• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_2947

01 Dec 2023 08:08PM UTC coverage: 91.739% (+0.04%) from 91.695%
jonathan.reams_2947

Pull #7160

Evergreen

jbreams
allow handle_error to decide resumability
Pull Request #7160: Prevent resuming a session that has not been fully shut down

92428 of 169414 branches covered (0.0%)

315 of 349 new or added lines in 14 files covered. (90.26%)

80 existing lines in 14 files now uncovered.

232137 of 253041 relevant lines covered (91.74%)

6882826.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
    void mark_unresumable();
131

132
private:
133
    ClientImpl& m_client;
134
    DBRef m_db;
135
    Replication* m_replication;
136

137
    const ProtocolEnvelope m_protocol_envelope;
138
    const std::string m_server_address;
139
    const port_type m_server_port;
140
    const std::string m_user_id;
141
    const SyncServerMode m_sync_mode;
142
    const std::string m_authorization_header_name;
143
    const std::map<std::string, std::string> m_custom_http_headers;
144
    const bool m_verify_servers_ssl_certificate;
145
    const bool m_simulate_integration_error;
146
    const Optional<std::string> m_ssl_trust_certificate_path;
147
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
148
    const size_t m_flx_bootstrap_batch_size_bytes;
149

150
    // This one is different from null when, and only when the session wrapper
151
    // is in ClientImpl::m_abandoned_session_wrappers.
152
    SessionWrapper* m_next = nullptr;
153

154
    // After initiation, these may only be accessed by the event loop thread.
155
    std::string m_http_request_path_prefix;
156
    std::string m_virt_path;
157
    std::string m_signed_access_token;
158

159
    util::Optional<ClientReset> m_client_reset_config;
160

161
    util::Optional<ProxyConfig> m_proxy_config;
162

163
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
164
    util::UniqueFunction<ProgressHandler> m_progress_handler;
165
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
166

167
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
168
    bool m_in_debug_hook = false;
169

170
    SessionReason m_session_reason;
171

172
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
173
    int64_t m_flx_active_version = 0;
174
    int64_t m_flx_last_seen_version = 0;
175
    int64_t m_flx_pending_mark_version = 0;
176
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
177

178
    std::shared_ptr<MigrationStore> m_migration_store;
179

180
    bool m_initiated = false;
181

182
    // Set to true when this session wrapper is actualized (or when it is
183
    // finalized before proper actualization). It is then never modified again.
184
    //
185
    // A session specific post handler submitted after the initiation of the
186
    // session wrapper (initiate()) will always find that `m_actualized` is
187
    // true. This is the case, because the scheduling of such a post handler
188
    // will have been preceded by the triggering of
189
    // `ClientImpl::m_actualize_and_finalize` (in
190
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
191
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
192
    // before the post handler. If the session wrapper is no longer in
193
    // `ClientImpl::m_unactualized_session_wrappers` when
194
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
195
    // have been abandoned already, but in that case,
196
    // finalize_before_actualization() has already been called.
197
    bool m_actualized = false;
198

199
    bool m_force_closed = false;
200

201
    bool m_suspended = false;
202
    bool m_resumable = true;
203

204
    // Has the SessionWrapper been finalized?
205
    bool m_finalized = false;
206

207
    // Set to true when the first DOWNLOAD message is received to indicate that
208
    // the byte-level download progress parameters can be considered reasonable
209
    // reliable. Before that, a lot of time may have passed, so our record of
210
    // the download progress is likely completely out of date.
211
    bool m_reliable_download_progress = false;
212

213
    // Set to point to an activated session object during actualization of the
214
    // session wrapper. Set to null during finalization of the session
215
    // wrapper. Both modifications are guaranteed to be performed by the event
216
    // loop thread.
217
    //
218
    // If a session specific post handler, that is submitted after the
219
    // initiation of the session wrapper, sees that `m_sess` is null, it can
220
    // conclude that the session wrapper has been both abandoned and
221
    // finalized. This is true, because the scheduling of such a post handler
222
    // will have been preceded by the triggering of
223
    // `ClientImpl::m_actualize_and_finalize` (in
224
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
225
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
226
    // before the post handler, so the session wrapper must have been actualized
227
    // unless it was already abandoned by the application. If it was abandoned
228
    // before it was actualized, it will already have been finalized by
229
    // finalize_before_actualization().
230
    //
231
    // Must only be accessed from the event loop thread.
232
    SessionImpl* m_sess = nullptr;
233

234
    // These must only be accessed from the event loop thread.
235
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
236
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
237
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
238

239
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
240
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
241
    // event loop thread.
242
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
243
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
244
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
245

246
    void on_sync_progress();
247
    void on_upload_completion();
248
    void on_download_completion();
249
    void on_suspended(const SessionErrorInfo& error_info);
250
    void on_resumed();
251
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
252
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
253
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
254
    void on_flx_sync_version_complete(int64_t version);
255

256
    void report_progress(bool only_if_new_uploadable_data = false);
257

258
    friend class SessionWrapperStack;
259
    friend class ClientImpl::Session;
260
};
261

262

263
// ################ SessionWrapperStack ################
264

265
inline bool SessionWrapperStack::empty() const noexcept
266
{
×
267
    return !m_back;
×
268
}
×
269

270

271
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
272
{
9,702✔
273
    REALM_ASSERT(!w->m_next);
9,702✔
274
    w->m_next = m_back;
9,702✔
275
    m_back = w.release();
9,702✔
276
}
9,702✔
277

278

279
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
280
{
24,294✔
281
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
24,294✔
282
    if (m_back) {
24,294✔
283
        m_back = m_back->m_next;
9,702✔
284
        w->m_next = nullptr;
9,702✔
285
    }
9,702✔
286
    return w;
24,294✔
287
}
24,294✔
288

289

290
inline void SessionWrapperStack::clear() noexcept
291
{
24,064✔
292
    while (m_back) {
24,064✔
293
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
294
        m_back = w->m_next;
×
295
    }
×
296
}
24,064✔
297

298

299
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
300
    : m_back{q.m_back}
301
{
302
    q.m_back = nullptr;
303
}
304

305

306
SessionWrapperStack::~SessionWrapperStack()
307
{
24,064✔
308
    clear();
24,064✔
309
}
24,064✔
310

311

312
// ################ ClientImpl ################
313

314

315
ClientImpl::~ClientImpl()
316
{
9,474✔
317
    // Since no other thread is allowed to be accessing this client or any of
4,668✔
318
    // its subobjects at this time, no mutex locking is necessary.
4,668✔
319

4,668✔
320
    shutdown_and_wait();
9,474✔
321
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,668✔
322
    // are abandoned.
4,668✔
323
    REALM_ASSERT(m_stopped);
9,474✔
324
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,474✔
325
}
9,474✔
326

327

328
void ClientImpl::cancel_reconnect_delay()
329
{
1,728✔
330
    // Thread safety required
896✔
331
    post([this](Status status) {
1,728✔
332
        if (status == ErrorCodes::OperationAborted)
1,728✔
333
            return;
×
334
        else if (!status.is_ok())
1,728✔
335
            throw Exception(status);
×
336

896✔
337
        for (auto& p : m_server_slots) {
1,728✔
338
            ServerSlot& slot = p.second;
1,728✔
339
            if (m_one_connection_per_session) {
1,728✔
340
                REALM_ASSERT(!slot.connection);
×
341
                for (const auto& p : slot.alt_connections) {
×
342
                    ClientImpl::Connection& conn = *p.second;
×
343
                    conn.resume_active_sessions(); // Throws
×
344
                    conn.cancel_reconnect_delay(); // Throws
×
345
                }
×
346
            }
×
347
            else {
1,728✔
348
                REALM_ASSERT(slot.alt_connections.empty());
1,728✔
349
                if (slot.connection) {
1,728✔
350
                    ClientImpl::Connection& conn = *slot.connection;
1,724✔
351
                    conn.resume_active_sessions(); // Throws
1,724✔
352
                    conn.cancel_reconnect_delay(); // Throws
1,724✔
353
                }
1,724✔
354
                else {
4✔
355
                    slot.reconnect_info.reset();
4✔
356
                }
4✔
357
            }
1,728✔
358
        }
1,728✔
359
    }); // Throws
1,728✔
360
}
1,728✔
361

362

363
void ClientImpl::voluntary_disconnect_all_connections()
364
{
12✔
365
    auto done_pf = util::make_promise_future<void>();
12✔
366
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
367
        if (status == ErrorCodes::OperationAborted) {
12✔
368
            return;
×
369
        }
×
370

6✔
371
        REALM_ASSERT(status.is_ok());
12✔
372

6✔
373
        try {
12✔
374
            for (auto& p : m_server_slots) {
12✔
375
                ServerSlot& slot = p.second;
12✔
376
                if (m_one_connection_per_session) {
12✔
377
                    REALM_ASSERT(!slot.connection);
×
378
                    for (const auto& p : slot.alt_connections) {
×
379
                        ClientImpl::Connection& conn = *p.second;
×
380
                        if (conn.get_state() == ConnectionState::disconnected) {
×
381
                            continue;
×
382
                        }
×
383
                        conn.voluntary_disconnect();
×
384
                    }
×
385
                }
×
386
                else {
12✔
387
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
388
                    if (!slot.connection) {
12✔
389
                        continue;
×
390
                    }
×
391
                    ClientImpl::Connection& conn = *slot.connection;
12✔
392
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
393
                        continue;
×
394
                    }
×
395
                    conn.voluntary_disconnect();
12✔
396
                }
12✔
397
            }
12✔
398
        }
12✔
399
        catch (...) {
6✔
400
            promise.set_error(exception_to_status());
×
401
            return;
×
402
        }
×
403
        promise.emplace_value();
12✔
404
    });
12✔
405
    done_pf.future.get();
12✔
406
}
12✔
407

408

409
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
410
{
372✔
411
    // Thread safety required
36✔
412

36✔
413
    {
372✔
414
        std::lock_guard lock{m_mutex};
372✔
415
        m_sessions_terminated = false;
372✔
416
    }
372✔
417

36✔
418
    // The technique employed here relies on the fact that
36✔
419
    // actualize_and_finalize_session_wrappers() must get to execute at least
36✔
420
    // once before the post handler submitted below gets to execute, but still
36✔
421
    // at a time where all session wrappers, that are abandoned prior to the
36✔
422
    // execution of wait_for_session_terminations_or_client_stopped(), have been
36✔
423
    // added to `m_abandoned_session_wrappers`.
36✔
424
    //
36✔
425
    // To see that this is the case, consider a session wrapper that was
36✔
426
    // abandoned before wait_for_session_terminations_or_client_stopped() was
36✔
427
    // invoked. Then the session wrapper will have been added to
36✔
428
    // `m_abandoned_session_wrappers`, and an invocation of
36✔
429
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
36✔
430
    // guarantees mentioned in the documentation of Trigger then ensure
36✔
431
    // that at least one execution of actualize_and_finalize_session_wrappers()
36✔
432
    // will happen after the session wrapper has been added to
36✔
433
    // `m_abandoned_session_wrappers`, but before the post handler submitted
36✔
434
    // below gets to execute.
36✔
435
    post([this](Status status) mutable {
372✔
436
        if (status == ErrorCodes::OperationAborted)
372✔
437
            return;
×
438
        else if (!status.is_ok())
372✔
439
            throw Exception(status);
×
440

36✔
441
        std::lock_guard lock{m_mutex};
372✔
442
        m_sessions_terminated = true;
372✔
443
        m_wait_or_client_stopped_cond.notify_all();
372✔
444
    }); // Throws
372✔
445

36✔
446
    bool completion_condition_was_satisfied;
372✔
447
    {
372✔
448
        std::unique_lock lock{m_mutex};
372✔
449
        while (!m_sessions_terminated && !m_stopped)
718✔
450
            m_wait_or_client_stopped_cond.wait(lock);
346✔
451
        completion_condition_was_satisfied = !m_stopped;
372✔
452
    }
372✔
453
    return completion_condition_was_satisfied;
372✔
454
}
372✔
455

456

457
void ClientImpl::drain_connections_on_loop()
458
{
9,474✔
459
    post([this](Status status) mutable {
9,474✔
460
        REALM_ASSERT(status.is_ok());
9,474✔
461
        drain_connections();
9,474✔
462
    });
9,474✔
463
}
9,474✔
464

465
void ClientImpl::shutdown_and_wait()
466
{
10,226✔
467
    shutdown();
10,226✔
468
    std::unique_lock lock{m_drain_mutex};
10,226✔
469
    if (m_drained) {
10,226✔
470
        return;
748✔
471
    }
748✔
472

4,670✔
473
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,478✔
474
    m_drain_cv.wait(lock, [&] {
20,658✔
475
        return m_num_connections == 0 && m_outstanding_posts == 0;
20,658✔
476
    });
20,658✔
477

4,670✔
478
    m_drained = true;
9,478✔
479
}
9,478✔
480

481
void ClientImpl::shutdown() noexcept
482
{
19,778✔
483
    {
19,778✔
484
        std::lock_guard lock{m_mutex};
19,778✔
485
        if (m_stopped)
19,778✔
486
            return;
10,304✔
487
        m_stopped = true;
9,474✔
488
        m_wait_or_client_stopped_cond.notify_all();
9,474✔
489
    }
9,474✔
490

4,668✔
491
    drain_connections_on_loop();
9,474✔
492
}
9,474✔
493

494

495
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
496
{
9,874✔
497
    // Thread safety required.
4,764✔
498

4,764✔
499
    std::lock_guard lock{m_mutex};
9,874✔
500
    REALM_ASSERT(m_actualize_and_finalize);
9,874✔
501
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
9,874✔
502
    bool retrigger = !m_actualize_and_finalize_needed;
9,874✔
503
    m_actualize_and_finalize_needed = true;
9,874✔
504
    // The conditional triggering needs to happen before releasing the mutex,
4,764✔
505
    // because if two threads call register_unactualized_session_wrapper()
4,764✔
506
    // roughly concurrently, then only the first one is guaranteed to be asked
4,764✔
507
    // to retrigger, but that retriggering must have happened before the other
4,764✔
508
    // thread returns from register_unactualized_session_wrapper().
4,764✔
509
    //
4,764✔
510
    // Note that a similar argument applies when two threads call
4,764✔
511
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,764✔
512
    // them and another thread call the other.
4,764✔
513
    if (retrigger)
9,874✔
514
        m_actualize_and_finalize->trigger();
5,590✔
515
}
9,874✔
516

517

518
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
519
{
9,874✔
520
    // Thread safety required.
4,764✔
521

4,764✔
522
    std::lock_guard lock{m_mutex};
9,874✔
523
    REALM_ASSERT(m_actualize_and_finalize);
9,874✔
524

4,764✔
525
    // If the session wrapper has not yet been actualized (on the event loop
4,764✔
526
    // thread), it can be immediately finalized. This ensures that we will
4,764✔
527
    // generally not actualize a session wrapper that has already been
4,764✔
528
    // abandoned.
4,764✔
529
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
9,874✔
530
    if (i != m_unactualized_session_wrappers.end()) {
9,874✔
531
        m_unactualized_session_wrappers.erase(i);
172✔
532
        wrapper->finalize_before_actualization();
172✔
533
        return;
172✔
534
    }
172✔
535
    m_abandoned_session_wrappers.push(std::move(wrapper));
9,702✔
536
    bool retrigger = !m_actualize_and_finalize_needed;
9,702✔
537
    m_actualize_and_finalize_needed = true;
9,702✔
538
    // The conditional triggering needs to happen before releasing the
4,672✔
539
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,672✔
540
    // details.
4,672✔
541
    if (retrigger)
9,702✔
542
        m_actualize_and_finalize->trigger();
9,002✔
543
}
9,702✔
544

545

546
// Must be called from the event loop thread.
547
void ClientImpl::actualize_and_finalize_session_wrappers()
548
{
14,592✔
549
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
14,592✔
550
    SessionWrapperStack abandoned_session_wrappers;
14,592✔
551
    bool stopped;
14,592✔
552
    {
14,592✔
553
        std::lock_guard lock{m_mutex};
14,592✔
554
        m_actualize_and_finalize_needed = false;
14,592✔
555
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
14,592✔
556
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
14,592✔
557
        stopped = m_stopped;
14,592✔
558
    }
14,592✔
559
    // Note, we need to finalize old session wrappers before we actualize new
7,042✔
560
    // ones. This ensures that deactivation of old sessions is initiated before
7,042✔
561
    // new session are activated. This, in turn, ensures that the server does
7,042✔
562
    // not see two overlapping sessions for the same local Realm file.
7,042✔
563
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
24,294✔
564
        wrapper->finalize(); // Throws
9,702✔
565
    if (stopped) {
14,592✔
566
        for (auto& p : unactualized_session_wrappers) {
396✔
567
            SessionWrapper& wrapper = *p.first;
4✔
568
            wrapper.finalize_before_actualization();
4✔
569
        }
4✔
570
        return;
756✔
571
    }
756✔
572
    for (auto& p : unactualized_session_wrappers) {
13,836✔
573
        SessionWrapper& wrapper = *p.first;
9,698✔
574
        ServerEndpoint server_endpoint = std::move(p.second);
9,698✔
575
        wrapper.actualize(std::move(server_endpoint)); // Throws
9,698✔
576
    }
9,698✔
577
}
13,836✔
578

579

580
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
581
                                                   const std::string& authorization_header_name,
582
                                                   const std::map<std::string, std::string>& custom_http_headers,
583
                                                   bool verify_servers_ssl_certificate,
584
                                                   Optional<std::string> ssl_trust_certificate_path,
585
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
586
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
587
{
9,694✔
588
    auto&& [server_slot_it, inserted] =
9,694✔
589
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
9,694✔
590
    ServerSlot& server_slot = server_slot_it->second; // Throws
9,694✔
591

4,668✔
592
    // TODO: enable multiplexing with proxies
4,668✔
593
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
9,694✔
594
        // Use preexisting connection
3,476✔
595
        REALM_ASSERT(server_slot.alt_connections.empty());
7,174✔
596
        return *server_slot.connection;
7,174✔
597
    }
7,174✔
598

1,192✔
599
    // Create a new connection
1,192✔
600
    REALM_ASSERT(!server_slot.connection);
2,520✔
601
    connection_ident_type ident = m_prev_connection_ident + 1;
2,520✔
602
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,520✔
603
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,520✔
604
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,520✔
605
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,520✔
606
    ClientImpl::Connection& conn = *conn_2;
2,520✔
607
    if (!m_one_connection_per_session) {
2,520✔
608
        server_slot.connection = std::move(conn_2);
2,506✔
609
    }
2,506✔
610
    else {
14✔
611
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
14✔
612
    }
14✔
613
    m_prev_connection_ident = ident;
2,520✔
614
    was_created = true;
2,520✔
615
    {
2,520✔
616
        std::lock_guard lk(m_drain_mutex);
2,520✔
617
        ++m_num_connections;
2,520✔
618
    }
2,520✔
619
    return conn;
2,520✔
620
}
2,520✔
621

622

623
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
624
{
2,520✔
625
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,520✔
626
    auto i = m_server_slots.find(endpoint);
2,520✔
627
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,520✔
628
    ServerSlot& server_slot = i->second;
2,520✔
629
    if (!m_one_connection_per_session) {
2,520✔
630
        REALM_ASSERT(server_slot.alt_connections.empty());
2,508✔
631
        REALM_ASSERT(&*server_slot.connection == &conn);
2,508✔
632
        server_slot.reconnect_info = conn.get_reconnect_info();
2,508✔
633
        server_slot.connection.reset();
2,508✔
634
    }
2,508✔
635
    else {
12✔
636
        REALM_ASSERT(!server_slot.connection);
12✔
637
        connection_ident_type ident = conn.get_ident();
12✔
638
        auto j = server_slot.alt_connections.find(ident);
12✔
639
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
12✔
640
        REALM_ASSERT(&*j->second == &conn);
12✔
641
        server_slot.alt_connections.erase(j);
12✔
642
    }
12✔
643

1,192✔
644
    {
2,520✔
645
        std::lock_guard lk(m_drain_mutex);
2,520✔
646
        REALM_ASSERT(m_num_connections);
2,520✔
647
        --m_num_connections;
2,520✔
648
        m_drain_cv.notify_all();
2,520✔
649
    }
2,520✔
650
}
2,520✔
651

652

653
// ################ SessionImpl ################
654

655
void SessionImpl::force_close()
656
{
136✔
657
    // Allow force_close() if session is active or hasn't been activated yet.
66✔
658
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
136!
659
        m_wrapper.force_close();
136✔
660
    }
136✔
661
}
136✔
662

663
void SessionImpl::on_connection_state_changed(ConnectionState state,
664
                                              const util::Optional<SessionErrorInfo>& error_info)
665
{
10,670✔
666
    // Only used to report errors back to the SyncSession while the Session is active
5,312✔
667
    if (m_state == SessionImpl::Active) {
10,670✔
668
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,670✔
669
    }
10,670✔
670
}
10,670✔
671

672

673
const std::string& SessionImpl::get_virt_path() const noexcept
674
{
6,920✔
675
    // Can only be called if the session is active or being activated
3,412✔
676
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
6,920!
677
    return m_wrapper.m_virt_path;
6,920✔
678
}
6,920✔
679

680
const std::string& SessionImpl::get_realm_path() const noexcept
681
{
9,966✔
682
    // Can only be called if the session is active or being activated
4,804✔
683
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,966✔
684
    return m_wrapper.m_db->get_path();
9,966✔
685
}
9,966✔
686

687
DBRef SessionImpl::get_db() const noexcept
688
{
23,718✔
689
    // Can only be called if the session is active or being activated
12,884✔
690
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
23,718!
691
    return m_wrapper.m_db;
23,718✔
692
}
23,718✔
693

694
ClientReplication& SessionImpl::get_repl() const noexcept
695
{
111,724✔
696
    // Can only be called if the session is active or being activated
57,032✔
697
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
111,724✔
698
    return m_wrapper.get_replication();
111,724✔
699
}
111,724✔
700

701
ClientHistory& SessionImpl::get_history() const noexcept
702
{
110,220✔
703
    return get_repl().get_history();
110,220✔
704
}
110,220✔
705

706
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
707
{
12,952✔
708
    // Can only be called if the session is active or being activated
6,216✔
709
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
12,952✔
710
    return m_wrapper.m_client_reset_config;
12,952✔
711
}
12,952✔
712

713
SessionReason SessionImpl::get_session_reason() noexcept
714
{
1,418✔
715
    // Can only be called if the session is active or being activated
736✔
716
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,418!
717
    return m_wrapper.m_session_reason;
1,418✔
718
}
1,418✔
719

720
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
721
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
722
{
42,626✔
723
    // Ignore the call if the session is not active
22,736✔
724
    if (m_state != State::Active) {
42,626✔
725
        return;
×
726
    }
×
727

22,736✔
728
    try {
42,626✔
729
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
42,626!
730
        if (simulate_integration_error) {
42,626✔
731
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
732
        }
×
733
        version_type client_version;
42,626✔
734
        if (REALM_LIKELY(!get_client().is_dry_run())) {
42,628✔
735
            VersionInfo version_info;
42,628✔
736
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
42,628✔
737
            client_version = version_info.realm_version;
42,628✔
738
        }
42,628✔
739
        else {
2,147,483,647✔
740
            // Fake it for "dry run" mode
2,147,483,647✔
741
            client_version = m_last_version_available + 1;
2,147,483,647✔
742
        }
2,147,483,647✔
743
        on_changesets_integrated(client_version, progress); // Throws
42,626✔
744
    }
42,626✔
745
    catch (const IntegrationException& e) {
22,748✔
746
        on_integration_failure(e);
24✔
747
    }
24✔
748
    m_wrapper.on_sync_progress(); // Throws
42,626✔
749
}
42,626✔
750

751

752
void SessionImpl::on_upload_completion()
753
{
14,672✔
754
    // Ignore the call if the session is not active
7,248✔
755
    if (m_state == State::Active) {
14,672✔
756
        m_wrapper.on_upload_completion(); // Throws
14,672✔
757
    }
14,672✔
758
}
14,672✔
759

760

761
void SessionImpl::on_download_completion()
762
{
15,488✔
763
    // Ignore the call if the session is not active
7,642✔
764
    if (m_state == State::Active) {
15,488✔
765
        m_wrapper.on_download_completion(); // Throws
15,488✔
766
    }
15,488✔
767
}
15,488✔
768

769

770
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
771
{
924✔
772
    // Ignore the call if the session is not active
484✔
773
    if (m_state == State::Active) {
924✔
774
        m_wrapper.on_suspended(error_info); // Throws
924✔
775
    }
924✔
776
}
924✔
777

778

779
void SessionImpl::on_resumed()
780
{
378✔
781
    // Ignore the call if the session is not active
214✔
782
    if (m_state == State::Active) {
378✔
783
        m_wrapper.on_resumed(); // Throws
378✔
784
    }
378✔
785
}
378✔
786

787
void SessionImpl::handle_pending_client_reset_acknowledgement()
788
{
290✔
789
    // Ignore the call if the session is not active
144✔
790
    if (m_state == State::Active) {
290✔
791
        m_wrapper.handle_pending_client_reset_acknowledgement();
290✔
792
    }
290✔
793
}
290✔
794

795
void SessionImpl::update_subscription_version_info()
796
{
272✔
797
    // Ignore the call if the session is not active
136✔
798
    if (m_state == State::Active) {
272✔
799
        m_wrapper.update_subscription_version_info();
272✔
800
    }
272✔
801
}
272✔
802

803
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
804
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
805
{
44,282✔
806
    // Ignore the call if the session is not active
23,564✔
807
    if (m_state != State::Active) {
44,282✔
808
        return false;
×
809
    }
×
810

23,564✔
811
    if (is_steady_state_download_message(batch_state, query_version)) {
44,282✔
812
        return false;
42,628✔
813
    }
42,628✔
814

826✔
815
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,654✔
816
    util::Optional<SyncProgress> maybe_progress;
1,654✔
817
    if (batch_state == DownloadBatchState::LastInBatch) {
1,654✔
818
        maybe_progress = progress;
1,496✔
819
    }
1,496✔
820

826✔
821
    bool new_batch = false;
1,654✔
822
    try {
1,654✔
823
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,654✔
824
    }
1,654✔
825
    catch (const LogicError& ex) {
826✔
826
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
827
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
828
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
829
                                    ProtocolError::bad_changeset_size);
×
830
            on_integration_failure(ex);
×
831
            return true;
×
832
        }
×
833
        throw;
×
834
    }
×
835

828✔
836
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
828✔
837
    // bootstrapping.
828✔
838
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,656✔
839
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
40✔
840
    }
40✔
841

828✔
842
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,656✔
843
                                       batch_state, received_changesets.size());
1,656✔
844
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,656✔
845
        return true;
12✔
846
    }
12✔
847
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,644✔
848

822✔
849
    if (batch_state == DownloadBatchState::MoreToCome) {
1,644✔
850
        return true;
156✔
851
    }
156✔
852

744✔
853
    try {
1,488✔
854
        process_pending_flx_bootstrap();
1,488✔
855
    }
1,488✔
856
    catch (const IntegrationException& e) {
748✔
857
        on_integration_failure(e);
8✔
858
    }
8✔
859

744✔
860
    return true;
1,488✔
861
}
1,488✔
862

863

864
void SessionImpl::process_pending_flx_bootstrap()
865
{
11,182✔
866
    // Ignore the call if not a flx session or session is not active
5,412✔
867
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,182✔
868
        return;
8,640✔
869
    }
8,640✔
870
    // Should never be called if session is not active
1,272✔
871
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,542✔
872
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,542✔
873
    if (!bootstrap_store->has_pending()) {
2,542✔
874
        return;
1,038✔
875
    }
1,038✔
876

752✔
877
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,504✔
878
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,504✔
879
                "changeset size: %3)",
1,504✔
880
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,504✔
881
                pending_batch_stats.pending_changeset_bytes);
1,504✔
882
    auto& history = get_repl().get_history();
1,504✔
883
    VersionInfo new_version;
1,504✔
884
    SyncProgress progress;
1,504✔
885
    int64_t query_version = -1;
1,504✔
886
    size_t changesets_processed = 0;
1,504✔
887

752✔
888
    // Used to commit each batch after it was transformed.
752✔
889
    TransactionRef transact = get_db()->start_write();
1,504✔
890
    while (bootstrap_store->has_pending()) {
3,124✔
891
        auto start_time = std::chrono::steady_clock::now();
1,636✔
892
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,636✔
893
        if (!pending_batch.progress) {
1,636✔
894
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
895
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
896
            // bootstrap store requires a write transaction itself.
4✔
897
            transact->close();
8✔
898
            bootstrap_store->clear();
8✔
899
            return;
8✔
900
        }
8✔
901

814✔
902
        auto batch_state =
1,628✔
903
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,560✔
904
        uint64_t downloadable_bytes = 0;
1,628✔
905
        query_version = pending_batch.query_version;
1,628✔
906
        bool simulate_integration_error =
1,628✔
907
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,628✔
908
        if (simulate_integration_error) {
1,628✔
909
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
8✔
910
        }
8✔
911

810✔
912
        history.integrate_server_changesets(
1,620✔
913
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,620✔
914
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,618✔
915
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,616✔
916
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,616✔
917
            });
1,616✔
918
        progress = *pending_batch.progress;
1,620✔
919
        changesets_processed += pending_batch.changesets.size();
1,620✔
920
        auto duration = std::chrono::steady_clock::now() - start_time;
1,620✔
921

810✔
922
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,620✔
923
                                      batch_state, pending_batch.changesets.size());
1,620✔
924
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,620✔
925

810✔
926
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,620✔
927
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,620✔
928
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,620✔
929
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,620✔
930
                    pending_batch.remaining_changesets);
1,620✔
931
    }
1,620✔
932
    on_changesets_integrated(new_version.realm_version, progress);
1,496✔
933

744✔
934
    REALM_ASSERT_3(query_version, !=, -1);
1,488✔
935
    m_wrapper.on_sync_progress();
1,488✔
936
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,488✔
937

744✔
938
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,488✔
939
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,488✔
940
    // NoAction/EarlyReturn are both valid no-op actions to take here.
744✔
941
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,488✔
942
}
1,488✔
943

944
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
945
{
16✔
946
    // Ignore the call if the session is not active
8✔
947
    if (m_state == State::Active) {
16✔
948
        m_wrapper.on_flx_sync_error(version, err_msg);
16✔
949
    }
16✔
950
}
16✔
951

952
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
953
{
1,524✔
954
    // Ignore the call if the session is not active
762✔
955
    if (m_state == State::Active) {
1,524✔
956
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,524✔
957
    }
1,524✔
958
}
1,524✔
959

960
SubscriptionStore* SessionImpl::get_flx_subscription_store()
961
{
13,582✔
962
    // Should never be called if session is not active
6,940✔
963
    REALM_ASSERT_EX(m_state == State::Active, m_state);
13,582✔
964
    return m_wrapper.get_flx_subscription_store();
13,582✔
965
}
13,582✔
966

967
MigrationStore* SessionImpl::get_migration_store()
968
{
59,358✔
969
    // Should never be called if session is not active
31,406✔
970
    REALM_ASSERT_EX(m_state == State::Active, m_state);
59,358✔
971
    return m_wrapper.get_migration_store();
59,358✔
972
}
59,358✔
973

974
void SessionImpl::on_flx_sync_version_complete(int64_t version)
975
{
232✔
976
    // Ignore the call if the session is not active
116✔
977
    if (m_state == State::Active) {
232✔
978
        m_wrapper.on_flx_sync_version_complete(version);
232✔
979
    }
232✔
980
}
232✔
981

982
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
983
{
956✔
984
    // Should never be called if session is not active
484✔
985
    REALM_ASSERT_EX(m_state == State::Active, m_state);
956✔
986

484✔
987
    // Make sure we don't call the debug hook recursively.
484✔
988
    if (m_wrapper.m_in_debug_hook) {
956✔
989
        return SyncClientHookAction::NoAction;
×
990
    }
×
991
    m_wrapper.m_in_debug_hook = true;
956✔
992
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
956✔
993
        m_wrapper.m_in_debug_hook = false;
956✔
994
    });
956✔
995

484✔
996
    auto action = m_wrapper.m_debug_hook(data);
956✔
997
    switch (action) {
956✔
998
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
✔
NEW
999
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false},
×
NEW
1000
                                      ProtocolErrorInfo::Action::Transient);
×
1001

1002
            auto err_processing_err = receive_error_message(err_info);
×
1003
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
1004
            return SyncClientHookAction::EarlyReturn;
×
1005
        }
×
1006
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1007
            get_connection().voluntary_disconnect();
24✔
1008
            return SyncClientHookAction::EarlyReturn;
24✔
1009
        }
×
1010
        default:
932✔
1011
            return action;
932✔
1012
    }
956✔
1013
}
956✔
1014

1015
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1016
                                                  int64_t query_version, DownloadBatchState batch_state,
1017
                                                  size_t num_changesets)
1018
{
91,674✔
1019
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
91,674✔
1020
        return SyncClientHookAction::NoAction;
90,868✔
1021
    }
90,868✔
1022
    if (REALM_UNLIKELY(m_state != State::Active)) {
806✔
1023
        return SyncClientHookAction::NoAction;
×
1024
    }
×
1025

402✔
1026
    SyncClientHookData data;
806✔
1027
    data.event = event;
806✔
1028
    data.batch_state = batch_state;
806✔
1029
    data.progress = progress;
806✔
1030
    data.num_changesets = num_changesets;
806✔
1031
    data.query_version = query_version;
806✔
1032

402✔
1033
    return call_debug_hook(data);
806✔
1034
}
806✔
1035

1036
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1037
{
1,820✔
1038
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,820✔
1039
        return SyncClientHookAction::NoAction;
1,672✔
1040
    }
1,672✔
1041
    if (REALM_UNLIKELY(m_state != State::Active)) {
148✔
1042
        return SyncClientHookAction::NoAction;
×
1043
    }
×
1044

80✔
1045
    SyncClientHookData data;
148✔
1046
    data.event = event;
148✔
1047
    data.batch_state = DownloadBatchState::SteadyState;
148✔
1048
    data.progress = m_progress;
148✔
1049
    data.num_changesets = 0;
148✔
1050
    data.query_version = 0;
148✔
1051
    data.error_info = &error_info;
148✔
1052

80✔
1053
    return call_debug_hook(data);
148✔
1054
}
148✔
1055

1056
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1057
{
88,570✔
1058
    // Should never be called if session is not active
47,134✔
1059
    REALM_ASSERT_EX(m_state == State::Active, m_state);
88,570✔
1060
    if (batch_state == DownloadBatchState::SteadyState) {
88,570✔
1061
        return true;
42,628✔
1062
    }
42,628✔
1063

24,396✔
1064
    if (!m_is_flx_sync_session) {
45,942✔
1065
        return true;
41,734✔
1066
    }
41,734✔
1067

2,104✔
1068
    // If this is a steady state DOWNLOAD, no need for special handling.
2,104✔
1069
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,208✔
1070
        return true;
896✔
1071
    }
896✔
1072

1,656✔
1073
    return false;
3,312✔
1074
}
3,312✔
1075

1076
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1077
{
68✔
1078
    if (m_state != State::Active) {
68✔
1079
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1080
    }
×
1081

34✔
1082
    try {
68✔
1083
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
68✔
1084
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
68✔
1085
            return Status{ErrorCodes::LogicError,
4✔
1086
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1087
        }
4✔
1088
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
64✔
1089
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1090
        }
×
1091
    }
4✔
1092
    catch (const nlohmann::json::parse_error& e) {
4✔
1093
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1094
    }
4✔
1095

30✔
1096
    auto pf = util::make_promise_future<std::string>();
60✔
1097

30✔
1098
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
60✔
1099
        // Includes operation_aborted
30✔
1100
        if (!status.is_ok())
60✔
1101
            promise.set_error(status);
×
1102

30✔
1103
        auto id = ++m_last_pending_test_command_ident;
60✔
1104
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
60✔
1105
        ensure_enlisted_to_send();
60✔
1106
    });
60✔
1107

30✔
1108
    return std::move(pf.future);
60✔
1109
}
60✔
1110

1111
// ################ SessionWrapper ################
1112

1113
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1114
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1115
// the ClientImpl::Connection that owns the ClientImpl::Session.
1116
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1117
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1118
    : m_client{client}
1119
    , m_db(std::move(db))
1120
    , m_replication(m_db->get_replication())
1121
    , m_protocol_envelope{config.protocol_envelope}
1122
    , m_server_address{std::move(config.server_address)}
1123
    , m_server_port{config.server_port}
1124
    , m_user_id(std::move(config.user_id))
1125
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1126
    , m_authorization_header_name{config.authorization_header_name}
1127
    , m_custom_http_headers{config.custom_http_headers}
1128
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1129
    , m_simulate_integration_error{config.simulate_integration_error}
1130
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1131
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1132
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1133
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1134
    , m_virt_path{std::move(config.realm_identifier)}
1135
    , m_signed_access_token{std::move(config.signed_user_token)}
1136
    , m_client_reset_config{std::move(config.client_reset_config)}
1137
    , m_proxy_config{config.proxy_config} // Throws
1138
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1139
    , m_session_reason(config.session_reason)
1140
    , m_flx_subscription_store(std::move(flx_sub_store))
1141
    , m_migration_store(std::move(migration_store))
1142
{
11,026✔
1143
    REALM_ASSERT(m_db);
11,026✔
1144
    REALM_ASSERT(m_db->get_replication());
11,026✔
1145
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,026✔
1146
    if (m_client_reset_config) {
11,026✔
1147
        m_session_reason = SessionReason::ClientReset;
348✔
1148
    }
348✔
1149

5,340✔
1150
    update_subscription_version_info();
11,026✔
1151
}
11,026✔
1152

1153
SessionWrapper::~SessionWrapper() noexcept
1154
{
11,026✔
1155
    if (m_db && m_actualized) {
11,026✔
1156
        m_db->remove_commit_listener(this);
172✔
1157
        m_db->release_sync_agent();
172✔
1158
    }
172✔
1159
}
11,026✔
1160

1161

1162
inline ClientReplication& SessionWrapper::get_replication() noexcept
1163
{
111,724✔
1164
    REALM_ASSERT(m_db);
111,724✔
1165
    return static_cast<ClientReplication&>(*m_replication);
111,724✔
1166
}
111,724✔
1167

1168

1169
inline ClientImpl& SessionWrapper::get_client() noexcept
1170
{
72✔
1171
    return m_client;
72✔
1172
}
72✔
1173

1174
bool SessionWrapper::has_flx_subscription_store() const
1175
{
1,524✔
1176
    return static_cast<bool>(m_flx_subscription_store);
1,524✔
1177
}
1,524✔
1178

1179
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1180
{
16✔
1181
    REALM_ASSERT(!m_finalized);
16✔
1182
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
16✔
1183
    mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
16✔
1184
    mut_subs.commit();
16✔
1185
}
16✔
1186

1187
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1188
{
1,716✔
1189
    REALM_ASSERT(!m_finalized);
1,716✔
1190
    m_flx_last_seen_version = version;
1,716✔
1191
    m_flx_active_version = version;
1,716✔
1192
}
1,716✔
1193

1194
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1195
{
1,524✔
1196
    if (!has_flx_subscription_store()) {
1,524✔
1197
        return;
×
1198
    }
×
1199
    REALM_ASSERT(!m_finalized);
1,524✔
1200
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,524✔
1201
    REALM_ASSERT(new_version >= m_flx_active_version);
1,524✔
1202
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,524✔
1203

762✔
1204
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,524✔
1205

762✔
1206
    switch (batch_state) {
1,524✔
1207
        case DownloadBatchState::SteadyState:
✔
1208
            // Cannot be called with this value.
1209
            REALM_UNREACHABLE();
1210
        case DownloadBatchState::LastInBatch:
1,484✔
1211
            if (m_flx_active_version == new_version) {
1,484✔
1212
                return;
×
1213
            }
×
1214
            on_flx_sync_version_complete(new_version);
1,484✔
1215
            if (new_version == 0) {
1,484✔
1216
                new_state = SubscriptionSet::State::Complete;
688✔
1217
            }
688✔
1218
            else {
796✔
1219
                new_state = SubscriptionSet::State::AwaitingMark;
796✔
1220
                m_flx_pending_mark_version = new_version;
796✔
1221
            }
796✔
1222
            break;
1,484✔
1223
        case DownloadBatchState::MoreToCome:
762✔
1224
            if (m_flx_last_seen_version == new_version) {
40✔
1225
                return;
×
1226
            }
×
1227

20✔
1228
            m_flx_last_seen_version = new_version;
40✔
1229
            new_state = SubscriptionSet::State::Bootstrapping;
40✔
1230
            break;
40✔
1231
    }
1,524✔
1232

762✔
1233
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
1,524✔
1234
    mut_subs.update_state(new_state);
1,524✔
1235
    mut_subs.commit();
1,524✔
1236
}
1,524✔
1237

1238
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1239
{
15,122✔
1240
    REALM_ASSERT(!m_finalized);
15,122✔
1241
    return m_flx_subscription_store.get();
15,122✔
1242
}
15,122✔
1243

1244
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1245
{
4,198✔
1246
    REALM_ASSERT(!m_finalized);
4,198✔
1247
    return m_flx_pending_bootstrap_store.get();
4,198✔
1248
}
4,198✔
1249

1250
MigrationStore* SessionWrapper::get_migration_store()
1251
{
59,356✔
1252
    REALM_ASSERT(!m_finalized);
59,356✔
1253
    return m_migration_store.get();
59,356✔
1254
}
59,356✔
1255

1256
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1257
{
3,510✔
1258
    REALM_ASSERT(!m_initiated);
3,510✔
1259
    m_progress_handler = std::move(handler);
3,510✔
1260
}
3,510✔
1261

1262

1263
inline void
1264
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1265
{
11,114✔
1266
    REALM_ASSERT(!m_initiated);
11,114✔
1267
    m_connection_state_change_listener = std::move(listener);
11,114✔
1268
}
11,114✔
1269

1270

1271
void SessionWrapper::initiate()
1272
{
9,874✔
1273
    REALM_ASSERT(!m_initiated);
9,874✔
1274
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
9,874✔
1275
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
9,874✔
1276
    m_initiated = true;
9,874✔
1277
    m_db->add_commit_listener(this);
9,874✔
1278
}
9,874✔
1279

1280

1281
void SessionWrapper::on_commit(version_type new_version)
1282
{
106,410✔
1283
    // Thread safety required
53,736✔
1284
    REALM_ASSERT(m_initiated);
106,410✔
1285

53,736✔
1286
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
106,410✔
1287
        return;
4✔
1288
    }
4✔
1289

53,734✔
1290
    util::bind_ptr<SessionWrapper> self{this};
106,406✔
1291
    m_client.post([self = std::move(self), new_version](Status status) {
106,408✔
1292
        if (status == ErrorCodes::OperationAborted)
106,408✔
1293
            return;
×
1294
        else if (!status.is_ok())
106,408✔
1295
            throw Exception(status);
×
1296

53,734✔
1297
        REALM_ASSERT(self->m_actualized);
106,408✔
1298
        if (REALM_UNLIKELY(!self->m_sess))
106,408✔
1299
            return; // Already finalized
54,238✔
1300
        SessionImpl& sess = *self->m_sess;
105,550✔
1301
        sess.recognize_sync_version(new_version); // Throws
105,550✔
1302
        bool only_if_new_uploadable_data = true;
105,550✔
1303
        self->report_progress(only_if_new_uploadable_data); // Throws
105,550✔
1304
    });
105,550✔
1305
}
106,406✔
1306

1307

1308
void SessionWrapper::cancel_reconnect_delay()
1309
{
12✔
1310
    // Thread safety required
6✔
1311
    REALM_ASSERT(m_initiated);
12✔
1312

6✔
1313
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
12✔
1314
        return;
×
1315
    }
×
1316

6✔
1317
    util::bind_ptr<SessionWrapper> self{this};
12✔
1318
    m_client.post([self = std::move(self)](Status status) {
12✔
1319
        if (status == ErrorCodes::OperationAborted)
12✔
1320
            return;
×
1321
        else if (!status.is_ok())
12✔
1322
            throw Exception(status);
×
1323

6✔
1324
        REALM_ASSERT(self->m_actualized);
12✔
1325
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1326
            return; // Already finalized
6✔
1327
        SessionImpl& sess = *self->m_sess;
12✔
1328
        if (!self->m_resumable) {
12✔
1329
            sess.logger.debug("Cannot resume a session that has received a fatal error");
4✔
1330
            return;
4✔
1331
        }
4✔
1332

4✔
1333
        sess.cancel_resumption_delay(); // Throws
8✔
1334
        ClientImpl::Connection& conn = sess.get_connection();
8✔
1335
        conn.cancel_reconnect_delay(); // Throws
8✔
1336
    });                                // Throws
8✔
1337
}
12✔
1338

1339
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1340
                                    WaitOperCompletionHandler handler)
1341
{
4,404✔
1342
    REALM_ASSERT(upload_completion || download_completion);
4,404✔
1343
    REALM_ASSERT(m_initiated);
4,404✔
1344
    REALM_ASSERT(!m_finalized);
4,404✔
1345

2,114✔
1346
    util::bind_ptr<SessionWrapper> self{this};
4,404✔
1347
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,404✔
1348
                   download_completion](Status status) mutable {
4,404✔
1349
        if (status == ErrorCodes::OperationAborted)
4,404✔
1350
            return;
×
1351
        else if (!status.is_ok())
4,404✔
1352
            throw Exception(status);
×
1353

2,114✔
1354
        REALM_ASSERT(self->m_actualized);
4,404✔
1355
        if (REALM_UNLIKELY(!self->m_sess)) {
4,404✔
1356
            // Already finalized
46✔
1357
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
82✔
1358
            return;
82✔
1359
        }
82✔
1360
        if (upload_completion) {
4,322✔
1361
            if (download_completion) {
2,288✔
1362
                // Wait for upload and download completion
136✔
1363
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
272✔
1364
            }
272✔
1365
            else {
2,016✔
1366
                // Wait for upload completion only
916✔
1367
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,016✔
1368
            }
2,016✔
1369
        }
2,288✔
1370
        else {
2,034✔
1371
            // Wait for download completion only
1,016✔
1372
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,034✔
1373
        }
2,034✔
1374
        SessionImpl& sess = *self->m_sess;
4,322✔
1375
        if (upload_completion)
4,322✔
1376
            sess.request_upload_completion_notification(); // Throws
2,288✔
1377
        if (download_completion)
4,322✔
1378
            sess.request_download_completion_notification(); // Throws
2,306✔
1379
    });                                                      // Throws
4,322✔
1380
}
4,404✔
1381

1382

1383
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1384
{
12,996✔
1385
    // Thread safety required
6,498✔
1386
    REALM_ASSERT(m_initiated);
12,996✔
1387
    REALM_ASSERT(!m_finalized);
12,996✔
1388

6,498✔
1389
    std::int_fast64_t target_mark;
12,996✔
1390
    {
12,996✔
1391
        std::lock_guard lock{m_client.m_mutex};
12,996✔
1392
        target_mark = ++m_target_upload_mark;
12,996✔
1393
    }
12,996✔
1394

6,498✔
1395
    util::bind_ptr<SessionWrapper> self{this};
12,996✔
1396
    m_client.post([self = std::move(self), target_mark](Status status) {
12,996✔
1397
        if (status == ErrorCodes::OperationAborted)
12,996✔
1398
            return;
×
1399
        else if (!status.is_ok())
12,996✔
1400
            throw Exception(status);
×
1401

6,498✔
1402
        REALM_ASSERT(self->m_actualized);
12,996✔
1403
        // The session wrapper may already have been finalized. This can only
6,498✔
1404
        // happen if it was abandoned, but in that case, the call of
6,498✔
1405
        // wait_for_upload_complete_or_client_stopped() must have returned
6,498✔
1406
        // already.
6,498✔
1407
        if (REALM_UNLIKELY(!self->m_sess))
12,996✔
1408
            return;
6,508✔
1409
        if (target_mark > self->m_staged_upload_mark) {
12,986✔
1410
            self->m_staged_upload_mark = target_mark;
12,986✔
1411
            SessionImpl& sess = *self->m_sess;
12,986✔
1412
            sess.request_upload_completion_notification(); // Throws
12,986✔
1413
        }
12,986✔
1414
    }); // Throws
12,986✔
1415

6,498✔
1416
    bool completion_condition_was_satisfied;
12,996✔
1417
    {
12,996✔
1418
        std::unique_lock lock{m_client.m_mutex};
12,996✔
1419
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
33,004✔
1420
            m_client.m_wait_or_client_stopped_cond.wait(lock);
20,008✔
1421
        completion_condition_was_satisfied = !m_client.m_stopped;
12,996✔
1422
    }
12,996✔
1423
    return completion_condition_was_satisfied;
12,996✔
1424
}
12,996✔
1425

1426

1427
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1428
{
10,084✔
1429
    // Thread safety required
5,044✔
1430
    REALM_ASSERT(m_initiated);
10,084✔
1431
    REALM_ASSERT(!m_finalized);
10,084✔
1432

5,044✔
1433
    std::int_fast64_t target_mark;
10,084✔
1434
    {
10,084✔
1435
        std::lock_guard lock{m_client.m_mutex};
10,084✔
1436
        target_mark = ++m_target_download_mark;
10,084✔
1437
    }
10,084✔
1438

5,044✔
1439
    util::bind_ptr<SessionWrapper> self{this};
10,084✔
1440
    m_client.post([self = std::move(self), target_mark](Status status) {
10,082✔
1441
        if (status == ErrorCodes::OperationAborted)
10,082✔
1442
            return;
×
1443
        else if (!status.is_ok())
10,082✔
1444
            throw Exception(status);
×
1445

5,044✔
1446
        REALM_ASSERT(self->m_actualized);
10,082✔
1447
        // The session wrapper may already have been finalized. This can only
5,044✔
1448
        // happen if it was abandoned, but in that case, the call of
5,044✔
1449
        // wait_for_download_complete_or_client_stopped() must have returned
5,044✔
1450
        // already.
5,044✔
1451
        if (REALM_UNLIKELY(!self->m_sess))
10,082✔
1452
            return;
5,074✔
1453
        if (target_mark > self->m_staged_download_mark) {
10,022✔
1454
            self->m_staged_download_mark = target_mark;
10,022✔
1455
            SessionImpl& sess = *self->m_sess;
10,022✔
1456
            sess.request_download_completion_notification(); // Throws
10,022✔
1457
        }
10,022✔
1458
    }); // Throws
10,022✔
1459

5,044✔
1460
    bool completion_condition_was_satisfied;
10,084✔
1461
    {
10,084✔
1462
        std::unique_lock lock{m_client.m_mutex};
10,084✔
1463
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,568✔
1464
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,484✔
1465
        completion_condition_was_satisfied = !m_client.m_stopped;
10,084✔
1466
    }
10,084✔
1467
    return completion_condition_was_satisfied;
10,084✔
1468
}
10,084✔
1469

1470

1471
void SessionWrapper::refresh(std::string signed_access_token)
1472
{
208✔
1473
    // Thread safety required
104✔
1474
    REALM_ASSERT(m_initiated);
208✔
1475
    REALM_ASSERT(!m_finalized);
208✔
1476

104✔
1477
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1478
        if (status == ErrorCodes::OperationAborted)
208✔
1479
            return;
×
1480
        else if (!status.is_ok())
208✔
1481
            throw Exception(status);
×
1482

104✔
1483
        REALM_ASSERT(self->m_actualized);
208✔
1484
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1485
            return; // Already finalized
104✔
1486
        self->m_signed_access_token = std::move(token);
208✔
1487
        SessionImpl& sess = *self->m_sess;
208✔
1488
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1489
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1490
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1491
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1492
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1493
    });
208✔
1494
}
208✔
1495

1496

1497
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1498
{
11,026✔
1499
    if (wrapper->m_initiated) {
11,026✔
1500
        ClientImpl& client = wrapper->m_client;
9,874✔
1501
        client.register_abandoned_session_wrapper(std::move(wrapper));
9,874✔
1502
    }
9,874✔
1503
}
11,026✔
1504

1505

1506
// Must be called from event loop thread
1507
void SessionWrapper::actualize(ServerEndpoint endpoint)
1508
{
9,698✔
1509
    REALM_ASSERT(!m_actualized);
9,698✔
1510
    REALM_ASSERT(!m_sess);
9,698✔
1511
    // Cannot be actualized if it's already been finalized or force closed
4,670✔
1512
    REALM_ASSERT(!m_finalized);
9,698✔
1513
    REALM_ASSERT(!m_force_closed);
9,698✔
1514
    try {
9,698✔
1515
        m_db->claim_sync_agent();
9,698✔
1516
    }
9,698✔
1517
    catch (const MultipleSyncAgents&) {
4,672✔
1518
        finalize_before_actualization();
4✔
1519
        throw;
4✔
1520
    }
4✔
1521
    auto sync_mode = endpoint.server_mode;
9,694✔
1522

4,668✔
1523
    bool was_created = false;
9,694✔
1524
    ClientImpl::Connection& conn = m_client.get_connection(
9,694✔
1525
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
9,694✔
1526
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
9,694✔
1527
        was_created); // Throws
9,694✔
1528
    try {
9,694✔
1529
        // FIXME: This only makes sense when each session uses a separate connection.
4,668✔
1530
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
9,694✔
1531
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
9,694✔
1532
        if (sync_mode == SyncServerMode::FLX) {
9,694✔
1533
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,054✔
1534
        }
1,054✔
1535

4,668✔
1536
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
9,694✔
1537
        m_sess = sess.get();
9,694✔
1538
        conn.activate_session(std::move(sess)); // Throws
9,694✔
1539
    }
9,694✔
1540
    catch (...) {
4,668✔
1541
        if (was_created)
×
1542
            m_client.remove_connection(conn);
×
1543

1544
        finalize_before_actualization();
×
1545
        throw;
×
1546
    }
×
1547

4,668✔
1548
    m_actualized = true;
9,694✔
1549
    if (was_created)
9,694✔
1550
        conn.activate(); // Throws
2,516✔
1551

4,668✔
1552
    if (m_connection_state_change_listener) {
9,694✔
1553
        ConnectionState state = conn.get_state();
9,674✔
1554
        if (state != ConnectionState::disconnected) {
9,674✔
1555
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,034✔
1556
            if (state == ConnectionState::connected)
7,034✔
1557
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,684✔
1558
        }
7,034✔
1559
    }
9,674✔
1560

4,668✔
1561
    if (!m_client_reset_config)
9,694✔
1562
        report_progress(); // Throws
9,348✔
1563
}
9,694✔
1564

1565
void SessionWrapper::force_close()
1566
{
136✔
1567
    if (m_force_closed || m_finalized) {
136✔
1568
        return;
×
1569
    }
×
1570
    REALM_ASSERT(m_actualized);
136✔
1571
    REALM_ASSERT(m_sess);
136✔
1572
    m_force_closed = true;
136✔
1573

66✔
1574
    ClientImpl::Connection& conn = m_sess->get_connection();
136✔
1575
    conn.initiate_session_deactivation(m_sess); // Throws
136✔
1576

66✔
1577
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
66✔
1578
    m_flx_pending_bootstrap_store.reset();
136✔
1579
    // Clear the subscription and migration store refs since they are owned by SyncSession
66✔
1580
    m_flx_subscription_store.reset();
136✔
1581
    m_migration_store.reset();
136✔
1582
    m_sess = nullptr;
136✔
1583
    // Everything is being torn down, no need to report connection state anymore
66✔
1584
    m_connection_state_change_listener = {};
136✔
1585
}
136✔
1586

1587
// Must be called from event loop thread
1588
void SessionWrapper::finalize()
1589
{
9,702✔
1590
    REALM_ASSERT(m_actualized);
9,702✔
1591

4,672✔
1592
    // Already finalized?
4,672✔
1593
    if (m_finalized) {
9,702✔
1594
        return;
×
1595
    }
×
1596

4,672✔
1597
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,672✔
1598
    m_db->remove_commit_listener(this);
9,702✔
1599

4,672✔
1600
    m_finalized = true;
9,702✔
1601

4,672✔
1602
    if (!m_force_closed) {
9,702✔
1603
        REALM_ASSERT(m_sess);
9,556✔
1604
        ClientImpl::Connection& conn = m_sess->get_connection();
9,556✔
1605
        conn.initiate_session_deactivation(m_sess); // Throws
9,556✔
1606

4,600✔
1607
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,600✔
1608
        m_flx_pending_bootstrap_store.reset();
9,556✔
1609
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,600✔
1610
        m_flx_subscription_store.reset();
9,556✔
1611
        m_migration_store.reset();
9,556✔
1612
        m_sess = nullptr;
9,556✔
1613
    }
9,556✔
1614

4,672✔
1615
    // The Realm file can be closed now, as no access to the Realm file is
4,672✔
1616
    // supposed to happen on behalf of a session after initiation of
4,672✔
1617
    // deactivation.
4,672✔
1618
    m_db->release_sync_agent();
9,702✔
1619
    m_db = nullptr;
9,702✔
1620

4,672✔
1621
    // All outstanding wait operations must be canceled
4,672✔
1622
    while (!m_upload_completion_handlers.empty()) {
10,084✔
1623
        auto handler = std::move(m_upload_completion_handlers.back());
382✔
1624
        m_upload_completion_handlers.pop_back();
382✔
1625
        handler(
382✔
1626
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
382✔
1627
    }
382✔
1628
    while (!m_download_completion_handlers.empty()) {
9,856✔
1629
        auto handler = std::move(m_download_completion_handlers.back());
154✔
1630
        m_download_completion_handlers.pop_back();
154✔
1631
        handler(
154✔
1632
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
154✔
1633
    }
154✔
1634
    while (!m_sync_completion_handlers.empty()) {
9,712✔
1635
        auto handler = std::move(m_sync_completion_handlers.back());
10✔
1636
        m_sync_completion_handlers.pop_back();
10✔
1637
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
10✔
1638
    }
10✔
1639
}
9,702✔
1640

1641

1642
// Must be called only when an unactualized session wrapper becomes abandoned.
1643
//
1644
// Called with a lock on `m_client.m_mutex`.
1645
inline void SessionWrapper::finalize_before_actualization() noexcept
1646
{
180✔
1647
    REALM_ASSERT(!m_sess);
180✔
1648
    m_actualized = true;
180✔
1649
    m_force_closed = true;
180✔
1650
}
180✔
1651

1652

1653
inline void SessionWrapper::on_sync_progress()
1654
{
44,110✔
1655
    REALM_ASSERT(!m_finalized);
44,110✔
1656
    m_reliable_download_progress = true;
44,110✔
1657
    report_progress(); // Throws
44,110✔
1658
}
44,110✔
1659

1660

1661
void SessionWrapper::on_upload_completion()
1662
{
14,672✔
1663
    REALM_ASSERT(!m_finalized);
14,672✔
1664
    while (!m_upload_completion_handlers.empty()) {
16,384✔
1665
        auto handler = std::move(m_upload_completion_handlers.back());
1,712✔
1666
        m_upload_completion_handlers.pop_back();
1,712✔
1667
        handler(Status::OK()); // Throws
1,712✔
1668
    }
1,712✔
1669
    while (!m_sync_completion_handlers.empty()) {
14,856✔
1670
        auto handler = std::move(m_sync_completion_handlers.back());
184✔
1671
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
184✔
1672
        m_sync_completion_handlers.pop_back();
184✔
1673
    }
184✔
1674
    std::lock_guard lock{m_client.m_mutex};
14,672✔
1675
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,672✔
1676
        m_reached_upload_mark = m_staged_upload_mark;
12,964✔
1677
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,964✔
1678
    }
12,964✔
1679
}
14,672✔
1680

1681

1682
void SessionWrapper::on_download_completion()
1683
{
15,488✔
1684
    while (!m_download_completion_handlers.empty()) {
17,552✔
1685
        auto handler = std::move(m_download_completion_handlers.back());
2,064✔
1686
        m_download_completion_handlers.pop_back();
2,064✔
1687
        handler(Status::OK()); // Throws
2,064✔
1688
    }
2,064✔
1689
    while (!m_sync_completion_handlers.empty()) {
15,566✔
1690
        auto handler = std::move(m_sync_completion_handlers.back());
78✔
1691
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
78✔
1692
        m_sync_completion_handlers.pop_back();
78✔
1693
    }
78✔
1694

7,642✔
1695
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
15,488✔
1696
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
682✔
1697
                             m_flx_pending_mark_version);
682✔
1698
        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
682✔
1699
        mutable_subs.update_state(SubscriptionSet::State::Complete);
682✔
1700
        mutable_subs.commit();
682✔
1701
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
682✔
1702
    }
682✔
1703

7,642✔
1704
    std::lock_guard lock{m_client.m_mutex};
15,488✔
1705
    if (m_staged_download_mark > m_reached_download_mark) {
15,488✔
1706
        m_reached_download_mark = m_staged_download_mark;
9,954✔
1707
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,954✔
1708
    }
9,954✔
1709
}
15,488✔
1710

1711

1712
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1713
{
924✔
1714
    REALM_ASSERT(!m_finalized);
924✔
1715
    m_suspended = true;
924✔
1716
    if (m_connection_state_change_listener) {
924✔
1717
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
924✔
1718
    }
924✔
1719
}
924✔
1720

1721

1722
void SessionWrapper::on_resumed()
1723
{
378✔
1724
    REALM_ASSERT(!m_finalized);
378✔
1725
    m_suspended = false;
378✔
1726
    if (m_connection_state_change_listener) {
378✔
1727
        ClientImpl::Connection& conn = m_sess->get_connection();
378✔
1728
        if (conn.get_state() != ConnectionState::disconnected) {
378✔
1729
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
378✔
1730
            if (conn.get_state() == ConnectionState::connected)
378✔
1731
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
378✔
1732
        }
378✔
1733
    }
378✔
1734
}
378✔
1735

1736

1737
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1738
                                                 const util::Optional<SessionErrorInfo>& error_info)
1739
{
10,670✔
1740
    if (m_connection_state_change_listener) {
10,670✔
1741
        if (!m_suspended)
10,658✔
1742
            m_connection_state_change_listener(state, error_info); // Throws
10,654✔
1743
    }
10,658✔
1744
}
10,670✔
1745

1746

1747
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
1748
{
159,014✔
1749
    REALM_ASSERT(!m_finalized);
159,014✔
1750
    REALM_ASSERT(m_sess);
159,014✔
1751

81,354✔
1752
    if (!m_progress_handler)
159,014✔
1753
        return;
113,218✔
1754

21,022✔
1755
    std::uint_fast64_t downloaded_bytes = 0;
45,796✔
1756
    std::uint_fast64_t downloadable_bytes = 0;
45,796✔
1757
    std::uint_fast64_t uploaded_bytes = 0;
45,796✔
1758
    std::uint_fast64_t uploadable_bytes = 0;
45,796✔
1759
    std::uint_fast64_t snapshot_version = 0;
45,796✔
1760
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
45,796✔
1761
                                             uploadable_bytes, snapshot_version);
45,796✔
1762

21,022✔
1763
    // If this progress notification was triggered by a commit being made we
21,022✔
1764
    // only want to send it if the uploadable bytes has actually increased,
21,022✔
1765
    // and not if it was an empty commit.
21,022✔
1766
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
45,796✔
1767
        return;
32,078✔
1768
    m_last_reported_uploadable_bytes = uploadable_bytes;
13,718✔
1769

5,788✔
1770
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
5,788✔
1771
    // is only the remaining to download. This is confusing, so make them use
5,788✔
1772
    // the same units.
5,788✔
1773
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
13,718✔
1774

5,788✔
1775
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
13,718✔
1776
                         "downloadable(total) = %2, uploaded = %3, "
13,718✔
1777
                         "uploadable = %4, reliable_download_progress = %5, "
13,718✔
1778
                         "snapshot version = %6",
13,718✔
1779
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
13,718✔
1780
                         m_reliable_download_progress, snapshot_version);
13,718✔
1781

5,788✔
1782
    // FIXME: Why is this boolean status communicated to the application as
5,788✔
1783
    // a 64-bit integer? Also, the name `progress_version` is confusing.
5,788✔
1784
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
10,122✔
1785
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
13,718✔
1786
                       snapshot_version);
13,718✔
1787
}
13,718✔
1788

1789
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1790
{
68✔
1791
    if (!m_sess) {
68✔
1792
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1793
    }
×
1794

34✔
1795
    return m_sess->send_test_command(std::move(body));
68✔
1796
}
68✔
1797

1798
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1799
{
290✔
1800
    REALM_ASSERT(!m_finalized);
290✔
1801

144✔
1802
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
290✔
1803
    REALM_ASSERT(pending_reset);
290✔
1804
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
290✔
1805
                        pending_reset->time);
290✔
1806
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
290✔
1807
        if (status == ErrorCodes::OperationAborted) {
290✔
1808
            return;
146✔
1809
        }
146✔
1810
        auto& logger = self->m_sess->logger;
144✔
1811
        if (!status.is_ok()) {
144✔
1812
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1813
            return;
×
1814
        }
×
1815

72✔
1816
        auto wt = self->m_db->start_write();
144✔
1817
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
144✔
1818
        if (!cur_pending_reset) {
144✔
1819
            logger.debug(
×
1820
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1821
                pending_reset.type, pending_reset.time);
×
1822
            return;
×
1823
        }
×
1824
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
144✔
1825
            logger.debug(
×
1826
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1827
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1828
        }
×
1829
        else {
144✔
1830
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
144✔
1831
                         "Removing cycle detection tracker.",
144✔
1832
                         pending_reset.type, pending_reset.time);
144✔
1833
        }
144✔
1834
        _impl::client_reset::remove_pending_client_resets(*wt);
144✔
1835
        wt->commit();
144✔
1836
    });
144✔
1837
}
290✔
1838

1839
void SessionWrapper::update_subscription_version_info()
1840
{
11,298✔
1841
    if (!m_flx_subscription_store)
11,298✔
1842
        return;
10,144✔
1843
    auto versions_info = m_flx_subscription_store->get_version_info();
1,154✔
1844
    m_flx_active_version = versions_info.active;
1,154✔
1845
    m_flx_pending_mark_version = versions_info.pending_mark;
1,154✔
1846
}
1,154✔
1847

1848
std::string SessionWrapper::get_appservices_connection_id()
1849
{
72✔
1850
    auto pf = util::make_promise_future<std::string>();
72✔
1851
    REALM_ASSERT(m_initiated);
72✔
1852

36✔
1853
    util::bind_ptr<SessionWrapper> self(this);
72✔
1854
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1855
        if (!status.is_ok()) {
72✔
1856
            promise.set_error(status);
×
1857
            return;
×
1858
        }
×
1859

36✔
1860
        if (!self->m_sess) {
72✔
1861
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1862
            return;
×
1863
        }
×
1864

36✔
1865
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1866
    });
72✔
1867

36✔
1868
    return pf.future.get();
72✔
1869
}
72✔
1870

1871
void SessionWrapper::mark_unresumable()
1872
{
202✔
1873
    m_resumable = false;
202✔
1874
}
202✔
1875

1876
// ################ ClientImpl::Connection ################
1877

1878
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1879
                                   const std::string& authorization_header_name,
1880
                                   const std::map<std::string, std::string>& custom_http_headers,
1881
                                   bool verify_servers_ssl_certificate,
1882
                                   Optional<std::string> ssl_trust_certificate_path,
1883
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1884
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1885
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1886
    , logger{*logger_ptr}
1887
    , m_client{client}
1888
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1889
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1890
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1891
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1892
    , m_reconnect_info{reconnect_info}
1893
    , m_session_history{}
1894
    , m_ident{ident}
1895
    , m_server_endpoint{std::move(endpoint)}
1896
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1897
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1898
{
2,520✔
1899
    m_on_idle = m_client.create_trigger([this](Status status) {
2,524✔
1900
        if (status == ErrorCodes::OperationAborted)
2,524✔
1901
            return;
×
1902
        else if (!status.is_ok())
2,524✔
1903
            throw Exception(status);
×
1904

1,192✔
1905
        REALM_ASSERT(m_activated);
2,524✔
1906
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,524✔
1907
            on_idle(); // Throws
2,520✔
1908
            // Connection object may be destroyed now.
1,192✔
1909
        }
2,520✔
1910
    });
2,524✔
1911
}
2,520✔
1912

1913
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1914
{
12✔
1915
    return m_ident;
12✔
1916
}
12✔
1917

1918

1919
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1920
{
2,520✔
1921
    return m_server_endpoint;
2,520✔
1922
}
2,520✔
1923

1924
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1925
                                                        const std::string& signed_access_token)
1926
{
9,900✔
1927
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
9,900✔
1928
    m_signed_access_token = signed_access_token;           // Throws (copy)
9,900✔
1929
}
9,900✔
1930

1931

1932
void ClientImpl::Connection::resume_active_sessions()
1933
{
1,724✔
1934
    auto handler = [=](ClientImpl::Session& sess) {
3,444✔
1935
        sess.cancel_resumption_delay(); // Throws
3,444✔
1936
    };
3,444✔
1937
    for_each_active_session(std::move(handler)); // Throws
1,724✔
1938
}
1,724✔
1939

1940
void ClientImpl::Connection::on_idle()
1941
{
2,520✔
1942
    logger.debug("Destroying connection object");
2,520✔
1943
    ClientImpl& client = get_client();
2,520✔
1944
    client.remove_connection(*this);
2,520✔
1945
    // NOTE: This connection object is now destroyed!
1,192✔
1946
}
2,520✔
1947

1948

1949
std::string ClientImpl::Connection::get_http_request_path() const
1950
{
3,354✔
1951
    using namespace std::string_view_literals;
3,354✔
1952
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
2,147,485,277✔
1953

1,630✔
1954
    std::string path;
3,354✔
1955
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,354✔
1956
    path += m_http_request_path_prefix;
3,354✔
1957
    path += param;
3,354✔
1958
    path += m_signed_access_token;
3,354✔
1959

1,630✔
1960
    return path;
3,354✔
1961
}
3,354✔
1962

1963

1964
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1965
{
2,520✔
1966
    std::ostringstream out;
2,520✔
1967
    out.imbue(std::locale::classic());
2,520✔
1968
    out << "Connection[" << ident << "]: "; // Throws
2,520✔
1969
    return out.str();                       // Throws
2,520✔
1970
}
2,520✔
1971

1972

1973
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1974
                                                            util::Optional<SessionErrorInfo> error_info)
1975
{
10,002✔
1976
    if (m_force_closed) {
10,002✔
1977
        return;
2,218✔
1978
    }
2,218✔
1979
    auto handler = [=](ClientImpl::Session& sess) {
10,594✔
1980
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,594✔
1981
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,594✔
1982
    };
10,594✔
1983
    for_each_active_session(std::move(handler)); // Throws
7,784✔
1984
}
7,784✔
1985

1986

1987
Client::Client(Config config)
1988
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1989
{
9,474✔
1990
}
9,474✔
1991

1992

1993
Client::Client(Client&& client) noexcept
1994
    : m_impl{std::move(client.m_impl)}
1995
{
×
1996
}
×
1997

1998

1999
Client::~Client() noexcept {}
9,474✔
2000

2001

2002
void Client::shutdown() noexcept
2003
{
9,552✔
2004
    m_impl->shutdown();
9,552✔
2005
}
9,552✔
2006

2007
void Client::shutdown_and_wait()
2008
{
752✔
2009
    m_impl->shutdown_and_wait();
752✔
2010
}
752✔
2011

2012
void Client::cancel_reconnect_delay()
2013
{
1,728✔
2014
    m_impl->cancel_reconnect_delay();
1,728✔
2015
}
1,728✔
2016

2017
void Client::voluntary_disconnect_all_connections()
2018
{
12✔
2019
    m_impl->voluntary_disconnect_all_connections();
12✔
2020
}
12✔
2021

2022
bool Client::wait_for_session_terminations_or_client_stopped()
2023
{
372✔
2024
    return m_impl.get()->wait_for_session_terminations_or_client_stopped();
372✔
2025
}
372✔
2026

2027

2028
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2029
                                  port_type& port, std::string& path) const
2030
{
3,434✔
2031
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,434✔
2032
}
3,434✔
2033

2034

2035
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2036
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2037
{
11,026✔
2038
    util::bind_ptr<SessionWrapper> sess;
11,026✔
2039
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,026✔
2040
                                  std::move(config)}); // Throws
11,026✔
2041
    // The reference count passed back to the application is implicitly
5,340✔
2042
    // owned by a naked pointer. This is done to avoid exposing
5,340✔
2043
    // implementation details through the header file (that is, through the
5,340✔
2044
    // Session object).
5,340✔
2045
    m_impl = sess.release();
11,026✔
2046
}
11,026✔
2047

2048

2049
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2050
{
3,510✔
2051
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,510✔
2052
}
3,510✔
2053

2054

2055
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2056
{
11,114✔
2057
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,114✔
2058
}
11,114✔
2059

2060

2061
void Session::bind()
2062
{
9,874✔
2063
    m_impl->initiate(); // Throws
9,874✔
2064
}
9,874✔
2065

2066

2067
void Session::nonsync_transact_notify(version_type new_version)
2068
{
15,342✔
2069
    m_impl->on_commit(new_version); // Throws
15,342✔
2070
}
15,342✔
2071

2072

2073
void Session::cancel_reconnect_delay()
2074
{
12✔
2075
    m_impl->cancel_reconnect_delay(); // Throws
12✔
2076
}
12✔
2077

2078

2079
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2080
{
4,114✔
2081
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,114✔
2082
}
4,114✔
2083

2084

2085
bool Session::wait_for_upload_complete_or_client_stopped()
2086
{
12,996✔
2087
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,996✔
2088
}
12,996✔
2089

2090

2091
bool Session::wait_for_download_complete_or_client_stopped()
2092
{
10,084✔
2093
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,084✔
2094
}
10,084✔
2095

2096

2097
void Session::refresh(const std::string& signed_access_token)
2098
{
208✔
2099
    m_impl->refresh(signed_access_token); // Throws
208✔
2100
}
208✔
2101

2102

2103
void Session::abandon() noexcept
2104
{
11,026✔
2105
    REALM_ASSERT(m_impl);
11,026✔
2106
    // Reabsorb the ownership assigned to the applications naked pointer by
5,340✔
2107
    // Session constructor
5,340✔
2108
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
11,026✔
2109
    SessionWrapper::abandon(std::move(wrapper));
11,026✔
2110
}
11,026✔
2111

2112
util::Future<std::string> Session::send_test_command(std::string body)
2113
{
68✔
2114
    return m_impl->send_test_command(std::move(body));
68✔
2115
}
68✔
2116

2117
std::string Session::get_appservices_connection_id()
2118
{
72✔
2119
    return m_impl->get_appservices_connection_id();
72✔
2120
}
72✔
2121

2122
void Session::mark_unresumable()
2123
{
202✔
2124
    m_impl->mark_unresumable();
202✔
2125
}
202✔
2126

2127
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2128
{
×
2129
    switch (proxyType) {
×
2130
        case ProxyConfig::Type::HTTP:
×
2131
            return os << "HTTP";
×
2132
        case ProxyConfig::Type::HTTPS:
×
2133
            return os << "HTTPS";
×
2134
    }
×
2135
    REALM_TERMINATE("Invalid Proxy Type object.");
2136
}
×
2137

2138
} // namespace sync
2139
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc