• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1816

04 Nov 2023 12:29AM UTC coverage: 91.648% (-0.01%) from 91.66%
1816

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92128 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

84 existing lines in 15 files now uncovered.

230681 of 251702 relevant lines covered (91.65%)

6383138.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
private:
131
    ClientImpl& m_client;
132
    DBRef m_db;
133
    Replication* m_replication;
134

135
    const ProtocolEnvelope m_protocol_envelope;
136
    const std::string m_server_address;
137
    const port_type m_server_port;
138
    const std::string m_user_id;
139
    const SyncServerMode m_sync_mode;
140
    const std::string m_authorization_header_name;
141
    const std::map<std::string, std::string> m_custom_http_headers;
142
    const bool m_verify_servers_ssl_certificate;
143
    const bool m_simulate_integration_error;
144
    const Optional<std::string> m_ssl_trust_certificate_path;
145
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
146
    const size_t m_flx_bootstrap_batch_size_bytes;
147

148
    // This one is different from null when, and only when the session wrapper
149
    // is in ClientImpl::m_abandoned_session_wrappers.
150
    SessionWrapper* m_next = nullptr;
151

152
    // After initiation, these may only be accessed by the event loop thread.
153
    std::string m_http_request_path_prefix;
154
    std::string m_virt_path;
155
    std::string m_signed_access_token;
156

157
    util::Optional<ClientReset> m_client_reset_config;
158

159
    util::Optional<ProxyConfig> m_proxy_config;
160

161
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
162
    util::UniqueFunction<ProgressHandler> m_progress_handler;
163
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
164

165
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
166
    bool m_in_debug_hook = false;
167

168
    SessionReason m_session_reason;
169

170
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
171
    int64_t m_flx_active_version = 0;
172
    int64_t m_flx_last_seen_version = 0;
173
    int64_t m_flx_pending_mark_version = 0;
174
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
175

176
    std::shared_ptr<MigrationStore> m_migration_store;
177

178
    bool m_initiated = false;
179

180
    // Set to true when this session wrapper is actualized (or when it is
181
    // finalized before proper actualization). It is then never modified again.
182
    //
183
    // A session specific post handler submitted after the initiation of the
184
    // session wrapper (initiate()) will always find that `m_actualized` is
185
    // true. This is the case, because the scheduling of such a post handler
186
    // will have been preceded by the triggering of
187
    // `ClientImpl::m_actualize_and_finalize` (in
188
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
189
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
190
    // before the post handler. If the session wrapper is no longer in
191
    // `ClientImpl::m_unactualized_session_wrappers` when
192
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
193
    // have been abandoned already, but in that case,
194
    // finalize_before_actualization() has already been called.
195
    bool m_actualized = false;
196

197
    bool m_force_closed = false;
198

199
    bool m_suspended = false;
200

201
    // Has the SessionWrapper been finalized?
202
    bool m_finalized = false;
203

204
    // Set to true when the first DOWNLOAD message is received to indicate that
205
    // the byte-level download progress parameters can be considered reasonable
206
    // reliable. Before that, a lot of time may have passed, so our record of
207
    // the download progress is likely completely out of date.
208
    bool m_reliable_download_progress = false;
209

210
    // Set to point to an activated session object during actualization of the
211
    // session wrapper. Set to null during finalization of the session
212
    // wrapper. Both modifications are guaranteed to be performed by the event
213
    // loop thread.
214
    //
215
    // If a session specific post handler, that is submitted after the
216
    // initiation of the session wrapper, sees that `m_sess` is null, it can
217
    // conclude that the session wrapper has been both abandoned and
218
    // finalized. This is true, because the scheduling of such a post handler
219
    // will have been preceded by the triggering of
220
    // `ClientImpl::m_actualize_and_finalize` (in
221
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
222
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
223
    // before the post handler, so the session wrapper must have been actualized
224
    // unless it was already abandoned by the application. If it was abandoned
225
    // before it was actualized, it will already have been finalized by
226
    // finalize_before_actualization().
227
    //
228
    // Must only be accessed from the event loop thread.
229
    SessionImpl* m_sess = nullptr;
230

231
    // These must only be accessed from the event loop thread.
232
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
233
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
234
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
235

236
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
237
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
238
    // event loop thread.
239
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
240
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
241
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
242

243
    void on_sync_progress();
244
    void on_upload_completion();
245
    void on_download_completion();
246
    void on_suspended(const SessionErrorInfo& error_info);
247
    void on_resumed();
248
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
249
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
250
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
251
    void on_flx_sync_version_complete(int64_t version);
252

253
    void report_progress(bool only_if_new_uploadable_data = false);
254

255
    friend class SessionWrapperStack;
256
    friend class ClientImpl::Session;
257
};
258

259

260
// ################ SessionWrapperStack ################
261

262
inline bool SessionWrapperStack::empty() const noexcept
263
{
×
264
    return !m_back;
×
265
}
×
266

267

268
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
269
{
9,632✔
270
    REALM_ASSERT(!w->m_next);
9,632✔
271
    w->m_next = m_back;
9,632✔
272
    m_back = w.release();
9,632✔
273
}
9,632✔
274

275

276
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
277
{
25,268✔
278
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
25,268✔
279
    if (m_back) {
25,268✔
280
        m_back = m_back->m_next;
9,630✔
281
        w->m_next = nullptr;
9,630✔
282
    }
9,630✔
283
    return w;
25,268✔
284
}
25,268✔
285

286

287
inline void SessionWrapperStack::clear() noexcept
288
{
24,614✔
289
    while (m_back) {
24,614✔
290
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
291
        m_back = w->m_next;
×
292
    }
×
293
}
24,614✔
294

295

296
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
297
    : m_back{q.m_back}
298
{
299
    q.m_back = nullptr;
300
}
301

302

303
SessionWrapperStack::~SessionWrapperStack()
304
{
24,614✔
305
    clear();
24,614✔
306
}
24,614✔
307

308

309
// ################ ClientImpl ################
310

311

312
ClientImpl::~ClientImpl()
313
{
8,978✔
314
    // Since no other thread is allowed to be accessing this client or any of
4,420✔
315
    // its subobjects at this time, no mutex locking is necessary.
4,420✔
316

4,420✔
317
    shutdown_and_wait();
8,978✔
318
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,420✔
319
    // are abandoned.
4,420✔
320
    REALM_ASSERT(m_stopped);
8,978✔
321
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
8,978✔
322
}
8,978✔
323

324

325
void ClientImpl::cancel_reconnect_delay()
326
{
1,768✔
327
    // Thread safety required
946✔
328
    post([this](Status status) {
1,768✔
329
        if (status == ErrorCodes::OperationAborted)
1,768✔
330
            return;
×
331
        else if (!status.is_ok())
1,768✔
332
            throw Exception(status);
×
333

946✔
334
        for (auto& p : m_server_slots) {
1,768✔
335
            ServerSlot& slot = p.second;
1,768✔
336
            if (m_one_connection_per_session) {
1,768✔
337
                REALM_ASSERT(!slot.connection);
×
338
                for (const auto& p : slot.alt_connections) {
×
339
                    ClientImpl::Connection& conn = *p.second;
×
340
                    conn.resume_active_sessions(); // Throws
×
341
                    conn.cancel_reconnect_delay(); // Throws
×
342
                }
×
343
            }
×
344
            else {
1,768✔
345
                REALM_ASSERT(slot.alt_connections.empty());
1,768✔
346
                if (slot.connection) {
1,768✔
347
                    ClientImpl::Connection& conn = *slot.connection;
1,764✔
348
                    conn.resume_active_sessions(); // Throws
1,764✔
349
                    conn.cancel_reconnect_delay(); // Throws
1,764✔
350
                }
1,764✔
351
                else {
4✔
352
                    slot.reconnect_info.reset();
4✔
353
                }
4✔
354
            }
1,768✔
355
        }
1,768✔
356
    }); // Throws
1,768✔
357
}
1,768✔
358

359

360
void ClientImpl::voluntary_disconnect_all_connections()
361
{
12✔
362
    auto done_pf = util::make_promise_future<void>();
12✔
363
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
364
        if (status == ErrorCodes::OperationAborted) {
12✔
365
            return;
×
366
        }
×
367

6✔
368
        REALM_ASSERT(status.is_ok());
12✔
369

6✔
370
        try {
12✔
371
            for (auto& p : m_server_slots) {
12✔
372
                ServerSlot& slot = p.second;
12✔
373
                if (m_one_connection_per_session) {
12✔
374
                    REALM_ASSERT(!slot.connection);
×
375
                    for (const auto& p : slot.alt_connections) {
×
376
                        ClientImpl::Connection& conn = *p.second;
×
377
                        if (conn.get_state() == ConnectionState::disconnected) {
×
378
                            continue;
×
379
                        }
×
380
                        conn.voluntary_disconnect();
×
381
                    }
×
382
                }
×
383
                else {
12✔
384
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
385
                    if (!slot.connection) {
12✔
386
                        continue;
×
387
                    }
×
388
                    ClientImpl::Connection& conn = *slot.connection;
12✔
389
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
390
                        continue;
×
391
                    }
×
392
                    conn.voluntary_disconnect();
12✔
393
                }
12✔
394
            }
12✔
395
        }
12✔
396
        catch (...) {
6✔
397
            promise.set_error(exception_to_status());
×
398
            return;
×
399
        }
×
400
        promise.emplace_value();
12✔
401
    });
12✔
402
    done_pf.future.get();
12✔
403
}
12✔
404

405

406
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
407
{
368✔
408
    // Thread safety required
34✔
409

34✔
410
    {
368✔
411
        std::lock_guard lock{m_mutex};
368✔
412
        m_sessions_terminated = false;
368✔
413
    }
368✔
414

34✔
415
    // The technique employed here relies on the fact that
34✔
416
    // actualize_and_finalize_session_wrappers() must get to execute at least
34✔
417
    // once before the post handler submitted below gets to execute, but still
34✔
418
    // at a time where all session wrappers, that are abandoned prior to the
34✔
419
    // execution of wait_for_session_terminations_or_client_stopped(), have been
34✔
420
    // added to `m_abandoned_session_wrappers`.
34✔
421
    //
34✔
422
    // To see that this is the case, consider a session wrapper that was
34✔
423
    // abandoned before wait_for_session_terminations_or_client_stopped() was
34✔
424
    // invoked. Then the session wrapper will have been added to
34✔
425
    // `m_abandoned_session_wrappers`, and an invocation of
34✔
426
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
34✔
427
    // guarantees mentioned in the documentation of Trigger then ensure
34✔
428
    // that at least one execution of actualize_and_finalize_session_wrappers()
34✔
429
    // will happen after the session wrapper has been added to
34✔
430
    // `m_abandoned_session_wrappers`, but before the post handler submitted
34✔
431
    // below gets to execute.
34✔
432
    post([this](Status status) mutable {
368✔
433
        if (status == ErrorCodes::OperationAborted)
368✔
434
            return;
×
435
        else if (!status.is_ok())
368✔
436
            throw Exception(status);
×
437

34✔
438
        std::lock_guard lock{m_mutex};
368✔
439
        m_sessions_terminated = true;
368✔
440
        m_wait_or_client_stopped_cond.notify_all();
368✔
441
    }); // Throws
368✔
442

34✔
443
    bool completion_condition_was_satisfied;
368✔
444
    {
368✔
445
        std::unique_lock lock{m_mutex};
368✔
446
        while (!m_sessions_terminated && !m_stopped)
694✔
447
            m_wait_or_client_stopped_cond.wait(lock);
326✔
448
        completion_condition_was_satisfied = !m_stopped;
368✔
449
    }
368✔
450
    return completion_condition_was_satisfied;
368✔
451
}
368✔
452

453

454
void ClientImpl::drain_connections_on_loop()
455
{
8,978✔
456
    post([this](Status status) mutable {
8,978✔
457
        REALM_ASSERT(status.is_ok());
8,978✔
458
        drain_connections();
8,978✔
459
    });
8,978✔
460
}
8,978✔
461

462
void ClientImpl::shutdown_and_wait()
463
{
9,738✔
464
    shutdown();
9,738✔
465
    std::unique_lock lock{m_drain_mutex};
9,738✔
466
    if (m_drained) {
9,738✔
467
        return;
756✔
468
    }
756✔
469

4,422✔
470
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
8,982✔
471
    m_drain_cv.wait(lock, [&] {
19,854✔
472
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,854✔
473
    });
19,854✔
474

4,422✔
475
    m_drained = true;
8,982✔
476
}
8,982✔
477

478
void ClientImpl::shutdown() noexcept
479
{
18,794✔
480
    {
18,794✔
481
        std::lock_guard lock{m_mutex};
18,794✔
482
        if (m_stopped)
18,794✔
483
            return;
9,816✔
484
        m_stopped = true;
8,978✔
485
        m_wait_or_client_stopped_cond.notify_all();
8,978✔
486
    }
8,978✔
487

4,420✔
488
    drain_connections_on_loop();
8,978✔
489
}
8,978✔
490

491

492
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
493
{
9,798✔
494
    // Thread safety required.
4,724✔
495

4,724✔
496
    std::lock_guard lock{m_mutex};
9,798✔
497
    REALM_ASSERT(m_actualize_and_finalize);
9,798✔
498
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
9,798✔
499
    bool retrigger = !m_actualize_and_finalize_needed;
9,798✔
500
    m_actualize_and_finalize_needed = true;
9,798✔
501
    // The conditional triggering needs to happen before releasing the mutex,
4,724✔
502
    // because if two threads call register_unactualized_session_wrapper()
4,724✔
503
    // roughly concurrently, then only the first one is guaranteed to be asked
4,724✔
504
    // to retrigger, but that retriggering must have happened before the other
4,724✔
505
    // thread returns from register_unactualized_session_wrapper().
4,724✔
506
    //
4,724✔
507
    // Note that a similar argument applies when two threads call
4,724✔
508
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,724✔
509
    // them and another thread call the other.
4,724✔
510
    if (retrigger)
9,798✔
511
        m_actualize_and_finalize->trigger();
6,756✔
512
}
9,798✔
513

514

515
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
516
{
9,800✔
517
    // Thread safety required.
4,726✔
518

4,726✔
519
    std::lock_guard lock{m_mutex};
9,800✔
520
    REALM_ASSERT(m_actualize_and_finalize);
9,800✔
521

4,726✔
522
    // If the session wrapper has not yet been actualized (on the event loop
4,726✔
523
    // thread), it can be immediately finalized. This ensures that we will
4,726✔
524
    // generally not actualize a session wrapper that has already been
4,726✔
525
    // abandoned.
4,726✔
526
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
9,800✔
527
    if (i != m_unactualized_session_wrappers.end()) {
9,800✔
528
        m_unactualized_session_wrappers.erase(i);
168✔
529
        wrapper->finalize_before_actualization();
168✔
530
        return;
168✔
531
    }
168✔
532
    m_abandoned_session_wrappers.push(std::move(wrapper));
9,632✔
533
    bool retrigger = !m_actualize_and_finalize_needed;
9,632✔
534
    m_actualize_and_finalize_needed = true;
9,632✔
535
    // The conditional triggering needs to happen before releasing the
4,640✔
536
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,640✔
537
    // details.
4,640✔
538
    if (retrigger)
9,632✔
539
        m_actualize_and_finalize->trigger();
8,880✔
540
}
9,632✔
541

542

543
// Must be called from the event loop thread.
544
void ClientImpl::actualize_and_finalize_session_wrappers()
545
{
15,638✔
546
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
15,638✔
547
    SessionWrapperStack abandoned_session_wrappers;
15,638✔
548
    bool stopped;
15,638✔
549
    {
15,638✔
550
        std::lock_guard lock{m_mutex};
15,638✔
551
        m_actualize_and_finalize_needed = false;
15,638✔
552
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
15,638✔
553
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
15,638✔
554
        stopped = m_stopped;
15,638✔
555
    }
15,638✔
556
    // Note, we need to finalize old session wrappers before we actualize new
7,124✔
557
    // ones. This ensures that deactivation of old sessions is initiated before
7,124✔
558
    // new session are activated. This, in turn, ensures that the server does
7,124✔
559
    // not see two overlapping sessions for the same local Realm file.
7,124✔
560
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
25,268✔
561
        wrapper->finalize(); // Throws
9,630✔
562
    if (stopped) {
15,638✔
563
        for (auto& p : unactualized_session_wrappers) {
416✔
564
            SessionWrapper& wrapper = *p.first;
4✔
565
            wrapper.finalize_before_actualization();
4✔
566
        }
4✔
567
        return;
780✔
568
    }
780✔
569
    for (auto& p : unactualized_session_wrappers) {
14,858✔
570
        SessionWrapper& wrapper = *p.first;
9,628✔
571
        ServerEndpoint server_endpoint = std::move(p.second);
9,628✔
572
        wrapper.actualize(std::move(server_endpoint)); // Throws
9,628✔
573
    }
9,628✔
574
}
14,858✔
575

576

577
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
578
                                                   const std::string& authorization_header_name,
579
                                                   const std::map<std::string, std::string>& custom_http_headers,
580
                                                   bool verify_servers_ssl_certificate,
581
                                                   Optional<std::string> ssl_trust_certificate_path,
582
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
583
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
584
{
9,624✔
585
    auto&& [server_slot_it, inserted] =
9,624✔
586
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
9,624✔
587
    ServerSlot& server_slot = server_slot_it->second; // Throws
9,624✔
588

4,636✔
589
    // TODO: enable multiplexing with proxies
4,636✔
590
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
9,624✔
591
        // Use preexisting connection
3,474✔
592
        REALM_ASSERT(server_slot.alt_connections.empty());
7,162✔
593
        return *server_slot.connection;
7,162✔
594
    }
7,162✔
595

1,162✔
596
    // Create a new connection
1,162✔
597
    REALM_ASSERT(!server_slot.connection);
2,462✔
598
    connection_ident_type ident = m_prev_connection_ident + 1;
2,462✔
599
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,462✔
600
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,462✔
601
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,462✔
602
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,462✔
603
    ClientImpl::Connection& conn = *conn_2;
2,462✔
604
    if (!m_one_connection_per_session) {
2,462✔
605
        server_slot.connection = std::move(conn_2);
2,450✔
606
    }
2,450✔
607
    else {
12✔
608
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
609
    }
12✔
610
    m_prev_connection_ident = ident;
2,462✔
611
    was_created = true;
2,462✔
612
    {
2,462✔
613
        std::lock_guard lk(m_drain_mutex);
2,462✔
614
        ++m_num_connections;
2,462✔
615
    }
2,462✔
616
    return conn;
2,462✔
617
}
2,462✔
618

619

620
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
621
{
2,460✔
622
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,460✔
623
    auto i = m_server_slots.find(endpoint);
2,460✔
624
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,460✔
625
    ServerSlot& server_slot = i->second;
2,460✔
626
    if (!m_one_connection_per_session) {
2,460✔
627
        REALM_ASSERT(server_slot.alt_connections.empty());
2,448✔
628
        REALM_ASSERT(&*server_slot.connection == &conn);
2,448✔
629
        server_slot.reconnect_info = conn.get_reconnect_info();
2,448✔
630
        server_slot.connection.reset();
2,448✔
631
    }
2,448✔
632
    else {
12✔
633
        REALM_ASSERT(!server_slot.connection);
12✔
634
        connection_ident_type ident = conn.get_ident();
12✔
635
        auto j = server_slot.alt_connections.find(ident);
12✔
636
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
637
        REALM_ASSERT(&*j->second == &conn);
12✔
638
        server_slot.alt_connections.erase(j);
12✔
639
    }
12✔
640

1,160✔
641
    {
2,460✔
642
        std::lock_guard lk(m_drain_mutex);
2,460✔
643
        REALM_ASSERT(m_num_connections);
2,460✔
644
        --m_num_connections;
2,460✔
645
        m_drain_cv.notify_all();
2,460✔
646
    }
2,460✔
647
}
2,460✔
648

649

650
// ################ SessionImpl ################
651

652
void SessionImpl::force_close()
653
{
156✔
654
    // Allow force_close() if session is active or hasn't been activated yet.
78✔
655
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
156!
656
        m_wrapper.force_close();
156✔
657
    }
156✔
658
}
156✔
659

660
void SessionImpl::on_connection_state_changed(ConnectionState state,
661
                                              const util::Optional<SessionErrorInfo>& error_info)
662
{
10,628✔
663
    // Only used to report errors back to the SyncSession while the Session is active
5,372✔
664
    if (m_state == SessionImpl::Active) {
10,628✔
665
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,628✔
666
    }
10,628✔
667
}
10,628✔
668

669

670
const std::string& SessionImpl::get_virt_path() const noexcept
671
{
7,262✔
672
    // Can only be called if the session is active or being activated
3,560✔
673
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,262!
674
    return m_wrapper.m_virt_path;
7,262✔
675
}
7,262✔
676

677
const std::string& SessionImpl::get_realm_path() const noexcept
678
{
9,892✔
679
    // Can only be called if the session is active or being activated
4,770✔
680
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,892✔
681
    return m_wrapper.m_db->get_path();
9,892✔
682
}
9,892✔
683

684
DBRef SessionImpl::get_db() const noexcept
685
{
23,306✔
686
    // Can only be called if the session is active or being activated
12,882✔
687
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
23,306✔
688
    return m_wrapper.m_db;
23,306✔
689
}
23,306✔
690

691
ClientReplication& SessionImpl::access_realm()
692
{
111,088✔
693
    // Can only be called if the session is active or being activated
57,110✔
694
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
111,088✔
695
    return m_wrapper.get_replication();
111,088✔
696
}
111,088✔
697

698
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
699
{
9,624✔
700
    // Can only be called if the session is active or being activated
4,636✔
701
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,624✔
702
    return m_wrapper.m_client_reset_config;
9,624✔
703
}
9,624✔
704

705
SessionReason SessionImpl::get_session_reason() noexcept
706
{
1,350✔
707
    // Can only be called if the session is active or being activated
698✔
708
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,350!
709
    return m_wrapper.m_session_reason;
1,350✔
710
}
1,350✔
711

712
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
713
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
714
{
42,678✔
715
    // Ignore the call if the session is not active
22,992✔
716
    if (m_state != State::Active) {
42,678✔
717
        return;
×
718
    }
×
719

22,992✔
720
    try {
42,678✔
721
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
42,678!
722
        if (simulate_integration_error) {
42,678✔
723
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
724
        }
×
725
        version_type client_version;
42,678✔
726
        if (REALM_LIKELY(!get_client().is_dry_run())) {
42,678✔
727
            VersionInfo version_info;
42,678✔
728
            ClientReplication& repl = access_realm(); // Throws
42,678✔
729
            integrate_changesets(repl, progress, downloadable_bytes, changesets, version_info,
42,678✔
730
                                 batch_state); // Throws
42,678✔
731
            client_version = version_info.realm_version;
42,678✔
732
        }
42,678✔
UNCOV
733
        else {
×
734
            // Fake it for "dry run" mode
UNCOV
735
            client_version = m_last_version_available + 1;
×
UNCOV
736
        }
×
737
        on_changesets_integrated(client_version, progress); // Throws
42,678✔
738
    }
42,678✔
739
    catch (const IntegrationException& e) {
23,004✔
740
        on_integration_failure(e);
24✔
741
    }
24✔
742
    m_wrapper.on_sync_progress(); // Throws
42,678✔
743
}
42,678✔
744

745

746
void SessionImpl::on_upload_completion()
747
{
14,672✔
748
    // Ignore the call if the session is not active
7,250✔
749
    if (m_state == State::Active) {
14,672✔
750
        m_wrapper.on_upload_completion(); // Throws
14,672✔
751
    }
14,672✔
752
}
14,672✔
753

754

755
void SessionImpl::on_download_completion()
756
{
15,628✔
757
    // Ignore the call if the session is not active
7,596✔
758
    if (m_state == State::Active) {
15,628✔
759
        m_wrapper.on_download_completion(); // Throws
15,628✔
760
    }
15,628✔
761
}
15,628✔
762

763

764
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
765
{
896✔
766
    // Ignore the call if the session is not active
466✔
767
    if (m_state == State::Active) {
896✔
768
        m_wrapper.on_suspended(error_info); // Throws
896✔
769
    }
896✔
770
}
896✔
771

772

773
void SessionImpl::on_resumed()
774
{
366✔
775
    // Ignore the call if the session is not active
204✔
776
    if (m_state == State::Active) {
366✔
777
        m_wrapper.on_resumed(); // Throws
366✔
778
    }
366✔
779
}
366✔
780

781
void SessionImpl::handle_pending_client_reset_acknowledgement()
782
{
290✔
783
    // Ignore the call if the session is not active
144✔
784
    if (m_state == State::Active) {
290✔
785
        m_wrapper.handle_pending_client_reset_acknowledgement();
290✔
786
    }
290✔
787
}
290✔
788

789
void SessionImpl::update_subscription_version_info()
790
{
268✔
791
    // Ignore the call if the session is not active
134✔
792
    if (m_state == State::Active) {
268✔
793
        m_wrapper.update_subscription_version_info();
268✔
794
    }
268✔
795
}
268✔
796

797
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
798
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
799
{
44,260✔
800
    // Ignore the call if the session is not active
23,784✔
801
    if (m_state != State::Active) {
44,260✔
802
        return false;
×
803
    }
×
804

23,784✔
805
    if (is_steady_state_download_message(batch_state, query_version)) {
44,260✔
806
        return false;
42,678✔
807
    }
42,678✔
808

792✔
809
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,582✔
810
    util::Optional<SyncProgress> maybe_progress;
1,582✔
811
    if (batch_state == DownloadBatchState::LastInBatch) {
1,582✔
812
        maybe_progress = progress;
1,422✔
813
    }
1,422✔
814

792✔
815
    bool new_batch = false;
1,582✔
816
    try {
1,582✔
817
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,582✔
818
    }
1,582✔
819
    catch (const LogicError& ex) {
792✔
820
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
821
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
822
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
823
                                    ProtocolError::bad_changeset_size);
×
824
            on_integration_failure(ex);
×
825
            return true;
×
826
        }
×
827
        throw;
×
828
    }
×
829

792✔
830
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
792✔
831
    // bootstrapping.
792✔
832
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,582✔
833
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
40✔
834
    }
40✔
835

792✔
836
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,582✔
837
                                       batch_state, received_changesets.size());
1,582✔
838
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,582✔
839
        return true;
12✔
840
    }
12✔
841
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,570✔
842

786✔
843
    if (batch_state == DownloadBatchState::MoreToCome) {
1,570✔
844
        return true;
156✔
845
    }
156✔
846

708✔
847
    try {
1,414✔
848
        process_pending_flx_bootstrap();
1,414✔
849
    }
1,414✔
850
    catch (const IntegrationException& e) {
712✔
851
        on_integration_failure(e);
8✔
852
    }
8✔
853

708✔
854
    return true;
1,414✔
855
}
1,414✔
856

857

858
void SessionImpl::process_pending_flx_bootstrap()
859
{
11,036✔
860
    // Ignore the call if not a flx session or session is not active
5,342✔
861
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,036✔
862
        return;
8,618✔
863
    }
8,618✔
864
    // Should never be called if session is not active
1,210✔
865
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,418✔
866
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,418✔
867
    if (!bootstrap_store->has_pending()) {
2,418✔
868
        return;
988✔
869
    }
988✔
870

716✔
871
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,430✔
872
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,430✔
873
                "changeset size: %3)",
1,430✔
874
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,430✔
875
                pending_batch_stats.pending_changeset_bytes);
1,430✔
876
    auto& history = access_realm().get_history();
1,430✔
877
    VersionInfo new_version;
1,430✔
878
    SyncProgress progress;
1,430✔
879
    int64_t query_version = -1;
1,430✔
880
    size_t changesets_processed = 0;
1,430✔
881

716✔
882
    // Used to commit each batch after it was transformed.
716✔
883
    TransactionRef transact = get_db()->start_write();
1,430✔
884
    while (bootstrap_store->has_pending()) {
2,976✔
885
        auto start_time = std::chrono::steady_clock::now();
1,562✔
886
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,562✔
887
        if (!pending_batch.progress) {
1,562✔
888
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
889
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
890
            // bootstrap store requires a write transaction itself.
4✔
891
            transact->close();
8✔
892
            bootstrap_store->clear();
8✔
893
            return;
8✔
894
        }
8✔
895

778✔
896
        auto batch_state =
1,554✔
897
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,486✔
898
        uint64_t downloadable_bytes = 0;
1,554✔
899
        query_version = pending_batch.query_version;
1,554✔
900
        bool simulate_integration_error =
1,554✔
901
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,554✔
902
        if (simulate_integration_error) {
1,554✔
903
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
8✔
904
        }
8✔
905

774✔
906
        history.integrate_server_changesets(
1,546✔
907
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,546✔
908
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,544✔
909
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,542✔
910
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,542✔
911
            });
1,542✔
912
        progress = *pending_batch.progress;
1,546✔
913
        changesets_processed += pending_batch.changesets.size();
1,546✔
914
        auto duration = std::chrono::steady_clock::now() - start_time;
1,546✔
915

774✔
916
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,546✔
917
                                      batch_state, pending_batch.changesets.size());
1,546✔
918
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,546✔
919

774✔
920
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,546✔
921
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,546✔
922
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,546✔
923
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,546✔
924
                    pending_batch.remaining_changesets);
1,546✔
925
    }
1,546✔
926
    on_changesets_integrated(new_version.realm_version, progress);
1,422✔
927

708✔
928
    REALM_ASSERT_3(query_version, !=, -1);
1,414✔
929
    m_wrapper.on_sync_progress();
1,414✔
930
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,414✔
931

708✔
932
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,414✔
933
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,414✔
934
    // NoAction/EarlyReturn are both valid no-op actions to take here.
708✔
935
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,414✔
936
}
1,414✔
937

938
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
939
{
16✔
940
    // Ignore the call if the session is not active
8✔
941
    if (m_state == State::Active) {
16✔
942
        m_wrapper.on_flx_sync_error(version, err_msg);
16✔
943
    }
16✔
944
}
16✔
945

946
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
947
{
1,450✔
948
    // Ignore the call if the session is not active
726✔
949
    if (m_state == State::Active) {
1,450✔
950
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,450✔
951
    }
1,450✔
952
}
1,450✔
953

954
SubscriptionStore* SessionImpl::get_flx_subscription_store()
955
{
13,492✔
956
    // Should never be called if session is not active
6,946✔
957
    REALM_ASSERT_EX(m_state == State::Active, m_state);
13,492✔
958
    return m_wrapper.get_flx_subscription_store();
13,492✔
959
}
13,492✔
960

961
MigrationStore* SessionImpl::get_migration_store()
962
{
59,230✔
963
    // Should never be called if session is not active
31,496✔
964
    REALM_ASSERT_EX(m_state == State::Active, m_state);
59,230✔
965
    return m_wrapper.get_migration_store();
59,230✔
966
}
59,230✔
967

968
void SessionImpl::on_flx_sync_version_complete(int64_t version)
969
{
232✔
970
    // Ignore the call if the session is not active
116✔
971
    if (m_state == State::Active) {
232✔
972
        m_wrapper.on_flx_sync_version_complete(version);
232✔
973
    }
232✔
974
}
232✔
975

976
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
977
{
846✔
978
    // Should never be called if session is not active
430✔
979
    REALM_ASSERT_EX(m_state == State::Active, m_state);
846✔
980

430✔
981
    // Make sure we don't call the debug hook recursively.
430✔
982
    if (m_wrapper.m_in_debug_hook) {
846✔
983
        return SyncClientHookAction::NoAction;
×
984
    }
×
985
    m_wrapper.m_in_debug_hook = true;
846✔
986
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
846✔
987
        m_wrapper.m_in_debug_hook = false;
846✔
988
    });
846✔
989

430✔
990
    auto action = m_wrapper.m_debug_hook(data);
846✔
991
    switch (action) {
846✔
992
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
✔
993
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
×
994
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
×
995

996
            auto err_processing_err = receive_error_message(err_info);
×
997
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
998
            return SyncClientHookAction::EarlyReturn;
×
999
        }
×
1000
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1001
            get_connection().voluntary_disconnect();
24✔
1002
            return SyncClientHookAction::EarlyReturn;
24✔
1003
        }
×
1004
        default:
822✔
1005
            return action;
822✔
1006
    }
846✔
1007
}
846✔
1008

1009
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1010
                                                  int64_t query_version, DownloadBatchState batch_state,
1011
                                                  size_t num_changesets)
1012
{
91,482✔
1013
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
91,482✔
1014
        return SyncClientHookAction::NoAction;
90,708✔
1015
    }
90,708✔
1016
    if (REALM_UNLIKELY(m_state != State::Active)) {
774✔
1017
        return SyncClientHookAction::NoAction;
×
1018
    }
×
1019

390✔
1020
    SyncClientHookData data;
774✔
1021
    data.event = event;
774✔
1022
    data.batch_state = batch_state;
774✔
1023
    data.progress = progress;
774✔
1024
    data.num_changesets = num_changesets;
774✔
1025
    data.query_version = query_version;
774✔
1026

390✔
1027
    return call_debug_hook(data);
774✔
1028
}
774✔
1029

1030
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1031
{
872✔
1032
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
872✔
1033
        return SyncClientHookAction::NoAction;
802✔
1034
    }
802✔
1035
    if (REALM_UNLIKELY(m_state != State::Active)) {
70✔
1036
        return SyncClientHookAction::NoAction;
×
1037
    }
×
1038

38✔
1039
    SyncClientHookData data;
70✔
1040
    data.event = event;
70✔
1041
    data.batch_state = DownloadBatchState::SteadyState;
70✔
1042
    data.progress = m_progress;
70✔
1043
    data.num_changesets = 0;
70✔
1044
    data.query_version = 0;
70✔
1045
    data.error_info = &error_info;
70✔
1046

38✔
1047
    return call_debug_hook(data);
70✔
1048
}
70✔
1049

1050
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1051
{
88,532✔
1052
    // Should never be called if session is not active
47,574✔
1053
    REALM_ASSERT_EX(m_state == State::Active, m_state);
88,532✔
1054
    if (batch_state == DownloadBatchState::SteadyState) {
88,532✔
1055
        return true;
42,678✔
1056
    }
42,678✔
1057

24,582✔
1058
    if (!m_is_flx_sync_session) {
45,854✔
1059
        return true;
41,646✔
1060
    }
41,646✔
1061

2,122✔
1062
    // If this is a steady state DOWNLOAD, no need for special handling.
2,122✔
1063
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,208✔
1064
        return true;
1,036✔
1065
    }
1,036✔
1066

1,588✔
1067
    return false;
3,172✔
1068
}
3,172✔
1069

1070
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1071
{
52✔
1072
    if (m_state != State::Active) {
52✔
1073
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1074
    }
×
1075

26✔
1076
    try {
52✔
1077
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
52✔
1078
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
52✔
1079
            return Status{ErrorCodes::LogicError,
4✔
1080
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1081
        }
4✔
1082
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
48✔
1083
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1084
        }
×
1085
    }
4✔
1086
    catch (const nlohmann::json::parse_error& e) {
4✔
1087
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1088
    }
4✔
1089

22✔
1090
    auto pf = util::make_promise_future<std::string>();
44✔
1091

22✔
1092
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
44✔
1093
        // Includes operation_aborted
22✔
1094
        if (!status.is_ok())
44✔
1095
            promise.set_error(status);
×
1096

22✔
1097
        auto id = ++m_last_pending_test_command_ident;
44✔
1098
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
44✔
1099
        ensure_enlisted_to_send();
44✔
1100
    });
44✔
1101

22✔
1102
    return std::move(pf.future);
44✔
1103
}
44✔
1104

1105
// ################ SessionWrapper ################
1106

1107
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1108
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1109
// the ClientImpl::Connection that owns the ClientImpl::Session.
1110
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1111
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1112
    : m_client{client}
1113
    , m_db(std::move(db))
1114
    , m_replication(m_db->get_replication())
1115
    , m_protocol_envelope{config.protocol_envelope}
1116
    , m_server_address{std::move(config.server_address)}
1117
    , m_server_port{config.server_port}
1118
    , m_user_id(std::move(config.user_id))
1119
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1120
    , m_authorization_header_name{config.authorization_header_name}
1121
    , m_custom_http_headers{config.custom_http_headers}
1122
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1123
    , m_simulate_integration_error{config.simulate_integration_error}
1124
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1125
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1126
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1127
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1128
    , m_virt_path{std::move(config.realm_identifier)}
1129
    , m_signed_access_token{std::move(config.signed_user_token)}
1130
    , m_client_reset_config{std::move(config.client_reset_config)}
1131
    , m_proxy_config{config.proxy_config} // Throws
1132
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1133
    , m_session_reason(config.session_reason)
1134
    , m_flx_subscription_store(std::move(flx_sub_store))
1135
    , m_migration_store(std::move(migration_store))
1136
{
10,952✔
1137
    REALM_ASSERT(m_db);
10,952✔
1138
    REALM_ASSERT(m_db->get_replication());
10,952✔
1139
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,952✔
1140

5,302✔
1141
    update_subscription_version_info();
10,952✔
1142
}
10,952✔
1143

1144
SessionWrapper::~SessionWrapper() noexcept
1145
{
10,952✔
1146
    if (m_db && m_actualized) {
10,952✔
1147
        m_db->remove_commit_listener(this);
168✔
1148
        m_db->release_sync_agent();
168✔
1149
    }
168✔
1150
}
10,952✔
1151

1152

1153
inline ClientReplication& SessionWrapper::get_replication() noexcept
1154
{
111,086✔
1155
    REALM_ASSERT(m_db);
111,086✔
1156
    return static_cast<ClientReplication&>(*m_replication);
111,086✔
1157
}
111,086✔
1158

1159

1160
inline ClientImpl& SessionWrapper::get_client() noexcept
1161
{
72✔
1162
    return m_client;
72✔
1163
}
72✔
1164

1165
bool SessionWrapper::has_flx_subscription_store() const
1166
{
1,450✔
1167
    return static_cast<bool>(m_flx_subscription_store);
1,450✔
1168
}
1,450✔
1169

1170
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1171
{
16✔
1172
    REALM_ASSERT(!m_finalized);
16✔
1173
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
16✔
1174
    mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
16✔
1175
    mut_subs.commit();
16✔
1176
}
16✔
1177

1178
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1179
{
1,642✔
1180
    REALM_ASSERT(!m_finalized);
1,642✔
1181
    m_flx_last_seen_version = version;
1,642✔
1182
    m_flx_active_version = version;
1,642✔
1183
}
1,642✔
1184

1185
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1186
{
1,450✔
1187
    if (!has_flx_subscription_store()) {
1,450✔
1188
        return;
×
1189
    }
×
1190
    REALM_ASSERT(!m_finalized);
1,450✔
1191
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,450✔
1192
    REALM_ASSERT(new_version >= m_flx_active_version);
1,450✔
1193
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,450✔
1194

726✔
1195
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,450✔
1196

726✔
1197
    switch (batch_state) {
1,450✔
1198
        case DownloadBatchState::SteadyState:
✔
1199
            // Cannot be called with this value.
1200
            REALM_UNREACHABLE();
1201
        case DownloadBatchState::LastInBatch:
1,410✔
1202
            if (m_flx_active_version == new_version) {
1,410✔
1203
                return;
×
1204
            }
×
1205
            on_flx_sync_version_complete(new_version);
1,410✔
1206
            if (new_version == 0) {
1,410✔
1207
                new_state = SubscriptionSet::State::Complete;
640✔
1208
            }
640✔
1209
            else {
770✔
1210
                new_state = SubscriptionSet::State::AwaitingMark;
770✔
1211
                m_flx_pending_mark_version = new_version;
770✔
1212
            }
770✔
1213
            break;
1,410✔
1214
        case DownloadBatchState::MoreToCome:
726✔
1215
            if (m_flx_last_seen_version == new_version) {
40✔
1216
                return;
×
1217
            }
×
1218

20✔
1219
            m_flx_last_seen_version = new_version;
40✔
1220
            new_state = SubscriptionSet::State::Bootstrapping;
40✔
1221
            break;
40✔
1222
    }
1,450✔
1223

726✔
1224
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
1,450✔
1225
    mut_subs.update_state(new_state);
1,450✔
1226
    mut_subs.commit();
1,450✔
1227
}
1,450✔
1228

1229
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1230
{
14,958✔
1231
    REALM_ASSERT(!m_finalized);
14,958✔
1232
    return m_flx_subscription_store.get();
14,958✔
1233
}
14,958✔
1234

1235
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1236
{
4,000✔
1237
    REALM_ASSERT(!m_finalized);
4,000✔
1238
    return m_flx_pending_bootstrap_store.get();
4,000✔
1239
}
4,000✔
1240

1241
MigrationStore* SessionWrapper::get_migration_store()
1242
{
59,230✔
1243
    REALM_ASSERT(!m_finalized);
59,230✔
1244
    return m_migration_store.get();
59,230✔
1245
}
59,230✔
1246

1247
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1248
{
3,420✔
1249
    REALM_ASSERT(!m_initiated);
3,420✔
1250
    m_progress_handler = std::move(handler);
3,420✔
1251
}
3,420✔
1252

1253

1254
inline void
1255
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1256
{
11,048✔
1257
    REALM_ASSERT(!m_initiated);
11,048✔
1258
    m_connection_state_change_listener = std::move(listener);
11,048✔
1259
}
11,048✔
1260

1261

1262
void SessionWrapper::initiate()
1263
{
9,800✔
1264
    REALM_ASSERT(!m_initiated);
9,800✔
1265
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
9,800✔
1266
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
9,800✔
1267
    m_initiated = true;
9,800✔
1268
    m_db->add_commit_listener(this);
9,800✔
1269
}
9,800✔
1270

1271

1272
void SessionWrapper::on_commit(version_type new_version)
1273
{
105,110✔
1274
    // Thread safety required
53,328✔
1275
    REALM_ASSERT(m_initiated);
105,110✔
1276

53,328✔
1277
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
105,110✔
1278
        return;
4✔
1279
    }
4✔
1280

53,326✔
1281
    util::bind_ptr<SessionWrapper> self{this};
105,106✔
1282
    m_client.post([self = std::move(self), new_version](Status status) {
105,110✔
1283
        if (status == ErrorCodes::OperationAborted)
105,110✔
1284
            return;
×
1285
        else if (!status.is_ok())
105,110✔
1286
            throw Exception(status);
×
1287

53,326✔
1288
        REALM_ASSERT(self->m_actualized);
105,110✔
1289
        if (REALM_UNLIKELY(!self->m_sess))
105,110✔
1290
            return; // Already finalized
53,862✔
1291
        SessionImpl& sess = *self->m_sess;
104,202✔
1292
        sess.recognize_sync_version(new_version); // Throws
104,202✔
1293
        bool only_if_new_uploadable_data = true;
104,202✔
1294
        self->report_progress(only_if_new_uploadable_data); // Throws
104,202✔
1295
    });
104,202✔
1296
}
105,106✔
1297

1298

1299
void SessionWrapper::cancel_reconnect_delay()
1300
{
12✔
1301
    // Thread safety required
6✔
1302
    REALM_ASSERT(m_initiated);
12✔
1303

6✔
1304
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
12✔
1305
        return;
×
1306
    }
×
1307

6✔
1308
    util::bind_ptr<SessionWrapper> self{this};
12✔
1309
    m_client.post([self = std::move(self)](Status status) {
12✔
1310
        if (status == ErrorCodes::OperationAborted)
12✔
1311
            return;
×
1312
        else if (!status.is_ok())
12✔
1313
            throw Exception(status);
×
1314

6✔
1315
        REALM_ASSERT(self->m_actualized);
12✔
1316
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1317
            return; // Already finalized
6✔
1318
        SessionImpl& sess = *self->m_sess;
12✔
1319
        sess.cancel_resumption_delay(); // Throws
12✔
1320
        ClientImpl::Connection& conn = sess.get_connection();
12✔
1321
        conn.cancel_reconnect_delay(); // Throws
12✔
1322
    });                                // Throws
12✔
1323
}
12✔
1324

1325
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1326
                                    WaitOperCompletionHandler handler)
1327
{
4,348✔
1328
    REALM_ASSERT(upload_completion || download_completion);
4,348✔
1329
    REALM_ASSERT(m_initiated);
4,348✔
1330
    REALM_ASSERT(!m_finalized);
4,348✔
1331

2,086✔
1332
    util::bind_ptr<SessionWrapper> self{this};
4,348✔
1333
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,348✔
1334
                   download_completion](Status status) mutable {
4,348✔
1335
        if (status == ErrorCodes::OperationAborted)
4,348✔
1336
            return;
×
1337
        else if (!status.is_ok())
4,348✔
1338
            throw Exception(status);
×
1339

2,086✔
1340
        REALM_ASSERT(self->m_actualized);
4,348✔
1341
        if (REALM_UNLIKELY(!self->m_sess)) {
4,348✔
1342
            // Already finalized
40✔
1343
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
74✔
1344
            return;
74✔
1345
        }
74✔
1346
        if (upload_completion) {
4,274✔
1347
            if (download_completion) {
2,270✔
1348
                // Wait for upload and download completion
136✔
1349
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
274✔
1350
            }
274✔
1351
            else {
1,996✔
1352
                // Wait for upload completion only
908✔
1353
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
1,996✔
1354
            }
1,996✔
1355
        }
2,270✔
1356
        else {
2,004✔
1357
            // Wait for download completion only
1,002✔
1358
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,004✔
1359
        }
2,004✔
1360
        SessionImpl& sess = *self->m_sess;
4,274✔
1361
        if (upload_completion)
4,274✔
1362
            sess.request_upload_completion_notification(); // Throws
2,270✔
1363
        if (download_completion)
4,274✔
1364
            sess.request_download_completion_notification(); // Throws
2,278✔
1365
    });                                                      // Throws
4,274✔
1366
}
4,348✔
1367

1368

1369
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1370
{
12,996✔
1371
    // Thread safety required
6,498✔
1372
    REALM_ASSERT(m_initiated);
12,996✔
1373
    REALM_ASSERT(!m_finalized);
12,996✔
1374

6,498✔
1375
    std::int_fast64_t target_mark;
12,996✔
1376
    {
12,996✔
1377
        std::lock_guard lock{m_client.m_mutex};
12,996✔
1378
        target_mark = ++m_target_upload_mark;
12,996✔
1379
    }
12,996✔
1380

6,498✔
1381
    util::bind_ptr<SessionWrapper> self{this};
12,996✔
1382
    m_client.post([self = std::move(self), target_mark](Status status) {
12,996✔
1383
        if (status == ErrorCodes::OperationAborted)
12,996✔
1384
            return;
×
1385
        else if (!status.is_ok())
12,996✔
1386
            throw Exception(status);
×
1387

6,498✔
1388
        REALM_ASSERT(self->m_actualized);
12,996✔
1389
        // The session wrapper may already have been finalized. This can only
6,498✔
1390
        // happen if it was abandoned, but in that case, the call of
6,498✔
1391
        // wait_for_upload_complete_or_client_stopped() must have returned
6,498✔
1392
        // already.
6,498✔
1393
        if (REALM_UNLIKELY(!self->m_sess))
12,996✔
1394
            return;
6,506✔
1395
        if (target_mark > self->m_staged_upload_mark) {
12,984✔
1396
            self->m_staged_upload_mark = target_mark;
12,984✔
1397
            SessionImpl& sess = *self->m_sess;
12,984✔
1398
            sess.request_upload_completion_notification(); // Throws
12,984✔
1399
        }
12,984✔
1400
    }); // Throws
12,984✔
1401

6,498✔
1402
    bool completion_condition_was_satisfied;
12,996✔
1403
    {
12,996✔
1404
        std::unique_lock lock{m_client.m_mutex};
12,996✔
1405
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
32,776✔
1406
            m_client.m_wait_or_client_stopped_cond.wait(lock);
19,780✔
1407
        completion_condition_was_satisfied = !m_client.m_stopped;
12,996✔
1408
    }
12,996✔
1409
    return completion_condition_was_satisfied;
12,996✔
1410
}
12,996✔
1411

1412

1413
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1414
{
10,092✔
1415
    // Thread safety required
5,048✔
1416
    REALM_ASSERT(m_initiated);
10,092✔
1417
    REALM_ASSERT(!m_finalized);
10,092✔
1418

5,048✔
1419
    std::int_fast64_t target_mark;
10,092✔
1420
    {
10,092✔
1421
        std::lock_guard lock{m_client.m_mutex};
10,092✔
1422
        target_mark = ++m_target_download_mark;
10,092✔
1423
    }
10,092✔
1424

5,048✔
1425
    util::bind_ptr<SessionWrapper> self{this};
10,092✔
1426
    m_client.post([self = std::move(self), target_mark](Status status) {
10,092✔
1427
        if (status == ErrorCodes::OperationAborted)
10,092✔
1428
            return;
×
1429
        else if (!status.is_ok())
10,092✔
1430
            throw Exception(status);
×
1431

5,048✔
1432
        REALM_ASSERT(self->m_actualized);
10,092✔
1433
        // The session wrapper may already have been finalized. This can only
5,048✔
1434
        // happen if it was abandoned, but in that case, the call of
5,048✔
1435
        // wait_for_download_complete_or_client_stopped() must have returned
5,048✔
1436
        // already.
5,048✔
1437
        if (REALM_UNLIKELY(!self->m_sess))
10,092✔
1438
            return;
5,078✔
1439
        if (target_mark > self->m_staged_download_mark) {
10,032✔
1440
            self->m_staged_download_mark = target_mark;
10,032✔
1441
            SessionImpl& sess = *self->m_sess;
10,032✔
1442
            sess.request_download_completion_notification(); // Throws
10,032✔
1443
        }
10,032✔
1444
    }); // Throws
10,032✔
1445

5,048✔
1446
    bool completion_condition_was_satisfied;
10,092✔
1447
    {
10,092✔
1448
        std::unique_lock lock{m_client.m_mutex};
10,092✔
1449
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,656✔
1450
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,564✔
1451
        completion_condition_was_satisfied = !m_client.m_stopped;
10,092✔
1452
    }
10,092✔
1453
    return completion_condition_was_satisfied;
10,092✔
1454
}
10,092✔
1455

1456

1457
void SessionWrapper::refresh(std::string signed_access_token)
1458
{
208✔
1459
    // Thread safety required
104✔
1460
    REALM_ASSERT(m_initiated);
208✔
1461
    REALM_ASSERT(!m_finalized);
208✔
1462

104✔
1463
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1464
        if (status == ErrorCodes::OperationAborted)
208✔
1465
            return;
×
1466
        else if (!status.is_ok())
208✔
1467
            throw Exception(status);
×
1468

104✔
1469
        REALM_ASSERT(self->m_actualized);
208✔
1470
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1471
            return; // Already finalized
104✔
1472
        self->m_signed_access_token = std::move(token);
208✔
1473
        SessionImpl& sess = *self->m_sess;
208✔
1474
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1475
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1476
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1477
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1478
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1479
    });
208✔
1480
}
208✔
1481

1482

1483
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1484
{
10,952✔
1485
    if (wrapper->m_initiated) {
10,952✔
1486
        ClientImpl& client = wrapper->m_client;
9,800✔
1487
        client.register_abandoned_session_wrapper(std::move(wrapper));
9,800✔
1488
    }
9,800✔
1489
}
10,952✔
1490

1491

1492
// Must be called from event loop thread
1493
void SessionWrapper::actualize(ServerEndpoint endpoint)
1494
{
9,626✔
1495
    REALM_ASSERT(!m_actualized);
9,626✔
1496
    REALM_ASSERT(!m_sess);
9,626✔
1497
    // Cannot be actualized if it's already been finalized or force closed
4,638✔
1498
    REALM_ASSERT(!m_finalized);
9,626✔
1499
    REALM_ASSERT(!m_force_closed);
9,626✔
1500
    try {
9,626✔
1501
        m_db->claim_sync_agent();
9,626✔
1502
    }
9,626✔
1503
    catch (const MultipleSyncAgents&) {
4,640✔
1504
        finalize_before_actualization();
4✔
1505
        throw;
4✔
1506
    }
4✔
1507
    auto sync_mode = endpoint.server_mode;
9,624✔
1508

4,636✔
1509
    bool was_created = false;
9,624✔
1510
    ClientImpl::Connection& conn = m_client.get_connection(
9,624✔
1511
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
9,624✔
1512
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
9,624✔
1513
        was_created); // Throws
9,624✔
1514
    try {
9,624✔
1515
        // FIXME: This only makes sense when each session uses a separate connection.
4,636✔
1516
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
9,624✔
1517
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
9,624✔
1518
        if (sync_mode == SyncServerMode::FLX) {
9,624✔
1519
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,004✔
1520
        }
1,004✔
1521

4,636✔
1522
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
9,624✔
1523
        m_sess = sess.get();
9,624✔
1524
        conn.activate_session(std::move(sess)); // Throws
9,624✔
1525
    }
9,624✔
1526
    catch (...) {
4,636✔
1527
        if (was_created)
×
1528
            m_client.remove_connection(conn);
×
1529

1530
        finalize_before_actualization();
×
1531
        throw;
×
1532
    }
×
1533

4,636✔
1534
    m_actualized = true;
9,624✔
1535
    if (was_created)
9,624✔
1536
        conn.activate(); // Throws
2,462✔
1537

4,636✔
1538
    if (m_connection_state_change_listener) {
9,624✔
1539
        ConnectionState state = conn.get_state();
9,606✔
1540
        if (state != ConnectionState::disconnected) {
9,606✔
1541
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,018✔
1542
            if (state == ConnectionState::connected)
7,018✔
1543
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,726✔
1544
        }
7,018✔
1545
    }
9,606✔
1546

4,636✔
1547
    if (!m_client_reset_config)
9,624✔
1548
        report_progress(); // Throws
9,288✔
1549
}
9,624✔
1550

1551
void SessionWrapper::force_close()
1552
{
156✔
1553
    if (m_force_closed || m_finalized) {
156✔
1554
        return;
×
1555
    }
×
1556
    REALM_ASSERT(m_actualized);
156✔
1557
    REALM_ASSERT(m_sess);
156✔
1558
    m_force_closed = true;
156✔
1559

78✔
1560
    ClientImpl::Connection& conn = m_sess->get_connection();
156✔
1561
    conn.initiate_session_deactivation(m_sess); // Throws
156✔
1562

78✔
1563
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
78✔
1564
    m_flx_pending_bootstrap_store.reset();
156✔
1565
    // Clear the subscription and migration store refs since they are owned by SyncSession
78✔
1566
    m_flx_subscription_store.reset();
156✔
1567
    m_migration_store.reset();
156✔
1568
    m_sess = nullptr;
156✔
1569
    // Everything is being torn down, no need to report connection state anymore
78✔
1570
    m_connection_state_change_listener = {};
156✔
1571
}
156✔
1572

1573
// Must be called from event loop thread
1574
void SessionWrapper::finalize()
1575
{
9,630✔
1576
    REALM_ASSERT(m_actualized);
9,630✔
1577

4,640✔
1578
    // Already finalized?
4,640✔
1579
    if (m_finalized) {
9,630✔
1580
        return;
×
1581
    }
×
1582

4,640✔
1583
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,640✔
1584
    m_db->remove_commit_listener(this);
9,630✔
1585

4,640✔
1586
    m_finalized = true;
9,630✔
1587

4,640✔
1588
    if (!m_force_closed) {
9,630✔
1589
        REALM_ASSERT(m_sess);
9,466✔
1590
        ClientImpl::Connection& conn = m_sess->get_connection();
9,466✔
1591
        conn.initiate_session_deactivation(m_sess); // Throws
9,466✔
1592

4,558✔
1593
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,558✔
1594
        m_flx_pending_bootstrap_store.reset();
9,466✔
1595
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,558✔
1596
        m_flx_subscription_store.reset();
9,466✔
1597
        m_migration_store.reset();
9,466✔
1598
        m_sess = nullptr;
9,466✔
1599
    }
9,466✔
1600

4,640✔
1601
    // The Realm file can be closed now, as no access to the Realm file is
4,640✔
1602
    // supposed to happen on behalf of a session after initiation of
4,640✔
1603
    // deactivation.
4,640✔
1604
    m_db->release_sync_agent();
9,630✔
1605
    m_db = nullptr;
9,630✔
1606

4,640✔
1607
    // All outstanding wait operations must be canceled
4,640✔
1608
    while (!m_upload_completion_handlers.empty()) {
10,002✔
1609
        auto handler = std::move(m_upload_completion_handlers.back());
372✔
1610
        m_upload_completion_handlers.pop_back();
372✔
1611
        handler(
372✔
1612
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
372✔
1613
    }
372✔
1614
    while (!m_download_completion_handlers.empty()) {
9,790✔
1615
        auto handler = std::move(m_download_completion_handlers.back());
160✔
1616
        m_download_completion_handlers.pop_back();
160✔
1617
        handler(
160✔
1618
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
160✔
1619
    }
160✔
1620
    while (!m_sync_completion_handlers.empty()) {
9,640✔
1621
        auto handler = std::move(m_sync_completion_handlers.back());
10✔
1622
        m_sync_completion_handlers.pop_back();
10✔
1623
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
10✔
1624
    }
10✔
1625
}
9,630✔
1626

1627

1628
// Must be called only when an unactualized session wrapper becomes abandoned.
1629
//
1630
// Called with a lock on `m_client.m_mutex`.
1631
inline void SessionWrapper::finalize_before_actualization() noexcept
1632
{
176✔
1633
    REALM_ASSERT(!m_sess);
176✔
1634
    m_actualized = true;
176✔
1635
    m_force_closed = true;
176✔
1636
}
176✔
1637

1638

1639
inline void SessionWrapper::on_sync_progress()
1640
{
44,088✔
1641
    REALM_ASSERT(!m_finalized);
44,088✔
1642
    m_reliable_download_progress = true;
44,088✔
1643
    report_progress(); // Throws
44,088✔
1644
}
44,088✔
1645

1646

1647
void SessionWrapper::on_upload_completion()
1648
{
14,672✔
1649
    REALM_ASSERT(!m_finalized);
14,672✔
1650
    while (!m_upload_completion_handlers.empty()) {
16,380✔
1651
        auto handler = std::move(m_upload_completion_handlers.back());
1,708✔
1652
        m_upload_completion_handlers.pop_back();
1,708✔
1653
        handler(Status::OK()); // Throws
1,708✔
1654
    }
1,708✔
1655
    while (!m_sync_completion_handlers.empty()) {
14,852✔
1656
        auto handler = std::move(m_sync_completion_handlers.back());
180✔
1657
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
180✔
1658
        m_sync_completion_handlers.pop_back();
180✔
1659
    }
180✔
1660
    std::lock_guard lock{m_client.m_mutex};
14,672✔
1661
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,672✔
1662
        m_reached_upload_mark = m_staged_upload_mark;
12,964✔
1663
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,964✔
1664
    }
12,964✔
1665
}
14,672✔
1666

1667

1668
void SessionWrapper::on_download_completion()
1669
{
15,628✔
1670
    while (!m_download_completion_handlers.empty()) {
17,652✔
1671
        auto handler = std::move(m_download_completion_handlers.back());
2,024✔
1672
        m_download_completion_handlers.pop_back();
2,024✔
1673
        handler(Status::OK()); // Throws
2,024✔
1674
    }
2,024✔
1675
    while (!m_sync_completion_handlers.empty()) {
15,712✔
1676
        auto handler = std::move(m_sync_completion_handlers.back());
84✔
1677
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
84✔
1678
        m_sync_completion_handlers.pop_back();
84✔
1679
    }
84✔
1680

7,596✔
1681
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
15,628✔
1682
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
656✔
1683
                             m_flx_pending_mark_version);
656✔
1684
        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
656✔
1685
        mutable_subs.update_state(SubscriptionSet::State::Complete);
656✔
1686
        mutable_subs.commit();
656✔
1687
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
656✔
1688
    }
656✔
1689

7,596✔
1690
    std::lock_guard lock{m_client.m_mutex};
15,628✔
1691
    if (m_staged_download_mark > m_reached_download_mark) {
15,628✔
1692
        m_reached_download_mark = m_staged_download_mark;
9,962✔
1693
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,962✔
1694
    }
9,962✔
1695
}
15,628✔
1696

1697

1698
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1699
{
896✔
1700
    REALM_ASSERT(!m_finalized);
896✔
1701
    m_suspended = true;
896✔
1702
    if (m_connection_state_change_listener) {
896✔
1703
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
896✔
1704
    }
896✔
1705
}
896✔
1706

1707

1708
void SessionWrapper::on_resumed()
1709
{
366✔
1710
    REALM_ASSERT(!m_finalized);
366✔
1711
    m_suspended = false;
366✔
1712
    if (m_connection_state_change_listener) {
366✔
1713
        ClientImpl::Connection& conn = m_sess->get_connection();
366✔
1714
        if (conn.get_state() != ConnectionState::disconnected) {
366✔
1715
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
366✔
1716
            if (conn.get_state() == ConnectionState::connected)
366✔
1717
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
366✔
1718
        }
366✔
1719
    }
366✔
1720
}
366✔
1721

1722

1723
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1724
                                                 const util::Optional<SessionErrorInfo>& error_info)
1725
{
10,630✔
1726
    if (m_connection_state_change_listener) {
10,630✔
1727
        if (!m_suspended)
10,620✔
1728
            m_connection_state_change_listener(state, error_info); // Throws
10,618✔
1729
    }
10,620✔
1730
}
10,630✔
1731

1732

1733
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
1734
{
157,578✔
1735
    REALM_ASSERT(!m_finalized);
157,578✔
1736
    REALM_ASSERT(m_sess);
157,578✔
1737

81,116✔
1738
    if (!m_progress_handler)
157,578✔
1739
        return;
113,164✔
1740

20,384✔
1741
    std::uint_fast64_t downloaded_bytes = 0;
44,414✔
1742
    std::uint_fast64_t downloadable_bytes = 0;
44,414✔
1743
    std::uint_fast64_t uploaded_bytes = 0;
44,414✔
1744
    std::uint_fast64_t uploadable_bytes = 0;
44,414✔
1745
    std::uint_fast64_t snapshot_version = 0;
44,414✔
1746
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
44,414✔
1747
                                             uploadable_bytes, snapshot_version);
44,414✔
1748

20,384✔
1749
    // If this progress notification was triggered by a commit being made we
20,384✔
1750
    // only want to send it if the uploadable bytes has actually increased,
20,384✔
1751
    // and not if it was an empty commit.
20,384✔
1752
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
44,414✔
1753
        return;
30,930✔
1754
    m_last_reported_uploadable_bytes = uploadable_bytes;
13,484✔
1755

5,682✔
1756
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
5,682✔
1757
    // is only the remaining to download. This is confusing, so make them use
5,682✔
1758
    // the same units.
5,682✔
1759
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
13,484✔
1760

5,682✔
1761
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
13,484✔
1762
                         "downloadable(total) = %2, uploaded = %3, "
13,484✔
1763
                         "uploadable = %4, reliable_download_progress = %5, "
13,484✔
1764
                         "snapshot version = %6",
13,484✔
1765
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
13,484✔
1766
                         m_reliable_download_progress, snapshot_version);
13,484✔
1767

5,682✔
1768
    // FIXME: Why is this boolean status communicated to the application as
5,682✔
1769
    // a 64-bit integer? Also, the name `progress_version` is confusing.
5,682✔
1770
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
9,976✔
1771
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
13,484✔
1772
                       snapshot_version);
13,484✔
1773
}
13,484✔
1774

1775
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1776
{
52✔
1777
    if (!m_sess) {
52✔
1778
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1779
    }
×
1780

26✔
1781
    return m_sess->send_test_command(std::move(body));
52✔
1782
}
52✔
1783

1784
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1785
{
290✔
1786
    REALM_ASSERT(!m_finalized);
290✔
1787

144✔
1788
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
290✔
1789
    REALM_ASSERT(pending_reset);
290✔
1790
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
290✔
1791
                        pending_reset->time);
290✔
1792
    util::bind_ptr<SessionWrapper> self(this);
290✔
1793
    async_wait_for(true, true, [self = std::move(self), pending_reset = *pending_reset](Status status) {
290✔
1794
        if (status == ErrorCodes::OperationAborted) {
290✔
1795
            return;
150✔
1796
        }
150✔
1797
        auto& logger = self->m_sess->logger;
140✔
1798
        if (!status.is_ok()) {
140✔
1799
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1800
            return;
×
1801
        }
×
1802

70✔
1803
        auto wt = self->m_db->start_write();
140✔
1804
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
140✔
1805
        if (!cur_pending_reset) {
140✔
1806
            logger.debug(
×
1807
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1808
                pending_reset.type, pending_reset.time);
×
1809
            return;
×
1810
        }
×
1811
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
140✔
1812
            logger.debug(
×
1813
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1814
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1815
        }
×
1816
        else {
140✔
1817
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
140✔
1818
                         "Removing cycle detection tracker.",
140✔
1819
                         pending_reset.type, pending_reset.time);
140✔
1820
        }
140✔
1821
        _impl::client_reset::remove_pending_client_resets(*wt);
140✔
1822
        wt->commit();
140✔
1823
    });
140✔
1824
}
290✔
1825

1826
void SessionWrapper::update_subscription_version_info()
1827
{
11,220✔
1828
    if (!m_flx_subscription_store)
11,220✔
1829
        return;
10,116✔
1830
    auto versions_info = m_flx_subscription_store->get_version_info();
1,104✔
1831
    m_flx_active_version = versions_info.active;
1,104✔
1832
    m_flx_pending_mark_version = versions_info.pending_mark;
1,104✔
1833
}
1,104✔
1834

1835
std::string SessionWrapper::get_appservices_connection_id()
1836
{
72✔
1837
    auto pf = util::make_promise_future<std::string>();
72✔
1838
    REALM_ASSERT(m_initiated);
72✔
1839

36✔
1840
    util::bind_ptr<SessionWrapper> self(this);
72✔
1841
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1842
        if (!status.is_ok()) {
72✔
1843
            promise.set_error(status);
×
1844
            return;
×
1845
        }
×
1846

36✔
1847
        if (!self->m_sess) {
72✔
1848
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1849
            return;
×
1850
        }
×
1851

36✔
1852
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1853
    });
72✔
1854

36✔
1855
    return pf.future.get();
72✔
1856
}
72✔
1857

1858
// ################ ClientImpl::Connection ################
1859

1860
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1861
                                   const std::string& authorization_header_name,
1862
                                   const std::map<std::string, std::string>& custom_http_headers,
1863
                                   bool verify_servers_ssl_certificate,
1864
                                   Optional<std::string> ssl_trust_certificate_path,
1865
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1866
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1867
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1868
    , logger{*logger_ptr}
1869
    , m_client{client}
1870
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1871
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1872
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1873
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1874
    , m_reconnect_info{reconnect_info}
1875
    , m_session_history{}
1876
    , m_ident{ident}
1877
    , m_server_endpoint{std::move(endpoint)}
1878
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1879
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1880
{
2,462✔
1881
    m_on_idle = m_client.create_trigger([this](Status status) {
2,464✔
1882
        if (status == ErrorCodes::OperationAborted)
2,464✔
1883
            return;
×
1884
        else if (!status.is_ok())
2,464✔
1885
            throw Exception(status);
×
1886

1,162✔
1887
        REALM_ASSERT(m_activated);
2,464✔
1888
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,464✔
1889
            on_idle(); // Throws
2,460✔
1890
            // Connection object may be destroyed now.
1,160✔
1891
        }
2,460✔
1892
    });
2,464✔
1893
}
2,462✔
1894

1895
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1896
{
12✔
1897
    return m_ident;
12✔
1898
}
12✔
1899

1900

1901
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1902
{
2,460✔
1903
    return m_server_endpoint;
2,460✔
1904
}
2,460✔
1905

1906
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1907
                                                        const std::string& signed_access_token)
1908
{
9,832✔
1909
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
9,832✔
1910
    m_signed_access_token = signed_access_token;           // Throws (copy)
9,832✔
1911
}
9,832✔
1912

1913

1914
void ClientImpl::Connection::resume_active_sessions()
1915
{
1,764✔
1916
    auto handler = [=](ClientImpl::Session& sess) {
3,524✔
1917
        sess.cancel_resumption_delay(); // Throws
3,524✔
1918
    };
3,524✔
1919
    for_each_active_session(std::move(handler)); // Throws
1,764✔
1920
}
1,764✔
1921

1922
void ClientImpl::Connection::on_idle()
1923
{
2,460✔
1924
    logger.debug("Destroying connection object");
2,460✔
1925
    ClientImpl& client = get_client();
2,460✔
1926
    client.remove_connection(*this);
2,460✔
1927
    // NOTE: This connection object is now destroyed!
1,160✔
1928
}
2,460✔
1929

1930

1931
std::string ClientImpl::Connection::get_http_request_path() const
1932
{
3,320✔
1933
    using namespace std::string_view_literals;
3,320✔
1934
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,320✔
1935

1,622✔
1936
    std::string path;
3,320✔
1937
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,320✔
1938
    path += m_http_request_path_prefix;
3,320✔
1939
    path += param;
3,320✔
1940
    path += m_signed_access_token;
3,320✔
1941

1,622✔
1942
    return path;
3,320✔
1943
}
3,320✔
1944

1945

1946
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1947
{
2,462✔
1948
    std::ostringstream out;
2,462✔
1949
    out.imbue(std::locale::classic());
2,462✔
1950
    out << "Connection[" << ident << "]: "; // Throws
2,462✔
1951
    return out.str();                       // Throws
2,462✔
1952
}
2,462✔
1953

1954

1955
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1956
                                                            util::Optional<SessionErrorInfo> error_info)
1957
{
9,842✔
1958
    if (m_force_closed) {
9,842✔
1959
        return;
2,168✔
1960
    }
2,168✔
1961
    auto handler = [=](ClientImpl::Session& sess) {
10,556✔
1962
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,556✔
1963
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,556✔
1964
    };
10,556✔
1965
    for_each_active_session(std::move(handler)); // Throws
7,674✔
1966
}
7,674✔
1967

1968

1969
Client::Client(Config config)
1970
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1971
{
8,978✔
1972
}
8,978✔
1973

1974

1975
Client::Client(Client&& client) noexcept
1976
    : m_impl{std::move(client.m_impl)}
1977
{
×
1978
}
×
1979

1980

1981
Client::~Client() noexcept {}
8,978✔
1982

1983

1984
void Client::shutdown() noexcept
1985
{
9,056✔
1986
    m_impl->shutdown();
9,056✔
1987
}
9,056✔
1988

1989
void Client::shutdown_and_wait()
1990
{
760✔
1991
    m_impl->shutdown_and_wait();
760✔
1992
}
760✔
1993

1994
void Client::cancel_reconnect_delay()
1995
{
1,768✔
1996
    m_impl->cancel_reconnect_delay();
1,768✔
1997
}
1,768✔
1998

1999
void Client::voluntary_disconnect_all_connections()
2000
{
12✔
2001
    m_impl->voluntary_disconnect_all_connections();
12✔
2002
}
12✔
2003

2004
bool Client::wait_for_session_terminations_or_client_stopped()
2005
{
368✔
2006
    return m_impl.get()->wait_for_session_terminations_or_client_stopped();
368✔
2007
}
368✔
2008

2009

2010
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2011
                                  port_type& port, std::string& path) const
2012
{
3,344✔
2013
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,344✔
2014
}
3,344✔
2015

2016

2017
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2018
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2019
{
10,952✔
2020
    util::bind_ptr<SessionWrapper> sess;
10,952✔
2021
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,952✔
2022
                                  std::move(config)}); // Throws
10,952✔
2023
    // The reference count passed back to the application is implicitly
5,302✔
2024
    // owned by a naked pointer. This is done to avoid exposing
5,302✔
2025
    // implementation details through the header file (that is, through the
5,302✔
2026
    // Session object).
5,302✔
2027
    m_impl = sess.release();
10,952✔
2028
}
10,952✔
2029

2030

2031
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2032
{
3,420✔
2033
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,420✔
2034
}
3,420✔
2035

2036

2037
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2038
{
11,048✔
2039
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,048✔
2040
}
11,048✔
2041

2042

2043
void Session::bind()
2044
{
9,800✔
2045
    m_impl->initiate(); // Throws
9,800✔
2046
}
9,800✔
2047

2048

2049
void Session::nonsync_transact_notify(version_type new_version)
2050
{
14,838✔
2051
    m_impl->on_commit(new_version); // Throws
14,838✔
2052
}
14,838✔
2053

2054

2055
void Session::cancel_reconnect_delay()
2056
{
12✔
2057
    m_impl->cancel_reconnect_delay(); // Throws
12✔
2058
}
12✔
2059

2060

2061
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2062
{
4,058✔
2063
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,058✔
2064
}
4,058✔
2065

2066

2067
bool Session::wait_for_upload_complete_or_client_stopped()
2068
{
12,996✔
2069
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,996✔
2070
}
12,996✔
2071

2072

2073
bool Session::wait_for_download_complete_or_client_stopped()
2074
{
10,092✔
2075
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,092✔
2076
}
10,092✔
2077

2078

2079
void Session::refresh(const std::string& signed_access_token)
2080
{
208✔
2081
    m_impl->refresh(signed_access_token); // Throws
208✔
2082
}
208✔
2083

2084

2085
void Session::abandon() noexcept
2086
{
10,952✔
2087
    REALM_ASSERT(m_impl);
10,952✔
2088
    // Reabsorb the ownership assigned to the applications naked pointer by
5,302✔
2089
    // Session constructor
5,302✔
2090
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,952✔
2091
    SessionWrapper::abandon(std::move(wrapper));
10,952✔
2092
}
10,952✔
2093

2094
util::Future<std::string> Session::send_test_command(std::string body)
2095
{
52✔
2096
    return m_impl->send_test_command(std::move(body));
52✔
2097
}
52✔
2098

2099
std::string Session::get_appservices_connection_id()
2100
{
72✔
2101
    return m_impl->get_appservices_connection_id();
72✔
2102
}
72✔
2103

2104
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2105
{
×
2106
    switch (proxyType) {
×
2107
        case ProxyConfig::Type::HTTP:
×
2108
            return os << "HTTP";
×
2109
        case ProxyConfig::Type::HTTPS:
×
2110
            return os << "HTTPS";
×
2111
    }
×
2112
    REALM_TERMINATE("Invalid Proxy Type object.");
2113
}
×
2114

2115
} // namespace sync
2116
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc