• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1772

20 Oct 2023 07:34PM UTC coverage: 91.61% (+0.04%) from 91.567%
1772

push

Evergreen

web-flow
Added sync socket result enum for sync socket callback handlers in C API (#7015)

* Added sync socket result enum for sync socket callback handlers in C API
* Updated changelog
* CAPI: timer callbacks are now released by canceled/complete function
* Updated c_api tests to use all sync socket c_api functions
* Additional updates to sync socket c api test
* Added CAPI write callback manager to manage async write callbacks
* Pass error codes up to default socket provider for async_write_binary() callbacks
* Removed async write callback manager from CAPI
* Updated changelog after release
* clang format and updates from review
* Update async_write_binary() error handling to not throw exception
* Updated a few comments
* Another comment update
* Updates from review

94360 of 173622 branches covered (0.0%)

111 of 150 new or added lines in 8 files covered. (74.0%)

66 existing lines in 17 files now uncovered.

230703 of 251832 relevant lines covered (91.61%)

6730695.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.75
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    std::string get_appservices_connection_id();
127

128
private:
129
    ClientImpl& m_client;
130
    DBRef m_db;
131
    Replication* m_replication;
132

133
    const ProtocolEnvelope m_protocol_envelope;
134
    const std::string m_server_address;
135
    const port_type m_server_port;
136
    const std::string m_user_id;
137
    const SyncServerMode m_sync_mode;
138
    const std::string m_authorization_header_name;
139
    const std::map<std::string, std::string> m_custom_http_headers;
140
    const bool m_verify_servers_ssl_certificate;
141
    const bool m_simulate_integration_error;
142
    const Optional<std::string> m_ssl_trust_certificate_path;
143
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
144
    const size_t m_flx_bootstrap_batch_size_bytes;
145

146
    // This one is different from null when, and only when the session wrapper
147
    // is in ClientImpl::m_abandoned_session_wrappers.
148
    SessionWrapper* m_next = nullptr;
149

150
    // After initiation, these may only be accessed by the event loop thread.
151
    std::string m_http_request_path_prefix;
152
    std::string m_virt_path;
153
    std::string m_signed_access_token;
154

155
    util::Optional<ClientReset> m_client_reset_config;
156

157
    util::Optional<ProxyConfig> m_proxy_config;
158

159
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
160
    util::UniqueFunction<ProgressHandler> m_progress_handler;
161
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
162

163
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
164
    bool m_in_debug_hook = false;
165

166
    SessionReason m_session_reason;
167

168
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
169
    int64_t m_flx_active_version = 0;
170
    int64_t m_flx_last_seen_version = 0;
171
    int64_t m_flx_pending_mark_version = 0;
172
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
173

174
    std::shared_ptr<MigrationStore> m_migration_store;
175

176
    bool m_initiated = false;
177

178
    // Set to true when this session wrapper is actualized (or when it is
179
    // finalized before proper actualization). It is then never modified again.
180
    //
181
    // A session specific post handler submitted after the initiation of the
182
    // session wrapper (initiate()) will always find that `m_actualized` is
183
    // true. This is the case, because the scheduling of such a post handler
184
    // will have been preceded by the triggering of
185
    // `ClientImpl::m_actualize_and_finalize` (in
186
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
187
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
188
    // before the post handler. If the session wrapper is no longer in
189
    // `ClientImpl::m_unactualized_session_wrappers` when
190
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
191
    // have been abandoned already, but in that case,
192
    // finalize_before_actualization() has already been called.
193
    bool m_actualized = false;
194

195
    bool m_force_closed = false;
196

197
    bool m_suspended = false;
198

199
    // Has the SessionWrapper been finalized?
200
    bool m_finalized = false;
201

202
    // Set to true when the first DOWNLOAD message is received to indicate that
203
    // the byte-level download progress parameters can be considered reasonable
204
    // reliable. Before that, a lot of time may have passed, so our record of
205
    // the download progress is likely completely out of date.
206
    bool m_reliable_download_progress = false;
207

208
    // Set to point to an activated session object during actualization of the
209
    // session wrapper. Set to null during finalization of the session
210
    // wrapper. Both modifications are guaranteed to be performed by the event
211
    // loop thread.
212
    //
213
    // If a session specific post handler, that is submitted after the
214
    // initiation of the session wrapper, sees that `m_sess` is null, it can
215
    // conclude that the session wrapper has been both abandoned and
216
    // finalized. This is true, because the scheduling of such a post handler
217
    // will have been preceded by the triggering of
218
    // `ClientImpl::m_actualize_and_finalize` (in
219
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
220
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
221
    // before the post handler, so the session wrapper must have been actualized
222
    // unless it was already abandoned by the application. If it was abandoned
223
    // before it was actualized, it will already have been finalized by
224
    // finalize_before_actualization().
225
    //
226
    // Must only be accessed from the event loop thread.
227
    SessionImpl* m_sess = nullptr;
228

229
    // These must only be accessed from the event loop thread.
230
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
231
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
232
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
233

234
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
235
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
236
    // event loop thread.
237
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
238
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
239
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
240

241
    void on_sync_progress();
242
    void on_upload_completion();
243
    void on_download_completion();
244
    void on_suspended(const SessionErrorInfo& error_info);
245
    void on_resumed();
246
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
247
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
248
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
249
    void on_flx_sync_version_complete(int64_t version);
250

251
    void report_progress(bool only_if_new_uploadable_data = false);
252

253
    friend class SessionWrapperStack;
254
    friend class ClientImpl::Session;
255
};
256

257

258
// ################ SessionWrapperStack ################
259

260
inline bool SessionWrapperStack::empty() const noexcept
261
{
×
262
    return !m_back;
×
263
}
×
264

265

266
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
267
{
9,626✔
268
    REALM_ASSERT(!w->m_next);
9,626✔
269
    w->m_next = m_back;
9,626✔
270
    m_back = w.release();
9,626✔
271
}
9,626✔
272

273

274
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
275
{
24,734✔
276
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
24,734✔
277
    if (m_back) {
24,734✔
278
        m_back = m_back->m_next;
9,624✔
279
        w->m_next = nullptr;
9,624✔
280
    }
9,624✔
281
    return w;
24,734✔
282
}
24,734✔
283

284

285
inline void SessionWrapperStack::clear() noexcept
286
{
24,082✔
287
    while (m_back) {
24,082✔
288
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
289
        m_back = w->m_next;
×
290
    }
×
291
}
24,082✔
292

293

294
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
295
    : m_back{q.m_back}
296
{
297
    q.m_back = nullptr;
298
}
299

300

301
SessionWrapperStack::~SessionWrapperStack()
302
{
24,082✔
303
    clear();
24,082✔
304
}
24,082✔
305

306

307
// ################ ClientImpl ################
308

309

310
ClientImpl::~ClientImpl()
311
{
8,974✔
312
    // Since no other thread is allowed to be accessing this client or any of
4,418✔
313
    // its subobjects at this time, no mutex locking is necessary.
4,418✔
314

4,418✔
315
    shutdown_and_wait();
8,974✔
316
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,418✔
317
    // are abandoned.
4,418✔
318
    REALM_ASSERT(m_stopped);
8,974✔
319
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
8,974✔
320
}
8,974✔
321

322

323
void ClientImpl::cancel_reconnect_delay()
324
{
1,780✔
325
    // Thread safety required
1,086✔
326
    post([this](Status status) {
1,780✔
327
        if (status == ErrorCodes::OperationAborted)
1,780✔
328
            return;
×
329
        else if (!status.is_ok())
1,780✔
330
            throw Exception(status);
×
331

1,086✔
332
        for (auto& p : m_server_slots) {
1,780✔
333
            ServerSlot& slot = p.second;
1,780✔
334
            if (m_one_connection_per_session) {
1,780✔
335
                REALM_ASSERT(!slot.connection);
×
336
                for (const auto& p : slot.alt_connections) {
×
337
                    ClientImpl::Connection& conn = *p.second;
×
338
                    conn.resume_active_sessions(); // Throws
×
339
                    conn.cancel_reconnect_delay(); // Throws
×
340
                }
×
341
            }
×
342
            else {
1,780✔
343
                REALM_ASSERT(slot.alt_connections.empty());
1,780✔
344
                if (slot.connection) {
1,780✔
345
                    ClientImpl::Connection& conn = *slot.connection;
1,776✔
346
                    conn.resume_active_sessions(); // Throws
1,776✔
347
                    conn.cancel_reconnect_delay(); // Throws
1,776✔
348
                }
1,776✔
349
                else {
4✔
350
                    slot.reconnect_info.reset();
4✔
351
                }
4✔
352
            }
1,780✔
353
        }
1,780✔
354
    }); // Throws
1,780✔
355
}
1,780✔
356

357

358
void ClientImpl::voluntary_disconnect_all_connections()
359
{
12✔
360
    auto done_pf = util::make_promise_future<void>();
12✔
361
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
362
        if (status == ErrorCodes::OperationAborted) {
12✔
363
            return;
×
364
        }
×
365

6✔
366
        REALM_ASSERT(status.is_ok());
12✔
367

6✔
368
        try {
12✔
369
            for (auto& p : m_server_slots) {
12✔
370
                ServerSlot& slot = p.second;
12✔
371
                if (m_one_connection_per_session) {
12✔
372
                    REALM_ASSERT(!slot.connection);
×
373
                    for (const auto& p : slot.alt_connections) {
×
374
                        ClientImpl::Connection& conn = *p.second;
×
375
                        if (conn.get_state() == ConnectionState::disconnected) {
×
376
                            continue;
×
377
                        }
×
378
                        conn.voluntary_disconnect();
×
379
                    }
×
380
                }
×
381
                else {
12✔
382
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
383
                    if (!slot.connection) {
12✔
384
                        continue;
×
385
                    }
×
386
                    ClientImpl::Connection& conn = *slot.connection;
12✔
387
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
388
                        continue;
×
389
                    }
×
390
                    conn.voluntary_disconnect();
12✔
391
                }
12✔
392
            }
12✔
393
        }
12✔
394
        catch (...) {
6✔
395
            promise.set_error(exception_to_status());
×
396
            return;
×
397
        }
×
398
        promise.emplace_value();
12✔
399
    });
12✔
400
    done_pf.future.get();
12✔
401
}
12✔
402

403

404
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
405
{
368✔
406
    // Thread safety required
34✔
407

34✔
408
    {
368✔
409
        std::lock_guard lock{m_mutex};
368✔
410
        m_sessions_terminated = false;
368✔
411
    }
368✔
412

34✔
413
    // The technique employed here relies on the fact that
34✔
414
    // actualize_and_finalize_session_wrappers() must get to execute at least
34✔
415
    // once before the post handler submitted below gets to execute, but still
34✔
416
    // at a time where all session wrappers, that are abandoned prior to the
34✔
417
    // execution of wait_for_session_terminations_or_client_stopped(), have been
34✔
418
    // added to `m_abandoned_session_wrappers`.
34✔
419
    //
34✔
420
    // To see that this is the case, consider a session wrapper that was
34✔
421
    // abandoned before wait_for_session_terminations_or_client_stopped() was
34✔
422
    // invoked. Then the session wrapper will have been added to
34✔
423
    // `m_abandoned_session_wrappers`, and an invocation of
34✔
424
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
34✔
425
    // guarantees mentioned in the documentation of Trigger then ensure
34✔
426
    // that at least one execution of actualize_and_finalize_session_wrappers()
34✔
427
    // will happen after the session wrapper has been added to
34✔
428
    // `m_abandoned_session_wrappers`, but before the post handler submitted
34✔
429
    // below gets to execute.
34✔
430
    post([this](Status status) mutable {
368✔
431
        if (status == ErrorCodes::OperationAborted)
368✔
432
            return;
×
433
        else if (!status.is_ok())
368✔
434
            throw Exception(status);
×
435

34✔
436
        std::lock_guard lock{m_mutex};
368✔
437
        m_sessions_terminated = true;
368✔
438
        m_wait_or_client_stopped_cond.notify_all();
368✔
439
    }); // Throws
368✔
440

34✔
441
    bool completion_condition_was_satisfied;
368✔
442
    {
368✔
443
        std::unique_lock lock{m_mutex};
368✔
444
        while (!m_sessions_terminated && !m_stopped)
710✔
445
            m_wait_or_client_stopped_cond.wait(lock);
342✔
446
        completion_condition_was_satisfied = !m_stopped;
368✔
447
    }
368✔
448
    return completion_condition_was_satisfied;
368✔
449
}
368✔
450

451

452
void ClientImpl::drain_connections_on_loop()
453
{
8,974✔
454
    post([this](Status status) mutable {
8,974✔
455
        REALM_ASSERT(status.is_ok());
8,974✔
456
        drain_connections();
8,974✔
457
    });
8,974✔
458
}
8,974✔
459

460
void ClientImpl::shutdown_and_wait()
461
{
9,734✔
462
    shutdown();
9,734✔
463
    std::unique_lock lock{m_drain_mutex};
9,734✔
464
    if (m_drained) {
9,734✔
465
        return;
756✔
466
    }
756✔
467

4,420✔
468
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
8,978✔
469
    m_drain_cv.wait(lock, [&] {
19,740✔
470
        return m_num_connections == 0 && m_outstanding_posts == 0;
19,740✔
471
    });
19,740✔
472

4,420✔
473
    m_drained = true;
8,978✔
474
}
8,978✔
475

476
void ClientImpl::shutdown() noexcept
477
{
18,786✔
478
    {
18,786✔
479
        std::lock_guard lock{m_mutex};
18,786✔
480
        if (m_stopped)
18,786✔
481
            return;
9,812✔
482
        m_stopped = true;
8,974✔
483
        m_wait_or_client_stopped_cond.notify_all();
8,974✔
484
    }
8,974✔
485

4,418✔
486
    drain_connections_on_loop();
8,974✔
487
}
8,974✔
488

489

490
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
491
{
9,790✔
492
    // Thread safety required.
4,722✔
493

4,722✔
494
    std::lock_guard lock{m_mutex};
9,790✔
495
    REALM_ASSERT(m_actualize_and_finalize);
9,790✔
496
    m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
9,790✔
497
    bool retrigger = !m_actualize_and_finalize_needed;
9,790✔
498
    m_actualize_and_finalize_needed = true;
9,790✔
499
    // The conditional triggering needs to happen before releasing the mutex,
4,722✔
500
    // because if two threads call register_unactualized_session_wrapper()
4,722✔
501
    // roughly concurrently, then only the first one is guaranteed to be asked
4,722✔
502
    // to retrigger, but that retriggering must have happened before the other
4,722✔
503
    // thread returns from register_unactualized_session_wrapper().
4,722✔
504
    //
4,722✔
505
    // Note that a similar argument applies when two threads call
4,722✔
506
    // register_abandoned_session_wrapper(), and when one thread calls one of
4,722✔
507
    // them and another thread call the other.
4,722✔
508
    if (retrigger)
9,790✔
509
        m_actualize_and_finalize->trigger();
6,226✔
510
}
9,790✔
511

512

513
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
514
{
9,788✔
515
    // Thread safety required.
4,720✔
516

4,720✔
517
    std::lock_guard lock{m_mutex};
9,788✔
518
    REALM_ASSERT(m_actualize_and_finalize);
9,788✔
519

4,720✔
520
    // If the session wrapper has not yet been actualized (on the event loop
4,720✔
521
    // thread), it can be immediately finalized. This ensures that we will
4,720✔
522
    // generally not actualize a session wrapper that has already been
4,720✔
523
    // abandoned.
4,720✔
524
    auto i = m_unactualized_session_wrappers.find(wrapper.get());
9,788✔
525
    if (i != m_unactualized_session_wrappers.end()) {
9,788✔
526
        m_unactualized_session_wrappers.erase(i);
164✔
527
        wrapper->finalize_before_actualization();
164✔
528
        return;
164✔
529
    }
164✔
530
    m_abandoned_session_wrappers.push(std::move(wrapper));
9,624✔
531
    bool retrigger = !m_actualize_and_finalize_needed;
9,624✔
532
    m_actualize_and_finalize_needed = true;
9,624✔
533
    // The conditional triggering needs to happen before releasing the
4,634✔
534
    // mutex. See implementation of register_unactualized_session_wrapper() for
4,634✔
535
    // details.
4,634✔
536
    if (retrigger)
9,624✔
537
        m_actualize_and_finalize->trigger();
8,886✔
538
}
9,624✔
539

540

541
// Must be called from the event loop thread.
542
void ClientImpl::actualize_and_finalize_session_wrappers()
543
{
15,108✔
544
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
15,108✔
545
    SessionWrapperStack abandoned_session_wrappers;
15,108✔
546
    bool stopped;
15,108✔
547
    {
15,108✔
548
        std::lock_guard lock{m_mutex};
15,108✔
549
        m_actualize_and_finalize_needed = false;
15,108✔
550
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
15,108✔
551
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
15,108✔
552
        stopped = m_stopped;
15,108✔
553
    }
15,108✔
554
    // Note, we need to finalize old session wrappers before we actualize new
7,142✔
555
    // ones. This ensures that deactivation of old sessions is initiated before
7,142✔
556
    // new session are activated. This, in turn, ensures that the server does
7,142✔
557
    // not see two overlapping sessions for the same local Realm file.
7,142✔
558
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
24,730✔
559
        wrapper->finalize(); // Throws
9,622✔
560
    if (stopped) {
15,108✔
561
        for (auto& p : unactualized_session_wrappers) {
402✔
562
            SessionWrapper& wrapper = *p.first;
4✔
563
            wrapper.finalize_before_actualization();
4✔
564
        }
4✔
565
        return;
762✔
566
    }
762✔
567
    for (auto& p : unactualized_session_wrappers) {
14,346✔
568
        SessionWrapper& wrapper = *p.first;
9,622✔
569
        ServerEndpoint server_endpoint = std::move(p.second);
9,622✔
570
        wrapper.actualize(std::move(server_endpoint)); // Throws
9,622✔
571
    }
9,622✔
572
}
14,346✔
573

574

575
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
576
                                                   const std::string& authorization_header_name,
577
                                                   const std::map<std::string, std::string>& custom_http_headers,
578
                                                   bool verify_servers_ssl_certificate,
579
                                                   Optional<std::string> ssl_trust_certificate_path,
580
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
581
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
582
{
9,618✔
583
    auto&& [server_slot_it, inserted] =
9,618✔
584
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
9,618✔
585
    ServerSlot& server_slot = server_slot_it->second; // Throws
9,618✔
586

4,632✔
587
    // TODO: enable multiplexing with proxies
4,632✔
588
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
9,618✔
589
        // Use preexisting connection
3,472✔
590
        REALM_ASSERT(server_slot.alt_connections.empty());
7,160✔
591
        return *server_slot.connection;
7,160✔
592
    }
7,160✔
593

1,160✔
594
    // Create a new connection
1,160✔
595
    REALM_ASSERT(!server_slot.connection);
2,458✔
596
    connection_ident_type ident = m_prev_connection_ident + 1;
2,458✔
597
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,458✔
598
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,458✔
599
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,458✔
600
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,458✔
601
    ClientImpl::Connection& conn = *conn_2;
2,458✔
602
    if (!m_one_connection_per_session) {
2,458✔
603
        server_slot.connection = std::move(conn_2);
2,446✔
604
    }
2,446✔
605
    else {
12✔
606
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
607
    }
12✔
608
    m_prev_connection_ident = ident;
2,458✔
609
    was_created = true;
2,458✔
610
    {
2,458✔
611
        std::lock_guard lk(m_drain_mutex);
2,458✔
612
        ++m_num_connections;
2,458✔
613
    }
2,458✔
614
    return conn;
2,458✔
615
}
2,458✔
616

617

618
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
619
{
2,456✔
620
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,456✔
621
    auto i = m_server_slots.find(endpoint);
2,456✔
622
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,456✔
623
    ServerSlot& server_slot = i->second;
2,456✔
624
    if (!m_one_connection_per_session) {
2,456✔
625
        REALM_ASSERT(server_slot.alt_connections.empty());
2,442✔
626
        REALM_ASSERT(&*server_slot.connection == &conn);
2,442✔
627
        server_slot.reconnect_info = conn.get_reconnect_info();
2,442✔
628
        server_slot.connection.reset();
2,442✔
629
    }
2,442✔
630
    else {
14✔
631
        REALM_ASSERT(!server_slot.connection);
14✔
632
        connection_ident_type ident = conn.get_ident();
14✔
633
        auto j = server_slot.alt_connections.find(ident);
14✔
634
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
14✔
635
        REALM_ASSERT(&*j->second == &conn);
14✔
636
        server_slot.alt_connections.erase(j);
14✔
637
    }
14✔
638

1,158✔
639
    {
2,456✔
640
        std::lock_guard lk(m_drain_mutex);
2,456✔
641
        REALM_ASSERT(m_num_connections);
2,456✔
642
        --m_num_connections;
2,456✔
643
        m_drain_cv.notify_all();
2,456✔
644
    }
2,456✔
645
}
2,456✔
646

647

648
// ################ SessionImpl ################
649

650
void SessionImpl::force_close()
651
{
150✔
652
    // Allow force_close() if session is active or hasn't been activated yet.
74✔
653
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
150!
654
        m_wrapper.force_close();
150✔
655
    }
150✔
656
}
150✔
657

658
void SessionImpl::on_connection_state_changed(ConnectionState state,
659
                                              const util::Optional<SessionErrorInfo>& error_info)
660
{
10,660✔
661
    // Only used to report errors back to the SyncSession while the Session is active
5,804✔
662
    if (m_state == SessionImpl::Active) {
10,660✔
663
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,660✔
664
    }
10,660✔
665
}
10,660✔
666

667

668
const std::string& SessionImpl::get_virt_path() const noexcept
669
{
6,626✔
670
    // Can only be called if the session is active or being activated
3,612✔
671
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
6,626!
672
    return m_wrapper.m_virt_path;
6,626✔
673
}
6,626✔
674

675
const std::string& SessionImpl::get_realm_path() const noexcept
676
{
9,886✔
677
    // Can only be called if the session is active or being activated
4,766✔
678
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,886✔
679
    return m_wrapper.m_db->get_path();
9,886✔
680
}
9,886✔
681

682
DBRef SessionImpl::get_db() const noexcept
683
{
75,862✔
684
    // Can only be called if the session is active or being activated
40,114✔
685
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
75,862✔
686
    return m_wrapper.m_db;
75,862✔
687
}
75,862✔
688

689
ClientReplication& SessionImpl::access_realm()
690
{
108,114✔
691
    // Can only be called if the session is active or being activated
56,820✔
692
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
108,114✔
693
    return m_wrapper.get_replication();
108,114✔
694
}
108,114✔
695

696
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
697
{
9,618✔
698
    // Can only be called if the session is active or being activated
4,632✔
699
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,618✔
700
    return m_wrapper.m_client_reset_config;
9,618✔
701
}
9,618✔
702

703
SessionReason SessionImpl::get_session_reason() noexcept
704
{
1,354✔
705
    // Can only be called if the session is active or being activated
698✔
706
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,354!
707
    return m_wrapper.m_session_reason;
1,354✔
708
}
1,354✔
709

710
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
711
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
712
{
40,824✔
713
    // Ignore the call if the session is not active
22,712✔
714
    if (m_state != State::Active) {
40,824✔
715
        return;
×
716
    }
×
717

22,712✔
718
    try {
40,824✔
719
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
40,824!
720
        if (simulate_integration_error) {
40,824✔
721
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
722
        }
×
723
        version_type client_version;
40,824✔
724
        if (REALM_LIKELY(!get_client().is_dry_run())) {
40,824✔
725
            VersionInfo version_info;
40,824✔
726
            ClientReplication& repl = access_realm(); // Throws
40,824✔
727
            integrate_changesets(repl, progress, downloadable_bytes, changesets, version_info,
40,824✔
728
                                 batch_state); // Throws
40,824✔
729
            client_version = version_info.realm_version;
40,824✔
730
        }
40,824✔
UNCOV
731
        else {
×
732
            // Fake it for "dry run" mode
UNCOV
733
            client_version = m_last_version_available + 1;
×
UNCOV
734
        }
×
735
        on_changesets_integrated(client_version, progress); // Throws
40,824✔
736
    }
40,824✔
737
    catch (const IntegrationException& e) {
22,724✔
738
        on_integration_failure(e);
24✔
739
    }
24✔
740
    m_wrapper.on_sync_progress(); // Throws
40,822✔
741
}
40,820✔
742

743

744
void SessionImpl::on_upload_completion()
745
{
14,656✔
746
    // Ignore the call if the session is not active
7,246✔
747
    if (m_state == State::Active) {
14,656✔
748
        m_wrapper.on_upload_completion(); // Throws
14,656✔
749
    }
14,656✔
750
}
14,656✔
751

752

753
void SessionImpl::on_download_completion()
754
{
15,386✔
755
    // Ignore the call if the session is not active
7,594✔
756
    if (m_state == State::Active) {
15,386✔
757
        m_wrapper.on_download_completion(); // Throws
15,386✔
758
    }
15,386✔
759
}
15,386✔
760

761

762
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
763
{
906✔
764
    // Ignore the call if the session is not active
470✔
765
    if (m_state == State::Active) {
906✔
766
        m_wrapper.on_suspended(error_info); // Throws
906✔
767
    }
906✔
768
}
906✔
769

770

771
void SessionImpl::on_resumed()
772
{
382✔
773
    // Ignore the call if the session is not active
210✔
774
    if (m_state == State::Active) {
382✔
775
        m_wrapper.on_resumed(); // Throws
382✔
776
    }
382✔
777
}
382✔
778

779
void SessionImpl::handle_pending_client_reset_acknowledgement()
780
{
292✔
781
    // Ignore the call if the session is not active
146✔
782
    if (m_state == State::Active) {
292✔
783
        m_wrapper.handle_pending_client_reset_acknowledgement();
292✔
784
    }
292✔
785
}
292✔
786

787

788
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
789
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
790
{
42,390✔
791
    // Ignore the call if the session is not active
23,496✔
792
    if (m_state != State::Active) {
42,390✔
793
        return false;
×
794
    }
×
795

23,496✔
796
    if (is_steady_state_download_message(batch_state, query_version)) {
42,390✔
797
        return false;
40,824✔
798
    }
40,824✔
799

784✔
800
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,566✔
801
    util::Optional<SyncProgress> maybe_progress;
1,566✔
802
    if (batch_state == DownloadBatchState::LastInBatch) {
1,566✔
803
        maybe_progress = progress;
1,406✔
804
    }
1,406✔
805

784✔
806
    bool new_batch = false;
1,566✔
807
    try {
1,566✔
808
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,566✔
809
    }
1,566✔
810
    catch (const LogicError& ex) {
784✔
811
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
812
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
813
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
814
                                    ProtocolError::bad_changeset_size);
×
815
            on_integration_failure(ex);
×
816
            return true;
×
817
        }
×
818
        throw;
×
819
    }
×
820

784✔
821
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
784✔
822
    // bootstrapping.
784✔
823
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,566✔
824
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
40✔
825
    }
40✔
826

784✔
827
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,566✔
828
                                       batch_state, received_changesets.size());
1,566✔
829
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,566✔
830
        return true;
12✔
831
    }
12✔
832
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,554✔
833

778✔
834
    if (batch_state == DownloadBatchState::MoreToCome) {
1,554✔
835
        return true;
156✔
836
    }
156✔
837

700✔
838
    try {
1,398✔
839
        process_pending_flx_bootstrap();
1,398✔
840
    }
1,398✔
841
    catch (const IntegrationException& e) {
704✔
842
        on_integration_failure(e);
8✔
843
    }
8✔
844

700✔
845
    return true;
1,398✔
846
}
1,398✔
847

848

849
void SessionImpl::process_pending_flx_bootstrap()
850
{
11,012✔
851
    // Ignore the call if not a flx session or session is not active
5,330✔
852
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,012✔
853
        return;
8,620✔
854
    }
8,620✔
855
    // Should never be called if session is not active
1,196✔
856
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,392✔
857
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,392✔
858
    if (!bootstrap_store->has_pending()) {
2,392✔
859
        return;
976✔
860
    }
976✔
861

708✔
862
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,416✔
863
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,416✔
864
                "changeset size: %3)",
1,416✔
865
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,416✔
866
                pending_batch_stats.pending_changeset_bytes);
1,416✔
867
    auto& history = access_realm().get_history();
1,416✔
868
    VersionInfo new_version;
1,416✔
869
    SyncProgress progress;
1,416✔
870
    int64_t query_version = -1;
1,416✔
871
    size_t changesets_processed = 0;
1,416✔
872

708✔
873
    // Used to commit each batch after it was transformed.
708✔
874
    TransactionRef transact = get_db()->start_write();
1,416✔
875
    while (bootstrap_store->has_pending()) {
2,946✔
876
        auto start_time = std::chrono::steady_clock::now();
1,546✔
877
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,546✔
878
        if (!pending_batch.progress) {
1,546✔
879
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
880
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
881
            // bootstrap store requires a write transaction itself.
4✔
882
            transact->close();
8✔
883
            bootstrap_store->clear();
8✔
884
            return;
8✔
885
        }
8✔
886

770✔
887
        auto batch_state =
1,538✔
888
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,470✔
889
        uint64_t downloadable_bytes = 0;
1,538✔
890
        query_version = pending_batch.query_version;
1,538✔
891
        bool simulate_integration_error =
1,538✔
892
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,538✔
893
        if (simulate_integration_error) {
1,538✔
894
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
8✔
895
        }
8✔
896

766✔
897
        history.integrate_server_changesets(
1,530✔
898
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,530✔
899
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,528✔
900
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,526✔
901
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,526✔
902
            });
1,526✔
903
        progress = *pending_batch.progress;
1,530✔
904
        changesets_processed += pending_batch.changesets.size();
1,530✔
905
        auto duration = std::chrono::steady_clock::now() - start_time;
1,530✔
906

766✔
907
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,530✔
908
                                      batch_state, pending_batch.changesets.size());
1,530✔
909
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,530✔
910

766✔
911
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,530✔
912
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,530✔
913
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,530✔
914
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,530✔
915
                    pending_batch.remaining_changesets);
1,530✔
916
    }
1,530✔
917
    on_changesets_integrated(new_version.realm_version, progress);
1,408✔
918

700✔
919
    REALM_ASSERT_3(query_version, !=, -1);
1,400✔
920
    m_wrapper.on_sync_progress();
1,400✔
921
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,400✔
922

700✔
923
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,400✔
924
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,400✔
925
    // NoAction/EarlyReturn are both valid no-op actions to take here.
700✔
926
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,400✔
927
}
1,400✔
928

929
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
930
{
16✔
931
    // Ignore the call if the session is not active
8✔
932
    if (m_state == State::Active) {
16✔
933
        m_wrapper.on_flx_sync_error(version, err_msg);
16✔
934
    }
16✔
935
}
16✔
936

937
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
938
{
1,434✔
939
    // Ignore the call if the session is not active
718✔
940
    if (m_state == State::Active) {
1,434✔
941
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,434✔
942
    }
1,434✔
943
}
1,434✔
944

945
SubscriptionStore* SessionImpl::get_flx_subscription_store()
946
{
13,330✔
947
    // Should never be called if session is not active
6,862✔
948
    REALM_ASSERT_EX(m_state == State::Active, m_state);
13,330✔
949
    return m_wrapper.get_flx_subscription_store();
13,330✔
950
}
13,330✔
951

952
MigrationStore* SessionImpl::get_migration_store()
953
{
57,222✔
954
    // Should never be called if session is not active
31,132✔
955
    REALM_ASSERT_EX(m_state == State::Active, m_state);
57,222✔
956
    return m_wrapper.get_migration_store();
57,222✔
957
}
57,222✔
958

959
void SessionImpl::on_flx_sync_version_complete(int64_t version)
960
{
104✔
961
    // Ignore the call if the session is not active
52✔
962
    if (m_state == State::Active) {
104✔
963
        m_wrapper.on_flx_sync_version_complete(version);
104✔
964
    }
104✔
965
}
104✔
966

967
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
968
{
846✔
969
    // Should never be called if session is not active
430✔
970
    REALM_ASSERT_EX(m_state == State::Active, m_state);
846✔
971

430✔
972
    // Make sure we don't call the debug hook recursively.
430✔
973
    if (m_wrapper.m_in_debug_hook) {
846✔
974
        return SyncClientHookAction::NoAction;
×
975
    }
×
976
    m_wrapper.m_in_debug_hook = true;
846✔
977
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
846✔
978
        m_wrapper.m_in_debug_hook = false;
846✔
979
    });
846✔
980

430✔
981
    auto action = m_wrapper.m_debug_hook(data);
846✔
982
    switch (action) {
846✔
983
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
✔
984
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
×
985
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
×
986

987
            auto err_processing_err = receive_error_message(err_info);
×
988
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
×
989
            return SyncClientHookAction::EarlyReturn;
×
990
        }
×
991
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
992
            get_connection().voluntary_disconnect();
24✔
993
            return SyncClientHookAction::EarlyReturn;
24✔
994
        }
×
995
        default:
822✔
996
            return action;
822✔
997
    }
846✔
998
}
846✔
999

1000
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1001
                                                  int64_t query_version, DownloadBatchState batch_state,
1002
                                                  size_t num_changesets)
1003
{
87,710✔
1004
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
87,710✔
1005
        return SyncClientHookAction::NoAction;
86,932✔
1006
    }
86,932✔
1007
    if (REALM_UNLIKELY(m_state != State::Active)) {
778✔
1008
        return SyncClientHookAction::NoAction;
×
1009
    }
×
1010

394✔
1011
    SyncClientHookData data;
778✔
1012
    data.event = event;
778✔
1013
    data.batch_state = batch_state;
778✔
1014
    data.progress = progress;
778✔
1015
    data.num_changesets = num_changesets;
778✔
1016
    data.query_version = query_version;
778✔
1017

394✔
1018
    return call_debug_hook(data);
778✔
1019
}
778✔
1020

1021
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1022
{
882✔
1023
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
882✔
1024
        return SyncClientHookAction::NoAction;
812✔
1025
    }
812✔
1026
    if (REALM_UNLIKELY(m_state != State::Active)) {
70✔
1027
        return SyncClientHookAction::NoAction;
×
1028
    }
×
1029

38✔
1030
    SyncClientHookData data;
70✔
1031
    data.event = event;
70✔
1032
    data.batch_state = DownloadBatchState::SteadyState;
70✔
1033
    data.progress = m_progress;
70✔
1034
    data.num_changesets = 0;
70✔
1035
    data.query_version = 0;
70✔
1036
    data.error_info = &error_info;
70✔
1037

38✔
1038
    return call_debug_hook(data);
70✔
1039
}
70✔
1040

1041
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1042
{
84,792✔
1043
    // Should never be called if session is not active
46,998✔
1044
    REALM_ASSERT_EX(m_state == State::Active, m_state);
84,792✔
1045
    if (batch_state == DownloadBatchState::SteadyState) {
84,792✔
1046
        return true;
40,824✔
1047
    }
40,824✔
1048

24,286✔
1049
    if (!m_is_flx_sync_session) {
43,968✔
1050
        return true;
39,796✔
1051
    }
39,796✔
1052

2,106✔
1053
    // If this is a steady state DOWNLOAD, no need for special handling.
2,106✔
1054
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,172✔
1055
        return true;
1,032✔
1056
    }
1,032✔
1057

1,572✔
1058
    return false;
3,140✔
1059
}
3,140✔
1060

1061
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1062
{
52✔
1063
    if (m_state != State::Active) {
52✔
1064
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1065
    }
×
1066

26✔
1067
    try {
52✔
1068
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
52✔
1069
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
52✔
1070
            return Status{ErrorCodes::LogicError,
4✔
1071
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1072
        }
4✔
1073
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
48✔
1074
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1075
        }
×
1076
    }
4✔
1077
    catch (const nlohmann::json::parse_error& e) {
4✔
1078
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1079
    }
4✔
1080

22✔
1081
    auto pf = util::make_promise_future<std::string>();
44✔
1082

22✔
1083
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
44✔
1084
        // Includes operation_aborted
22✔
1085
        if (!status.is_ok())
44✔
1086
            promise.set_error(status);
×
1087

22✔
1088
        auto id = ++m_last_pending_test_command_ident;
44✔
1089
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
44✔
1090
        ensure_enlisted_to_send();
44✔
1091
    });
44✔
1092

22✔
1093
    return std::move(pf.future);
44✔
1094
}
44✔
1095

1096
// ################ SessionWrapper ################
1097

1098
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1099
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1100
// the ClientImpl::Connection that owns the ClientImpl::Session.
1101
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1102
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1103
    : m_client{client}
1104
    , m_db(std::move(db))
1105
    , m_replication(m_db->get_replication())
1106
    , m_protocol_envelope{config.protocol_envelope}
1107
    , m_server_address{std::move(config.server_address)}
1108
    , m_server_port{config.server_port}
1109
    , m_user_id(std::move(config.user_id))
1110
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1111
    , m_authorization_header_name{config.authorization_header_name}
1112
    , m_custom_http_headers{config.custom_http_headers}
1113
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1114
    , m_simulate_integration_error{config.simulate_integration_error}
1115
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1116
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1117
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1118
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1119
    , m_virt_path{std::move(config.realm_identifier)}
1120
    , m_signed_access_token{std::move(config.signed_user_token)}
1121
    , m_client_reset_config{std::move(config.client_reset_config)}
1122
    , m_proxy_config{config.proxy_config} // Throws
1123
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1124
    , m_session_reason(config.session_reason)
1125
    , m_flx_subscription_store(std::move(flx_sub_store))
1126
    , m_migration_store(std::move(migration_store))
1127
{
10,942✔
1128
    REALM_ASSERT(m_db);
10,942✔
1129
    REALM_ASSERT(m_db->get_replication());
10,942✔
1130
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
10,942✔
1131

5,298✔
1132
    if (m_flx_subscription_store) {
10,942✔
1133
        auto versions_info = m_flx_subscription_store->get_version_info();
996✔
1134
        m_flx_active_version = versions_info.active;
996✔
1135
        m_flx_pending_mark_version = versions_info.pending_mark;
996✔
1136
    }
996✔
1137
}
10,942✔
1138

1139
SessionWrapper::~SessionWrapper() noexcept
1140
{
10,942✔
1141
    if (m_db && m_actualized) {
10,942✔
1142
        m_db->remove_commit_listener(this);
164✔
1143
        m_db->release_sync_agent();
164✔
1144
    }
164✔
1145
}
10,942✔
1146

1147

1148
inline ClientReplication& SessionWrapper::get_replication() noexcept
1149
{
108,114✔
1150
    REALM_ASSERT(m_db);
108,114✔
1151
    return static_cast<ClientReplication&>(*m_replication);
108,114✔
1152
}
108,114✔
1153

1154

1155
inline ClientImpl& SessionWrapper::get_client() noexcept
1156
{
72✔
1157
    return m_client;
72✔
1158
}
72✔
1159

1160
bool SessionWrapper::has_flx_subscription_store() const
1161
{
1,434✔
1162
    return static_cast<bool>(m_flx_subscription_store);
1,434✔
1163
}
1,434✔
1164

1165
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1166
{
16✔
1167
    REALM_ASSERT(!m_finalized);
16✔
1168
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(version);
16✔
1169
    mut_subs.update_state(SubscriptionSet::State::Error, err_msg);
16✔
1170
    mut_subs.commit();
16✔
1171
}
16✔
1172

1173
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1174
{
1,498✔
1175
    REALM_ASSERT(!m_finalized);
1,498✔
1176
    m_flx_last_seen_version = version;
1,498✔
1177
    m_flx_active_version = version;
1,498✔
1178
}
1,498✔
1179

1180
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1181
{
1,434✔
1182
    if (!has_flx_subscription_store()) {
1,434✔
1183
        return;
×
1184
    }
×
1185
    REALM_ASSERT(!m_finalized);
1,434✔
1186
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,434✔
1187
    REALM_ASSERT(new_version >= m_flx_active_version);
1,434✔
1188
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,434✔
1189

718✔
1190
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,434✔
1191

718✔
1192
    switch (batch_state) {
1,434✔
1193
        case DownloadBatchState::SteadyState:
✔
1194
            // Cannot be called with this value.
1195
            REALM_UNREACHABLE();
×
1196
        case DownloadBatchState::LastInBatch:
1,394✔
1197
            if (m_flx_active_version == new_version) {
1,394✔
1198
                return;
×
1199
            }
×
1200
            on_flx_sync_version_complete(new_version);
1,394✔
1201
            if (new_version == 0) {
1,394✔
1202
                new_state = SubscriptionSet::State::Complete;
632✔
1203
            }
632✔
1204
            else {
762✔
1205
                new_state = SubscriptionSet::State::AwaitingMark;
762✔
1206
                m_flx_pending_mark_version = new_version;
762✔
1207
            }
762✔
1208
            break;
1,394✔
1209
        case DownloadBatchState::MoreToCome:
718✔
1210
            if (m_flx_last_seen_version == new_version) {
40✔
1211
                return;
×
1212
            }
×
1213

20✔
1214
            m_flx_last_seen_version = new_version;
40✔
1215
            new_state = SubscriptionSet::State::Bootstrapping;
40✔
1216
            break;
40✔
1217
    }
1,434✔
1218

718✔
1219
    auto mut_subs = get_flx_subscription_store()->get_mutable_by_version(new_version);
1,434✔
1220
    mut_subs.update_state(new_state);
1,434✔
1221
    mut_subs.commit();
1,434✔
1222
}
1,434✔
1223

1224
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1225
{
14,780✔
1226
    REALM_ASSERT(!m_finalized);
14,780✔
1227
    return m_flx_subscription_store.get();
14,780✔
1228
}
14,780✔
1229

1230
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1231
{
3,956✔
1232
    REALM_ASSERT(!m_finalized);
3,956✔
1233
    return m_flx_pending_bootstrap_store.get();
3,956✔
1234
}
3,956✔
1235

1236
MigrationStore* SessionWrapper::get_migration_store()
1237
{
57,222✔
1238
    REALM_ASSERT(!m_finalized);
57,222✔
1239
    return m_migration_store.get();
57,222✔
1240
}
57,222✔
1241

1242
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1243
{
3,410✔
1244
    REALM_ASSERT(!m_initiated);
3,410✔
1245
    m_progress_handler = std::move(handler);
3,410✔
1246
}
3,410✔
1247

1248

1249
inline void
1250
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1251
{
11,038✔
1252
    REALM_ASSERT(!m_initiated);
11,038✔
1253
    m_connection_state_change_listener = std::move(listener);
11,038✔
1254
}
11,038✔
1255

1256

1257
void SessionWrapper::initiate()
1258
{
9,790✔
1259
    REALM_ASSERT(!m_initiated);
9,790✔
1260
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
9,790✔
1261
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
9,790✔
1262
    m_initiated = true;
9,790✔
1263
    m_db->add_commit_listener(this);
9,790✔
1264
}
9,790✔
1265

1266

1267
void SessionWrapper::on_commit(version_type new_version)
1268
{
103,158✔
1269
    // Thread safety required
53,042✔
1270
    REALM_ASSERT(m_initiated);
103,158✔
1271

53,042✔
1272
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
103,158✔
1273
        return;
4✔
1274
    }
4✔
1275

53,040✔
1276
    util::bind_ptr<SessionWrapper> self{this};
103,154✔
1277
    m_client.post([self = std::move(self), new_version](Status status) {
103,174✔
1278
        if (status == ErrorCodes::OperationAborted)
103,174✔
1279
            return;
×
1280
        else if (!status.is_ok())
103,174✔
1281
            throw Exception(status);
×
1282

53,040✔
1283
        REALM_ASSERT(self->m_actualized);
103,174✔
1284
        if (REALM_UNLIKELY(!self->m_sess))
103,174✔
1285
            return; // Already finalized
53,560✔
1286
        SessionImpl& sess = *self->m_sess;
102,260✔
1287
        sess.recognize_sync_version(new_version); // Throws
102,260✔
1288
        bool only_if_new_uploadable_data = true;
102,260✔
1289
        self->report_progress(only_if_new_uploadable_data); // Throws
102,260✔
1290
    });
102,260✔
1291
}
103,154✔
1292

1293

1294
void SessionWrapper::cancel_reconnect_delay()
1295
{
12✔
1296
    // Thread safety required
6✔
1297
    REALM_ASSERT(m_initiated);
12✔
1298

6✔
1299
    if (REALM_UNLIKELY(m_finalized || m_force_closed)) {
12✔
1300
        return;
×
1301
    }
×
1302

6✔
1303
    util::bind_ptr<SessionWrapper> self{this};
12✔
1304
    m_client.post([self = std::move(self)](Status status) {
12✔
1305
        if (status == ErrorCodes::OperationAborted)
12✔
1306
            return;
×
1307
        else if (!status.is_ok())
12✔
1308
            throw Exception(status);
×
1309

6✔
1310
        REALM_ASSERT(self->m_actualized);
12✔
1311
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1312
            return; // Already finalized
6✔
1313
        SessionImpl& sess = *self->m_sess;
12✔
1314
        sess.cancel_resumption_delay(); // Throws
12✔
1315
        ClientImpl::Connection& conn = sess.get_connection();
12✔
1316
        conn.cancel_reconnect_delay(); // Throws
12✔
1317
    });                                // Throws
12✔
1318
}
12✔
1319

1320
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1321
                                    WaitOperCompletionHandler handler)
1322
{
4,334✔
1323
    REALM_ASSERT(upload_completion || download_completion);
4,334✔
1324
    REALM_ASSERT(m_initiated);
4,334✔
1325
    REALM_ASSERT(!m_finalized);
4,334✔
1326

2,080✔
1327
    util::bind_ptr<SessionWrapper> self{this};
4,334✔
1328
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,334✔
1329
                   download_completion](Status status) mutable {
4,334✔
1330
        if (status == ErrorCodes::OperationAborted)
4,334✔
1331
            return;
×
1332
        else if (!status.is_ok())
4,334✔
1333
            throw Exception(status);
×
1334

2,080✔
1335
        REALM_ASSERT(self->m_actualized);
4,334✔
1336
        if (REALM_UNLIKELY(!self->m_sess)) {
4,334✔
1337
            // Already finalized
38✔
1338
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
72✔
1339
            return;
72✔
1340
        }
72✔
1341
        if (upload_completion) {
4,262✔
1342
            if (download_completion) {
2,266✔
1343
                // Wait for upload and download completion
138✔
1344
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
276✔
1345
            }
276✔
1346
            else {
1,990✔
1347
                // Wait for upload completion only
906✔
1348
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
1,990✔
1349
            }
1,990✔
1350
        }
2,266✔
1351
        else {
1,996✔
1352
            // Wait for download completion only
998✔
1353
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
1,996✔
1354
        }
1,996✔
1355
        SessionImpl& sess = *self->m_sess;
4,262✔
1356
        if (upload_completion)
4,262✔
1357
            sess.request_upload_completion_notification(); // Throws
2,266✔
1358
        if (download_completion)
4,262✔
1359
            sess.request_download_completion_notification(); // Throws
2,272✔
1360
    });                                                      // Throws
4,262✔
1361
}
4,334✔
1362

1363

1364
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1365
{
12,996✔
1366
    // Thread safety required
6,498✔
1367
    REALM_ASSERT(m_initiated);
12,996✔
1368
    REALM_ASSERT(!m_finalized);
12,996✔
1369

6,498✔
1370
    std::int_fast64_t target_mark;
12,996✔
1371
    {
12,996✔
1372
        std::lock_guard lock{m_client.m_mutex};
12,996✔
1373
        target_mark = ++m_target_upload_mark;
12,996✔
1374
    }
12,996✔
1375

6,498✔
1376
    util::bind_ptr<SessionWrapper> self{this};
12,996✔
1377
    m_client.post([self = std::move(self), target_mark](Status status) {
12,996✔
1378
        if (status == ErrorCodes::OperationAborted)
12,996✔
1379
            return;
×
1380
        else if (!status.is_ok())
12,996✔
1381
            throw Exception(status);
×
1382

6,498✔
1383
        REALM_ASSERT(self->m_actualized);
12,996✔
1384
        // The session wrapper may already have been finalized. This can only
6,498✔
1385
        // happen if it was abandoned, but in that case, the call of
6,498✔
1386
        // wait_for_upload_complete_or_client_stopped() must have returned
6,498✔
1387
        // already.
6,498✔
1388
        if (REALM_UNLIKELY(!self->m_sess))
12,996✔
1389
            return;
6,508✔
1390
        if (target_mark > self->m_staged_upload_mark) {
12,984✔
1391
            self->m_staged_upload_mark = target_mark;
12,984✔
1392
            SessionImpl& sess = *self->m_sess;
12,984✔
1393
            sess.request_upload_completion_notification(); // Throws
12,984✔
1394
        }
12,984✔
1395
    }); // Throws
12,984✔
1396

6,498✔
1397
    bool completion_condition_was_satisfied;
12,996✔
1398
    {
12,996✔
1399
        std::unique_lock lock{m_client.m_mutex};
12,996✔
1400
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
33,116✔
1401
            m_client.m_wait_or_client_stopped_cond.wait(lock);
20,120✔
1402
        completion_condition_was_satisfied = !m_client.m_stopped;
12,996✔
1403
    }
12,996✔
1404
    return completion_condition_was_satisfied;
12,996✔
1405
}
12,996✔
1406

1407

1408
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1409
{
10,092✔
1410
    // Thread safety required
5,048✔
1411
    REALM_ASSERT(m_initiated);
10,092✔
1412
    REALM_ASSERT(!m_finalized);
10,092✔
1413

5,048✔
1414
    std::int_fast64_t target_mark;
10,092✔
1415
    {
10,092✔
1416
        std::lock_guard lock{m_client.m_mutex};
10,092✔
1417
        target_mark = ++m_target_download_mark;
10,092✔
1418
    }
10,092✔
1419

5,048✔
1420
    util::bind_ptr<SessionWrapper> self{this};
10,092✔
1421
    m_client.post([self = std::move(self), target_mark](Status status) {
10,092✔
1422
        if (status == ErrorCodes::OperationAborted)
10,092✔
1423
            return;
×
1424
        else if (!status.is_ok())
10,092✔
1425
            throw Exception(status);
×
1426

5,048✔
1427
        REALM_ASSERT(self->m_actualized);
10,092✔
1428
        // The session wrapper may already have been finalized. This can only
5,048✔
1429
        // happen if it was abandoned, but in that case, the call of
5,048✔
1430
        // wait_for_download_complete_or_client_stopped() must have returned
5,048✔
1431
        // already.
5,048✔
1432
        if (REALM_UNLIKELY(!self->m_sess))
10,092✔
1433
            return;
5,078✔
1434
        if (target_mark > self->m_staged_download_mark) {
10,032✔
1435
            self->m_staged_download_mark = target_mark;
10,032✔
1436
            SessionImpl& sess = *self->m_sess;
10,032✔
1437
            sess.request_download_completion_notification(); // Throws
10,032✔
1438
        }
10,032✔
1439
    }); // Throws
10,032✔
1440

5,048✔
1441
    bool completion_condition_was_satisfied;
10,092✔
1442
    {
10,092✔
1443
        std::unique_lock lock{m_client.m_mutex};
10,092✔
1444
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,584✔
1445
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,492✔
1446
        completion_condition_was_satisfied = !m_client.m_stopped;
10,092✔
1447
    }
10,092✔
1448
    return completion_condition_was_satisfied;
10,092✔
1449
}
10,092✔
1450

1451

1452
void SessionWrapper::refresh(std::string signed_access_token)
1453
{
208✔
1454
    // Thread safety required
104✔
1455
    REALM_ASSERT(m_initiated);
208✔
1456
    REALM_ASSERT(!m_finalized);
208✔
1457

104✔
1458
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1459
        if (status == ErrorCodes::OperationAborted)
208✔
1460
            return;
×
1461
        else if (!status.is_ok())
208✔
1462
            throw Exception(status);
×
1463

104✔
1464
        REALM_ASSERT(self->m_actualized);
208✔
1465
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1466
            return; // Already finalized
104✔
1467
        self->m_signed_access_token = std::move(token);
208✔
1468
        SessionImpl& sess = *self->m_sess;
208✔
1469
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1470
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1471
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1472
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1473
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1474
    });
208✔
1475
}
208✔
1476

1477

1478
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1479
{
10,940✔
1480
    if (wrapper->m_initiated) {
10,940✔
1481
        ClientImpl& client = wrapper->m_client;
9,788✔
1482
        client.register_abandoned_session_wrapper(std::move(wrapper));
9,788✔
1483
    }
9,788✔
1484
}
10,940✔
1485

1486

1487
// Must be called from event loop thread
1488
void SessionWrapper::actualize(ServerEndpoint endpoint)
1489
{
9,622✔
1490
    REALM_ASSERT(!m_actualized);
9,622✔
1491
    REALM_ASSERT(!m_sess);
9,622✔
1492
    // Cannot be actualized if it's already been finalized or force closed
4,634✔
1493
    REALM_ASSERT(!m_finalized);
9,622✔
1494
    REALM_ASSERT(!m_force_closed);
9,622✔
1495
    try {
9,622✔
1496
        m_db->claim_sync_agent();
9,622✔
1497
    }
9,622✔
1498
    catch (const MultipleSyncAgents&) {
4,636✔
1499
        finalize_before_actualization();
4✔
1500
        throw;
4✔
1501
    }
4✔
1502
    auto sync_mode = endpoint.server_mode;
9,618✔
1503

4,632✔
1504
    bool was_created = false;
9,618✔
1505
    ClientImpl::Connection& conn = m_client.get_connection(
9,618✔
1506
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
9,618✔
1507
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
9,618✔
1508
        was_created); // Throws
9,618✔
1509
    try {
9,618✔
1510
        // FIXME: This only makes sense when each session uses a separate connection.
4,632✔
1511
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
9,618✔
1512
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
9,618✔
1513
        if (sync_mode == SyncServerMode::FLX) {
9,618✔
1514
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
992✔
1515
        }
992✔
1516

4,632✔
1517
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
9,618✔
1518
        m_sess = sess.get();
9,618✔
1519
        conn.activate_session(std::move(sess)); // Throws
9,618✔
1520
    }
9,618✔
1521
    catch (...) {
4,632✔
1522
        if (was_created)
×
1523
            m_client.remove_connection(conn);
×
1524

1525
        finalize_before_actualization();
×
1526
        throw;
×
1527
    }
×
1528

4,630✔
1529
    m_actualized = true;
9,614✔
1530
    if (was_created)
9,614✔
1531
        conn.activate(); // Throws
2,456✔
1532

4,630✔
1533
    if (m_connection_state_change_listener) {
9,614✔
1534
        ConnectionState state = conn.get_state();
9,596✔
1535
        if (state != ConnectionState::disconnected) {
9,596✔
1536
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,016✔
1537
            if (state == ConnectionState::connected)
7,016✔
1538
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
6,742✔
1539
        }
7,016✔
1540
    }
9,596✔
1541

4,630✔
1542
    if (!m_client_reset_config)
9,614✔
1543
        report_progress(); // Throws
9,276✔
1544
}
9,614✔
1545

1546
void SessionWrapper::force_close()
1547
{
150✔
1548
    if (m_force_closed || m_finalized) {
150✔
1549
        return;
×
1550
    }
×
1551
    REALM_ASSERT(m_actualized);
150✔
1552
    REALM_ASSERT(m_sess);
150✔
1553
    m_force_closed = true;
150✔
1554

74✔
1555
    ClientImpl::Connection& conn = m_sess->get_connection();
150✔
1556
    conn.initiate_session_deactivation(m_sess); // Throws
150✔
1557

74✔
1558
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
74✔
1559
    m_flx_pending_bootstrap_store.reset();
150✔
1560
    // Clear the subscription and migration store refs since they are owned by SyncSession
74✔
1561
    m_flx_subscription_store.reset();
150✔
1562
    m_migration_store.reset();
150✔
1563
    m_sess = nullptr;
150✔
1564
    // Everything is being torn down, no need to report connection state anymore
74✔
1565
    m_connection_state_change_listener = {};
150✔
1566
}
150✔
1567

1568
// Must be called from event loop thread
1569
void SessionWrapper::finalize()
1570
{
9,622✔
1571
    REALM_ASSERT(m_actualized);
9,622✔
1572

4,634✔
1573
    // Already finalized?
4,634✔
1574
    if (m_finalized) {
9,622✔
1575
        return;
×
1576
    }
×
1577

4,634✔
1578
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,634✔
1579
    m_db->remove_commit_listener(this);
9,622✔
1580

4,634✔
1581
    m_finalized = true;
9,622✔
1582

4,634✔
1583
    if (!m_force_closed) {
9,622✔
1584
        REALM_ASSERT(m_sess);
9,462✔
1585
        ClientImpl::Connection& conn = m_sess->get_connection();
9,462✔
1586
        conn.initiate_session_deactivation(m_sess); // Throws
9,462✔
1587

4,556✔
1588
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,556✔
1589
        m_flx_pending_bootstrap_store.reset();
9,462✔
1590
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,556✔
1591
        m_flx_subscription_store.reset();
9,462✔
1592
        m_migration_store.reset();
9,462✔
1593
        m_sess = nullptr;
9,462✔
1594
    }
9,462✔
1595

4,634✔
1596
    // The Realm file can be closed now, as no access to the Realm file is
4,634✔
1597
    // supposed to happen on behalf of a session after initiation of
4,634✔
1598
    // deactivation.
4,634✔
1599
    m_db->release_sync_agent();
9,622✔
1600
    m_db = nullptr;
9,622✔
1601

4,634✔
1602
    // All outstanding wait operations must be canceled
4,634✔
1603
    while (!m_upload_completion_handlers.empty()) {
9,994✔
1604
        auto handler = std::move(m_upload_completion_handlers.back());
372✔
1605
        m_upload_completion_handlers.pop_back();
372✔
1606
        handler(
372✔
1607
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
372✔
1608
    }
372✔
1609
    while (!m_download_completion_handlers.empty()) {
9,782✔
1610
        auto handler = std::move(m_download_completion_handlers.back());
160✔
1611
        m_download_completion_handlers.pop_back();
160✔
1612
        handler(
160✔
1613
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
160✔
1614
    }
160✔
1615
    while (!m_sync_completion_handlers.empty()) {
9,634✔
1616
        auto handler = std::move(m_sync_completion_handlers.back());
12✔
1617
        m_sync_completion_handlers.pop_back();
12✔
1618
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
12✔
1619
    }
12✔
1620
}
9,622✔
1621

1622

1623
// Must be called only when an unactualized session wrapper becomes abandoned.
1624
//
1625
// Called with a lock on `m_client.m_mutex`.
1626
inline void SessionWrapper::finalize_before_actualization() noexcept
1627
{
172✔
1628
    REALM_ASSERT(!m_sess);
172✔
1629
    m_actualized = true;
172✔
1630
    m_force_closed = true;
172✔
1631
}
172✔
1632

1633

1634
inline void SessionWrapper::on_sync_progress()
1635
{
42,216✔
1636
    REALM_ASSERT(!m_finalized);
42,216✔
1637
    m_reliable_download_progress = true;
42,216✔
1638
    report_progress(); // Throws
42,216✔
1639
}
42,216✔
1640

1641

1642
void SessionWrapper::on_upload_completion()
1643
{
14,656✔
1644
    REALM_ASSERT(!m_finalized);
14,656✔
1645
    while (!m_upload_completion_handlers.empty()) {
16,358✔
1646
        auto handler = std::move(m_upload_completion_handlers.back());
1,702✔
1647
        m_upload_completion_handlers.pop_back();
1,702✔
1648
        handler(Status::OK()); // Throws
1,702✔
1649
    }
1,702✔
1650
    while (!m_sync_completion_handlers.empty()) {
14,836✔
1651
        auto handler = std::move(m_sync_completion_handlers.back());
180✔
1652
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
180✔
1653
        m_sync_completion_handlers.pop_back();
180✔
1654
    }
180✔
1655
    std::lock_guard lock{m_client.m_mutex};
14,656✔
1656
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,656✔
1657
        m_reached_upload_mark = m_staged_upload_mark;
12,958✔
1658
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,958✔
1659
    }
12,958✔
1660
}
14,656✔
1661

1662

1663
void SessionWrapper::on_download_completion()
1664
{
15,386✔
1665
    while (!m_download_completion_handlers.empty()) {
17,402✔
1666
        auto handler = std::move(m_download_completion_handlers.back());
2,016✔
1667
        m_download_completion_handlers.pop_back();
2,016✔
1668
        handler(Status::OK()); // Throws
2,016✔
1669
    }
2,016✔
1670
    while (!m_sync_completion_handlers.empty()) {
15,470✔
1671
        auto handler = std::move(m_sync_completion_handlers.back());
84✔
1672
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
84✔
1673
        m_sync_completion_handlers.pop_back();
84✔
1674
    }
84✔
1675

7,594✔
1676
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
15,386✔
1677
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
652✔
1678
                             m_flx_pending_mark_version);
652✔
1679
        auto mutable_subs = m_flx_subscription_store->get_mutable_by_version(m_flx_pending_mark_version);
652✔
1680
        mutable_subs.update_state(SubscriptionSet::State::Complete);
652✔
1681
        mutable_subs.commit();
652✔
1682
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
652✔
1683
    }
652✔
1684

7,594✔
1685
    std::lock_guard lock{m_client.m_mutex};
15,386✔
1686
    if (m_staged_download_mark > m_reached_download_mark) {
15,386✔
1687
        m_reached_download_mark = m_staged_download_mark;
9,962✔
1688
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,962✔
1689
    }
9,962✔
1690
}
15,386✔
1691

1692

1693
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1694
{
906✔
1695
    REALM_ASSERT(!m_finalized);
906✔
1696
    m_suspended = true;
906✔
1697
    if (m_connection_state_change_listener) {
906✔
1698
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
906✔
1699
    }
906✔
1700
}
906✔
1701

1702

1703
void SessionWrapper::on_resumed()
1704
{
382✔
1705
    REALM_ASSERT(!m_finalized);
382✔
1706
    m_suspended = false;
382✔
1707
    if (m_connection_state_change_listener) {
382✔
1708
        ClientImpl::Connection& conn = m_sess->get_connection();
382✔
1709
        if (conn.get_state() != ConnectionState::disconnected) {
382✔
1710
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
382✔
1711
            if (conn.get_state() == ConnectionState::connected)
382✔
1712
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
382✔
1713
        }
382✔
1714
    }
382✔
1715
}
382✔
1716

1717

1718
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1719
                                                 const util::Optional<SessionErrorInfo>& error_info)
1720
{
10,664✔
1721
    if (m_connection_state_change_listener) {
10,664✔
1722
        if (!m_suspended)
10,652✔
1723
            m_connection_state_change_listener(state, error_info); // Throws
10,652✔
1724
    }
10,652✔
1725
}
10,664✔
1726

1727

1728
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
1729
{
153,748✔
1730
    REALM_ASSERT(!m_finalized);
153,748✔
1731
    REALM_ASSERT(m_sess);
153,748✔
1732

80,516✔
1733
    if (!m_progress_handler)
153,748✔
1734
        return;
109,414✔
1735

20,374✔
1736
    std::uint_fast64_t downloaded_bytes = 0;
44,334✔
1737
    std::uint_fast64_t downloadable_bytes = 0;
44,334✔
1738
    std::uint_fast64_t uploaded_bytes = 0;
44,334✔
1739
    std::uint_fast64_t uploadable_bytes = 0;
44,334✔
1740
    std::uint_fast64_t snapshot_version = 0;
44,334✔
1741
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
44,334✔
1742
                                             uploadable_bytes, snapshot_version);
44,334✔
1743

20,374✔
1744
    // If this progress notification was triggered by a commit being made we
20,374✔
1745
    // only want to send it if the uploadable bytes has actually increased,
20,374✔
1746
    // and not if it was an empty commit.
20,374✔
1747
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
44,334✔
1748
        return;
30,884✔
1749
    m_last_reported_uploadable_bytes = uploadable_bytes;
13,450✔
1750

5,678✔
1751
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
5,678✔
1752
    // is only the remaining to download. This is confusing, so make them use
5,678✔
1753
    // the same units.
5,678✔
1754
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
13,450✔
1755

5,678✔
1756
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
13,450✔
1757
                         "downloadable(total) = %2, uploaded = %3, "
13,450✔
1758
                         "uploadable = %4, reliable_download_progress = %5, "
13,450✔
1759
                         "snapshot version = %6",
13,450✔
1760
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
13,450✔
1761
                         m_reliable_download_progress, snapshot_version);
13,450✔
1762

5,678✔
1763
    // FIXME: Why is this boolean status communicated to the application as
5,678✔
1764
    // a 64-bit integer? Also, the name `progress_version` is confusing.
5,678✔
1765
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
9,960✔
1766
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
13,450✔
1767
                       snapshot_version);
13,450✔
1768
}
13,450✔
1769

1770
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1771
{
52✔
1772
    if (!m_sess) {
52✔
1773
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1774
    }
×
1775

26✔
1776
    return m_sess->send_test_command(std::move(body));
52✔
1777
}
52✔
1778

1779
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1780
{
292✔
1781
    REALM_ASSERT(!m_finalized);
292✔
1782

146✔
1783
    auto pending_reset = [&] {
292✔
1784
        auto ft = m_db->start_frozen();
292✔
1785
        return _impl::client_reset::has_pending_reset(ft);
292✔
1786
    }();
292✔
1787
    REALM_ASSERT(pending_reset);
292✔
1788
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
292✔
1789
                        pending_reset->time);
292✔
1790
    util::bind_ptr<SessionWrapper> self(this);
292✔
1791
    async_wait_for(true, true, [self = std::move(self), pending_reset = *pending_reset](Status status) {
292✔
1792
        if (status == ErrorCodes::OperationAborted) {
292✔
1793
            return;
152✔
1794
        }
152✔
1795
        auto& logger = self->m_sess->logger;
140✔
1796
        if (!status.is_ok()) {
140✔
1797
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1798
            return;
×
1799
        }
×
1800

70✔
1801
        auto wt = self->m_db->start_write();
140✔
1802
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(wt);
140✔
1803
        if (!cur_pending_reset) {
140✔
1804
            logger.debug(
×
1805
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1806
                pending_reset.type, pending_reset.time);
×
1807
            return;
×
1808
        }
×
1809
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
140✔
1810
            logger.debug(
×
1811
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1812
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1813
        }
×
1814
        else {
140✔
1815
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
140✔
1816
                         "Removing cycle detection tracker.",
140✔
1817
                         pending_reset.type, pending_reset.time);
140✔
1818
        }
140✔
1819
        _impl::client_reset::remove_pending_client_resets(wt);
140✔
1820
        wt->commit();
140✔
1821
    });
140✔
1822
}
292✔
1823

1824
std::string SessionWrapper::get_appservices_connection_id()
1825
{
72✔
1826
    auto pf = util::make_promise_future<std::string>();
72✔
1827
    REALM_ASSERT(m_initiated);
72✔
1828

36✔
1829
    util::bind_ptr<SessionWrapper> self(this);
72✔
1830
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1831
        if (!status.is_ok()) {
72✔
1832
            promise.set_error(status);
×
1833
            return;
×
1834
        }
×
1835

36✔
1836
        if (!self->m_sess) {
72✔
1837
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1838
            return;
×
1839
        }
×
1840

36✔
1841
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1842
    });
72✔
1843

36✔
1844
    return pf.future.get();
72✔
1845
}
72✔
1846

1847
// ################ ClientImpl::Connection ################
1848

1849
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1850
                                   const std::string& authorization_header_name,
1851
                                   const std::map<std::string, std::string>& custom_http_headers,
1852
                                   bool verify_servers_ssl_certificate,
1853
                                   Optional<std::string> ssl_trust_certificate_path,
1854
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1855
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1856
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), client.logger_ptr)} // Throws
1857
    , logger{*logger_ptr}
1858
    , m_client{client}
1859
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1860
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1861
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1862
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1863
    , m_reconnect_info{reconnect_info}
1864
    , m_session_history{}
1865
    , m_ident{ident}
1866
    , m_server_endpoint{std::move(endpoint)}
1867
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1868
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1869
{
2,458✔
1870
    m_on_idle = m_client.create_trigger([this](Status status) {
2,462✔
1871
        if (status == ErrorCodes::OperationAborted)
2,462✔
1872
            return;
×
1873
        else if (!status.is_ok())
2,462✔
1874
            throw Exception(status);
×
1875

1,162✔
1876
        REALM_ASSERT(m_activated);
2,462✔
1877
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,462✔
1878
            on_idle(); // Throws
2,456✔
1879
            // Connection object may be destroyed now.
1,160✔
1880
        }
2,456✔
1881
    });
2,462✔
1882
}
2,458✔
1883

1884
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1885
{
12✔
1886
    return m_ident;
12✔
1887
}
12✔
1888

1889

1890
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1891
{
2,456✔
1892
    return m_server_endpoint;
2,456✔
1893
}
2,456✔
1894

1895
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1896
                                                        const std::string& signed_access_token)
1897
{
9,826✔
1898
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
9,826✔
1899
    m_signed_access_token = signed_access_token;           // Throws (copy)
9,826✔
1900
}
9,826✔
1901

1902

1903
void ClientImpl::Connection::resume_active_sessions()
1904
{
1,776✔
1905
    auto handler = [=](ClientImpl::Session& sess) {
3,548✔
1906
        sess.cancel_resumption_delay(); // Throws
3,548✔
1907
    };
3,548✔
1908
    for_each_active_session(std::move(handler)); // Throws
1,776✔
1909
}
1,776✔
1910

1911
void ClientImpl::Connection::on_idle()
1912
{
2,456✔
1913
    logger.debug("Destroying connection object");
2,456✔
1914
    ClientImpl& client = get_client();
2,456✔
1915
    client.remove_connection(*this);
2,456✔
1916
    // NOTE: This connection object is now destroyed!
1,160✔
1917
}
2,456✔
1918

1919

1920
std::string ClientImpl::Connection::get_http_request_path() const
1921
{
3,320✔
1922
    using namespace std::string_view_literals;
3,320✔
1923
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
3,318✔
1924

1,690✔
1925
    std::string path;
3,320✔
1926
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,320✔
1927
    path += m_http_request_path_prefix;
3,320✔
1928
    path += param;
3,320✔
1929
    path += m_signed_access_token;
3,320✔
1930

1,690✔
1931
    return path;
3,320✔
1932
}
3,320✔
1933

1934

1935
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
1936
{
2,458✔
1937
    std::ostringstream out;
2,458✔
1938
    out.imbue(std::locale::classic());
2,458✔
1939
    out << "Connection[" << ident << "]: "; // Throws
2,458✔
1940
    return out.str();                       // Throws
2,458✔
1941
}
2,458✔
1942

1943

1944
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1945
                                                            util::Optional<SessionErrorInfo> error_info)
1946
{
9,844✔
1947
    if (m_force_closed) {
9,844✔
1948
        return;
2,156✔
1949
    }
2,156✔
1950
    auto handler = [=](ClientImpl::Session& sess) {
10,590✔
1951
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,590✔
1952
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,590✔
1953
    };
10,590✔
1954
    for_each_active_session(std::move(handler)); // Throws
7,688✔
1955
}
7,688✔
1956

1957

1958
Client::Client(Config config)
1959
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1960
{
8,974✔
1961
}
8,974✔
1962

1963

1964
Client::Client(Client&& client) noexcept
1965
    : m_impl{std::move(client.m_impl)}
1966
{
×
1967
}
×
1968

1969

1970
Client::~Client() noexcept {}
8,974✔
1971

1972

1973
void Client::shutdown() noexcept
1974
{
9,052✔
1975
    m_impl->shutdown();
9,052✔
1976
}
9,052✔
1977

1978
void Client::shutdown_and_wait()
1979
{
760✔
1980
    m_impl->shutdown_and_wait();
760✔
1981
}
760✔
1982

1983
void Client::cancel_reconnect_delay()
1984
{
1,780✔
1985
    m_impl->cancel_reconnect_delay();
1,780✔
1986
}
1,780✔
1987

1988
void Client::voluntary_disconnect_all_connections()
1989
{
12✔
1990
    m_impl->voluntary_disconnect_all_connections();
12✔
1991
}
12✔
1992

1993
bool Client::wait_for_session_terminations_or_client_stopped()
1994
{
368✔
1995
    return m_impl.get()->wait_for_session_terminations_or_client_stopped();
368✔
1996
}
368✔
1997

1998

1999
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2000
                                  port_type& port, std::string& path) const
2001
{
3,334✔
2002
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,334✔
2003
}
3,334✔
2004

2005

2006
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2007
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2008
{
10,942✔
2009
    util::bind_ptr<SessionWrapper> sess;
10,942✔
2010
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
10,942✔
2011
                                  std::move(config)}); // Throws
10,942✔
2012
    // The reference count passed back to the application is implicitly
5,298✔
2013
    // owned by a naked pointer. This is done to avoid exposing
5,298✔
2014
    // implementation details through the header file (that is, through the
5,298✔
2015
    // Session object).
5,298✔
2016
    m_impl = sess.release();
10,942✔
2017
}
10,942✔
2018

2019

2020
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2021
{
3,410✔
2022
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,410✔
2023
}
3,410✔
2024

2025

2026
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2027
{
11,038✔
2028
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,038✔
2029
}
11,038✔
2030

2031

2032
void Session::bind()
2033
{
9,790✔
2034
    m_impl->initiate(); // Throws
9,790✔
2035
}
9,790✔
2036

2037

2038
void Session::nonsync_transact_notify(version_type new_version)
2039
{
14,798✔
2040
    m_impl->on_commit(new_version); // Throws
14,798✔
2041
}
14,798✔
2042

2043

2044
void Session::cancel_reconnect_delay()
2045
{
12✔
2046
    m_impl->cancel_reconnect_delay(); // Throws
12✔
2047
}
12✔
2048

2049

2050
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2051
{
4,042✔
2052
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,042✔
2053
}
4,042✔
2054

2055

2056
bool Session::wait_for_upload_complete_or_client_stopped()
2057
{
12,996✔
2058
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,996✔
2059
}
12,996✔
2060

2061

2062
bool Session::wait_for_download_complete_or_client_stopped()
2063
{
10,092✔
2064
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,092✔
2065
}
10,092✔
2066

2067

2068
void Session::refresh(const std::string& signed_access_token)
2069
{
208✔
2070
    m_impl->refresh(signed_access_token); // Throws
208✔
2071
}
208✔
2072

2073

2074
void Session::abandon() noexcept
2075
{
10,940✔
2076
    REALM_ASSERT(m_impl);
10,940✔
2077
    // Reabsorb the ownership assigned to the applications naked pointer by
5,296✔
2078
    // Session constructor
5,296✔
2079
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
10,940✔
2080
    SessionWrapper::abandon(std::move(wrapper));
10,940✔
2081
}
10,940✔
2082

2083
util::Future<std::string> Session::send_test_command(std::string body)
2084
{
52✔
2085
    return m_impl->send_test_command(std::move(body));
52✔
2086
}
52✔
2087

2088
std::string Session::get_appservices_connection_id()
2089
{
72✔
2090
    return m_impl->get_appservices_connection_id();
72✔
2091
}
72✔
2092

2093
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2094
{
×
2095
    switch (proxyType) {
×
2096
        case ProxyConfig::Type::HTTP:
×
2097
            return os << "HTTP";
×
2098
        case ProxyConfig::Type::HTTPS:
×
2099
            return os << "HTTPS";
×
2100
    }
×
2101
    REALM_TERMINATE("Invalid Proxy Type object.");
×
2102
}
×
2103

2104
} // namespace sync
2105
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc