• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / michael.wilkersonbarker_976

05 Mar 2024 06:32PM UTC coverage: 90.896% (-0.04%) from 90.936%
michael.wilkersonbarker_976

Pull #7416

Evergreen

michael-wb
Added thread-safe comment to DeadlineTimer
Pull Request #7416: RCORE-1987 network::Service does not start waiting on timers if no other events are currently active

93900 of 173116 branches covered (54.24%)

238313 of 262182 relevant lines covered (90.9%)

5950091.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.07
/src/realm/sync/client.cpp
1

2
#include <memory>
3
#include <tuple>
4
#include <atomic>
5

6
#include "realm/sync/client_base.hpp"
7
#include "realm/sync/protocol.hpp"
8
#include "realm/util/optional.hpp"
9
#include <realm/sync/client.hpp>
10
#include <realm/sync/config.hpp>
11
#include <realm/sync/noinst/client_reset.hpp>
12
#include <realm/sync/noinst/client_history_impl.hpp>
13
#include <realm/sync/noinst/client_impl_base.hpp>
14
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/util/bind_ptr.hpp>
17
#include <realm/util/circular_buffer.hpp>
18
#include <realm/util/platform_info.hpp>
19
#include <realm/util/thread.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/util/value_reset_guard.hpp>
22
#include <realm/version.hpp>
23

24
namespace realm {
25
namespace sync {
26

27
namespace {
28
using namespace realm::util;
29

30

31
// clang-format off
32
using SessionImpl                     = ClientImpl::Session;
33
using SyncTransactCallback            = Session::SyncTransactCallback;
34
using ProgressHandler                 = Session::ProgressHandler;
35
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
36
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
37
using port_type                       = Session::port_type;
38
using connection_ident_type           = std::int_fast64_t;
39
using ProxyConfig                     = SyncConfig::ProxyConfig;
40
// clang-format on
41

42
} // unnamed namespace
43

44

45
// Life cycle states of a session wrapper:
46
//
47
//  - Uninitiated
48
//  - Unactualized
49
//  - Actualized
50
//  - Finalized
51
//
52
// The session wrapper moves from the Uninitiated to the Unactualized state when
53
// it is initiated, i.e., when initiate() is called. This may happen on any
54
// thread.
55
//
56
// The session wrapper moves from the Unactualized to the Actualized state when
57
// it is associated with a session object, i.e., when `m_sess` is made to refer
58
// to an object of type SessionImpl. This always happens on the event loop
59
// thread.
60
//
61
// The session wrapper moves from the Actualized to the Finalized state when it
62
// is dissociated from the session object. This happens in response to the
63
// session wrapper having been abandoned by the application. This always happens
64
// on the event loop thread.
65
//
66
// The session wrapper will exist in the Finalized state only while referenced
67
// from a post handler waiting to be executed.
68
//
69
// If the session wrapper is abandoned by the application while in the
70
// Uninitiated state, it will be destroyed immediately, since no post handlers
71
// can have been scheduled prior to initiation.
72
//
73
// If the session wrapper is abandoned while in the Unactivated state, it will
74
// move immediately to the Finalized state. This may happen on any thread.
75
//
76
// The moving of a session wrapper to, or from the Actualized state always
77
// happen on the event loop thread. All other state transitions may happen on
78
// any thread.
79
//
80
// NOTE: Activation of the session happens no later than during actualization,
81
// and initiation of deactivation happens no earlier than during
82
// finalization. See also activate_session() and initiate_session_deactivation()
83
// in ClientImpl::Connection.
84
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
85
public:
86
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
87
                   Session::Config);
88
    ~SessionWrapper() noexcept;
89

90
    ClientReplication& get_replication() noexcept;
91
    ClientImpl& get_client() noexcept;
92

93
    bool has_flx_subscription_store() const;
94
    SubscriptionStore* get_flx_subscription_store();
95
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
96

97
    MigrationStore* get_migration_store();
98

99
    void set_progress_handler(util::UniqueFunction<ProgressHandler>);
100
    void set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener>);
101

102
    void initiate();
103

104
    void force_close();
105

106
    void on_commit(version_type new_version) override;
107
    void cancel_reconnect_delay();
108

109
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
110
    bool wait_for_upload_complete_or_client_stopped();
111
    bool wait_for_download_complete_or_client_stopped();
112

113
    void refresh(std::string signed_access_token);
114

115
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
116

117
    // These are called from ClientImpl
118
    void actualize(ServerEndpoint);
119
    void finalize();
120
    void finalize_before_actualization() noexcept;
121

122
    util::Future<std::string> send_test_command(std::string body);
123

124
    void handle_pending_client_reset_acknowledgement();
125

126
    void update_subscription_version_info();
127

128
    std::string get_appservices_connection_id();
129

130
protected:
131
    friend class ClientImpl;
132

133
    // m_initiated/m_abandoned is used to check that we aren't trying to update immutable properties like the progress
134
    // handler or connection state listener after we've bound the session. We read the variable a bunch in
135
    // REALM_ASSERTS on the event loop and on the user's thread, but we only set it once and while we're registering
136
    // the session wrapper to be actualized. This function gets called from
137
    // ClientImpl::register_unactualized_session_wrapper() to synchronize updating this variable on the main thread
138
    // with reading the variable on the event loop.
139
    void mark_initiated();
140
    void mark_abandoned();
141

142
private:
143
    ClientImpl& m_client;
144
    DBRef m_db;
145
    Replication* m_replication;
146

147
    const ProtocolEnvelope m_protocol_envelope;
148
    const std::string m_server_address;
149
    const port_type m_server_port;
150
    const std::string m_user_id;
151
    const SyncServerMode m_sync_mode;
152
    const std::string m_authorization_header_name;
153
    const std::map<std::string, std::string> m_custom_http_headers;
154
    const bool m_verify_servers_ssl_certificate;
155
    const bool m_simulate_integration_error;
156
    const Optional<std::string> m_ssl_trust_certificate_path;
157
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
158
    const size_t m_flx_bootstrap_batch_size_bytes;
159

160
    // This one is different from null when, and only when the session wrapper
161
    // is in ClientImpl::m_abandoned_session_wrappers.
162
    SessionWrapper* m_next = nullptr;
163

164
    // After initiation, these may only be accessed by the event loop thread.
165
    std::string m_http_request_path_prefix;
166
    std::string m_virt_path;
167
    std::string m_signed_access_token;
168

169
    util::Optional<ClientReset> m_client_reset_config;
170

171
    util::Optional<ProxyConfig> m_proxy_config;
172

173
    uint_fast64_t m_last_reported_uploadable_bytes = 0;
174
    util::UniqueFunction<ProgressHandler> m_progress_handler;
175
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
176

177
    std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
178
    bool m_in_debug_hook = false;
179

180
    SessionReason m_session_reason;
181

182
    const uint64_t m_schema_version;
183

184
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
185
    int64_t m_flx_active_version = 0;
186
    int64_t m_flx_last_seen_version = 0;
187
    int64_t m_flx_pending_mark_version = 0;
188
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
189

190
    std::shared_ptr<MigrationStore> m_migration_store;
191

192
    bool m_initiated = false;
193

194
    // Set to true when this session wrapper is actualized (or when it is
195
    // finalized before proper actualization). It is then never modified again.
196
    //
197
    // A session specific post handler submitted after the initiation of the
198
    // session wrapper (initiate()) will always find that `m_actualized` is
199
    // true. This is the case, because the scheduling of such a post handler
200
    // will have been preceded by the triggering of
201
    // `ClientImpl::m_actualize_and_finalize` (in
202
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
203
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
204
    // before the post handler. If the session wrapper is no longer in
205
    // `ClientImpl::m_unactualized_session_wrappers` when
206
    // ClientImpl::actualize_and_finalize_session_wrappers() executes, it must
207
    // have been abandoned already, but in that case,
208
    // finalize_before_actualization() has already been called.
209
    bool m_actualized = false;
210

211
    bool m_force_closed = false;
212

213
    bool m_suspended = false;
214

215
    // Set when the session has been abandoned, but before it's been finalized.
216
    bool m_abandoned = false;
217
    // Has the SessionWrapper been finalized?
218
    bool m_finalized = false;
219

220
    // Set to true when the first DOWNLOAD message is received to indicate that
221
    // the byte-level download progress parameters can be considered reasonable
222
    // reliable. Before that, a lot of time may have passed, so our record of
223
    // the download progress is likely completely out of date.
224
    bool m_reliable_download_progress = false;
225

226
    // Set to point to an activated session object during actualization of the
227
    // session wrapper. Set to null during finalization of the session
228
    // wrapper. Both modifications are guaranteed to be performed by the event
229
    // loop thread.
230
    //
231
    // If a session specific post handler, that is submitted after the
232
    // initiation of the session wrapper, sees that `m_sess` is null, it can
233
    // conclude that the session wrapper has been both abandoned and
234
    // finalized. This is true, because the scheduling of such a post handler
235
    // will have been preceded by the triggering of
236
    // `ClientImpl::m_actualize_and_finalize` (in
237
    // ClientImpl::register_unactualized_session_wrapper()), which ensures that
238
    // ClientImpl::actualize_and_finalize_session_wrappers() gets to execute
239
    // before the post handler, so the session wrapper must have been actualized
240
    // unless it was already abandoned by the application. If it was abandoned
241
    // before it was actualized, it will already have been finalized by
242
    // finalize_before_actualization().
243
    //
244
    // Must only be accessed from the event loop thread.
245
    SessionImpl* m_sess = nullptr;
246

247
    // These must only be accessed from the event loop thread.
248
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
249
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
250
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
251

252
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
253
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
254
    // event loop thread.
255
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
256
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
257
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
258

259
    void on_sync_progress();
260
    void on_upload_completion();
261
    void on_download_completion();
262
    void on_suspended(const SessionErrorInfo& error_info);
263
    void on_resumed();
264
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
265
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
266
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
267
    void on_flx_sync_version_complete(int64_t version);
268

269
    void report_progress(bool only_if_new_uploadable_data = false);
270

271
    friend class SessionWrapperStack;
272
    friend class ClientImpl::Session;
273
};
274

275

276
// ################ SessionWrapperStack ################
277

278
inline bool SessionWrapperStack::empty() const noexcept
279
{
×
280
    return !m_back;
×
281
}
×
282

283

284
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
285
{
10,058✔
286
    REALM_ASSERT(!w->m_next);
10,058✔
287
    w->m_next = m_back;
10,058✔
288
    m_back = w.release();
10,058✔
289
}
10,058✔
290

291

292
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
293
{
26,568✔
294
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
26,568✔
295
    if (m_back) {
26,568✔
296
        m_back = m_back->m_next;
10,058✔
297
        w->m_next = nullptr;
10,058✔
298
    }
10,058✔
299
    return w;
26,568✔
300
}
26,568✔
301

302

303
inline void SessionWrapperStack::clear() noexcept
304
{
26,106✔
305
    while (m_back) {
26,106✔
306
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
307
        m_back = w->m_next;
×
308
    }
×
309
}
26,106✔
310

311

312
inline SessionWrapperStack::SessionWrapperStack(SessionWrapperStack&& q) noexcept
313
    : m_back{q.m_back}
314
{
315
    q.m_back = nullptr;
316
}
317

318

319
SessionWrapperStack::~SessionWrapperStack()
320
{
26,106✔
321
    clear();
26,106✔
322
}
26,106✔
323

324

325
// ################ ClientImpl ################
326

327
ClientImpl::~ClientImpl()
328
{
9,598✔
329
    // Since no other thread is allowed to be accessing this client or any of
4,730✔
330
    // its subobjects at this time, no mutex locking is necessary.
4,730✔
331

4,730✔
332
    shutdown_and_wait();
9,598✔
333
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,730✔
334
    // are abandoned.
4,730✔
335
    REALM_ASSERT(m_stopped);
9,598✔
336
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
9,598✔
337
}
9,598✔
338

339

340
void ClientImpl::cancel_reconnect_delay()
341
{
1,572✔
342
    // Thread safety required
894✔
343
    post([this](Status status) {
1,572✔
344
        if (status == ErrorCodes::OperationAborted)
1,572✔
345
            return;
×
346
        else if (!status.is_ok())
1,572✔
347
            throw Exception(status);
×
348

894✔
349
        for (auto& p : m_server_slots) {
1,572✔
350
            ServerSlot& slot = p.second;
1,572✔
351
            if (m_one_connection_per_session) {
1,572✔
352
                REALM_ASSERT(!slot.connection);
×
353
                for (const auto& p : slot.alt_connections) {
×
354
                    ClientImpl::Connection& conn = *p.second;
×
355
                    conn.resume_active_sessions(); // Throws
×
356
                    conn.cancel_reconnect_delay(); // Throws
×
357
                }
×
358
            }
×
359
            else {
1,572✔
360
                REALM_ASSERT(slot.alt_connections.empty());
1,572✔
361
                if (slot.connection) {
1,572✔
362
                    ClientImpl::Connection& conn = *slot.connection;
1,568✔
363
                    conn.resume_active_sessions(); // Throws
1,568✔
364
                    conn.cancel_reconnect_delay(); // Throws
1,568✔
365
                }
1,568✔
366
                else {
4✔
367
                    slot.reconnect_info.reset();
4✔
368
                }
4✔
369
            }
1,572✔
370
        }
1,572✔
371
    }); // Throws
1,572✔
372
}
1,572✔
373

374

375
void ClientImpl::voluntary_disconnect_all_connections()
376
{
12✔
377
    auto done_pf = util::make_promise_future<void>();
12✔
378
    post([this, promise = std::move(done_pf.promise)](Status status) mutable {
12✔
379
        if (status == ErrorCodes::OperationAborted) {
12✔
380
            return;
×
381
        }
×
382

6✔
383
        REALM_ASSERT(status.is_ok());
12✔
384

6✔
385
        try {
12✔
386
            for (auto& p : m_server_slots) {
12✔
387
                ServerSlot& slot = p.second;
12✔
388
                if (m_one_connection_per_session) {
12✔
389
                    REALM_ASSERT(!slot.connection);
×
390
                    for (const auto& p : slot.alt_connections) {
×
391
                        ClientImpl::Connection& conn = *p.second;
×
392
                        if (conn.get_state() == ConnectionState::disconnected) {
×
393
                            continue;
×
394
                        }
×
395
                        conn.voluntary_disconnect();
×
396
                    }
×
397
                }
×
398
                else {
12✔
399
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
400
                    if (!slot.connection) {
12✔
401
                        continue;
×
402
                    }
×
403
                    ClientImpl::Connection& conn = *slot.connection;
12✔
404
                    if (conn.get_state() == ConnectionState::disconnected) {
12✔
405
                        continue;
×
406
                    }
×
407
                    conn.voluntary_disconnect();
12✔
408
                }
12✔
409
            }
12✔
410
        }
12✔
411
        catch (...) {
6✔
412
            promise.set_error(exception_to_status());
×
413
            return;
×
414
        }
×
415
        promise.emplace_value();
12✔
416
    });
12✔
417
    done_pf.future.get();
12✔
418
}
12✔
419

420

421
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
422
{
9,146✔
423
    // Thread safety required
4,354✔
424

4,354✔
425
    {
9,146✔
426
        std::lock_guard lock{m_mutex};
9,146✔
427
        m_sessions_terminated = false;
9,146✔
428
    }
9,146✔
429

4,354✔
430
    // The technique employed here relies on the fact that
4,354✔
431
    // actualize_and_finalize_session_wrappers() must get to execute at least
4,354✔
432
    // once before the post handler submitted below gets to execute, but still
4,354✔
433
    // at a time where all session wrappers, that are abandoned prior to the
4,354✔
434
    // execution of wait_for_session_terminations_or_client_stopped(), have been
4,354✔
435
    // added to `m_abandoned_session_wrappers`.
4,354✔
436
    //
4,354✔
437
    // To see that this is the case, consider a session wrapper that was
4,354✔
438
    // abandoned before wait_for_session_terminations_or_client_stopped() was
4,354✔
439
    // invoked. Then the session wrapper will have been added to
4,354✔
440
    // `m_abandoned_session_wrappers`, and an invocation of
4,354✔
441
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
4,354✔
442
    // guarantees mentioned in the documentation of Trigger then ensure
4,354✔
443
    // that at least one execution of actualize_and_finalize_session_wrappers()
4,354✔
444
    // will happen after the session wrapper has been added to
4,354✔
445
    // `m_abandoned_session_wrappers`, but before the post handler submitted
4,354✔
446
    // below gets to execute.
4,354✔
447
    post([this](Status status) mutable {
9,146✔
448
        if (status == ErrorCodes::OperationAborted)
9,146✔
449
            return;
×
450
        else if (!status.is_ok())
9,146✔
451
            throw Exception(status);
×
452

4,354✔
453
        std::lock_guard lock{m_mutex};
9,146✔
454
        m_sessions_terminated = true;
9,146✔
455
        m_wait_or_client_stopped_cond.notify_all();
9,146✔
456
    }); // Throws
9,146✔
457

4,354✔
458
    bool completion_condition_was_satisfied;
9,146✔
459
    {
9,146✔
460
        std::unique_lock lock{m_mutex};
9,146✔
461
        while (!m_sessions_terminated && !m_stopped)
18,284✔
462
            m_wait_or_client_stopped_cond.wait(lock);
9,138✔
463
        completion_condition_was_satisfied = !m_stopped;
9,146✔
464
    }
9,146✔
465
    return completion_condition_was_satisfied;
9,146✔
466
}
9,146✔
467

468

469
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
470
util::Future<void> ClientImpl::notify_session_terminated()
471
{
44✔
472
    auto pf = util::make_promise_future<void>();
44✔
473
    post([promise = std::move(pf.promise)](Status status) mutable {
44✔
474
        // Includes operation_aborted
22✔
475
        if (!status.is_ok()) {
44✔
476
            promise.set_error(status);
×
477
            return;
×
478
        }
×
479

22✔
480
        promise.emplace_value();
44✔
481
    });
44✔
482

22✔
483
    return std::move(pf.future);
44✔
484
}
44✔
485

486
void ClientImpl::drain_connections_on_loop()
487
{
9,598✔
488
    post([this](Status status) mutable {
9,598✔
489
        REALM_ASSERT(status.is_ok());
9,598✔
490
        drain_connections();
9,598✔
491
    });
9,598✔
492
}
9,598✔
493

494
void ClientImpl::shutdown_and_wait()
495
{
10,354✔
496
    shutdown();
10,354✔
497
    std::unique_lock lock{m_drain_mutex};
10,354✔
498
    if (m_drained) {
10,354✔
499
        return;
752✔
500
    }
752✔
501

4,732✔
502
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
9,602✔
503
    m_drain_cv.wait(lock, [&] {
20,504✔
504
        return m_num_connections == 0 && m_outstanding_posts == 0;
20,504✔
505
    });
20,504✔
506

4,732✔
507
    m_drained = true;
9,602✔
508
}
9,602✔
509

510
void ClientImpl::shutdown() noexcept
511
{
20,030✔
512
    {
20,030✔
513
        std::lock_guard lock{m_mutex};
20,030✔
514
        if (m_stopped)
20,030✔
515
            return;
10,432✔
516
        m_stopped = true;
9,598✔
517
        m_wait_or_client_stopped_cond.notify_all();
9,598✔
518
    }
9,598✔
519

4,730✔
520
    drain_connections_on_loop();
9,598✔
521
}
9,598✔
522

523

524
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper, ServerEndpoint endpoint)
525
{
10,224✔
526
    // Thread safety required.
4,940✔
527
    {
10,224✔
528
        std::lock_guard lock{m_mutex};
10,224✔
529
        REALM_ASSERT(m_actualize_and_finalize);
10,224✔
530
        wrapper->mark_initiated();
10,224✔
531
        m_unactualized_session_wrappers.emplace(wrapper, std::move(endpoint)); // Throws
10,224✔
532
    }
10,224✔
533
    m_actualize_and_finalize->trigger();
10,224✔
534
}
10,224✔
535

536

537
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
538
{
10,224✔
539
    // Thread safety required.
4,940✔
540
    {
10,224✔
541
        std::lock_guard lock{m_mutex};
10,224✔
542
        REALM_ASSERT(m_actualize_and_finalize);
10,224✔
543
        wrapper->mark_abandoned();
10,224✔
544

4,940✔
545
        // If the session wrapper has not yet been actualized (on the event loop
4,940✔
546
        // thread), it can be immediately finalized. This ensures that we will
4,940✔
547
        // generally not actualize a session wrapper that has already been
4,940✔
548
        // abandoned.
4,940✔
549
        auto i = m_unactualized_session_wrappers.find(wrapper.get());
10,224✔
550
        if (i != m_unactualized_session_wrappers.end()) {
10,224✔
551
            m_unactualized_session_wrappers.erase(i);
166✔
552
            wrapper->finalize_before_actualization();
166✔
553
            return;
166✔
554
        }
166✔
555
        m_abandoned_session_wrappers.push(std::move(wrapper));
10,058✔
556
    }
10,058✔
557
    m_actualize_and_finalize->trigger();
10,058✔
558
}
10,058✔
559

560

561
// Must be called from the event loop thread.
562
void ClientImpl::actualize_and_finalize_session_wrappers()
563
{
16,510✔
564
    std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
16,510✔
565
    SessionWrapperStack abandoned_session_wrappers;
16,510✔
566
    bool stopped;
16,510✔
567
    {
16,510✔
568
        std::lock_guard lock{m_mutex};
16,510✔
569
        swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
16,510✔
570
        swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
16,510✔
571
        stopped = m_stopped;
16,510✔
572
    }
16,510✔
573
    // Note, we need to finalize old session wrappers before we actualize new
8,392✔
574
    // ones. This ensures that deactivation of old sessions is initiated before
8,392✔
575
    // new session are activated. This, in turn, ensures that the server does
8,392✔
576
    // not see two overlapping sessions for the same local Realm file.
8,392✔
577
    while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
26,566✔
578
        wrapper->finalize(); // Throws
10,056✔
579
    if (stopped) {
16,510✔
580
        for (auto& p : unactualized_session_wrappers) {
326✔
581
            SessionWrapper& wrapper = *p.first;
4✔
582
            wrapper.finalize_before_actualization();
4✔
583
        }
4✔
584
        return;
642✔
585
    }
642✔
586
    for (auto& p : unactualized_session_wrappers) {
15,868✔
587
        SessionWrapper& wrapper = *p.first;
10,054✔
588
        ServerEndpoint server_endpoint = std::move(p.second);
10,054✔
589
        wrapper.actualize(std::move(server_endpoint)); // Throws
10,054✔
590
    }
10,054✔
591
}
15,868✔
592

593

594
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
595
                                                   const std::string& authorization_header_name,
596
                                                   const std::map<std::string, std::string>& custom_http_headers,
597
                                                   bool verify_servers_ssl_certificate,
598
                                                   Optional<std::string> ssl_trust_certificate_path,
599
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
600
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
601
{
10,050✔
602
    auto&& [server_slot_it, inserted] =
10,050✔
603
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,050✔
604
    ServerSlot& server_slot = server_slot_it->second; // Throws
10,050✔
605

4,848✔
606
    // TODO: enable multiplexing with proxies
4,848✔
607
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
10,050✔
608
        // Use preexisting connection
3,592✔
609
        REALM_ASSERT(server_slot.alt_connections.empty());
7,404✔
610
        return *server_slot.connection;
7,404✔
611
    }
7,404✔
612

1,256✔
613
    // Create a new connection
1,256✔
614
    REALM_ASSERT(!server_slot.connection);
2,646✔
615
    connection_ident_type ident = m_prev_connection_ident + 1;
2,646✔
616
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,646✔
617
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,646✔
618
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,646✔
619
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,646✔
620
    ClientImpl::Connection& conn = *conn_2;
2,646✔
621
    if (!m_one_connection_per_session) {
2,646✔
622
        server_slot.connection = std::move(conn_2);
2,634✔
623
    }
2,634✔
624
    else {
12✔
625
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
12✔
626
    }
12✔
627
    m_prev_connection_ident = ident;
2,646✔
628
    was_created = true;
2,646✔
629
    {
2,646✔
630
        std::lock_guard lk(m_drain_mutex);
2,646✔
631
        ++m_num_connections;
2,646✔
632
    }
2,646✔
633
    return conn;
2,646✔
634
}
2,646✔
635

636

637
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
638
{
2,646✔
639
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,646✔
640
    auto i = m_server_slots.find(endpoint);
2,646✔
641
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,646✔
642
    ServerSlot& server_slot = i->second;
2,646✔
643
    if (!m_one_connection_per_session) {
2,646✔
644
        REALM_ASSERT(server_slot.alt_connections.empty());
2,632✔
645
        REALM_ASSERT(&*server_slot.connection == &conn);
2,632✔
646
        server_slot.reconnect_info = conn.get_reconnect_info();
2,632✔
647
        server_slot.connection.reset();
2,632✔
648
    }
2,632✔
649
    else {
14✔
650
        REALM_ASSERT(!server_slot.connection);
14✔
651
        connection_ident_type ident = conn.get_ident();
14✔
652
        auto j = server_slot.alt_connections.find(ident);
14✔
653
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
14✔
654
        REALM_ASSERT(&*j->second == &conn);
14✔
655
        server_slot.alt_connections.erase(j);
14✔
656
    }
14✔
657

1,256✔
658
    {
2,646✔
659
        std::lock_guard lk(m_drain_mutex);
2,646✔
660
        REALM_ASSERT(m_num_connections);
2,646✔
661
        --m_num_connections;
2,646✔
662
        m_drain_cv.notify_all();
2,646✔
663
    }
2,646✔
664
}
2,646✔
665

666

667
// ################ SessionImpl ################
668

669
void SessionImpl::force_close()
670
{
104✔
671
    // Allow force_close() if session is active or hasn't been activated yet.
52✔
672
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
104!
673
        m_wrapper.force_close();
104✔
674
    }
104✔
675
}
104✔
676

677
void SessionImpl::on_connection_state_changed(ConnectionState state,
678
                                              const util::Optional<SessionErrorInfo>& error_info)
679
{
10,504✔
680
    // Only used to report errors back to the SyncSession while the Session is active
5,470✔
681
    if (m_state == SessionImpl::Active) {
10,506✔
682
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
10,506✔
683
    }
10,506✔
684
}
10,504✔
685

686

687
const std::string& SessionImpl::get_virt_path() const noexcept
688
{
7,282✔
689
    // Can only be called if the session is active or being activated
3,026✔
690
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
7,282!
691
    return m_wrapper.m_virt_path;
7,282✔
692
}
7,282✔
693

694
const std::string& SessionImpl::get_realm_path() const noexcept
695
{
10,334✔
696
    // Can only be called if the session is active or being activated
4,990✔
697
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,334✔
698
    return m_wrapper.m_db->get_path();
10,334✔
699
}
10,334✔
700

701
DBRef SessionImpl::get_db() const noexcept
702
{
23,594✔
703
    // Can only be called if the session is active or being activated
12,962✔
704
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
23,594!
705
    return m_wrapper.m_db;
23,594✔
706
}
23,594✔
707

708
ClientReplication& SessionImpl::get_repl() const noexcept
709
{
114,004✔
710
    // Can only be called if the session is active or being activated
57,974✔
711
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
114,004✔
712
    return m_wrapper.get_replication();
114,004✔
713
}
114,004✔
714

715
ClientHistory& SessionImpl::get_history() const noexcept
716
{
112,222✔
717
    return get_repl().get_history();
112,222✔
718
}
112,222✔
719

720
util::Optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
721
{
13,498✔
722
    // Can only be called if the session is active or being activated
6,490✔
723
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
13,498✔
724
    return m_wrapper.m_client_reset_config;
13,498✔
725
}
13,498✔
726

727
SessionReason SessionImpl::get_session_reason() noexcept
728
{
1,354✔
729
    // Can only be called if the session is active or being activated
676✔
730
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,354!
731
    return m_wrapper.m_session_reason;
1,354✔
732
}
1,354✔
733

734
uint64_t SessionImpl::get_schema_version() noexcept
735
{
1,354✔
736
    // Can only be called if the session is active or being activated
676✔
737
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,354!
738
    return m_wrapper.m_schema_version;
1,354✔
739
}
1,354✔
740

741
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
742
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
743
{
43,046✔
744
    // Ignore the call if the session is not active
22,898✔
745
    if (m_state != State::Active) {
43,046✔
746
        return;
×
747
    }
×
748

22,898✔
749
    try {
43,046✔
750
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
43,046!
751
        if (simulate_integration_error) {
43,046✔
752
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
753
        }
×
754
        version_type client_version;
43,046✔
755
        if (REALM_LIKELY(!get_client().is_dry_run())) {
43,046✔
756
            VersionInfo version_info;
43,046✔
757
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
43,046✔
758
            client_version = version_info.realm_version;
43,046✔
759
        }
43,046✔
760
        else {
×
761
            // Fake it for "dry run" mode
762
            client_version = m_last_version_available + 1;
×
763
        }
×
764
        on_changesets_integrated(client_version, progress); // Throws
43,046✔
765
    }
43,046✔
766
    catch (const IntegrationException& e) {
22,910✔
767
        on_integration_failure(e);
24✔
768
    }
24✔
769
    m_wrapper.on_sync_progress(); // Throws
43,046✔
770
}
43,046✔
771

772

773
void SessionImpl::on_upload_completion()
774
{
14,826✔
775
    // Ignore the call if the session is not active
7,328✔
776
    if (m_state == State::Active) {
14,826✔
777
        m_wrapper.on_upload_completion(); // Throws
14,826✔
778
    }
14,826✔
779
}
14,826✔
780

781

782
void SessionImpl::on_download_completion()
783
{
16,720✔
784
    // Ignore the call if the session is not active
7,846✔
785
    if (m_state == State::Active) {
16,720✔
786
        m_wrapper.on_download_completion(); // Throws
16,720✔
787
    }
16,720✔
788
}
16,720✔
789

790

791
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
792
{
672✔
793
    // Ignore the call if the session is not active
330✔
794
    if (m_state == State::Active) {
672✔
795
        m_wrapper.on_suspended(error_info); // Throws
672✔
796
    }
672✔
797
}
672✔
798

799

800
void SessionImpl::on_resumed()
801
{
66✔
802
    // Ignore the call if the session is not active
30✔
803
    if (m_state == State::Active) {
66✔
804
        m_wrapper.on_resumed(); // Throws
66✔
805
    }
66✔
806
}
66✔
807

808
void SessionImpl::handle_pending_client_reset_acknowledgement()
809
{
302✔
810
    // Ignore the call if the session is not active
150✔
811
    if (m_state == State::Active) {
302✔
812
        m_wrapper.handle_pending_client_reset_acknowledgement();
302✔
813
    }
302✔
814
}
302✔
815

816
void SessionImpl::update_subscription_version_info()
817
{
284✔
818
    // Ignore the call if the session is not active
142✔
819
    if (m_state == State::Active) {
284✔
820
        m_wrapper.update_subscription_version_info();
284✔
821
    }
284✔
822
}
284✔
823

824
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
825
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
826
{
44,990✔
827
    // Ignore the call if the session is not active
23,870✔
828
    if (m_state != State::Active) {
44,990✔
829
        return false;
×
830
    }
×
831

23,870✔
832
    if (is_steady_state_download_message(batch_state, query_version)) {
44,990✔
833
        return false;
43,048✔
834
    }
43,048✔
835

972✔
836
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,942✔
837
    util::Optional<SyncProgress> maybe_progress;
1,942✔
838
    if (batch_state == DownloadBatchState::LastInBatch) {
1,942✔
839
        maybe_progress = progress;
1,766✔
840
    }
1,766✔
841

972✔
842
    bool new_batch = false;
1,942✔
843
    try {
1,942✔
844
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,942✔
845
    }
1,942✔
846
    catch (const LogicError& ex) {
972✔
847
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
848
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
849
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
850
                                    ProtocolError::bad_changeset_size);
×
851
            on_integration_failure(ex);
×
852
            return true;
×
853
        }
×
854
        throw;
×
855
    }
×
856

972✔
857
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
972✔
858
    // bootstrapping.
972✔
859
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,946✔
860
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
44✔
861
    }
44✔
862

972✔
863
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
1,946✔
864
                                       batch_state, received_changesets.size());
1,946✔
865
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,946✔
866
        return true;
12✔
867
    }
12✔
868
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,934✔
869

966✔
870
    if (batch_state == DownloadBatchState::MoreToCome) {
1,934✔
871
        return true;
176✔
872
    }
176✔
873

878✔
874
    try {
1,758✔
875
        process_pending_flx_bootstrap();
1,758✔
876
    }
1,758✔
877
    catch (const IntegrationException& e) {
888✔
878
        on_integration_failure(e);
20✔
879
    }
20✔
880
    catch (...) {
878✔
881
        on_integration_failure(IntegrationException(exception_to_status()));
×
882
    }
×
883

878✔
884
    return true;
1,758✔
885
}
1,758✔
886

887

888
void SessionImpl::process_pending_flx_bootstrap()
889
{
11,804✔
890
    // Ignore the call if not a flx session or session is not active
5,724✔
891
    if (!m_is_flx_sync_session || m_state != State::Active) {
11,804✔
892
        return;
8,704✔
893
    }
8,704✔
894
    // Should never be called if session is not active
1,550✔
895
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
3,100✔
896
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
3,100✔
897
    if (!bootstrap_store->has_pending()) {
3,100✔
898
        return;
1,322✔
899
    }
1,322✔
900

888✔
901
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,778✔
902
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,778✔
903
                "changeset size: %3)",
1,778✔
904
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,778✔
905
                pending_batch_stats.pending_changeset_bytes);
1,778✔
906
    auto& history = get_repl().get_history();
1,778✔
907
    VersionInfo new_version;
1,778✔
908
    SyncProgress progress;
1,778✔
909
    int64_t query_version = -1;
1,778✔
910
    size_t changesets_processed = 0;
1,778✔
911

888✔
912
    // Used to commit each batch after it was transformed.
888✔
913
    TransactionRef transact = get_db()->start_write();
1,778✔
914
    while (bootstrap_store->has_pending()) {
3,696✔
915
        auto start_time = std::chrono::steady_clock::now();
1,930✔
916
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,930✔
917
        if (!pending_batch.progress) {
1,930✔
918
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
8✔
919
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
4✔
920
            // bootstrap store requires a write transaction itself.
4✔
921
            transact->close();
8✔
922
            bootstrap_store->clear();
8✔
923
            return;
8✔
924
        }
8✔
925

960✔
926
        auto batch_state =
1,922✔
927
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,842✔
928
        uint64_t downloadable_bytes = 0;
1,922✔
929
        query_version = pending_batch.query_version;
1,922✔
930
        bool simulate_integration_error =
1,922✔
931
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,922✔
932
        if (simulate_integration_error) {
1,922✔
933
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
4✔
934
        }
4✔
935

958✔
936
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
1,918✔
937
                        batch_state, pending_batch.changesets.size());
1,918✔
938

958✔
939
        history.integrate_server_changesets(
1,918✔
940
            *pending_batch.progress, &downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
1,918✔
941
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
1,908✔
942
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,898✔
943
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,898✔
944
            });
1,898✔
945
        progress = *pending_batch.progress;
1,918✔
946
        changesets_processed += pending_batch.changesets.size();
1,918✔
947
        auto duration = std::chrono::steady_clock::now() - start_time;
1,918✔
948

958✔
949
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,918✔
950
                                      batch_state, pending_batch.changesets.size());
1,918✔
951
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,918✔
952

958✔
953
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,918✔
954
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,918✔
955
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,918✔
956
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,918✔
957
                    pending_batch.remaining_changesets);
1,918✔
958
    }
1,918✔
959
    on_changesets_integrated(new_version.realm_version, progress);
1,772✔
960

882✔
961
    REALM_ASSERT_3(query_version, !=, -1);
1,766✔
962
    m_wrapper.on_sync_progress();
1,766✔
963
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
1,766✔
964

882✔
965
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,766✔
966
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,766✔
967
    // NoAction/EarlyReturn are both valid no-op actions to take here.
882✔
968
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,766✔
969
}
1,766✔
970

971
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
972
{
20✔
973
    // Ignore the call if the session is not active
10✔
974
    if (m_state == State::Active) {
20✔
975
        m_wrapper.on_flx_sync_error(version, err_msg);
20✔
976
    }
20✔
977
}
20✔
978

979
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
980
{
1,790✔
981
    // Ignore the call if the session is not active
894✔
982
    if (m_state == State::Active) {
1,790✔
983
        m_wrapper.on_flx_sync_progress(version, batch_state);
1,790✔
984
    }
1,790✔
985
}
1,790✔
986

987
SubscriptionStore* SessionImpl::get_flx_subscription_store()
988
{
16,168✔
989
    // Should never be called if session is not active
8,190✔
990
    REALM_ASSERT_EX(m_state == State::Active, m_state);
16,168✔
991
    return m_wrapper.get_flx_subscription_store();
16,168✔
992
}
16,168✔
993

994
MigrationStore* SessionImpl::get_migration_store()
995
{
62,632✔
996
    // Should never be called if session is not active
32,934✔
997
    REALM_ASSERT_EX(m_state == State::Active, m_state);
62,632✔
998
    return m_wrapper.get_migration_store();
62,632✔
999
}
62,632✔
1000

1001
void SessionImpl::on_flx_sync_version_complete(int64_t version)
1002
{
288✔
1003
    // Ignore the call if the session is not active
144✔
1004
    if (m_state == State::Active) {
288✔
1005
        m_wrapper.on_flx_sync_version_complete(version);
288✔
1006
    }
288✔
1007
}
288✔
1008

1009
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
1010
{
1,990✔
1011
    // Should never be called if session is not active
992✔
1012
    REALM_ASSERT_EX(m_state == State::Active, m_state);
1,990✔
1013

992✔
1014
    // Make sure we don't call the debug hook recursively.
992✔
1015
    if (m_wrapper.m_in_debug_hook) {
1,990✔
1016
        return SyncClientHookAction::NoAction;
24✔
1017
    }
24✔
1018
    m_wrapper.m_in_debug_hook = true;
1,966✔
1019
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
1,966✔
1020
        m_wrapper.m_in_debug_hook = false;
1,966✔
1021
    });
1,966✔
1022

980✔
1023
    auto action = m_wrapper.m_debug_hook(data);
1,966✔
1024
    switch (action) {
1,966✔
1025
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
12✔
1026
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
12✔
1027
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
12✔
1028

6✔
1029
            auto err_processing_err = receive_error_message(err_info);
12✔
1030
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
12✔
1031
            return SyncClientHookAction::EarlyReturn;
12✔
1032
        }
×
1033
        case realm::SyncClientHookAction::TriggerReconnect: {
24✔
1034
            get_connection().voluntary_disconnect();
24✔
1035
            return SyncClientHookAction::EarlyReturn;
24✔
1036
        }
×
1037
        default:
1,922✔
1038
            return action;
1,922✔
1039
    }
1,966✔
1040
}
1,966✔
1041

1042
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1043
                                                  int64_t query_version, DownloadBatchState batch_state,
1044
                                                  size_t num_changesets)
1045
{
114,244✔
1046
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
114,244✔
1047
        return SyncClientHookAction::NoAction;
112,416✔
1048
    }
112,416✔
1049
    if (REALM_UNLIKELY(m_state != State::Active)) {
1,828✔
1050
        return SyncClientHookAction::NoAction;
×
1051
    }
×
1052

908✔
1053
    SyncClientHookData data;
1,828✔
1054
    data.event = event;
1,828✔
1055
    data.batch_state = batch_state;
1,828✔
1056
    data.progress = progress;
1,828✔
1057
    data.num_changesets = num_changesets;
1,828✔
1058
    data.query_version = query_version;
1,828✔
1059

908✔
1060
    return call_debug_hook(data);
1,828✔
1061
}
1,828✔
1062

1063
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
1064
{
1,370✔
1065
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
1,370✔
1066
        return SyncClientHookAction::NoAction;
1,198✔
1067
    }
1,198✔
1068
    if (REALM_UNLIKELY(m_state != State::Active)) {
172✔
1069
        return SyncClientHookAction::NoAction;
×
1070
    }
×
1071

84✔
1072
    SyncClientHookData data;
172✔
1073
    data.event = event;
172✔
1074
    data.batch_state = DownloadBatchState::SteadyState;
172✔
1075
    data.progress = m_progress;
172✔
1076
    data.num_changesets = 0;
172✔
1077
    data.query_version = 0;
172✔
1078
    data.error_info = &error_info;
172✔
1079

84✔
1080
    return call_debug_hook(data);
172✔
1081
}
172✔
1082

1083
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1084
{
90,002✔
1085
    // Should never be called if session is not active
47,748✔
1086
    REALM_ASSERT_EX(m_state == State::Active, m_state);
90,002✔
1087
    if (batch_state == DownloadBatchState::SteadyState) {
90,002✔
1088
        return true;
43,046✔
1089
    }
43,046✔
1090

24,850✔
1091
    if (!m_is_flx_sync_session) {
46,956✔
1092
        return true;
42,010✔
1093
    }
42,010✔
1094

2,468✔
1095
    // If this is a steady state DOWNLOAD, no need for special handling.
2,468✔
1096
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,946✔
1097
        return true;
1,040✔
1098
    }
1,040✔
1099

1,950✔
1100
    return false;
3,906✔
1101
}
3,906✔
1102

1103
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1104
{
60✔
1105
    if (m_state != State::Active) {
60✔
1106
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1107
    }
×
1108

30✔
1109
    try {
60✔
1110
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
60✔
1111
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
60✔
1112
            return Status{ErrorCodes::LogicError,
4✔
1113
                          "Must supply command name in \"command\" field of test command json object"};
4✔
1114
        }
4✔
1115
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
56✔
1116
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1117
        }
×
1118
    }
4✔
1119
    catch (const nlohmann::json::parse_error& e) {
4✔
1120
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
4✔
1121
    }
4✔
1122

26✔
1123
    auto pf = util::make_promise_future<std::string>();
52✔
1124

26✔
1125
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
52✔
1126
        // Includes operation_aborted
26✔
1127
        if (!status.is_ok()) {
52✔
1128
            promise.set_error(status);
×
1129
            return;
×
1130
        }
×
1131

26✔
1132
        auto id = ++m_last_pending_test_command_ident;
52✔
1133
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
52✔
1134
        ensure_enlisted_to_send();
52✔
1135
    });
52✔
1136

26✔
1137
    return std::move(pf.future);
52✔
1138
}
52✔
1139

1140
// ################ SessionWrapper ################
1141

1142
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1143
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1144
// the ClientImpl::Connection that owns the ClientImpl::Session.
1145
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1146
                               std::shared_ptr<MigrationStore> migration_store, Session::Config config)
1147
    : m_client{client}
1148
    , m_db(std::move(db))
1149
    , m_replication(m_db->get_replication())
1150
    , m_protocol_envelope{config.protocol_envelope}
1151
    , m_server_address{std::move(config.server_address)}
1152
    , m_server_port{config.server_port}
1153
    , m_user_id(std::move(config.user_id))
1154
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
1155
    , m_authorization_header_name{config.authorization_header_name}
1156
    , m_custom_http_headers{config.custom_http_headers}
1157
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1158
    , m_simulate_integration_error{config.simulate_integration_error}
1159
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1160
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
1161
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
1162
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1163
    , m_virt_path{std::move(config.realm_identifier)}
1164
    , m_signed_access_token{std::move(config.signed_user_token)}
1165
    , m_client_reset_config{std::move(config.client_reset_config)}
1166
    , m_proxy_config{config.proxy_config} // Throws
1167
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1168
    , m_session_reason(config.session_reason)
1169
    , m_schema_version(config.schema_version)
1170
    , m_flx_subscription_store(std::move(flx_sub_store))
1171
    , m_migration_store(std::move(migration_store))
1172
{
11,376✔
1173
    REALM_ASSERT(m_db);
11,376✔
1174
    REALM_ASSERT(m_db->get_replication());
11,376✔
1175
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
11,376✔
1176
    if (m_client_reset_config) {
11,376✔
1177
        m_session_reason = SessionReason::ClientReset;
368✔
1178
    }
368✔
1179
}
11,376✔
1180

1181
SessionWrapper::~SessionWrapper() noexcept
1182
{
11,376✔
1183
    if (m_db && m_actualized) {
11,376✔
1184
        m_db->remove_commit_listener(this);
166✔
1185
        m_db->release_sync_agent();
166✔
1186
    }
166✔
1187
}
11,376✔
1188

1189

1190
inline ClientReplication& SessionWrapper::get_replication() noexcept
1191
{
114,006✔
1192
    REALM_ASSERT(m_db);
114,006✔
1193
    return static_cast<ClientReplication&>(*m_replication);
114,006✔
1194
}
114,006✔
1195

1196

1197
inline ClientImpl& SessionWrapper::get_client() noexcept
1198
{
72✔
1199
    return m_client;
72✔
1200
}
72✔
1201

1202
bool SessionWrapper::has_flx_subscription_store() const
1203
{
1,790✔
1204
    return static_cast<bool>(m_flx_subscription_store);
1,790✔
1205
}
1,790✔
1206

1207
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1208
{
20✔
1209
    REALM_ASSERT(!m_finalized);
20✔
1210
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
20✔
1211
}
20✔
1212

1213
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1214
{
2,034✔
1215
    REALM_ASSERT(!m_finalized);
2,034✔
1216
    m_flx_last_seen_version = version;
2,034✔
1217
    m_flx_active_version = version;
2,034✔
1218
}
2,034✔
1219

1220
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
1221
{
1,790✔
1222
    if (!has_flx_subscription_store()) {
1,790✔
1223
        return;
×
1224
    }
×
1225
    REALM_ASSERT(!m_finalized);
1,790✔
1226
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,790✔
1227
    REALM_ASSERT(new_version >= m_flx_active_version);
1,790✔
1228
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,790✔
1229

894✔
1230
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
1,790✔
1231

894✔
1232
    switch (batch_state) {
1,790✔
1233
        case DownloadBatchState::SteadyState:
✔
1234
            // Cannot be called with this value.
1235
            REALM_UNREACHABLE();
1236
        case DownloadBatchState::LastInBatch:
1,746✔
1237
            if (m_flx_active_version == new_version) {
1,746✔
1238
                return;
×
1239
            }
×
1240
            on_flx_sync_version_complete(new_version);
1,746✔
1241
            if (new_version == 0) {
1,746✔
1242
                new_state = SubscriptionSet::State::Complete;
820✔
1243
            }
820✔
1244
            else {
926✔
1245
                new_state = SubscriptionSet::State::AwaitingMark;
926✔
1246
                m_flx_pending_mark_version = new_version;
926✔
1247
            }
926✔
1248
            break;
1,746✔
1249
        case DownloadBatchState::MoreToCome:
894✔
1250
            if (m_flx_last_seen_version == new_version) {
44✔
1251
                return;
×
1252
            }
×
1253

22✔
1254
            m_flx_last_seen_version = new_version;
44✔
1255
            new_state = SubscriptionSet::State::Bootstrapping;
44✔
1256
            break;
44✔
1257
    }
1,790✔
1258

894✔
1259
    get_flx_subscription_store()->update_state(new_version, new_state);
1,790✔
1260
}
1,790✔
1261

1262
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1263
{
17,978✔
1264
    REALM_ASSERT(!m_finalized);
17,978✔
1265
    return m_flx_subscription_store.get();
17,978✔
1266
}
17,978✔
1267

1268
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1269
{
5,046✔
1270
    REALM_ASSERT(!m_finalized);
5,046✔
1271
    return m_flx_pending_bootstrap_store.get();
5,046✔
1272
}
5,046✔
1273

1274
MigrationStore* SessionWrapper::get_migration_store()
1275
{
62,632✔
1276
    REALM_ASSERT(!m_finalized);
62,632✔
1277
    return m_migration_store.get();
62,632✔
1278
}
62,632✔
1279

1280
inline void SessionWrapper::mark_initiated()
1281
{
10,224✔
1282
    REALM_ASSERT(!m_initiated);
10,224✔
1283
    REALM_ASSERT(!m_abandoned);
10,224✔
1284
    m_initiated = true;
10,224✔
1285
}
10,224✔
1286

1287

1288
inline void SessionWrapper::mark_abandoned()
1289
{
10,224✔
1290
    REALM_ASSERT(!m_abandoned);
10,224✔
1291
    m_abandoned = true;
10,224✔
1292
}
10,224✔
1293

1294

1295
inline void SessionWrapper::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
1296
{
3,852✔
1297
    REALM_ASSERT(!m_initiated);
3,852✔
1298
    m_progress_handler = std::move(handler);
3,852✔
1299
}
3,852✔
1300

1301

1302
inline void
1303
SessionWrapper::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
1304
{
11,472✔
1305
    REALM_ASSERT(!m_initiated);
11,472✔
1306
    m_connection_state_change_listener = std::move(listener);
11,472✔
1307
}
11,472✔
1308

1309

1310
void SessionWrapper::initiate()
1311
{
10,224✔
1312
    ServerEndpoint server_endpoint{m_protocol_envelope, m_server_address, m_server_port, m_user_id, m_sync_mode};
10,224✔
1313
    m_client.register_unactualized_session_wrapper(this, std::move(server_endpoint)); // Throws
10,224✔
1314
    m_db->add_commit_listener(this);
10,224✔
1315
}
10,224✔
1316

1317

1318
void SessionWrapper::on_commit(version_type new_version)
1319
{
108,986✔
1320
    // Thread safety required
55,088✔
1321
    REALM_ASSERT(m_initiated);
108,986✔
1322

55,088✔
1323
    util::bind_ptr<SessionWrapper> self{this};
108,986✔
1324
    m_client.post([self = std::move(self), new_version](Status status) {
109,004✔
1325
        if (status == ErrorCodes::OperationAborted)
109,004✔
1326
            return;
×
1327
        else if (!status.is_ok())
109,004✔
1328
            throw Exception(status);
×
1329

55,088✔
1330
        REALM_ASSERT(self->m_actualized);
109,004✔
1331
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
109,004✔
1332
            return;
926✔
1333
        }
926✔
1334

54,702✔
1335
        if (REALM_UNLIKELY(!self->m_sess))
108,078✔
1336
            return; // Already finalized
54,702✔
1337
        SessionImpl& sess = *self->m_sess;
108,078✔
1338
        sess.recognize_sync_version(new_version); // Throws
108,078✔
1339
        bool only_if_new_uploadable_data = true;
108,078✔
1340
        self->report_progress(only_if_new_uploadable_data); // Throws
108,078✔
1341
    });
108,078✔
1342
}
108,986✔
1343

1344

1345
void SessionWrapper::cancel_reconnect_delay()
1346
{
20✔
1347
    // Thread safety required
10✔
1348
    REALM_ASSERT(m_initiated);
20✔
1349

10✔
1350
    util::bind_ptr<SessionWrapper> self{this};
20✔
1351
    m_client.post([self = std::move(self)](Status status) {
20✔
1352
        if (status == ErrorCodes::OperationAborted)
20✔
1353
            return;
×
1354
        else if (!status.is_ok())
20✔
1355
            throw Exception(status);
×
1356

10✔
1357
        REALM_ASSERT(self->m_actualized);
20✔
1358
        if (REALM_UNLIKELY(self->m_finalized || self->m_force_closed)) {
20✔
1359
            return;
×
1360
        }
×
1361

10✔
1362
        if (REALM_UNLIKELY(!self->m_sess))
20✔
1363
            return; // Already finalized
10✔
1364
        SessionImpl& sess = *self->m_sess;
20✔
1365
        sess.cancel_resumption_delay(); // Throws
20✔
1366
        ClientImpl::Connection& conn = sess.get_connection();
20✔
1367
        conn.cancel_reconnect_delay(); // Throws
20✔
1368
    });                                // Throws
20✔
1369
}
20✔
1370

1371
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1372
                                    WaitOperCompletionHandler handler)
1373
{
4,850✔
1374
    REALM_ASSERT(upload_completion || download_completion);
4,850✔
1375
    REALM_ASSERT(m_initiated);
4,850✔
1376

2,338✔
1377
    util::bind_ptr<SessionWrapper> self{this};
4,850✔
1378
    m_client.post([self = std::move(self), handler = std::move(handler), upload_completion,
4,850✔
1379
                   download_completion](Status status) mutable {
4,850✔
1380
        if (status == ErrorCodes::OperationAborted)
4,850✔
1381
            return;
×
1382
        else if (!status.is_ok())
4,850✔
1383
            throw Exception(status);
×
1384

2,338✔
1385
        REALM_ASSERT(self->m_actualized);
4,850✔
1386
        if (REALM_UNLIKELY(!self->m_sess)) {
4,850✔
1387
            // Already finalized
36✔
1388
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
68✔
1389
            return;
68✔
1390
        }
68✔
1391
        if (upload_completion) {
4,782✔
1392
            if (download_completion) {
2,488✔
1393
                // Wait for upload and download completion
146✔
1394
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
294✔
1395
            }
294✔
1396
            else {
2,194✔
1397
                // Wait for upload completion only
1,008✔
1398
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
2,194✔
1399
            }
2,194✔
1400
        }
2,488✔
1401
        else {
2,294✔
1402
            // Wait for download completion only
1,148✔
1403
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
2,294✔
1404
        }
2,294✔
1405
        SessionImpl& sess = *self->m_sess;
4,782✔
1406
        if (upload_completion)
4,782✔
1407
            sess.request_upload_completion_notification(); // Throws
2,488✔
1408
        if (download_completion)
4,782✔
1409
            sess.request_download_completion_notification(); // Throws
2,588✔
1410
    });                                                      // Throws
4,782✔
1411
}
4,850✔
1412

1413

1414
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1415
{
12,992✔
1416
    // Thread safety required
6,496✔
1417
    REALM_ASSERT(m_initiated);
12,992✔
1418
    REALM_ASSERT(!m_abandoned);
12,992✔
1419

6,496✔
1420
    std::int_fast64_t target_mark;
12,992✔
1421
    {
12,992✔
1422
        std::lock_guard lock{m_client.m_mutex};
12,992✔
1423
        target_mark = ++m_target_upload_mark;
12,992✔
1424
    }
12,992✔
1425

6,496✔
1426
    util::bind_ptr<SessionWrapper> self{this};
12,992✔
1427
    m_client.post([self = std::move(self), target_mark](Status status) {
12,992✔
1428
        if (status == ErrorCodes::OperationAborted)
12,992✔
1429
            return;
×
1430
        else if (!status.is_ok())
12,992✔
1431
            throw Exception(status);
×
1432

6,496✔
1433
        REALM_ASSERT(self->m_actualized);
12,992✔
1434
        REALM_ASSERT(!self->m_finalized);
12,992✔
1435
        // The session wrapper may already have been finalized. This can only
6,496✔
1436
        // happen if it was abandoned, but in that case, the call of
6,496✔
1437
        // wait_for_upload_complete_or_client_stopped() must have returned
6,496✔
1438
        // already.
6,496✔
1439
        if (REALM_UNLIKELY(!self->m_sess))
12,992✔
1440
            return;
6,500✔
1441
        if (target_mark > self->m_staged_upload_mark) {
12,982✔
1442
            self->m_staged_upload_mark = target_mark;
12,982✔
1443
            SessionImpl& sess = *self->m_sess;
12,982✔
1444
            sess.request_upload_completion_notification(); // Throws
12,982✔
1445
        }
12,982✔
1446
    }); // Throws
12,982✔
1447

6,496✔
1448
    bool completion_condition_was_satisfied;
12,992✔
1449
    {
12,992✔
1450
        std::unique_lock lock{m_client.m_mutex};
12,992✔
1451
        while (m_reached_upload_mark < target_mark && !m_client.m_stopped)
33,044✔
1452
            m_client.m_wait_or_client_stopped_cond.wait(lock);
20,052✔
1453
        completion_condition_was_satisfied = !m_client.m_stopped;
12,992✔
1454
    }
12,992✔
1455
    return completion_condition_was_satisfied;
12,992✔
1456
}
12,992✔
1457

1458

1459
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1460
{
10,088✔
1461
    // Thread safety required
5,046✔
1462
    REALM_ASSERT(m_initiated);
10,088✔
1463
    REALM_ASSERT(!m_abandoned);
10,088✔
1464

5,046✔
1465
    std::int_fast64_t target_mark;
10,088✔
1466
    {
10,088✔
1467
        std::lock_guard lock{m_client.m_mutex};
10,088✔
1468
        target_mark = ++m_target_download_mark;
10,088✔
1469
    }
10,088✔
1470

5,046✔
1471
    util::bind_ptr<SessionWrapper> self{this};
10,088✔
1472
    m_client.post([self = std::move(self), target_mark](Status status) {
10,088✔
1473
        if (status == ErrorCodes::OperationAborted)
10,088✔
1474
            return;
×
1475
        else if (!status.is_ok())
10,088✔
1476
            throw Exception(status);
×
1477

5,046✔
1478
        REALM_ASSERT(self->m_actualized);
10,088✔
1479
        REALM_ASSERT(!self->m_finalized);
10,088✔
1480
        // The session wrapper may already have been finalized. This can only
5,046✔
1481
        // happen if it was abandoned, but in that case, the call of
5,046✔
1482
        // wait_for_download_complete_or_client_stopped() must have returned
5,046✔
1483
        // already.
5,046✔
1484
        if (REALM_UNLIKELY(!self->m_sess))
10,088✔
1485
            return;
5,076✔
1486
        if (target_mark > self->m_staged_download_mark) {
10,028✔
1487
            self->m_staged_download_mark = target_mark;
10,026✔
1488
            SessionImpl& sess = *self->m_sess;
10,026✔
1489
            sess.request_download_completion_notification(); // Throws
10,026✔
1490
        }
10,026✔
1491
    }); // Throws
10,028✔
1492

5,046✔
1493
    bool completion_condition_was_satisfied;
10,088✔
1494
    {
10,088✔
1495
        std::unique_lock lock{m_client.m_mutex};
10,088✔
1496
        while (m_reached_download_mark < target_mark && !m_client.m_stopped)
20,592✔
1497
            m_client.m_wait_or_client_stopped_cond.wait(lock);
10,504✔
1498
        completion_condition_was_satisfied = !m_client.m_stopped;
10,088✔
1499
    }
10,088✔
1500
    return completion_condition_was_satisfied;
10,088✔
1501
}
10,088✔
1502

1503

1504
void SessionWrapper::refresh(std::string signed_access_token)
1505
{
208✔
1506
    // Thread safety required
104✔
1507
    REALM_ASSERT(m_initiated);
208✔
1508
    REALM_ASSERT(!m_abandoned);
208✔
1509

104✔
1510
    m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) {
208✔
1511
        if (status == ErrorCodes::OperationAborted)
208✔
1512
            return;
×
1513
        else if (!status.is_ok())
208✔
1514
            throw Exception(status);
×
1515

104✔
1516
        REALM_ASSERT(self->m_actualized);
208✔
1517
        if (REALM_UNLIKELY(!self->m_sess))
208✔
1518
            return; // Already finalized
104✔
1519
        self->m_signed_access_token = std::move(token);
208✔
1520
        SessionImpl& sess = *self->m_sess;
208✔
1521
        ClientImpl::Connection& conn = sess.get_connection();
208✔
1522
        // FIXME: This only makes sense when each session uses a separate connection.
104✔
1523
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
208✔
1524
        sess.cancel_resumption_delay();                                                          // Throws
208✔
1525
        conn.cancel_reconnect_delay();                                                           // Throws
208✔
1526
    });
208✔
1527
}
208✔
1528

1529

1530
inline void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1531
{
11,376✔
1532
    if (wrapper->m_initiated) {
11,376✔
1533
        ClientImpl& client = wrapper->m_client;
10,224✔
1534
        client.register_abandoned_session_wrapper(std::move(wrapper));
10,224✔
1535
    }
10,224✔
1536
}
11,376✔
1537

1538

1539
// Must be called from event loop thread
1540
void SessionWrapper::actualize(ServerEndpoint endpoint)
1541
{
10,054✔
1542
    REALM_ASSERT_DEBUG(m_initiated);
10,054✔
1543
    REALM_ASSERT(!m_actualized);
10,054✔
1544
    REALM_ASSERT(!m_sess);
10,054✔
1545
    // Cannot be actualized if it's already been finalized or force closed
4,850✔
1546
    REALM_ASSERT(!m_finalized);
10,054✔
1547
    REALM_ASSERT(!m_force_closed);
10,054✔
1548
    try {
10,054✔
1549
        m_db->claim_sync_agent();
10,054✔
1550
    }
10,054✔
1551
    catch (const MultipleSyncAgents&) {
4,852✔
1552
        finalize_before_actualization();
4✔
1553
        throw;
4✔
1554
    }
4✔
1555
    auto sync_mode = endpoint.server_mode;
10,050✔
1556

4,848✔
1557
    bool was_created = false;
10,050✔
1558
    ClientImpl::Connection& conn = m_client.get_connection(
10,050✔
1559
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
10,050✔
1560
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,050✔
1561
        was_created); // Throws
10,050✔
1562
    try {
10,050✔
1563
        // FIXME: This only makes sense when each session uses a separate connection.
4,848✔
1564
        conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
10,050✔
1565
        std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
10,050✔
1566
        if (sync_mode == SyncServerMode::FLX) {
10,050✔
1567
            m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
1,342✔
1568
        }
1,342✔
1569

4,848✔
1570
        sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
10,050✔
1571
        m_sess = sess.get();
10,050✔
1572
        conn.activate_session(std::move(sess)); // Throws
10,050✔
1573
    }
10,050✔
1574
    catch (...) {
4,848✔
1575
        if (was_created)
×
1576
            m_client.remove_connection(conn);
×
1577

1578
        // finalize_before_actualization() expects m_sess to be nullptr, but it's possible that we
1579
        // reached its assignment above before throwing. Unset it here so we get a clean unhandled
1580
        // exception failure instead of a REALM_ASSERT in finalize_before_actualization().
1581
        m_sess = nullptr;
×
1582
        finalize_before_actualization();
×
1583
        throw;
×
1584
    }
×
1585

4,846✔
1586
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
4,846✔
1587
    // session cannot change the state of the bootstrap store at the same time.
4,846✔
1588
    update_subscription_version_info();
10,046✔
1589

4,846✔
1590
    m_actualized = true;
10,046✔
1591
    if (was_created)
10,046✔
1592
        conn.activate(); // Throws
2,640✔
1593

4,846✔
1594
    if (m_connection_state_change_listener) {
10,046✔
1595
        ConnectionState state = conn.get_state();
10,028✔
1596
        if (state != ConnectionState::disconnected) {
10,028✔
1597
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,240✔
1598
            if (state == ConnectionState::connected)
7,240✔
1599
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
5,498✔
1600
        }
7,240✔
1601
    }
10,028✔
1602

4,846✔
1603
    if (!m_client_reset_config)
10,046✔
1604
        report_progress(); // Throws
9,678✔
1605
}
10,046✔
1606

1607
void SessionWrapper::force_close()
1608
{
104✔
1609
    if (m_force_closed || m_finalized) {
104✔
1610
        return;
×
1611
    }
×
1612
    REALM_ASSERT(m_actualized);
104✔
1613
    REALM_ASSERT(m_sess);
104✔
1614
    m_force_closed = true;
104✔
1615

52✔
1616
    ClientImpl::Connection& conn = m_sess->get_connection();
104✔
1617
    conn.initiate_session_deactivation(m_sess); // Throws
104✔
1618

52✔
1619
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
52✔
1620
    m_flx_pending_bootstrap_store.reset();
104✔
1621
    // Clear the subscription and migration store refs since they are owned by SyncSession
52✔
1622
    m_flx_subscription_store.reset();
104✔
1623
    m_migration_store.reset();
104✔
1624
    m_sess = nullptr;
104✔
1625
    // Everything is being torn down, no need to report connection state anymore
52✔
1626
    m_connection_state_change_listener = {};
104✔
1627
}
104✔
1628

1629
// Must be called from event loop thread
1630
void SessionWrapper::finalize()
1631
{
10,056✔
1632
    REALM_ASSERT(m_actualized);
10,056✔
1633
    REALM_ASSERT(m_abandoned);
10,056✔
1634

4,852✔
1635
    // Already finalized?
4,852✔
1636
    if (m_finalized) {
10,056✔
1637
        return;
×
1638
    }
×
1639

4,852✔
1640
    // Must be before marking as finalized as we expect m_finalized == false in on_change()
4,852✔
1641
    m_db->remove_commit_listener(this);
10,056✔
1642

4,852✔
1643
    m_finalized = true;
10,056✔
1644

4,852✔
1645
    if (!m_force_closed) {
10,056✔
1646
        REALM_ASSERT(m_sess);
9,946✔
1647
        ClientImpl::Connection& conn = m_sess->get_connection();
9,946✔
1648
        conn.initiate_session_deactivation(m_sess); // Throws
9,946✔
1649

4,796✔
1650
        // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
4,796✔
1651
        m_flx_pending_bootstrap_store.reset();
9,946✔
1652
        // Clear the subscription and migration store refs since they are owned by SyncSession
4,796✔
1653
        m_flx_subscription_store.reset();
9,946✔
1654
        m_migration_store.reset();
9,946✔
1655
        m_sess = nullptr;
9,946✔
1656
    }
9,946✔
1657

4,852✔
1658
    // The Realm file can be closed now, as no access to the Realm file is
4,852✔
1659
    // supposed to happen on behalf of a session after initiation of
4,852✔
1660
    // deactivation.
4,852✔
1661
    m_db->release_sync_agent();
10,056✔
1662
    m_db = nullptr;
10,056✔
1663

4,852✔
1664
    // All outstanding wait operations must be canceled
4,852✔
1665
    while (!m_upload_completion_handlers.empty()) {
10,476✔
1666
        auto handler = std::move(m_upload_completion_handlers.back());
420✔
1667
        m_upload_completion_handlers.pop_back();
420✔
1668
        handler(
420✔
1669
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
420✔
1670
    }
420✔
1671
    while (!m_download_completion_handlers.empty()) {
10,258✔
1672
        auto handler = std::move(m_download_completion_handlers.back());
202✔
1673
        m_download_completion_handlers.pop_back();
202✔
1674
        handler(
202✔
1675
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
202✔
1676
    }
202✔
1677
    while (!m_sync_completion_handlers.empty()) {
10,066✔
1678
        auto handler = std::move(m_sync_completion_handlers.back());
10✔
1679
        m_sync_completion_handlers.pop_back();
10✔
1680
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
10✔
1681
    }
10✔
1682
}
10,056✔
1683

1684

1685
// Must be called only when an unactualized session wrapper becomes abandoned.
1686
//
1687
// Called with a lock on `m_client.m_mutex`.
1688
inline void SessionWrapper::finalize_before_actualization() noexcept
1689
{
174✔
1690
    REALM_ASSERT(!m_sess);
174✔
1691
    m_actualized = true;
174✔
1692
    m_force_closed = true;
174✔
1693
}
174✔
1694

1695

1696
inline void SessionWrapper::on_sync_progress()
1697
{
44,788✔
1698
    REALM_ASSERT(!m_finalized);
44,788✔
1699
    m_reliable_download_progress = true;
44,788✔
1700
    report_progress(); // Throws
44,788✔
1701
}
44,788✔
1702

1703

1704
void SessionWrapper::on_upload_completion()
1705
{
14,826✔
1706
    REALM_ASSERT(!m_finalized);
14,826✔
1707
    while (!m_upload_completion_handlers.empty()) {
16,688✔
1708
        auto handler = std::move(m_upload_completion_handlers.back());
1,862✔
1709
        m_upload_completion_handlers.pop_back();
1,862✔
1710
        handler(Status::OK()); // Throws
1,862✔
1711
    }
1,862✔
1712
    while (!m_sync_completion_handlers.empty()) {
15,022✔
1713
        auto handler = std::move(m_sync_completion_handlers.back());
196✔
1714
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
196✔
1715
        m_sync_completion_handlers.pop_back();
196✔
1716
    }
196✔
1717
    std::lock_guard lock{m_client.m_mutex};
14,826✔
1718
    if (m_staged_upload_mark > m_reached_upload_mark) {
14,826✔
1719
        m_reached_upload_mark = m_staged_upload_mark;
12,962✔
1720
        m_client.m_wait_or_client_stopped_cond.notify_all();
12,962✔
1721
    }
12,962✔
1722
}
14,826✔
1723

1724

1725
void SessionWrapper::on_download_completion()
1726
{
16,720✔
1727
    while (!m_download_completion_handlers.empty()) {
19,008✔
1728
        auto handler = std::move(m_download_completion_handlers.back());
2,288✔
1729
        m_download_completion_handlers.pop_back();
2,288✔
1730
        handler(Status::OK()); // Throws
2,288✔
1731
    }
2,288✔
1732
    while (!m_sync_completion_handlers.empty()) {
16,808✔
1733
        auto handler = std::move(m_sync_completion_handlers.back());
88✔
1734
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
88✔
1735
        m_sync_completion_handlers.pop_back();
88✔
1736
    }
88✔
1737

7,846✔
1738
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
16,720✔
1739
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
804✔
1740
                             m_flx_pending_mark_version);
804✔
1741
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
804✔
1742
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
804✔
1743
    }
804✔
1744

7,846✔
1745
    std::lock_guard lock{m_client.m_mutex};
16,720✔
1746
    if (m_staged_download_mark > m_reached_download_mark) {
16,720✔
1747
        m_reached_download_mark = m_staged_download_mark;
9,956✔
1748
        m_client.m_wait_or_client_stopped_cond.notify_all();
9,956✔
1749
    }
9,956✔
1750
}
16,720✔
1751

1752

1753
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1754
{
672✔
1755
    REALM_ASSERT(!m_finalized);
672✔
1756
    m_suspended = true;
672✔
1757
    if (m_connection_state_change_listener) {
672✔
1758
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
672✔
1759
    }
672✔
1760
}
672✔
1761

1762

1763
void SessionWrapper::on_resumed()
1764
{
66✔
1765
    REALM_ASSERT(!m_finalized);
66✔
1766
    m_suspended = false;
66✔
1767
    if (m_connection_state_change_listener) {
66✔
1768
        ClientImpl::Connection& conn = m_sess->get_connection();
66✔
1769
        if (conn.get_state() != ConnectionState::disconnected) {
66✔
1770
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
62✔
1771
            if (conn.get_state() == ConnectionState::connected)
62✔
1772
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
54✔
1773
        }
62✔
1774
    }
66✔
1775
}
66✔
1776

1777

1778
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1779
                                                 const util::Optional<SessionErrorInfo>& error_info)
1780
{
10,508✔
1781
    if (m_connection_state_change_listener) {
10,508✔
1782
        if (!m_suspended)
10,496✔
1783
            m_connection_state_change_listener(state, error_info); // Throws
10,476✔
1784
    }
10,496✔
1785
}
10,508✔
1786

1787

1788
void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
1789
{
162,550✔
1790
    REALM_ASSERT(!m_finalized);
162,550✔
1791
    REALM_ASSERT(m_sess);
162,550✔
1792

83,132✔
1793
    if (!m_progress_handler)
162,550✔
1794
        return;
113,618✔
1795

22,712✔
1796
    std::uint_fast64_t downloaded_bytes = 0;
48,932✔
1797
    std::uint_fast64_t downloadable_bytes = 0;
48,932✔
1798
    std::uint_fast64_t uploaded_bytes = 0;
48,932✔
1799
    std::uint_fast64_t uploadable_bytes = 0;
48,932✔
1800
    std::uint_fast64_t snapshot_version = 0;
48,932✔
1801
    ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
48,932✔
1802
                                             uploadable_bytes, snapshot_version);
48,932✔
1803

22,712✔
1804
    // If this progress notification was triggered by a commit being made we
22,712✔
1805
    // only want to send it if the uploadable bytes has actually increased,
22,712✔
1806
    // and not if it was an empty commit.
22,712✔
1807
    if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
48,932✔
1808
        return;
34,382✔
1809
    m_last_reported_uploadable_bytes = uploadable_bytes;
14,550✔
1810

6,228✔
1811
    // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
6,228✔
1812
    // is only the remaining to download. This is confusing, so make them use
6,228✔
1813
    // the same units.
6,228✔
1814
    std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes;
14,550✔
1815

6,228✔
1816
    m_sess->logger.debug("Progress handler called, downloaded = %1, "
14,550✔
1817
                         "downloadable(total) = %2, uploaded = %3, "
14,550✔
1818
                         "uploadable = %4, reliable_download_progress = %5, "
14,550✔
1819
                         "snapshot version = %6",
14,550✔
1820
                         downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes,
14,550✔
1821
                         m_reliable_download_progress, snapshot_version);
14,550✔
1822

6,228✔
1823
    // FIXME: Why is this boolean status communicated to the application as
6,228✔
1824
    // a 64-bit integer? Also, the name `progress_version` is confusing.
6,228✔
1825
    std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0);
10,810✔
1826
    m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version,
14,550✔
1827
                       snapshot_version);
14,550✔
1828
}
14,550✔
1829

1830
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1831
{
60✔
1832
    if (!m_sess) {
60✔
1833
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1834
    }
×
1835

30✔
1836
    return m_sess->send_test_command(std::move(body));
60✔
1837
}
60✔
1838

1839
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1840
{
302✔
1841
    REALM_ASSERT(!m_finalized);
302✔
1842

150✔
1843
    auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
302✔
1844
    REALM_ASSERT(pending_reset);
302✔
1845
    m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
302✔
1846
                        pending_reset->time);
302✔
1847
    async_wait_for(true, true, [self = util::bind_ptr(this), pending_reset = *pending_reset](Status status) {
302✔
1848
        if (status == ErrorCodes::OperationAborted) {
302✔
1849
            return;
138✔
1850
        }
138✔
1851
        auto& logger = self->m_sess->logger;
164✔
1852
        if (!status.is_ok()) {
164✔
1853
            logger.error("Error while tracking client reset acknowledgement: %1", status);
×
1854
            return;
×
1855
        }
×
1856

82✔
1857
        auto wt = self->m_db->start_write();
164✔
1858
        auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
164✔
1859
        if (!cur_pending_reset) {
164✔
1860
            logger.debug(
×
1861
                "Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
×
1862
                pending_reset.type, pending_reset.time);
×
1863
            return;
×
1864
        }
×
1865
        else if (cur_pending_reset->type != pending_reset.type || cur_pending_reset->time != pending_reset.time) {
164✔
1866
            logger.debug(
×
1867
                "Was going to remove client reset tracker for type \"%1\" from %2, but found type \"%3\" from %4.",
×
1868
                pending_reset.type, pending_reset.time, cur_pending_reset->type, cur_pending_reset->time);
×
1869
        }
×
1870
        else {
164✔
1871
            logger.debug("Client reset of type \"%1\" from %2 has been acknowledged by the server. "
164✔
1872
                         "Removing cycle detection tracker.",
164✔
1873
                         pending_reset.type, pending_reset.time);
164✔
1874
        }
164✔
1875
        _impl::client_reset::remove_pending_client_resets(*wt);
164✔
1876
        wt->commit();
164✔
1877
    });
164✔
1878
}
302✔
1879

1880
void SessionWrapper::update_subscription_version_info()
1881
{
10,330✔
1882
    if (!m_flx_subscription_store)
10,330✔
1883
        return;
8,884✔
1884
    auto versions_info = m_flx_subscription_store->get_version_info();
1,446✔
1885
    m_flx_active_version = versions_info.active;
1,446✔
1886
    m_flx_pending_mark_version = versions_info.pending_mark;
1,446✔
1887
}
1,446✔
1888

1889
std::string SessionWrapper::get_appservices_connection_id()
1890
{
72✔
1891
    auto pf = util::make_promise_future<std::string>();
72✔
1892
    REALM_ASSERT(m_initiated);
72✔
1893

36✔
1894
    util::bind_ptr<SessionWrapper> self(this);
72✔
1895
    get_client().post([self, promise = std::move(pf.promise)](Status status) mutable {
72✔
1896
        if (!status.is_ok()) {
72✔
1897
            promise.set_error(status);
×
1898
            return;
×
1899
        }
×
1900

36✔
1901
        if (!self->m_sess) {
72✔
1902
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1903
            return;
×
1904
        }
×
1905

36✔
1906
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
72✔
1907
    });
72✔
1908

36✔
1909
    return pf.future.get();
72✔
1910
}
72✔
1911

1912
// ################ ClientImpl::Connection ################
1913

1914
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1915
                                   const std::string& authorization_header_name,
1916
                                   const std::map<std::string, std::string>& custom_http_headers,
1917
                                   bool verify_servers_ssl_certificate,
1918
                                   Optional<std::string> ssl_trust_certificate_path,
1919
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1920
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1921
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1922
                                                      client.logger_ptr)} // Throws
1923
    , logger{*logger_ptr}
1924
    , m_client{client}
1925
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1926
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1927
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1928
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1929
    , m_reconnect_info{reconnect_info}
1930
    , m_session_history{}
1931
    , m_ident{ident}
1932
    , m_server_endpoint{std::move(endpoint)}
1933
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1934
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1935
{
2,646✔
1936
    m_on_idle = m_client.create_trigger([this](Status status) {
2,648✔
1937
        if (status == ErrorCodes::OperationAborted)
2,648✔
1938
            return;
×
1939
        else if (!status.is_ok())
2,648✔
1940
            throw Exception(status);
×
1941

1,256✔
1942
        REALM_ASSERT(m_activated);
2,648✔
1943
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
2,648✔
1944
            on_idle(); // Throws
2,646✔
1945
            // Connection object may be destroyed now.
1,256✔
1946
        }
2,646✔
1947
    });
2,648✔
1948
}
2,646✔
1949

1950
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1951
{
12✔
1952
    return m_ident;
12✔
1953
}
12✔
1954

1955

1956
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1957
{
2,646✔
1958
    return m_server_endpoint;
2,646✔
1959
}
2,646✔
1960

1961
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1962
                                                        const std::string& signed_access_token)
1963
{
10,258✔
1964
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
10,258✔
1965
    m_signed_access_token = signed_access_token;           // Throws (copy)
10,258✔
1966
}
10,258✔
1967

1968

1969
void ClientImpl::Connection::resume_active_sessions()
1970
{
1,568✔
1971
    auto handler = [=](ClientImpl::Session& sess) {
3,132✔
1972
        sess.cancel_resumption_delay(); // Throws
3,132✔
1973
    };
3,132✔
1974
    for_each_active_session(std::move(handler)); // Throws
1,568✔
1975
}
1,568✔
1976

1977
void ClientImpl::Connection::on_idle()
1978
{
2,646✔
1979
    logger.debug(util::LogCategory::session, "Destroying connection object");
2,646✔
1980
    ClientImpl& client = get_client();
2,646✔
1981
    client.remove_connection(*this);
2,646✔
1982
    // NOTE: This connection object is now destroyed!
1,256✔
1983
}
2,646✔
1984

1985

1986
std::string ClientImpl::Connection::get_http_request_path() const
1987
{
3,396✔
1988
    using namespace std::string_view_literals;
3,396✔
1989
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
2,147,485,363✔
1990

1,680✔
1991
    std::string path;
3,396✔
1992
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
3,396✔
1993
    path += m_http_request_path_prefix;
3,396✔
1994
    path += param;
3,396✔
1995
    path += m_signed_access_token;
3,396✔
1996

1,680✔
1997
    return path;
3,396✔
1998
}
3,396✔
1999

2000

2001
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
2002
{
2,646✔
2003
    std::ostringstream out;
2,646✔
2004
    out.imbue(std::locale::classic());
2,646✔
2005
    out << "Connection[" << ident << "]: "; // Throws
2,646✔
2006
    return out.str();                       // Throws
2,646✔
2007
}
2,646✔
2008

2009

2010
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
2011
                                                            util::Optional<SessionErrorInfo> error_info)
2012
{
10,090✔
2013
    if (m_force_closed) {
10,090✔
2014
        return;
2,328✔
2015
    }
2,328✔
2016
    auto handler = [=](ClientImpl::Session& sess) {
10,358✔
2017
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
10,358✔
2018
        sess_2.on_connection_state_changed(state, error_info); // Throws
10,358✔
2019
    };
10,358✔
2020
    for_each_active_session(std::move(handler)); // Throws
7,762✔
2021
}
7,762✔
2022

2023

2024
Client::Client(Config config)
2025
    : m_impl{new ClientImpl{std::move(config)}} // Throws
2026
{
9,598✔
2027
}
9,598✔
2028

2029

2030
Client::Client(Client&& client) noexcept
2031
    : m_impl{std::move(client.m_impl)}
2032
{
×
2033
}
×
2034

2035

2036
Client::~Client() noexcept {}
9,598✔
2037

2038

2039
void Client::shutdown() noexcept
2040
{
9,676✔
2041
    m_impl->shutdown();
9,676✔
2042
}
9,676✔
2043

2044
void Client::shutdown_and_wait()
2045
{
756✔
2046
    m_impl->shutdown_and_wait();
756✔
2047
}
756✔
2048

2049
void Client::cancel_reconnect_delay()
2050
{
1,572✔
2051
    m_impl->cancel_reconnect_delay();
1,572✔
2052
}
1,572✔
2053

2054
void Client::voluntary_disconnect_all_connections()
2055
{
12✔
2056
    m_impl->voluntary_disconnect_all_connections();
12✔
2057
}
12✔
2058

2059
bool Client::wait_for_session_terminations_or_client_stopped()
2060
{
9,146✔
2061
    return m_impl->wait_for_session_terminations_or_client_stopped();
9,146✔
2062
}
9,146✔
2063

2064
util::Future<void> Client::notify_session_terminated()
2065
{
44✔
2066
    return m_impl->notify_session_terminated();
44✔
2067
}
44✔
2068

2069
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
2070
                                  port_type& port, std::string& path) const
2071
{
3,776✔
2072
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
3,776✔
2073
}
3,776✔
2074

2075

2076
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2077
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2078
{
11,376✔
2079
    util::bind_ptr<SessionWrapper> sess;
11,376✔
2080
    sess.reset(new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
11,376✔
2081
                                  std::move(config)}); // Throws
11,376✔
2082
    // The reference count passed back to the application is implicitly
5,516✔
2083
    // owned by a naked pointer. This is done to avoid exposing
5,516✔
2084
    // implementation details through the header file (that is, through the
5,516✔
2085
    // Session object).
5,516✔
2086
    m_impl = sess.release();
11,376✔
2087
}
11,376✔
2088

2089

2090
void Session::set_progress_handler(util::UniqueFunction<ProgressHandler> handler)
2091
{
3,852✔
2092
    m_impl->set_progress_handler(std::move(handler)); // Throws
3,852✔
2093
}
3,852✔
2094

2095

2096
void Session::set_connection_state_change_listener(util::UniqueFunction<ConnectionStateChangeListener> listener)
2097
{
11,472✔
2098
    m_impl->set_connection_state_change_listener(std::move(listener)); // Throws
11,472✔
2099
}
11,472✔
2100

2101

2102
void Session::bind()
2103
{
10,224✔
2104
    m_impl->initiate(); // Throws
10,224✔
2105
}
10,224✔
2106

2107

2108
void Session::nonsync_transact_notify(version_type new_version)
2109
{
16,072✔
2110
    m_impl->on_commit(new_version); // Throws
16,072✔
2111
}
16,072✔
2112

2113

2114
void Session::cancel_reconnect_delay()
2115
{
20✔
2116
    m_impl->cancel_reconnect_delay(); // Throws
20✔
2117
}
20✔
2118

2119

2120
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2121
{
4,548✔
2122
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
4,548✔
2123
}
4,548✔
2124

2125

2126
bool Session::wait_for_upload_complete_or_client_stopped()
2127
{
12,992✔
2128
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
12,992✔
2129
}
12,992✔
2130

2131

2132
bool Session::wait_for_download_complete_or_client_stopped()
2133
{
10,088✔
2134
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
10,088✔
2135
}
10,088✔
2136

2137

2138
void Session::refresh(const std::string& signed_access_token)
2139
{
208✔
2140
    m_impl->refresh(signed_access_token); // Throws
208✔
2141
}
208✔
2142

2143

2144
void Session::abandon() noexcept
2145
{
11,376✔
2146
    REALM_ASSERT(m_impl);
11,376✔
2147
    // Reabsorb the ownership assigned to the applications naked pointer by
5,516✔
2148
    // Session constructor
5,516✔
2149
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
11,376✔
2150
    SessionWrapper::abandon(std::move(wrapper));
11,376✔
2151
}
11,376✔
2152

2153
util::Future<std::string> Session::send_test_command(std::string body)
2154
{
60✔
2155
    return m_impl->send_test_command(std::move(body));
60✔
2156
}
60✔
2157

2158
std::string Session::get_appservices_connection_id()
2159
{
72✔
2160
    return m_impl->get_appservices_connection_id();
72✔
2161
}
72✔
2162

2163
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2164
{
×
2165
    switch (proxyType) {
×
2166
        case ProxyConfig::Type::HTTP:
×
2167
            return os << "HTTP";
×
2168
        case ProxyConfig::Type::HTTPS:
×
2169
            return os << "HTTPS";
×
2170
    }
×
2171
    REALM_TERMINATE("Invalid Proxy Type object.");
2172
}
×
2173

2174
} // namespace sync
2175
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc