• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2312

13 May 2024 08:25AM UTC coverage: 90.834% (+0.03%) from 90.809%
2312

push

Evergreen

web-flow
[bindgen] Adding a `app_user_as_sync_user` helper (#7684)

* Adding app_user_as_sync_user

* Update CHANGELOG.md

Co-authored-by: Kenneth Geisshirt <kenneth.geisshirt@mongodb.com>

---------

Co-authored-by: Kenneth Geisshirt <kenneth.geisshirt@mongodb.com>

102108 of 181070 branches covered (56.39%)

214614 of 236270 relevant lines covered (90.83%)

5903299.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.73
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string_view signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
protected:
131
    friend class ClientImpl;
132

133
    // m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress
134
    // handler or connection state listener after we've bound the session. We read the variable a bunch in
135
    // REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering
136
    // the session wrapper to be actualized. This function gets called from
137
    // ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread
138
    // with reading the variable on the event loop.
139
    void mark_initiated();
140
    void mark_abandoned();
141

142
private:
143
    ClientImpl& m_client;
144
    DBRef m_db;
145
    Replication* m_replication;
146

147
    const ProtocolEnvelope m_protocol_envelope;
148
    const std::string m_server_address;
149
    const port_type m_server_port;
150
    const bool m_server_verified;
151
    const std::string m_user_id;
152
    const SyncServerMode m_sync_mode;
153
    const std::string m_authorization_header_name;
154
    const std::map<std::string, std::string> m_custom_http_headers;
155
    const bool m_verify_servers_ssl_certificate;
156
    const bool m_simulate_integration_error;
157
    const Optional<std::string> m_ssl_trust_certificate_path;
158
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
159
    const size_t m_flx_bootstrap_batch_size_bytes;
160

161
    // This one is different from null when, and only when the session wrapper
162
    // is in ClientImpl::m_abandoned_session_wrappers.
163
    SessionWrapper* m_next = nullptr;
164

165
    // After initiation, these may only be accessed by the event loop thread.
166
    std::string m_http_request_path_prefix;
167
    std::string m_virt_path;
168
    std::string m_signed_access_token;
169

170
    util::Optional<ClientReset> m_client_reset_config;
171

172
    util::Optional<ProxyConfig> m_proxy_config;
173

174
    struct ReportedProgress {
175
        uint64_t snapshot = 0;
176
        uint64_t uploaded = 0;
177
        uint64_t uploadable = 0;
178
        uint64_t downloaded = 0;
179
        uint64_t downloadable = 0;
180
        uint64_t final_uploaded = 0;
181
        uint64_t final_downloaded = 0;
182
    } m_reported_progress;
183

184
    util::UniqueFunction<ProgressHandler> m_progress_handler;
185
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
186

187
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
188
    bool m_in_debug_hook = false;
189

190
    SessionReason m_session_reason;
191

192
    const uint64_t m_schema_version;
193

194
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
195
    int64_t m_flx_active_version = 0;
196
    int64_t m_flx_last_seen_version = 0;
197
    int64_t m_flx_pending_mark_version = 0;
198
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
199

200
    std::shared_ptr<MigrationStore> m_migration_store;
201

202
    bool m_initiated = false;
203

204
    // Set to true when this session wrapper is actualized (or when it is
205
    // finalized before proper actualization). It is then never modified again.
206
    //
207
    // A session specific post handler submitted after the initiation of the
208
    // session wrapper (initiate()) will always find that `m_actualized` is
209
    // true. This is the case, because the scheduling of such a post handler
210
    // will have been preceded by the triggering of
211
    // `ClientImpl::m_actualize_and_finalize` (in
212
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
213
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
214
    // before the post handler. If the session wrapper is no longer in
215
    // `ClientImpl::m_unactualized_session_wrappers` when
216
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
217
    // have been abandoned already, but in that case,
218
    // finalize_before_actualization() has already been called.
219
    bool m_actualized = false;
220

221
    bool m_force_closed = false;
222

223
    bool m_suspended = false;
224

225
    // Set when the session has been abandoned, but before it's been finalized.
226
    bool m_abandoned = false;
227
    // Has the SessionWrapper been finalized?
228
    bool m_finalized = false;
229

230
    // Set to true when the first DOWNLOAD message is received to indicate that
231
    // the byte-level download progress parameters can be considered reasonable
232
    // reliable. Before that, a lot of time may have passed, so our record of
233
    // the download progress is likely completely out of date.
234
    bool m_reliable_download_progress = false;
235

236
    std::optional<double> m_download_estimate;
237
    std::optional<uint64_t> m_bootstrap_store_bytes;
238

239
    // Set to point to an activated session object during actualization of the
240
    // session wrapper. Set to null during finalization of the session
241
    // wrapper. Both modifications are guaranteed to be performed by the event
242
    // loop thread.
243
    //
244
    // If a session specific post handler, that is submitted after the
245
    // initiation of the session wrapper, sees that `m_sess` is null, it can
246
    // conclude that the session wrapper has been both abandoned and
247
    // finalized. This is true, because the scheduling of such a post handler
248
    // will have been preceded by the triggering of
249
    // `ClientImpl::m_actualize_and_finalize` (in
250
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
251
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
252
    // before the post handler, so the session wrapper must have been actualized
253
    // unless it was already abandoned by the application. If it was abandoned
254
    // before it was actualized, it will already have been finalized by
255
    // finalize_before_actualization().
256
    //
257
    // Must only be accessed from the event loop thread.
258
    SessionImpl* m_sess = nullptr;
259

260
    // These must only be accessed from the event loop thread.
261
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
262
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
263
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
264

265
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
266
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
267
    // event loop thread.
268
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
269
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
270
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
271

272
    void on_upload_progress(bool only_if_new_uploadable_data = false);
273
    void on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes = {});
274
    void on_upload_completion();
275
    void on_download_completion();
276
    void on_suspended(const SessionErrorInfo& error_info);
277
    void on_resumed();
278
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
279
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
280
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
281
    void on_flx_sync_version_complete(int64_t version);
282

283
    void init_progress_handler();
284
    // only_if_new_uploadable_data can be true only if is_download is false
285
    void report_progress(bool is_download, bool only_if_new_uploadable_data = false);
286

287
    friend class SessionWrapperStack;
288
    friend class ClientImpl::Session;
289
};
290

291

292
// ################ SessionWrapperStack ################
293

294
inline bool SessionWrapperStack::empty() const noexcept
295
{
×
296
    return !m_back;
×
297
}
×
298

299

300
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
301
{
10,376✔
302
    REALM_ASSERT(!w->m_next);
10,376✔
303
    w->m_next = m_back;
10,376✔
304
    m_back = w.release();
10,376✔
305
}
10,376✔
306

307

308
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
309
{
25,610✔
310
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
25,610✔
311
    if (m_back) {
25,610✔
312
        m_back = m_back->m_next;
10,364✔
313
        w->m_next = nullptr;
10,364✔
314
    }
10,364✔
315
    return w;
25,610✔
316
}
25,610✔
317

318

319
inline void SessionWrapperStack::clear() noexcept
320
{
25,156✔
321
    while (m_back) {
25,156✔
322
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
323
        m_back = w->m_next;
×
324
    }
×
325
}
25,156✔
326

327

328
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
329
    : m_back{q.m_back}
330
{
331
    q.m_back = nullptr;
332
}
333

334

335
SessionWrapperStack::~SessionWrapperStack()
336
{
25,156✔
337
    clear();
25,156✔
338
}
25,156✔
339

340

341
// ################ ClientImpl ################
342

343
ClientImpl::~ClientImpl()
344
{
9,918✔
345
    // Since no other thread is allowed to be accessing this client or any of
346
    // its subobjects at this time, no mutex locking is necessary.
347

348
    shutdown_and_wait();
9,918✔
349
    // Session wrappers are removed from m_unactualized_session_wrappers as they
350
    // are abandoned.
351
    REALM_ASSERT(m_stopped);
9,918✔
352
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,918✔
353
}
9,918✔
354

355

356
void ClientImpl::cancel_reconnect_delay()
357
{
2,148✔
358
    // Thread safety required
359
    post([this](Status status) {
2,148✔
360
        if (status == ErrorCodes::OperationAborted)
2,148✔
361
            return;
×
362
        else if (!status.is_ok())
2,148✔
363
            throw Exception(status);
×
364

365
        for (auto& p : m_server_slots) {
2,148✔
366
            ServerSlot& slot = p.second;
2,148✔
367
            if (m_one_connection_per_session) {
2,148✔
368
                REALM_ASSERT(!slot.connection);
×
369
                for (const auto& p : slot.alt_connections) {
×
370
                    ClientImpl::Connection& conn = *p.second;
×
371
                    conn.resume_active_sessions(); // Throws
×
372
                    conn.cancel_reconnect_delay(); // Throws
×
373
                }
×
374
            }
×
375
            else {
2,148✔
376
                REALM_ASSERT(slot.alt_connections.empty());
2,148✔
377
                if (slot.connection) {
2,148✔
378
                    ClientImpl::Connection& conn = *slot.connection;
2,144✔
379
                    conn.resume_active_sessions(); // Throws
2,144✔
380
                    conn.cancel_reconnect_delay(); // Throws
2,144✔
381
                }
2,144✔
382
                else {
4✔
383
                    slot.reconnect_info.reset();
4✔
384
                }
4✔
385
            }
2,148✔
386
        }
2,148✔
387
    }); // Throws
2,148✔
388
}
2,148✔
389

390

391
void ClientImpl::voluntary_disconnect_all_connections()
392
{
12✔
393
    auto done_pf = util::make_promise_future<void>();
12✔
394
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
395
        if (status == ErrorCodes::OperationAborted) {
12✔
396
            return;
×
397
        }
×
398

399
        REALM_ASSERT(status.is_ok());
12✔
400

401
        try {
12✔
402
            for (auto& p : m_server_slots) {
12✔
403
                ServerSlot& slot = p.second;
12✔
404
                if (m_one_connection_per_session) {
12✔
405
                    REALM_ASSERT(!slot.connection);
×
406
                    for (const auto& p : slot.alt_connections) {
×
407
                        ClientImpl::Connection& conn = *p.second;
×
408
                        if (conn.get_state() == ConnectionState::disconnected) {
×
409
                            continue;
×
410
                        }
×
411
                        conn.voluntary_disconnect();
×
412
                    }
×
413
                }
×
414
                else {
12✔
415
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
416
                    if (!slot.connection) {
12✔
417
                        continue;
×
418
                    }
×
419
                    ClientImpl::Connection& conn = *slot.connection;
12✔
420
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
421
                        continue;
×
422
                    }
×
423
                    conn.voluntary_disconnect();
12✔
424
                }
12✔
425
            }
12✔
426
        }
12✔
427
        catch (...) {
12✔
428
            promise.set_error(exception_to_status());
×
429
            return;
×
430
        }
×
431
        promise.emplace_value();
12✔
432
    });
12✔
433
    done_pf.future.get();
12✔
434
}
12✔
435

436

437
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
438
{
9,480✔
439
    // Thread safety required
440

441
    {
9,480✔
442
        std::lock_guard lock{m_mutex};
9,480✔
443
        m_sessions_terminated = false;
9,480✔
444
    }
9,480✔
445

446
    // The technique employed here relies on the fact that
447
    // actualize_and_finalize_session_wrappers() must get to execute at least
448
    // once before the post handler submitted below gets to execute, but still
449
    // at a time where all session wrappers, that are abandoned prior to the
450
    // execution of wait_for_session_terminations_or_client_stopped(), have been
451
    // added to `m_abandoned_session_wrappers`.
452
    //
453
    // To see that this is the case, consider a session wrapper that was
454
    // abandoned before wait_for_session_terminations_or_client_stopped() was
455
    // invoked. Then the session wrapper will have been added to
456
    // `m_abandoned_session_wrappers`, and an invocation of
457
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
458
    // guarantees mentioned in the documentation of Trigger then ensure
459
    // that at least one execution of actualize_and_finalize_session_wrappers()
460
    // will happen after the session wrapper has been added to
461
    // `m_abandoned_session_wrappers`, but before the post handler submitted
462
    // below gets to execute.
463
    post([this](Status status) mutable {
9,480✔
464
        if (status == ErrorCodes::OperationAborted)
9,480✔
465
            return;
×
466
        else if (!status.is_ok())
9,480✔
467
            throw Exception(status);
×
468

469
        std::lock_guard lock{m_mutex};
9,480✔
470
        m_sessions_terminated = true;
9,480✔
471
        m_wait_or_client_stopped_cond.notify_all();
9,480✔
472
    }); // Throws
9,480✔
473

474
    bool completion_condition_was_satisfied;
9,480✔
475
    {
9,480✔
476
        std::unique_lock lock{m_mutex};
9,480✔
477
        while (!m_sessions_terminated && !m_stopped)
18,952✔
478
            m_wait_or_client_stopped_cond.wait(lock);
9,472✔
479
        completion_condition_was_satisfied = !m_stopped;
9,480✔
480
    }
9,480✔
481
    return completion_condition_was_satisfied;
9,480✔
482
}
9,480✔
483

484

485
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
486
util::Future<void> ClientImpl::notify_session_terminated()
487
{
56✔
488
    auto pf = util::make_promise_future<void>();
56✔
489
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
490
        // Includes operation_aborted
491
        if (!status.is_ok()) {
56✔
492
            promise.set_error(status);
×
493
            return;
×
494
        }
×
495

496
        promise.emplace_value();
56✔
497
    });
56✔
498

499
    return std::move(pf.future);
56✔
500
}
56✔
501

502
void ClientImpl::drain_connections_on_loop()
503
{
9,918✔
504
    post([this](Status status) mutable {
9,918✔
505
        REALM_ASSERT(status.is_ok());
9,904✔
506
        drain_connections();
9,904✔
507
    });
9,904✔
508
}
9,918✔
509

510
void ClientImpl::shutdown_and_wait()
511
{
10,690✔
512
    shutdown();
10,690✔
513
    std::unique_lock lock{m_drain_mutex};
10,690✔
514
    if (m_drained) {
10,690✔
515
        return;
768✔
516
    }
768✔
517

518
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,922✔
519
    m_drain_cv.wait(lock, [&] {
19,830✔
520
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,830✔
521
    });
19,830✔
522

523
    m_drained = true;
9,922✔
524
}
9,922✔
525

526
void ClientImpl::shutdown() noexcept
527
{
20,686✔
528
    {
20,686✔
529
        std::lock_guard lock{m_mutex};
20,686✔
530
        if (m_stopped)
20,686✔
531
            return;
10,768✔
532
        m_stopped = true;
9,918✔
533
        m_wait_or_client_stopped_cond.notify_all();
9,918✔
534
    }
9,918✔
535

536
    drain_connections_on_loop();
×
537
}
9,918✔
538

539

540
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
541
{
10,560✔
542
    // Thread safety required.
543
    {
10,560✔
544
        std::lock_guard lock{m_mutex};
10,560✔
545
        REALM_ASSERT(m_actualize_and_finalize);
10,560✔
546
        wrapper->mark_initiated();
10,560✔
547
        m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
10,560✔
548
    }
10,560✔
549
    m_actualize_and_finalize->trigger();
10,560✔
550
}
10,560✔
551

552

553
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
554
{
10,560✔
555
    // Thread safety required.
556
    {
10,560✔
557
        std::lock_guard lock{m_mutex};
10,560✔
558
        REALM_ASSERT(m_actualize_and_finalize);
10,560✔
559
        wrapper->mark_abandoned();
10,560✔
560

561
        // If the session wrapper has not yet been actualized (on the event loop
562
        // thread), it can be immediately finalized. This ensures that we will
563
        // generally not actualize a session wrapper that has already been
564
        // abandoned.
565
        auto i = m_unactualized_session_wrappers.find(wrapper.get());
10,560✔
566
        if (i != m_unactualized_session_wrappers.end()) {
10,560✔
567
            m_unactualized_session_wrappers.erase(i);
184✔
568
            wrapper->finalize_before_actualization();
184✔
569
            return;
184✔
570
        }
184✔
571
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,376✔
572
    }
10,376✔
573
    m_actualize_and_finalize->trigger();
×
574
}
10,376✔
575

576

577
// Must be called from the event loop thread.
578
void ClientImpl::actualize_and_finalize_session_wrappers()
579
{
15,250✔
580
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
15,250✔
581
    SessionWrapperStack abandoned_session_wrappers;
15,250✔
582
    bool stopped;
15,250✔
583
    {
15,250✔
584
        std::lock_guard lock{m_mutex};
15,250✔
585
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
15,250✔
586
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
15,250✔
587
        stopped = m_stopped;
15,250✔
588
    }
15,250✔
589
    // Note, we need to finalize old session wrappers before we actualize new
590
    // ones. This ensures that deactivation of old sessions is initiated before
591
    // new session are activated. This, in turn, ensures that the server does
592
    // not see two overlapping sessions for the same local Realm file.
593
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
25,612✔
594
        wrapper->finalize(); // Throws
10,362✔
595
    if (stopped) {
15,250✔
596
        for (auto& p : unactualized_session_wrappers) {
664✔
597
            SessionWrapper& wrapper = *p.first;
4✔
598
            wrapper.finalize_before_actualization();
4✔
599
        }
4✔
600
        return;
664✔
601
    }
664✔
602
    for (auto& p : unactualized_session_wrappers) {
14,586✔
603
        SessionWrapper& wrapper = *p.first;
10,372✔
604
        ServerEndpoint server_endpoint = std::move(p.second);
10,372✔
605
        wrapper.actualize(std::move(server_endpoint)); // Throws
10,372✔
606
    }
10,372✔
607
}
14,586✔
608

609

610
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
611
                                                   const std::string& authorization_header_name,
612
                                                   const std::map<std::string, std::string>& custom_http_headers,
613
                                                   bool verify_servers_ssl_certificate,
614
                                                   Optional<std::string> ssl_trust_certificate_path,
615
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
616
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
617
{
10,368✔
618
    auto&& [server_slot_it, inserted] =
10,368✔
619
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,368✔
620
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,368✔
621

622
    // TODO: enable multiplexing with proxies
623
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,368✔
624
        // Use preexisting connection
625
        REALM_ASSERT(server_slot.alt_connections.empty());
7,588✔
626
        return *server_slot.connection;
7,588✔
627
    }
7,588✔
628

629
    // Create a new connection
630
    REALM_ASSERT(!server_slot.connection);
2,780✔
631
    connection_ident_type ident = m_prev_connection_ident + 1;
2,780✔
632
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,780✔
633
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,780✔
634
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,780✔
635
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,780✔
636
    ClientImpl::Connection& conn = *conn_2;
2,780✔
637
    if (!m_one_connection_per_session) {
2,780✔
638
        server_slot.connection = std::move(conn_2);
2,768✔
639
    }
2,768✔
640
    else {
12✔
641
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
642
    }
12✔
643
    m_prev_connection_ident = ident;
2,780✔
644
    was_created = true;
2,780✔
645
    {
2,780✔
646
        std::lock_guard lk(m_drain_mutex);
2,780✔
647
        ++m_num_connections;
2,780✔
648
    }
2,780✔
649
    return conn;
2,780✔
650
}
10,368✔
651

652

653
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
654
{
2,768✔
655
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,768✔
656
    auto i = m_server_slots.find(endpoint);
2,768✔
657
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,768✔
658
    ServerSlot& server_slot = i->second;
2,768✔
659
    if (!m_one_connection_per_session) {
2,768✔
660
        REALM_ASSERT(server_slot.alt_connections.empty());
2,756✔
661
        REALM_ASSERT(&*server_slot.connection == &conn);
2,756✔
662
        server_slot.reconnect_info = conn.get_reconnect_info();
2,756✔
663
        server_slot.connection.reset();
2,756✔
664
    }
2,756✔
665
    else {
12✔
666
        REALM_ASSERT(!server_slot.connection);
12✔
667
        connection_ident_type ident = conn.get_ident();
12✔
668
        auto j = server_slot.alt_connections.find(ident);
12✔
669
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
670
        REALM_ASSERT(&*j->second == &conn);
12✔
671
        server_slot.alt_connections.erase(j);
12✔
672
    }
12✔
673

674
    {
2,768✔
675
        std::lock_guard lk(m_drain_mutex);
2,768✔
676
        REALM_ASSERT(m_num_connections);
2,768✔
677
        --m_num_connections;
2,768✔
678
        m_drain_cv.notify_all();
2,768✔
679
    }
2,768✔
680
}
2,768✔
681

682

683
// ################ SessionImpl ################
684

685
void SessionImpl::force_close()
686
{
102✔
687
    // Allow force_close() if session is active or hasn't been activated yet.
688
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
102!
689
        m_wrapper.force_close();
102✔
690
    }
102✔
691
}
102✔
692

693
void SessionImpl::on_connection_state_changed(ConnectionState state,
694
                                              const util::Optional<SessionErrorInfo>& error_info)
695
{
12,600✔
696
    // Only used to report errors back to the SyncSession while the Session is active
697
    if (m_state == SessionImpl::Active) {
12,600✔
698
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
12,600✔
699
    }
12,600✔
700
}
12,600✔
701

702

703
const std::string& SessionImpl::get_virt_path() const noexcept
704
{
7,376✔
705
    // Can only be called if the session is active or being activated
706
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,376✔
707
    return m_wrapper.m_virt_path;
7,376✔
708
}
7,376✔
709

710
const std::string& SessionImpl::get_realm_path() const noexcept
711
{
10,664✔
712
    // Can only be called if the session is active or being activated
713
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,664✔
714
    return m_wrapper.m_db->get_path();
10,664✔
715
}
10,664✔
716

717
DBRef SessionImpl::get_db() const noexcept
718
{
26,210✔
719
    // Can only be called if the session is active or being activated
720
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
26,210!
721
    return m_wrapper.m_db;
26,210✔
722
}
26,210✔
723

724
ClientReplication& SessionImpl::get_repl() const noexcept
725
{
120,890✔
726
    // Can only be called if the session is active or being activated
727
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
120,890✔
728
    return m_wrapper.get_replication();
120,890✔
729
}
120,890✔
730

731
ClientHistory& SessionImpl::get_history() const noexcept
732
{
118,926✔
733
    return get_repl().get_history();
118,926✔
734
}
118,926✔
735

736
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
737
{
14,048✔
738
    // Can only be called if the session is active or being activated
739
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
14,048✔
740
    return m_wrapper.m_client_reset_config;
14,048✔
741
}
14,048✔
742

743
SessionReason SessionImpl::get_session_reason() noexcept
744
{
1,496✔
745
    // Can only be called if the session is active or being activated
746
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,496!
747
    return m_wrapper.m_session_reason;
1,496✔
748
}
1,496✔
749

750
uint64_t SessionImpl::get_schema_version() noexcept
751
{
1,496✔
752
    // Can only be called if the session is active or being activated
753
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,496!
754
    return m_wrapper.m_schema_version;
1,496✔
755
}
1,496✔
756

757
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
758
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
759
{
46,008✔
760
    // Ignore the call if the session is not active
761
    if (m_state != State::Active) {
46,008✔
762
        return;
×
763
    }
×
764

765
    try {
46,008✔
766
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
46,008!
767
        if (simulate_integration_error) {
46,008✔
768
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
769
        }
×
770
        version_type client_version;
46,008✔
771
        if (REALM_LIKELY(!get_client().is_dry_run())) {
46,008✔
772
            VersionInfo version_info;
46,008✔
773
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
46,008✔
774
            client_version = version_info.realm_version;
46,008✔
775
        }
46,008✔
776
        else {
×
777
            // Fake it for "dry run" mode
778
            client_version = m_last_version_available + 1;
×
779
        }
×
780
        on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
46,008✔
781
    }
46,008✔
782
    catch (const IntegrationException& e) {
46,008✔
783
        on_integration_failure(e);
24✔
784
    }
24✔
785
}
46,008✔
786

787

788
void SessionImpl::on_upload_completion()
789
{
15,002✔
790
    // Ignore the call if the session is not active
791
    if (m_state == State::Active) {
15,002✔
792
        m_wrapper.on_upload_completion(); // Throws
15,002✔
793
    }
15,002✔
794
}
15,002✔
795

796

797
void SessionImpl::on_download_completion()
798
{
16,362✔
799
    // Ignore the call if the session is not active
800
    if (m_state == State::Active) {
16,362✔
801
        m_wrapper.on_download_completion(); // Throws
16,362✔
802
    }
16,362✔
803
}
16,362✔
804

805

806
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
807
{
670✔
808
    // Ignore the call if the session is not active
809
    if (m_state == State::Active) {
670✔
810
        m_wrapper.on_suspended(error_info); // Throws
670✔
811
    }
670✔
812
}
670✔
813

814

815
void SessionImpl::on_resumed()
816
{
62✔
817
    // Ignore the call if the session is not active
818
    if (m_state == State::Active) {
62✔
819
        m_wrapper.on_resumed(); // Throws
62✔
820
    }
62✔
821
}
62✔
822

823
void SessionImpl::handle_pending_client_reset_acknowledgement()
824
{
314✔
825
    // Ignore the call if the session is not active
826
    if (m_state == State::Active) {
314✔
827
        m_wrapper.handle_pending_client_reset_acknowledgement();
314✔
828
    }
314✔
829
}
314✔
830

831
void SessionImpl::update_subscription_version_info()
832
{
296✔
833
    // Ignore the call if the session is not active
834
    if (m_state == State::Active) {
296✔
835
        m_wrapper.update_subscription_version_info();
296✔
836
    }
296✔
837
}
296✔
838

839
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
840
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
841
{
48,200✔
842
    // Ignore the call if the session is not active
843
    if (m_state != State::Active) {
48,200✔
844
        return false;
×
845
    }
×
846

847
    if (is_steady_state_download_message(batch_state, query_version)) {
48,200✔
848
        return false;
46,008✔
849
    }
46,008✔
850

851
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,192✔
852
    util::Optional<SyncProgress> maybe_progress;
2,192✔
853
    if (batch_state == DownloadBatchState::LastInBatch) {
2,192✔
854
        maybe_progress = progress;
1,952✔
855
    }
1,952✔
856

857
    bool new_batch = false;
2,192✔
858
    try {
2,192✔
859
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
2,192✔
860
    }
2,192✔
861
    catch (const LogicError& ex) {
2,192✔
862
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
863
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
864
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
865
                                    ProtocolError::bad_changeset_size);
×
866
            on_integration_failure(ex);
×
867
            return true;
×
868
        }
×
869
        throw;
×
870
    }
×
871

872
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
873
    // bootstrapping.
874
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
2,192✔
875
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
56✔
876
    }
56✔
877

878
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
2,192✔
879
                                       batch_state, received_changesets.size());
2,192✔
880
    if (hook_action == SyncClientHookAction::EarlyReturn) {
2,192✔
881
        return true;
12✔
882
    }
12✔
883
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
2,180✔
884

885
    if (batch_state == DownloadBatchState::MoreToCome) {
2,180✔
886
        notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes);
236✔
887
        return true;
236✔
888
    }
236✔
889
    else {
1,944✔
890
        // FIXME (#7451) this variable is not needed in principle, and bootstrap store bytes could be passed just
891
        // through notify_download_progress, but since it is needed in report_progress, and it is also called on
892
        // upload progress for now until progress is reported separately. As soon as we understand here that there
893
        // are no more changesets for bootstrap store, and we want to process bootstrap, we don't need to notify
894
        // intermediate progress - so reset these bytes to not accidentally double report them.
895
        m_wrapper.m_bootstrap_store_bytes.reset();
1,944✔
896
    }
1,944✔
897

898
    try {
1,944✔
899
        process_pending_flx_bootstrap();
1,944✔
900
    }
1,944✔
901
    catch (const IntegrationException& e) {
1,944✔
902
        on_integration_failure(e);
12✔
903
    }
12✔
904
    catch (...) {
1,944✔
905
        on_integration_failure(IntegrationException(exception_to_status()));
×
906
    }
×
907

908
    return true;
1,944✔
909
}
1,944✔
910

911

912
void SessionImpl::process_pending_flx_bootstrap()
913
{
12,310✔
914
    // Ignore the call if not a flx session or session is not active
915
    if (!m_is_flx_sync_session || m_state != State::Active) {
12,310✔
916
        return;
8,834✔
917
    }
8,834✔
918
    // Should never be called if session is not active
919
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,476✔
920
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,476✔
921
    if (!bootstrap_store->has_pending()) {
3,476✔
922
        return;
1,510✔
923
    }
1,510✔
924

925
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,966✔
926
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,966✔
927
                "changeset size: %3)",
1,966✔
928
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,966✔
929
                pending_batch_stats.pending_changeset_bytes);
1,966✔
930
    auto& history = get_repl().get_history();
1,966✔
931
    VersionInfo new_version;
1,966✔
932
    SyncProgress progress;
1,966✔
933
    int64_t query_version = -1;
1,966✔
934
    size_t changesets_processed = 0;
1,966✔
935

936
    // Used to commit each batch after it was transformed.
937
    TransactionRef transact = get_db()->start_write();
1,966✔
938
    while (bootstrap_store->has_pending()) {
4,130✔
939
        auto start_time = std::chrono::steady_clock::now();
2,176✔
940
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,176✔
941
        if (!pending_batch.progress) {
2,176✔
942
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
943
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
944
            // bootstrap store requires a write transaction itself.
945
            transact->close();
8✔
946
            bootstrap_store->clear();
8✔
947
            return;
8✔
948
        }
8✔
949

950
        auto batch_state =
2,168✔
951
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,168✔
952
        uint64_t downloadable_bytes = 0;
2,168✔
953
        query_version = pending_batch.query_version;
2,168✔
954
        bool simulate_integration_error =
2,168✔
955
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,168✔
956
        if (simulate_integration_error) {
2,168✔
957
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
958
        }
4✔
959

960
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,164✔
961
                        batch_state, pending_batch.changesets.size());
2,164✔
962

963
        history.integrate_server_changesets(
2,164✔
964
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,164✔
965
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,164✔
966
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,152✔
967
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,152✔
968
            });
2,152✔
969
        progress = *pending_batch.progress;
2,164✔
970
        changesets_processed += pending_batch.changesets.size();
2,164✔
971
        auto duration = std::chrono::steady_clock::now() - start_time;
2,164✔
972

973
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,164✔
974
                                      batch_state, pending_batch.changesets.size());
2,164✔
975
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,164✔
976

977
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,164✔
978
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,164✔
979
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
2,164✔
980
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
2,164✔
981
                    pending_batch.remaining_changesets);
2,164✔
982
    }
2,164✔
983
    on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
1,954✔
984

985
    REALM_ASSERT_3(query_version, !=, -1);
1,954✔
986
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,954✔
987

988
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,954✔
989
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,954✔
990
    // NoAction/EarlyReturn are both valid no-op actions to take here.
991
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,954✔
992
}
1,954✔
993

994
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
995
{
20✔
996
    // Ignore the call if the session is not active
997
    if (m_state == State::Active) {
20✔
998
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
999
    }
20✔
1000
}
20✔
1001

1002
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
1003
{
1,996✔
1004
    // Ignore the call if the session is not active
1005
    if (m_state == State::Active) {
1,996✔
1006
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,996✔
1007
    }
1,996✔
1008
}
1,996✔
1009

1010
SubscriptionStore* SessionImpl::get_flx_subscription_store()
1011
{
18,598✔
1012
    // Should never be called if session is not active
1013
    REALM_ASSERT_EX(m_state == State::Active, m_state);
18,598✔
1014
    return m_wrapper.get_flx_subscription_store();
18,598✔
1015
}
18,598✔
1016

1017
MigrationStore* SessionImpl::get_migration_store()
1018
{
68,494✔
1019
    // Should never be called if session is not active
1020
    REALM_ASSERT_EX(m_state == State::Active, m_state);
68,494✔
1021
    return m_wrapper.get_migration_store();
68,494✔
1022
}
68,494✔
1023

1024
void SessionImpl::on_flx_sync_version_complete(int64_t version)
1025
{
300✔
1026
    // Ignore the call if the session is not active
1027
    if (m_state == State::Active) {
300✔
1028
        m_wrapper.on_flx_sync_version_complete(version);
300✔
1029
    }
300✔
1030
}
300✔
1031

1032
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
1033
{
2,176✔
1034
    // Should never be called if session is not active
1035
    REALM_ASSERT_EX(m_state == State::Active, m_state);
2,176✔
1036

1037
    // Make sure we don't call the debug hook recursively.
1038
    if (m_wrapper.m_in_debug_hook) {
2,176✔
1039
        return SyncClientHookAction::NoAction;
24✔
1040
    }
24✔
1041
    m_wrapper.m_in_debug_hook = true;
2,152✔
1042
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
2,152✔
1043
        m_wrapper.m_in_debug_hook = false;
2,152✔
1044
    });
2,152✔
1045

1046
    auto action = m_wrapper.m_debug_hook(data);
2,152✔
1047
    switch (action) {
2,152✔
1048
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1049
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1050
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1051

1052
            auto err_processing_err = receive_error_message(err_info);
12✔
1053
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1054
            return SyncClientHookAction::EarlyReturn;
12✔
1055
        }
×
1056
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1057
            get_connection().voluntary_disconnect();
24✔
1058
            return SyncClientHookAction::EarlyReturn;
24✔
1059
        }
×
1060
        default:
2,108✔
1061
            return action;
2,108✔
1062
    }
2,152✔
1063
}
2,152✔
1064

1065
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1066
                                                  int64_t query_version, DownloadBatchState batch_state,
1067
                                                  size_t num_changesets)
1068
{
121,910✔
1069
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
121,910✔
1070
        return SyncClientHookAction::NoAction;
119,898✔
1071
    }
119,898✔
1072
    if (REALM_UNLIKELY(m_state != State::Active)) {
2,012✔
1073
        return SyncClientHookAction::NoAction;
×
1074
    }
×
1075

1076
    SyncClientHookData data;
2,012✔
1077
    data.event = event;
2,012✔
1078
    data.batch_state = batch_state;
2,012✔
1079
    data.progress = progress;
2,012✔
1080
    data.num_changesets = num_changesets;
2,012✔
1081
    data.query_version = query_version;
2,012✔
1082

1083
    return call_debug_hook(data);
2,012✔
1084
}
2,012✔
1085

1086
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1087
{
1,382✔
1088
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,382✔
1089
        return SyncClientHookAction::NoAction;
1,214✔
1090
    }
1,214✔
1091
    if (REALM_UNLIKELY(m_state != State::Active)) {
168✔
1092
        return SyncClientHookAction::NoAction;
×
1093
    }
×
1094

1095
    SyncClientHookData data;
168✔
1096
    data.event = event;
168✔
1097
    data.batch_state = DownloadBatchState::SteadyState;
168✔
1098
    data.progress = m_progress;
168✔
1099
    data.num_changesets = 0;
168✔
1100
    data.query_version = 0;
168✔
1101
    data.error_info = &error_info;
168✔
1102

1103
    return call_debug_hook(data);
168✔
1104
}
168✔
1105

1106
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1107
{
96,418✔
1108
    // Should never be called if session is not active
1109
    REALM_ASSERT_EX(m_state == State::Active, m_state);
96,418✔
1110
    if (batch_state == DownloadBatchState::SteadyState) {
96,418✔
1111
        return true;
46,008✔
1112
    }
46,008✔
1113

1114
    if (!m_is_flx_sync_session) {
50,410✔
1115
        return true;
44,610✔
1116
    }
44,610✔
1117

1118
    // If this is a steady state DOWNLOAD, no need for special handling.
1119
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
5,800✔
1120
        return true;
1,406✔
1121
    }
1,406✔
1122

1123
    return false;
4,394✔
1124
}
5,800✔
1125

1126
void SessionImpl::init_progress_handler()
1127
{
10,664✔
1128
    if (m_state != State::Unactivated && m_state != State::Active)
10,664✔
1129
        return;
×
1130

1131
    m_wrapper.init_progress_handler();
10,664✔
1132
}
10,664✔
1133

1134
void SessionImpl::enable_progress_notifications()
1135
{
46,592✔
1136
    m_wrapper.m_reliable_download_progress = true;
46,592✔
1137
}
46,592✔
1138

1139
void SessionImpl::notify_upload_progress()
1140
{
34,062✔
1141
    if (m_state != State::Active)
34,062✔
1142
        return;
×
1143

1144
    m_wrapper.on_upload_progress();
34,062✔
1145
}
34,062✔
1146

1147
void SessionImpl::update_download_estimate(double download_estimate)
1148
{
3,594✔
1149
    if (m_state != State::Active)
3,594✔
1150
        return;
×
1151

1152
    m_wrapper.m_download_estimate = download_estimate;
3,594✔
1153
}
3,594✔
1154

1155
void SessionImpl::notify_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1156
{
27,926✔
1157
    if (m_state != State::Active)
27,926✔
1158
        return;
×
1159

1160
    m_wrapper.on_download_progress(bootstrap_store_bytes); // Throws
27,926✔
1161
}
27,926✔
1162

1163
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1164
{
60✔
1165
    if (m_state != State::Active) {
60✔
1166
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1167
    }
×
1168

1169
    try {
60✔
1170
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1171
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1172
            return Status{ErrorCodes::LogicError,
4✔
1173
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1174
        }
4✔
1175
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1176
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1177
        }
×
1178
    }
56✔
1179
    catch (const nlohmann::json::parse_error& e) {
60✔
1180
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1181
    }
4✔
1182

1183
    auto pf = util::make_promise_future<std::string>();
52✔
1184

1185
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1186
        // Includes operation_aborted
1187
        if (!status.is_ok()) {
52✔
1188
            promise.set_error(status);
×
1189
            return;
×
1190
        }
×
1191

1192
        auto id = ++m_last_pending_test_command_ident;
52✔
1193
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1194
        ensure_enlisted_to_send();
52✔
1195
    });
52✔
1196

1197
    return std::move(pf.future);
52✔
1198
}
60✔
1199

1200
// ################ SessionWrapper ################
1201

1202
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1203
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1204
// the ClientImpl::Connection that owns the ClientImpl::Session.
1205
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1206
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1207
    : m_client{client}
5,684✔
1208
    , m_db(std::move(db))
5,684✔
1209
    , m_replication(m_db->get_replication())
5,684✔
1210
    , m_protocol_envelope{config.protocol_envelope}
5,684✔
1211
    , m_server_address{std::move(config.server_address)}
5,684✔
1212
    , m_server_port{config.server_port}
5,684✔
1213
    , m_server_verified{config.server_verified}
5,684✔
1214
    , m_user_id(std::move(config.user_id))
5,684✔
1215
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
5,684✔
1216
    , m_authorization_header_name{config.authorization_header_name}
5,684✔
1217
    , m_custom_http_headers{config.custom_http_headers}
5,684✔
1218
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
5,684✔
1219
    , m_simulate_integration_error{config.simulate_integration_error}
5,684✔
1220
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
5,684✔
1221
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
5,684✔
1222
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
5,684✔
1223
    , m_http_request_path_prefix{std::move(config.service_identifier)}
5,684✔
1224
    , m_virt_path{std::move(config.realm_identifier)}
5,684✔
1225
    , m_signed_access_token{std::move(config.signed_user_token)}
5,684✔
1226
    , m_client_reset_config{std::move(config.client_reset_config)}
5,684✔
1227
    , m_proxy_config{config.proxy_config} // Throws
5,684✔
1228
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
5,684✔
1229
    , m_session_reason(config.session_reason)
5,684✔
1230
    , m_schema_version(config.schema_version)
5,684✔
1231
    , m_flx_subscription_store(std::move(flx_sub_store))
5,684✔
1232
    , m_migration_store(std::move(migration_store))
5,684✔
1233
{
11,712✔
1234
    REALM_ASSERT(m_db);
11,712✔
1235
    REALM_ASSERT(m_db->get_replication());
11,712✔
1236
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,712✔
1237
    if (m_client_reset_config) {
11,712✔
1238
        m_session_reason = SessionReason::ClientReset;
380✔
1239
    }
380✔
1240
}
11,712✔
1241

1242
SessionWrapper::~SessionWrapper() noexcept
1243
{
11,700✔
1244
    if (m_db && m_actualized) {
11,700✔
1245
        m_db->remove_commit_listener(this);
184✔
1246
        m_db->release_sync_agent();
184✔
1247
    }
184✔
1248
}
11,700✔
1249

1250

1251
inline ClientReplication& SessionWrapper::get_replication() noexcept
1252
{
120,888✔
1253
    REALM_ASSERT(m_db);
120,888✔
1254
    return static_cast<ClientReplication&>(*m_replication);
120,888✔
1255
}
120,888✔
1256

1257

1258
inline ClientImpl& SessionWrapper::get_client() noexcept
1259
{
72✔
1260
    return m_client;
72✔
1261
}
72✔
1262

1263
bool SessionWrapper::has_flx_subscription_store() const
1264
{
1,996✔
1265
    return static_cast<bool>(m_flx_subscription_store);
1,996✔
1266
}
1,996✔
1267

1268
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1269
{
20✔
1270
    REALM_ASSERT(!m_finalized);
20✔
1271
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1272
}
20✔
1273

1274
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1275
{
2,240✔
1276
    REALM_ASSERT(!m_finalized);
2,240✔
1277
    m_flx_last_seen_version = version;
2,240✔
1278
    m_flx_active_version = version;
2,240✔
1279
}
2,240✔
1280

1281
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1282
{
1,996✔
1283
    if (!has_flx_subscription_store()) {
1,996✔
1284
        return;
×
1285
    }
×
1286
    REALM_ASSERT(!m_finalized);
1,996✔
1287
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,996✔
1288
    REALM_ASSERT(new_version >= m_flx_active_version);
1,996✔
1289
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,996✔
1290

1291
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,996✔
1292

1293
    switch (batch_state) {
1,996✔
1294
        case DownloadBatchState::SteadyState:
✔
1295
            // Cannot be called with this value.
1296
            REALM_UNREACHABLE();
1297
        case DownloadBatchState::LastInBatch:
1,940✔
1298
            if (m_flx_active_version == new_version) {
1,940✔
1299
                return;
×
1300
            }
×
1301
            on_flx_sync_version_complete(new_version);
1,940✔
1302
            if (new_version == 0) {
1,940✔
1303
                new_state = SubscriptionSet::State::Complete;
916✔
1304
            }
916✔
1305
            else {
1,024✔
1306
                new_state = SubscriptionSet::State::AwaitingMark;
1,024✔
1307
                m_flx_pending_mark_version = new_version;
1,024✔
1308
            }
1,024✔
1309
            break;
1,940✔
1310
        case DownloadBatchState::MoreToCome:
56✔
1311
            if (m_flx_last_seen_version == new_version) {
56✔
1312
                return;
×
1313
            }
×
1314

1315
            m_flx_last_seen_version = new_version;
56✔
1316
            new_state = SubscriptionSet::State::Bootstrapping;
56✔
1317
            break;
56✔
1318
    }
1,996✔
1319

1320
    get_flx_subscription_store()->update_state(new_version, new_state);
1,996✔
1321
}
1,996✔
1322

1323
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1324
{
20,614✔
1325
    REALM_ASSERT(!m_finalized);
20,614✔
1326
    return m_flx_subscription_store.get();
20,614✔
1327
}
20,614✔
1328

1329
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1330
{
5,666✔
1331
    REALM_ASSERT(!m_finalized);
5,666✔
1332
    return m_flx_pending_bootstrap_store.get();
5,666✔
1333
}
5,666✔
1334

1335
MigrationStore* SessionWrapper::get_migration_store()
1336
{
68,492✔
1337
    REALM_ASSERT(!m_finalized);
68,492✔
1338
    return m_migration_store.get();
68,492✔
1339
}
68,492✔
1340

1341
inline void SessionWrapper::mark_initiated()
1342
{
10,560✔
1343
    REALM_ASSERT(!m_initiated);
10,560✔
1344
    REALM_ASSERT(!m_abandoned);
10,560✔
1345
    m_initiated = true;
10,560✔
1346
}
10,560✔
1347

1348

1349
inline void SessionWrapper::mark_abandoned()
1350
{
10,560✔
1351
    REALM_ASSERT(!m_abandoned);
10,560✔
1352
    m_abandoned = true;
10,560✔
1353
}
10,560✔
1354

1355

1356
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1357
{
4,164✔
1358
    REALM_ASSERT(!m_initiated);
4,164✔
1359
    m_progress_handler = std::move(handler);
4,164✔
1360
}
4,164✔
1361

1362

1363
inline void
1364
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1365
{
11,816✔
1366
    REALM_ASSERT(!m_initiated);
11,816✔
1367
    m_connection_state_change_listener = std::move(listener);
11,816✔
1368
}
11,816✔
1369

1370

1371
void SessionWrapper::initiate()
1372
{
10,560✔
1373
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,560✔
1374
                                   m_user_id,           m_sync_mode,      m_server_verified};
10,560✔
1375
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
10,560✔
1376
    m_db->add_commit_listener(this);
10,560✔
1377
}
10,560✔
1378

1379

1380
void SessionWrapper::on_commit(version_type new_version)
1381
{
114,966✔
1382
    // Thread safety required
1383
    REALM_ASSERT(m_initiated);
114,966✔
1384

1385
    util::bind_ptr<SessionWrapper> self{this};
114,966✔
1386
    m_client.post([self = std::move(self), new_version](Status status) {
114,966✔
1387
        if (status == ErrorCodes::OperationAborted)
114,958✔
1388
            return;
×
1389
        else if (!status.is_ok())
114,958✔
1390
            throw Exception(status);
×
1391

1392
        REALM_ASSERT(self->m_actualized);
114,958✔
1393
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
114,958✔
1394
            return;
860✔
1395
        }
860✔
1396

1397
        if (REALM_UNLIKELY(!self->m_sess))
114,098✔
1398
            return; // Already finalized
×
1399
        SessionImpl& sess = *self->m_sess;
114,098✔
1400
        sess.recognize_sync_version(new_version);                           // Throws
114,098✔
1401
        self->on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
114,098✔
1402
    });
114,098✔
1403
}
114,966✔
1404

1405

1406
void SessionWrapper::cancel_reconnect_delay()
1407
{
20✔
1408
    // Thread safety required
1409
    REALM_ASSERT(m_initiated);
20✔
1410

1411
    util::bind_ptr<SessionWrapper> self{this};
20✔
1412
    m_client.post([self = std::move(self)](Status status) {
20✔
1413
        if (status == ErrorCodes::OperationAborted)
20✔
1414
            return;
×
1415
        else if (!status.is_ok())
20✔
1416
            throw Exception(status);
×
1417

1418
        REALM_ASSERT(self->m_actualized);
20✔
1419
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
20✔
1420
            return;
×
1421
        }
×
1422

1423
        if (REALM_UNLIKELY(!self->m_sess))
20✔
1424
            return; // Already finalized
×
1425
        SessionImpl& sess = *self->m_sess;
20✔
1426
        sess.cancel_resumption_delay(); // Throws
20✔
1427
        ClientImpl::Connection& conn = sess.get_connection();
20✔
1428
        conn.cancel_reconnect_delay(); // Throws
20✔
1429
    });                                // Throws
20✔
1430
}
20✔
1431

1432
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1433
                                    WaitOperCompletionHandler handler)
1434
{
5,306✔
1435
    REALM_ASSERT(upload_completion || download_completion);
5,306✔
1436
    REALM_ASSERT(m_initiated);
5,306✔
1437

1438
    util::bind_ptr<SessionWrapper> self{this};
5,306✔
1439
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
5,306✔
1440
                   download_completion](Status status) mutable {
5,306✔
1441
        if (status == ErrorCodes::OperationAborted)
5,306✔
1442
            return;
×
1443
        else if (!status.is_ok())
5,306✔
1444
            throw Exception(status);
×
1445

1446
        REALM_ASSERT(self->m_actualized);
5,306✔
1447
        if (REALM_UNLIKELY(!self->m_sess)) {
5,306✔
1448
            // Already finalized
1449
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
64✔
1450
            return;
64✔
1451
        }
64✔
1452
        if (upload_completion) {
5,242✔
1453
            if (download_completion) {
2,674✔
1454
                // Wait for upload and download completion
1455
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
306✔
1456
            }
306✔
1457
            else {
2,368✔
1458
                // Wait for upload completion only
1459
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,368✔
1460
            }
2,368✔
1461
        }
2,674✔
1462
        else {
2,568✔
1463
            // Wait for download completion only
1464
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,568✔
1465
        }
2,568✔
1466
        SessionImpl& sess = *self->m_sess;
5,242✔
1467
        if (upload_completion)
5,242✔
1468
            sess.request_upload_completion_notification(); // Throws
2,674✔
1469
        if (download_completion)
5,242✔
1470
            sess.request_download_completion_notification(); // Throws
2,874✔
1471
    });                                                      // Throws
5,242✔
1472
}
5,306✔
1473

1474

1475
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1476
{
13,024✔
1477
    // Thread safety required
1478
    REALM_ASSERT(m_initiated);
13,024✔
1479
    REALM_ASSERT(!m_abandoned);
13,024✔
1480

1481
    std::int_fast64_t target_mark;
13,024✔
1482
    {
13,024✔
1483
        std::lock_guard lock{m_client.m_mutex};
13,024✔
1484
        target_mark = ++m_target_upload_mark;
13,024✔
1485
    }
13,024✔
1486

1487
    util::bind_ptr<SessionWrapper> self{this};
13,024✔
1488
    m_client.post([self = std::move(self), target_mark](Status status) {
13,024✔
1489
        if (status == ErrorCodes::OperationAborted)
13,024✔
1490
            return;
×
1491
        else if (!status.is_ok())
13,024✔
1492
            throw Exception(status);
×
1493

1494
        REALM_ASSERT(self->m_actualized);
13,024✔
1495
        REALM_ASSERT(!self->m_finalized);
13,024✔
1496
        // The session wrapper may already have been finalized. This can only
1497
        // happen if it was abandoned, but in that case, the call of
1498
        // wait_for_upload_complete_or_client_stopped() must have returned
1499
        // already.
1500
        if (REALM_UNLIKELY(!self->m_sess))
13,024✔
1501
            return;
24✔
1502
        if (target_mark > self->m_staged_upload_mark) {
13,000✔
1503
            self->m_staged_upload_mark = target_mark;
13,000✔
1504
            SessionImpl& sess = *self->m_sess;
13,000✔
1505
            sess.request_upload_completion_notification(); // Throws
13,000✔
1506
        }
13,000✔
1507
    }); // Throws
13,000✔
1508

1509
    bool completion_condition_was_satisfied;
13,024✔
1510
    {
13,024✔
1511
        std::unique_lock lock{m_client.m_mutex};
13,024✔
1512
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
32,860✔
1513
            m_client.m_wait_or_client_stopped_cond.wait(lock);
19,836✔
1514
        completion_condition_was_satisfied = !m_client.m_stopped;
13,024✔
1515
    }
13,024✔
1516
    return completion_condition_was_satisfied;
13,024✔
1517
}
13,024✔
1518

1519

1520
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1521
{
10,120✔
1522
    // Thread safety required
1523
    REALM_ASSERT(m_initiated);
10,120✔
1524
    REALM_ASSERT(!m_abandoned);
10,120✔
1525

1526
    std::int_fast64_t target_mark;
10,120✔
1527
    {
10,120✔
1528
        std::lock_guard lock{m_client.m_mutex};
10,120✔
1529
        target_mark = ++m_target_download_mark;
10,120✔
1530
    }
10,120✔
1531

1532
    util::bind_ptr<SessionWrapper> self{this};
10,120✔
1533
    m_client.post([self = std::move(self), target_mark](Status status) {
10,120✔
1534
        if (status == ErrorCodes::OperationAborted)
10,120✔
1535
            return;
×
1536
        else if (!status.is_ok())
10,120✔
1537
            throw Exception(status);
×
1538

1539
        REALM_ASSERT(self->m_actualized);
10,120✔
1540
        REALM_ASSERT(!self->m_finalized);
10,120✔
1541
        // The session wrapper may already have been finalized. This can only
1542
        // happen if it was abandoned, but in that case, the call of
1543
        // wait_for_download_complete_or_client_stopped() must have returned
1544
        // already.
1545
        if (REALM_UNLIKELY(!self->m_sess))
10,120✔
1546
            return;
60✔
1547
        if (target_mark > self->m_staged_download_mark) {
10,060✔
1548
            self->m_staged_download_mark = target_mark;
10,058✔
1549
            SessionImpl& sess = *self->m_sess;
10,058✔
1550
            sess.request_download_completion_notification(); // Throws
10,058✔
1551
        }
10,058✔
1552
    }); // Throws
10,060✔
1553

1554
    bool completion_condition_was_satisfied;
10,120✔
1555
    {
10,120✔
1556
        std::unique_lock lock{m_client.m_mutex};
10,120✔
1557
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,484✔
1558
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,364✔
1559
        completion_condition_was_satisfied = !m_client.m_stopped;
10,120✔
1560
    }
10,120✔
1561
    return completion_condition_was_satisfied;
10,120✔
1562
}
10,120✔
1563

1564

1565
void SessionWrapper::refresh(std::string_view signed_access_token)
1566
{
216✔
1567
    // Thread safety required
1568
    REALM_ASSERT(m_initiated);
216✔
1569
    REALM_ASSERT(!m_abandoned);
216✔
1570

1571
    m_client.post([self = util::bind_ptr(this), token = std::string(signed_access_token)](Status status) {
216✔
1572
        if (status == ErrorCodes::OperationAborted)
216✔
1573
            return;
×
1574
        else if (!status.is_ok())
216✔
1575
            throw Exception(status);
×
1576

1577
        REALM_ASSERT(self->m_actualized);
216✔
1578
        if (REALM_UNLIKELY(!self->m_sess))
216✔
1579
            return; // Already finalized
×
1580
        self->m_signed_access_token = std::move(token);
216✔
1581
        SessionImpl& sess = *self->m_sess;
216✔
1582
        ClientImpl::Connection& conn = sess.get_connection();
216✔
1583
        // FIXME: This only makes sense when each session uses a separate connection.
1584
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
216✔
1585
        sess.cancel_resumption_delay();                                                          // Throws
216✔
1586
        conn.cancel_reconnect_delay();                                                           // Throws
216✔
1587
    });
216✔
1588
}
216✔
1589

1590

1591
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1592
{
11,712✔
1593
    if (wrapper->m_initiated) {
11,712✔
1594
        ClientImpl& client = wrapper->m_client;
10,560✔
1595
        client.register_abandoned_session_wrapper(std::move(wrapper));
10,560✔
1596
    }
10,560✔
1597
}
11,712✔
1598

1599

1600
// Must be called from event loop thread
1601
void SessionWrapper::actualize(ServerEndpoint endpoint)
1602
{
10,372✔
1603
    REALM_ASSERT_DEBUG(m_initiated);
10,372✔
1604
    REALM_ASSERT(!m_actualized);
10,372✔
1605
    REALM_ASSERT(!m_sess);
10,372✔
1606
    // Cannot be actualized if it's already been finalized or force closed
1607
    REALM_ASSERT(!m_finalized);
10,372✔
1608
    REALM_ASSERT(!m_force_closed);
10,372✔
1609
    try {
10,372✔
1610
        m_db->claim_sync_agent();
10,372✔
1611
    }
10,372✔
1612
    catch (const MultipleSyncAgents&) {
10,372✔
1613
        finalize_before_actualization();
4✔
1614
        throw;
4✔
1615
    }
4✔
1616
    auto sync_mode = endpoint.server_mode;
10,368✔
1617

1618
    bool was_created = false;
10,368✔
1619
    ClientImpl::Connection& conn = m_client.get_connection(
10,368✔
1620
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,368✔
1621
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,368✔
1622
        was_created); // Throws
10,368✔
1623
    try {
10,368✔
1624
        // FIXME: This only makes sense when each session uses a separate connection.
1625
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,368✔
1626
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,368✔
1627
        if (sync_mode == SyncServerMode::FLX) {
10,368✔
1628
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,530✔
1629
        }
1,530✔
1630

1631
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,368✔
1632
        m_sess = sess.get();
10,368✔
1633
        conn.activate_session(std::move(sess)); // Throws
10,368✔
1634
    }
10,368✔
1635
    catch (...) {
10,368✔
1636
        if (was_created)
×
1637
            m_client.remove_connection(conn);
×
1638

1639
        // finalize_before_actualization() expects m_sess to be nullptr, but it's possible that we
1640
        // reached its assignment above before throwing. Unset it here so we get a clean unhandled
1641
        // exception failure instead of a REALM_ASSERT in finalize_before_actualization().
1642
        m_sess = nullptr;
×
1643
        finalize_before_actualization();
×
1644
        throw;
×
1645
    }
×
1646

1647
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1648
    // session cannot change the state of the bootstrap store at the same time.
1649
    update_subscription_version_info();
10,366✔
1650

1651
    m_actualized = true;
10,366✔
1652
    if (was_created)
10,366✔
1653
        conn.activate(); // Throws
2,776✔
1654

1655
    if (m_connection_state_change_listener) {
10,366✔
1656
        ConnectionState state = conn.get_state();
10,354✔
1657
        if (state != ConnectionState::disconnected) {
10,354✔
1658
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,430✔
1659
            if (state == ConnectionState::connected)
7,430✔
1660
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
5,864✔
1661
        }
7,430✔
1662
    }
10,354✔
1663

1664
    if (!m_client_reset_config)
10,366✔
1665
        on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws
9,992✔
1666
}
10,366✔
1667

1668
void SessionWrapper::force_close()
1669
{
102✔
1670
    if (m_force_closed || m_finalized) {
102✔
1671
        return;
×
1672
    }
×
1673
    REALM_ASSERT(m_actualized);
102✔
1674
    REALM_ASSERT(m_sess);
102✔
1675
    m_force_closed = true;
102✔
1676

1677
    ClientImpl::Connection& conn = m_sess->get_connection();
102✔
1678
    conn.initiate_session_deactivation(m_sess); // Throws
102✔
1679

1680
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1681
    m_flx_pending_bootstrap_store.reset();
102✔
1682
    // Clear the subscription and migration store refs since they are owned by SyncSession
1683
    m_flx_subscription_store.reset();
102✔
1684
    m_migration_store.reset();
102✔
1685
    m_sess = nullptr;
102✔
1686
    // Everything is being torn down, no need to report connection state anymore
1687
    m_connection_state_change_listener = {};
102✔
1688
}
102✔
1689

1690
// Must be called from event loop thread
1691
void SessionWrapper::finalize()
1692
{
10,362✔
1693
    REALM_ASSERT(m_actualized);
10,362✔
1694
    REALM_ASSERT(m_abandoned);
10,362✔
1695

1696
    // Already finalized?
1697
    if (m_finalized) {
10,362✔
1698
        return;
×
1699
    }
×
1700

1701
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
1702
    m_db->remove_commit_listener(this);
10,362✔
1703

1704
    m_finalized = true;
10,362✔
1705

1706
    if (!m_force_closed) {
10,362✔
1707
        REALM_ASSERT(m_sess);
10,252✔
1708
        ClientImpl::Connection& conn = m_sess->get_connection();
10,252✔
1709
        conn.initiate_session_deactivation(m_sess); // Throws
10,252✔
1710

1711
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1712
        m_flx_pending_bootstrap_store.reset();
10,252✔
1713
        // Clear the subscription and migration store refs since they are owned by SyncSession
1714
        m_flx_subscription_store.reset();
10,252✔
1715
        m_migration_store.reset();
10,252✔
1716
        m_sess = nullptr;
10,252✔
1717
    }
10,252✔
1718

1719
    // The Realm file can be closed now, as no access to the Realm file is
1720
    // supposed to happen on behalf of a session after initiation of
1721
    // deactivation.
1722
    m_db->release_sync_agent();
10,362✔
1723
    m_db = nullptr;
10,362✔
1724

1725
    // All outstanding wait operations must be canceled
1726
    while (!m_upload_completion_handlers.empty()) {
10,800✔
1727
        auto handler = std::move(m_upload_completion_handlers.back());
438✔
1728
        m_upload_completion_handlers.pop_back();
438✔
1729
        handler(
438✔
1730
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
438✔
1731
    }
438✔
1732
    while (!m_download_completion_handlers.empty()) {
10,590✔
1733
        auto handler = std::move(m_download_completion_handlers.back());
228✔
1734
        m_download_completion_handlers.pop_back();
228✔
1735
        handler(
228✔
1736
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
228✔
1737
    }
228✔
1738
    while (!m_sync_completion_handlers.empty()) {
10,374✔
1739
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1740
        m_sync_completion_handlers.pop_back();
12✔
1741
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
12✔
1742
    }
12✔
1743
}
10,362✔
1744

1745

1746
// Must be called only when an unactualized session wrapper becomes abandoned.
1747
//
1748
// Called with a lock on `m_client.m_mutex`.
1749
inline void SessionWrapper::finalize_before_actualization() noexcept
1750
{
192✔
1751
    REALM_ASSERT(!m_sess);
192✔
1752
    m_actualized = true;
192✔
1753
    m_force_closed = true;
192✔
1754
}
192✔
1755

1756
inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data)
1757
{
158,148✔
1758
    REALM_ASSERT(!m_finalized);
158,148✔
1759
    report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws
158,148✔
1760
}
158,148✔
1761

1762
inline void SessionWrapper::on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
1763
{
27,926✔
1764
    REALM_ASSERT(!m_finalized);
27,926✔
1765
    m_bootstrap_store_bytes = bootstrap_store_bytes;
27,926✔
1766
    report_progress(/* is_download = */ true); // Throws
27,926✔
1767
}
27,926✔
1768

1769

1770
void SessionWrapper::on_upload_completion()
1771
{
15,002✔
1772
    REALM_ASSERT(!m_finalized);
15,002✔
1773
    while (!m_upload_completion_handlers.empty()) {
17,028✔
1774
        auto handler = std::move(m_upload_completion_handlers.back());
2,026✔
1775
        m_upload_completion_handlers.pop_back();
2,026✔
1776
        handler(Status::OK()); // Throws
2,026✔
1777
    }
2,026✔
1778
    while (!m_sync_completion_handlers.empty()) {
15,200✔
1779
        auto handler = std::move(m_sync_completion_handlers.back());
198✔
1780
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
198✔
1781
        m_sync_completion_handlers.pop_back();
198✔
1782
    }
198✔
1783
    std::lock_guard lock{m_client.m_mutex};
15,002✔
1784
    if (m_staged_upload_mark > m_reached_upload_mark) {
15,002✔
1785
        m_reached_upload_mark = m_staged_upload_mark;
12,988✔
1786
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,988✔
1787
    }
12,988✔
1788
}
15,002✔
1789

1790

1791
void SessionWrapper::on_download_completion()
1792
{
16,362✔
1793
    while (!m_download_completion_handlers.empty()) {
18,888✔
1794
        auto handler = std::move(m_download_completion_handlers.back());
2,526✔
1795
        m_download_completion_handlers.pop_back();
2,526✔
1796
        handler(Status::OK()); // Throws
2,526✔
1797
    }
2,526✔
1798
    while (!m_sync_completion_handlers.empty()) {
16,458✔
1799
        auto handler = std::move(m_sync_completion_handlers.back());
96✔
1800
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
96✔
1801
        m_sync_completion_handlers.pop_back();
96✔
1802
    }
96✔
1803

1804
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,362✔
1805
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
894✔
1806
                             m_flx_pending_mark_version);
894✔
1807
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
894✔
1808
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
894✔
1809
    }
894✔
1810

1811
    std::lock_guard lock{m_client.m_mutex};
16,362✔
1812
    if (m_staged_download_mark > m_reached_download_mark) {
16,362✔
1813
        m_reached_download_mark = m_staged_download_mark;
9,990✔
1814
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,990✔
1815
    }
9,990✔
1816
}
16,362✔
1817

1818

1819
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1820
{
670✔
1821
    REALM_ASSERT(!m_finalized);
670✔
1822
    m_suspended = true;
670✔
1823
    if (m_connection_state_change_listener) {
670✔
1824
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
670✔
1825
    }
670✔
1826
}
670✔
1827

1828

1829
void SessionWrapper::on_resumed()
1830
{
62✔
1831
    REALM_ASSERT(!m_finalized);
62✔
1832
    m_suspended = false;
62✔
1833
    if (m_connection_state_change_listener) {
62✔
1834
        ClientImpl::Connection& conn = m_sess->get_connection();
62✔
1835
        if (conn.get_state() != ConnectionState::disconnected) {
62✔
1836
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
56✔
1837
            if (conn.get_state() == ConnectionState::connected)
56✔
1838
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
48✔
1839
        }
56✔
1840
    }
62✔
1841
}
62✔
1842

1843

1844
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1845
                                                 const util::Optional<SessionErrorInfo>& error_info)
1846
{
12,602✔
1847
    if (m_connection_state_change_listener) {
12,602✔
1848
        if (!m_suspended)
12,586✔
1849
            m_connection_state_change_listener(state, error_info); // Throws
12,564✔
1850
    }
12,586✔
1851
}
12,602✔
1852

1853
void SessionWrapper::init_progress_handler()
1854
{
10,664✔
1855
    uint64_t unused = 0;
10,664✔
1856
    ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused,
10,664✔
1857
                                             m_reported_progress.final_uploaded, unused, unused);
10,664✔
1858
}
10,664✔
1859

1860
void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadable_data)
1861
{
186,072✔
1862
    REALM_ASSERT(!m_finalized);
186,072✔
1863
    REALM_ASSERT(m_sess);
186,072✔
1864
    REALM_ASSERT(!(only_if_new_uploadable_data && is_download));
186,072✔
1865

1866
    if (!m_progress_handler)
186,072✔
1867
        return;
130,890✔
1868

1869
    // Ignore progress messages from before we first receive a DOWNLOAD message
1870
    if (!m_reliable_download_progress)
55,182✔
1871
        return;
27,548✔
1872

1873
    ReportedProgress p = m_reported_progress;
27,634✔
1874
    ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable,
27,634✔
1875
                                             p.snapshot);
27,634✔
1876

1877
    // If this progress notification was triggered by a commit being made we
1878
    // only want to send it if the uploadable bytes has actually increased,
1879
    // and not if it was an empty commit.
1880
    if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable)
27,634✔
1881
        return;
18,726✔
1882

1883
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1884
    // is only the remaining to download. This is confusing, so make them use
1885
    // the same units.
1886
    p.downloadable += p.downloaded;
8,908✔
1887

1888
    bool is_completed = false;
8,908✔
1889
    if (is_download) {
8,908✔
1890
        if (m_download_estimate)
3,558✔
1891
            is_completed = *m_download_estimate >= 1.0;
1,414✔
1892
        else
2,144✔
1893
            is_completed = p.downloaded == p.downloadable;
2,144✔
1894
    }
3,558✔
1895
    else {
5,350✔
1896
        is_completed = p.uploaded == p.uploadable;
5,350✔
1897
    }
5,350✔
1898

1899
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
9,832✔
1900
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
9,832✔
1901
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
9,832✔
1902

1903
        // The effect of this calculation is that if new bytes are added for download/upload,
1904
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1905
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1906
        // before progress has reached 1.0.
1907
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1908
        // Example for upload progress reported:
1909
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1910

1911
        double progress_estimate = 1.0;
9,832✔
1912
        if (final_transferred < transferable && transferred < transferable)
9,832✔
1913
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
4,114✔
1914
        return progress_estimate;
9,832✔
1915
    };
9,832✔
1916

1917
    double upload_estimate = 1.0, download_estimate = 1.0;
8,908✔
1918

1919
    // calculate estimate for both download/upload since the progress is reported all at once
1920
    if (!is_completed || is_download)
8,908✔
1921
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, p.final_uploaded);
6,060✔
1922

1923
    // download estimate only known for flx
1924
    if (m_download_estimate) {
8,908✔
1925
        download_estimate = *m_download_estimate;
3,104✔
1926

1927
        // ... bootstrap store bytes should be null after initial sync when every changeset integrated immediately
1928
        if (m_bootstrap_store_bytes)
3,104✔
1929
            p.downloaded += *m_bootstrap_store_bytes;
236✔
1930

1931
        // FIXME for flx with download estimate these bytes are not known
1932
        // provide some sensible value for non-streaming version of object-store callbacks
1933
        // until these field are completely removed from the api after pbs deprecation
1934
        p.downloadable = p.downloaded;
3,104✔
1935
        if (0.01 <= download_estimate && download_estimate <= 0.99)
3,104✔
1936
            if (p.downloaded > p.final_downloaded)
252✔
1937
                p.downloadable =
252✔
1938
                    p.final_downloaded + uint64_t((p.downloaded - p.final_downloaded) / download_estimate);
252✔
1939
    }
3,104✔
1940
    else {
5,804✔
1941
        if (!is_completed || !is_download)
5,804✔
1942
            download_estimate = calculate_progress(p.downloaded, p.downloadable, p.final_downloaded);
3,772✔
1943
    }
5,804✔
1944

1945
    if (is_completed) {
8,908✔
1946
        if (is_download)
6,010✔
1947
            p.final_downloaded = p.downloaded;
3,162✔
1948
        else
2,848✔
1949
            p.final_uploaded = p.uploaded;
2,848✔
1950
    }
6,010✔
1951

1952
    m_reported_progress = p;
8,908✔
1953

1954
    if (m_sess->logger.would_log(Logger::Level::debug)) {
8,908✔
1955
        auto to_str = [](double d) {
17,296✔
1956
            std::ostringstream ss;
17,296✔
1957
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1958
            ss << std::fixed << std::setprecision(4) << d;
17,296✔
1959
            return ss.str();
17,296✔
1960
        };
17,296✔
1961
        m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
8,648✔
1962
                             "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7",
8,648✔
1963
                             p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
8,648✔
1964
                             to_str(upload_estimate), p.snapshot);
8,648✔
1965
    }
8,648✔
1966

1967
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
8,908✔
1968
                       upload_estimate);
8,908✔
1969
}
8,908✔
1970

1971
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1972
{
60✔
1973
    if (!m_sess) {
60✔
1974
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1975
    }
×
1976

1977
    return m_sess->send_test_command(std::move(body));
60✔
1978
}
60✔
1979

1980
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1981
{
314✔
1982
    REALM_ASSERT(!m_finalized);
314✔
1983

1984
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
314✔
1985
    REALM_ASSERT(pending_reset);
314✔
1986
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
314✔
1987
                        pending_reset->time);
314✔
1988
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
314✔
1989
        if (status == ErrorCodes::OperationAborted) {
314✔
1990
            return;
138✔
1991
        }
138✔
1992
        auto& logger = self->m_sess->logger;
176✔
1993
        if (!status.is_ok()) {
176✔
1994
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1995
            return;
×
1996
        }
×
1997

1998
        auto wt = self->m_db->start_write();
176✔
1999
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
176✔
2000
        if (!cur_pending_reset) {
176✔
2001
            logger.debug(
×
2002
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
2003
                pending_reset.type, pending_reset.time);
×
2004
            return;
×
2005
        }
×
2006
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
176✔
2007
            logger.debug(
×
2008
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
2009
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
2010
        }
×
2011
        else {
176✔
2012
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
176✔
2013
                         "Removing cycle detection tracker.",
176✔
2014
                         pending_reset.type, pending_reset.time);
176✔
2015
        }
176✔
2016
        _impl::client_reset::remove_pending_client_resets(*wt);
176✔
2017
        wt->commit();
176✔
2018
    });
176✔
2019
}
314✔
2020

2021
void SessionWrapper::update_subscription_version_info()
2022
{
10,662✔
2023
    if (!m_flx_subscription_store)
10,662✔
2024
        return;
9,020✔
2025
    auto versions_info = m_flx_subscription_store->get_version_info();
1,642✔
2026
    m_flx_active_version = versions_info.active;
1,642✔
2027
    m_flx_pending_mark_version = versions_info.pending_mark;
1,642✔
2028
}
1,642✔
2029

2030
std::string SessionWrapper::get_appservices_connection_id()
2031
{
72✔
2032
    auto pf = util::make_promise_future<std::string>();
72✔
2033
    REALM_ASSERT(m_initiated);
72✔
2034

2035
    util::bind_ptr<SessionWrapper> self(this);
72✔
2036
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
2037
        if (!status.is_ok()) {
72✔
2038
            promise.set_error(status);
×
2039
            return;
×
2040
        }
×
2041

2042
        if (!self->m_sess) {
72✔
2043
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
2044
            return;
×
2045
        }
×
2046

2047
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
2048
    });
72✔
2049

2050
    return pf.future.get();
72✔
2051
}
72✔
2052

2053
// ################ ClientImpl::Connection ################
2054

2055
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
2056
                                   const std::string& authorization_header_name,
2057
                                   const std::map<std::string, std::string>& custom_http_headers,
2058
                                   bool verify_servers_ssl_certificate,
2059
                                   Optional<std::string> ssl_trust_certificate_path,
2060
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
2061
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
2062
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1,324✔
2063
                                                      client.logger_ptr)} // Throws
1,324✔
2064
    , logger{*logger_ptr}
1,324✔
2065
    , m_client{client}
1,324✔
2066
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,324✔
2067
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,324✔
2068
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,324✔
2069
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,324✔
2070
    , m_reconnect_info{reconnect_info}
1,324✔
2071
    , m_session_history{}
1,324✔
2072
    , m_ident{ident}
1,324✔
2073
    , m_server_endpoint{std::move(endpoint)}
1,324✔
2074
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,324✔
2075
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,324✔
2076
{
2,780✔
2077
    m_on_idle = m_client.create_trigger([this](Status status) {
2,780✔
2078
        if (status == ErrorCodes::OperationAborted)
2,772✔
2079
            return;
×
2080
        else if (!status.is_ok())
2,772✔
2081
            throw Exception(status);
×
2082

2083
        REALM_ASSERT(m_activated);
2,772✔
2084
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,772✔
2085
            on_idle(); // Throws
2,762✔
2086
            // Connection object may be destroyed now.
2087
        }
2,762✔
2088
    });
2,772✔
2089
}
2,780✔
2090

2091
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
2092
{
12✔
2093
    return m_ident;
12✔
2094
}
12✔
2095

2096

2097
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
2098
{
2,768✔
2099
    return m_server_endpoint;
2,768✔
2100
}
2,768✔
2101

2102
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
2103
                                                        const std::string& signed_access_token)
2104
{
10,582✔
2105
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,582✔
2106
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,582✔
2107
}
10,582✔
2108

2109

2110
void ClientImpl::Connection::resume_active_sessions()
2111
{
2,144✔
2112
    auto handler = [=](ClientImpl::Session& sess) {
4,284✔
2113
        sess.cancel_resumption_delay(); // Throws
4,284✔
2114
    };
4,284✔
2115
    for_each_active_session(std::move(handler)); // Throws
2,144✔
2116
}
2,144✔
2117

2118
void ClientImpl::Connection::on_idle()
2119
{
2,762✔
2120
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,762✔
2121
    ClientImpl& client = get_client();
2,762✔
2122
    client.remove_connection(*this);
2,762✔
2123
    // NOTE: This connection object is now destroyed!
2124
}
2,762✔
2125

2126

2127
std::string ClientImpl::Connection::get_http_request_path() const
2128
{
3,870✔
2129
    using namespace std::string_view_literals;
3,870✔
2130
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,870✔
2131

2132
    std::string path;
3,870✔
2133
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,870✔
2134
    path += m_http_request_path_prefix;
3,870✔
2135
    path += param;
3,870✔
2136
    path += m_signed_access_token;
3,870✔
2137

2138
    return path;
3,870✔
2139
}
3,870✔
2140

2141

2142
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
2143
{
2,780✔
2144
    std::ostringstream out;
2,780✔
2145
    out.imbue(std::locale::classic());
2,780✔
2146
    out << "Connection[" << ident << "]: "; // Throws
2,780✔
2147
    return out.str();                       // Throws
2,780✔
2148
}
2,780✔
2149

2150

2151
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2152
                                                            util::Optional<SessionErrorInfo> error_info)
2153
{
11,398✔
2154
    if (m_force_closed) {
11,398✔
2155
        return;
2,398✔
2156
    }
2,398✔
2157
    auto handler = [=](ClientImpl::Session& sess) {
12,446✔
2158
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
12,446✔
2159
        sess_2.on_connection_state_changed(state, error_info); // Throws
12,446✔
2160
    };
12,446✔
2161
    for_each_active_session(std::move(handler)); // Throws
9,000✔
2162
}
9,000✔
2163

2164

2165
Client::Client(Config config)
2166
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,890✔
2167
{
9,918✔
2168
}
9,918✔
2169

2170

2171
Client::Client(Client&& client) noexcept
2172
    : m_impl{std::move(client.m_impl)}
2173
{
×
2174
}
×
2175

2176

2177
Client::~Client() noexcept {}
9,918✔
2178

2179

2180
void Client::shutdown() noexcept
2181
{
9,996✔
2182
    m_impl->shutdown();
9,996✔
2183
}
9,996✔
2184

2185
void Client::shutdown_and_wait()
2186
{
772✔
2187
    m_impl->shutdown_and_wait();
772✔
2188
}
772✔
2189

2190
void Client::cancel_reconnect_delay()
2191
{
2,148✔
2192
    m_impl->cancel_reconnect_delay();
2,148✔
2193
}
2,148✔
2194

2195
void Client::voluntary_disconnect_all_connections()
2196
{
12✔
2197
    m_impl->voluntary_disconnect_all_connections();
12✔
2198
}
12✔
2199

2200
bool Client::wait_for_session_terminations_or_client_stopped()
2201
{
9,480✔
2202
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,480✔
2203
}
9,480✔
2204

2205
util::Future<void> Client::notify_session_terminated()
2206
{
56✔
2207
    return m_impl->notify_session_terminated();
56✔
2208
}
56✔
2209

2210
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2211
                                  port_type& port, std::string& path) const
2212
{
4,088✔
2213
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
4,088✔
2214
}
4,088✔
2215

2216

2217
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2218
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2219
{
11,712✔
2220
    util::bind_ptr<SessionWrapper> sess;
11,712✔
2221
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,712✔
2222
                                  std::move(config)}); // Throws
11,712✔
2223
    // The reference count passed back to the application is implicitly
2224
    // owned by a naked pointer. This is done to avoid exposing
2225
    // implementation details through the header file (that is, through the
2226
    // Session object).
2227
    m_impl = sess.release();
11,712✔
2228
}
11,712✔
2229

2230

2231
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2232
{
4,164✔
2233
    m_impl->set_progress_handler(std::move(handler)); // Throws
4,164✔
2234
}
4,164✔
2235

2236

2237
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2238
{
11,816✔
2239
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,816✔
2240
}
11,816✔
2241

2242

2243
void Session::bind()
2244
{
10,560✔
2245
    m_impl->initiate(); // Throws
10,560✔
2246
}
10,560✔
2247

2248

2249
void Session::nonsync_transact_notify(version_type new_version)
2250
{
17,774✔
2251
    m_impl->on_commit(new_version); // Throws
17,774✔
2252
}
17,774✔
2253

2254

2255
void Session::cancel_reconnect_delay()
2256
{
20✔
2257
    m_impl->cancel_reconnect_delay(); // Throws
20✔
2258
}
20✔
2259

2260

2261
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2262
{
4,992✔
2263
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,992✔
2264
}
4,992✔
2265

2266

2267
bool Session::wait_for_upload_complete_or_client_stopped()
2268
{
13,024✔
2269
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
13,024✔
2270
}
13,024✔
2271

2272

2273
bool Session::wait_for_download_complete_or_client_stopped()
2274
{
10,120✔
2275
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,120✔
2276
}
10,120✔
2277

2278

2279
void Session::refresh(std::string_view signed_access_token)
2280
{
216✔
2281
    m_impl->refresh(signed_access_token); // Throws
216✔
2282
}
216✔
2283

2284

2285
void Session::abandon() noexcept
2286
{
11,712✔
2287
    REALM_ASSERT(m_impl);
11,712✔
2288
    // Reabsorb the ownership assigned to the applications naked pointer by
2289
    // Session constructor
2290
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
11,712✔
2291
    SessionWrapper::abandon(std::move(wrapper));
11,712✔
2292
}
11,712✔
2293

2294
util::Future<std::string> Session::send_test_command(std::string body)
2295
{
60✔
2296
    return m_impl->send_test_command(std::move(body));
60✔
2297
}
60✔
2298

2299
std::string Session::get_appservices_connection_id()
2300
{
72✔
2301
    return m_impl->get_appservices_connection_id();
72✔
2302
}
72✔
2303

2304
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2305
{
×
2306
    switch (proxyType) {
×
2307
        case ProxyConfig::Type::HTTP:
×
2308
            return os << "HTTP";
×
2309
        case ProxyConfig::Type::HTTPS:
×
2310
            return os << "HTTPS";
×
2311
    }
×
2312
    REALM_TERMINATE("Invalid Proxy Type object.");
2313
}
×
2314

2315
} // namespace sync
2316
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc