• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2458

01 Jul 2024 04:59PM UTC coverage: 91.147% (+0.1%) from 91.001%
2458

push

Evergreen

web-flow
Merge pull request #7796 from realm/tg/upload-completion

RCORE-2160 Make upload completion reporting multiprocess-compatible

103762 of 182414 branches covered (56.88%)

136 of 139 new or added lines in 8 files covered. (97.84%)

52 existing lines in 11 files now uncovered.

216754 of 237806 relevant lines covered (91.15%)

5777851.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.37
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot;
148
        uint64_t uploaded;
149
        uint64_t uploadable;
150
        uint64_t downloaded;
151
        uint64_t downloadable;
152

153
        // Does not check snapshot
154
        bool operator==(const ReportedProgress& p) const noexcept
155
        {
22,798✔
156
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
22,798✔
157
                   downloadable == p.downloadable;
22,798✔
158
        }
22,798✔
159
    };
160
    std::optional<ReportedProgress> m_reported_progress;
161
    uint64_t m_final_uploaded = 0;
162
    uint64_t m_final_downloaded = 0;
163

164
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
165
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
166

167
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
168
    bool m_in_debug_hook = false;
169

170
    const SessionReason m_session_reason;
171

172
    const uint64_t m_schema_version;
173

174
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
175
    int64_t m_flx_active_version = 0;
176
    int64_t m_flx_last_seen_version = 0;
177
    int64_t m_flx_pending_mark_version = 0;
178
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
179

180
    std::shared_ptr<MigrationStore> m_migration_store;
181

182
    // Set to true when this session wrapper is actualized (i.e. the wrapped
183
    // session is created), or when the wrapper is finalized before actualization.
184
    // It is then never modified again.
185
    //
186
    // Actualization is scheduled during the construction of SessionWrapper, and
187
    // so a session specific post handler will always find that `m_actualized`
188
    // is true as the handler will always be run after the actualization job.
189
    // This holds even if the wrapper is finalized or closed before actualization.
190
    bool m_actualized = false;
191

192
    // Set to true when session deactivation is begun, either via force_close()
193
    // or finalize().
194
    bool m_closed = false;
195

196
    // Set to true in on_suspended() and then false in on_resumed(). Used to
197
    // suppress spurious connection state and error reporting while the session
198
    // is already in an error state.
199
    bool m_suspended = false;
200

201
    // Set when the session has been abandoned. After this point none of the
202
    // public API functions should be called again.
203
    bool m_abandoned = false;
204
    // Has the SessionWrapper been finalized?
205
    bool m_finalized = false;
206

207
    // Set to true when the first DOWNLOAD message is received to indicate that
208
    // the byte-level download progress parameters can be considered reasonable
209
    // reliable. Before that, a lot of time may have passed, so our record of
210
    // the download progress is likely completely out of date.
211
    bool m_reliable_download_progress = false;
212

213
    // Set to point to an activated session object during actualization of the
214
    // session wrapper. Set to null during finalization of the session
215
    // wrapper. Both modifications are guaranteed to be performed by the event
216
    // loop thread.
217
    //
218
    // If a session specific post handler, that is submitted after the
219
    // initiation of the session wrapper, sees that `m_sess` is null, it can
220
    // conclude that the session wrapper has either been force closed or has
221
    // been both abandoned and finalized.
222
    //
223
    // Must only be accessed from the event loop thread.
224
    SessionImpl* m_sess = nullptr;
225

226
    // These must only be accessed from the event loop thread.
227
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
228
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
229
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
230

231
    // `m_target_*load_mark` and `m_reached_*load_mark` are protected by
232
    // `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
233
    // event loop thread.
234
    std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
235
    std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
236
    std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
237

238
    void on_upload_completion();
239
    void on_download_completion();
240
    void on_suspended(const SessionErrorInfo& error_info);
241
    void on_resumed();
242
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
243
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
244
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
245
    void on_flx_sync_version_complete(int64_t version);
246

247
    void init_progress_handler();
248
    void report_progress();
249

250
    friend class SessionWrapperStack;
251
    friend class ClientImpl::Session;
252
};
253

254

9,784✔
255
// ################ SessionWrapperStack ################
9,784✔
256

9,784✔
257
inline bool SessionWrapperStack::empty() const noexcept
258
{
10,060✔
259
    return !m_back;
10,060✔
260
}
19,812✔
261

9,752✔
262

9,752✔
263
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
9,752✔
264
{
20,190✔
265
    REALM_ASSERT(!w->m_next);
10,438✔
266
    w->m_next = m_back;
10,438✔
267
    m_back = w.release();
10,438✔
268
}
38,982✔
269

28,544✔
270

28,544✔
271
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
9,712✔
272
{
39,992✔
273
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
39,992✔
274
    if (m_back) {
58,824✔
275
        m_back = m_back->m_next;
38,936✔
276
        w->m_next = nullptr;
10,392✔
277
    }
10,392✔
278
    return w;
30,280✔
279
}
40,064✔
280

9,784✔
281

282
inline void SessionWrapperStack::clear() noexcept
283
{
10,060✔
284
    while (m_back) {
19,844✔
285
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
286
        m_back = w->m_next;
287
    }
288
}
14,952✔
289

4,892✔
290

4,980✔
291
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
88✔
292
{
5,326✔
293
    SessionWrapper** p = &m_back;
10,130✔
294
    while (*p && *p != w) {
10,194✔
295
        p = &(*p)->m_next;
4,956✔
296
    }
132✔
297
    if (!*p) {
5,272✔
298
        return false;
5,232✔
299
    }
10,090✔
300
    *p = w->m_next;
40✔
301
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
40✔
302
    return true;
40✔
303
}
15,022✔
304

9,784✔
305

9,784✔
306
SessionWrapperStack::~SessionWrapperStack()
307
{
10,060✔
308
    clear();
10,060✔
309
}
10,060✔
310

311

4,898✔
312
// ################ ClientImpl ################
313

314
ClientImpl::~ClientImpl()
315
{
9,934✔
316
    // Since no other thread is allowed to be accessing this client or any of
317
    // its subobjects at this time, no mutex locking is necessary.
318

4,898✔
319
    shutdown_and_wait();
9,934✔
320
    // Session wrappers are removed from m_unactualized_session_wrappers as they
4,898✔
321
    // are abandoned.
4,898✔
322
    REALM_ASSERT(m_stopped);
5,036✔
323
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
5,036✔
324
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
5,036✔
325
}
5,926✔
326

327

890✔
328
void ClientImpl::cancel_reconnect_delay()
890✔
329
{
1,900✔
330
    // Thread safety required
890✔
331
    post([this] {
1,010!
332
        for (auto& p : m_server_slots) {
1,010✔
333
            ServerSlot& slot = p.second;
1,010✔
334
            if (m_one_connection_per_session) {
1,010✔
335
                REALM_ASSERT(!slot.connection);
×
336
                for (const auto& p : slot.alt_connections) {
×
337
                    ClientImpl::Connection& conn = *p.second;
×
338
                    conn.resume_active_sessions(); // Throws
890✔
339
                    conn.cancel_reconnect_delay(); // Throws
890✔
340
                }
890✔
341
            }
888✔
342
            else {
1,898✔
343
                REALM_ASSERT(slot.alt_connections.empty());
1,898✔
344
                if (slot.connection) {
1,898✔
345
                    ClientImpl::Connection& conn = *slot.connection;
1,010✔
346
                    conn.resume_active_sessions(); // Throws
1,010✔
347
                    conn.cancel_reconnect_delay(); // Throws
1,010✔
348
                }
1,898✔
349
                else {
892✔
350
                    slot.reconnect_info.reset();
892✔
351
                }
892✔
352
            }
1,010✔
353
        }
1,010✔
354
    }); // Throws
1,010✔
355
}
1,016✔
356

6✔
357

6✔
358
void ClientImpl::voluntary_disconnect_all_connections()
6✔
359
{
12✔
360
    auto done_pf = util::make_promise_future<void>();
12✔
361
    post([this, promise = std::move(done_pf.promise)]() mutable {
12✔
362
        try {
6!
363
            for (auto& p : m_server_slots) {
6✔
364
                ServerSlot& slot = p.second;
6✔
365
                if (m_one_connection_per_session) {
6✔
366
                    REALM_ASSERT(!slot.connection);
×
367
                    for (const auto& [_, conn] : slot.alt_connections) {
6!
368
                        conn->voluntary_disconnect();
6✔
369
                    }
6✔
370
                }
6✔
371
                else {
12✔
372
                    REALM_ASSERT(slot.alt_connections.empty());
12✔
373
                    if (slot.connection) {
12✔
374
                        slot.connection->voluntary_disconnect();
12✔
375
                    }
12✔
376
                }
6✔
377
            }
6✔
378
        }
6✔
379
        catch (...) {
12✔
380
            promise.set_error(exception_to_status());
6✔
381
            return;
6✔
382
        }
6✔
383
        promise.emplace_value();
6✔
384
    });
6✔
385
    done_pf.future.get();
6✔
386
}
4,552✔
387

388

389
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
4,546✔
390
{
9,532✔
391
    // Thread safety required
4,546✔
392

4,546✔
393
    {
4,986✔
394
        util::CheckedLockGuard lock{m_mutex};
4,986✔
395
        m_sessions_terminated = false;
4,986✔
396
    }
4,986✔
397

398
    // The technique employed here relies on the fact that
399
    // actualize_and_finalize_session_wrappers() must get to execute at least
400
    // once before the post handler submitted below gets to execute, but still
401
    // at a time where all session wrappers, that are abandoned prior to the
402
    // execution of wait_for_session_terminations_or_client_stopped(), have been
403
    // added to `m_abandoned_session_wrappers`.
404
    //
405
    // To see that this is the case, consider a session wrapper that was
406
    // abandoned before wait_for_session_terminations_or_client_stopped() was
407
    // invoked. Then the session wrapper will have been added to
408
    // `m_abandoned_session_wrappers`, and an invocation of
409
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
410
    // guarantees mentioned in the documentation of Trigger then ensure
411
    // that at least one execution of actualize_and_finalize_session_wrappers()
4,546✔
412
    // will happen after the session wrapper has been added to
4,546✔
413
    // `m_abandoned_session_wrappers`, but before the post handler submitted
4,546✔
414
    // below gets to execute.
4,546✔
415
    post([this] {
9,532✔
416
        {
9,532✔
417
            util::CheckedLockGuard lock{m_mutex};
9,532✔
418
            m_sessions_terminated = true;
4,986✔
419
        }
9,532✔
420
        m_wait_or_client_stopped_cond.notify_all();
9,532✔
421
    }); // Throws
9,532✔
422

9,092✔
423
    bool completion_condition_was_satisfied;
14,078✔
424
    {
14,078✔
425
        util::CheckedUniqueLock lock{m_mutex};
9,532✔
426
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
14,510✔
427
            return m_sessions_terminated || m_stopped;
14,510✔
428
        });
14,510✔
429
        completion_condition_was_satisfied = !m_stopped;
4,986✔
430
    }
4,986✔
431
    return completion_condition_was_satisfied;
4,986✔
432
}
4,986✔
433

28✔
434

28✔
435
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
28✔
436
util::Future<void> ClientImpl::notify_session_terminated()
437
{
56✔
438
    auto pf = util::make_promise_future<void>();
28✔
439
    post([promise = std::move(pf.promise)](Status status) mutable {
28✔
440
        // Includes operation_aborted
441
        if (!status.is_ok()) {
28✔
442
            promise.set_error(status);
28✔
443
            return;
28✔
444
        }
445

28✔
446
        promise.emplace_value();
56✔
447
    });
28✔
448

449
    return std::move(pf.future);
4,926✔
450
}
4,926✔
451

4,892✔
452
void ClientImpl::drain_connections_on_loop()
4,892✔
453
{
9,928✔
454
    post([this](Status status) {
9,934✔
455
        REALM_ASSERT(status.is_ok());
5,030✔
456
        drain_connections();
5,030✔
457
    });
10,316✔
458
}
10,322✔
459

5,286✔
460
void ClientImpl::shutdown_and_wait()
5,286✔
461
{
5,808✔
462
    shutdown();
5,808✔
463
    util::CheckedUniqueLock lock{m_drain_mutex};
5,420✔
464
    if (m_drained) {
10,318✔
465
        return;
10,170✔
466
    }
10,170✔
467

9,786✔
468
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
5,036✔
469
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
10,980✔
470
        return m_num_connections == 0 && m_outstanding_posts == 0;
10,980✔
471
    });
6,082✔
472

473
    m_drained = true;
15,262✔
474
}
15,262✔
475

10,226✔
476
void ClientImpl::shutdown() noexcept
10,226✔
477
{
15,824✔
478
    {
15,394✔
479
        util::CheckedLockGuard lock{m_mutex};
15,394✔
480
        if (m_stopped)
10,496✔
481
            return;
5,460✔
482
        m_stopped = true;
9,934✔
483
    }
9,934✔
484
    m_wait_or_client_stopped_cond.notify_all();
485

486
    drain_connections_on_loop();
5,036✔
487
}
9,930✔
488

489

4,894✔
490
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
4,894✔
491
{
5,240✔
492
    // Thread safety required.
493
    {
10,134✔
494
        util::CheckedLockGuard lock{m_mutex};
5,240✔
495
        // We can't actualize the session if we've already been stopped, so
496
        // just finalize it immediately.
497
        if (m_stopped) {
5,240✔
498
            wrapper->finalize_before_actualization();
4,894✔
499
            return;
4,894✔
500
        }
4,894✔
501

502
        REALM_ASSERT(m_actualize_and_finalize);
10,134✔
503
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
5,240✔
504
    }
5,240✔
505
    m_actualize_and_finalize->trigger();
506
}
10,134✔
507

508

4,894✔
509
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
4,894✔
510
{
10,134✔
511
    // Thread safety required.
512
    {
5,240✔
513
        util::CheckedLockGuard lock{m_mutex};
10,134✔
514
        REALM_ASSERT(m_actualize_and_finalize);
5,242✔
515
        // The wrapper may have already been finalized before being abandoned
516
        // if we were stopped when it was created.
517
        if (wrapper->mark_abandoned())
5,240✔
518
            return;
2✔
519

520
        // If the session wrapper has not yet been actualized (on the event loop
4,892✔
521
        // thread), it can be immediately finalized. This ensures that we will
34✔
522
        // generally not actualize a session wrapper that has already been
34✔
523
        // abandoned.
34✔
524
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
10,096✔
525
            wrapper->finalize_before_actualization();
4,898✔
526
            return;
40✔
527
        }
4,898✔
528
        m_abandoned_session_wrappers.push(std::move(wrapper));
5,198✔
529
    }
5,198✔
530
    m_actualize_and_finalize->trigger();
531
}
5,198✔
532

6,988✔
533

534
// Must be called from the event loop thread.
535
void ClientImpl::actualize_and_finalize_session_wrappers()
536
{
7,350✔
537
    // We need to pop from the wrapper stacks while holding the lock to ensure
538
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
539
    // release the lock before finalizing or actualizing because those functions
540
    // invoke user callbacks which may try to access the client and reacquire
541
    // the lock.
542
    //
543
    // Finalization must always happen before actualization because we may be
544
    // finalizing and actualizing sessions for the same Realm file, and
16,700✔
545
    // actualizing first would result in overlapping sessions. Because we're
16,698✔
546
    // releasing the lock new sessions may come in as we're looping, so we need
16,698✔
547
    // a single loop that checks both fields.
16,698✔
548
    while (true) {
34,436✔
549
        bool finalize = true;
34,434✔
550
        bool stopped;
34,434✔
551
        util::bind_ptr<SessionWrapper> wrapper;
34,434✔
552
        {
29,582✔
553
            util::CheckedLockGuard lock{m_mutex};
29,582✔
554
            wrapper = m_abandoned_session_wrappers.pop();
29,582✔
555
            if (!wrapper) {
34,434✔
556
                wrapper = m_unactualized_session_wrappers.pop();
29,244✔
557
                finalize = false;
29,244✔
558
            }
19,532✔
559
            stopped = m_stopped;
27,448✔
560
        }
22,588✔
561
        if (!wrapper)
22,596✔
562
            break;
7,350✔
563
        if (finalize)
15,246✔
564
            wrapper->finalize(); // Throws
10,050✔
565
        else if (stopped)
14,908✔
566
            wrapper->finalize_before_actualization();
6,990✔
567
        else
5,194✔
568
            wrapper->actualize(); // Throws
5,194✔
569
    }
10,388✔
570
}
7,350✔
571

572

573
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
574
                                                   const std::string& authorization_header_name,
575
                                                   const std::map<std::string, std::string>& custom_http_headers,
576
                                                   bool verify_servers_ssl_certificate,
4,856✔
577
                                                   Optional<std::string> ssl_trust_certificate_path,
4,856✔
578
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
4,856✔
579
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
4,856✔
580
{
5,194✔
581
    auto&& [server_slot_it, inserted] =
5,194✔
582
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
10,050✔
583
    ServerSlot& server_slot = server_slot_it->second; // Throws
5,194✔
584

3,526✔
585
    // TODO: enable multiplexing with proxies
3,526✔
586
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
8,720✔
587
        // Use preexisting connection
588
        REALM_ASSERT(server_slot.alt_connections.empty());
3,734✔
589
        return *server_slot.connection;
5,064✔
590
    }
5,064✔
591

1,330✔
592
    // Create a new connection
1,330✔
593
    REALM_ASSERT(!server_slot.connection);
2,790✔
594
    connection_ident_type ident = m_prev_connection_ident + 1;
2,790✔
595
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
2,790✔
596
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
2,790✔
597
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
2,784✔
598
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,784✔
599
    ClientImpl::Connection& conn = *conn_2;
1,466✔
600
    if (!m_one_connection_per_session) {
1,466✔
601
        server_slot.connection = std::move(conn_2);
1,462✔
602
    }
2,786✔
603
    else {
1,334✔
604
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
1,334✔
605
    }
1,334✔
606
    m_prev_connection_ident = ident;
2,790✔
607
    was_created = true;
2,790✔
608
    {
2,790✔
609
        util::CheckedLockGuard lk(m_drain_mutex);
6,316✔
610
        ++m_num_connections;
1,460✔
611
    }
1,460✔
612
    return conn;
1,460✔
613
}
6,518✔
614

1,324✔
615

1,324✔
616
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
1,324✔
617
{
2,780✔
618
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,780✔
619
    auto i = m_server_slots.find(endpoint);
2,774✔
620
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
2,774✔
621
    ServerSlot& server_slot = i->second;
2,774✔
622
    if (!m_one_connection_per_session) {
2,774✔
623
        REALM_ASSERT(server_slot.alt_connections.empty());
2,768✔
624
        REALM_ASSERT(&*server_slot.connection == &conn);
1,456✔
625
        server_slot.reconnect_info = conn.get_reconnect_info();
1,456✔
626
        server_slot.connection.reset();
1,456✔
627
    }
1,456✔
628
    else {
12✔
629
        REALM_ASSERT(!server_slot.connection);
12✔
630
        connection_ident_type ident = conn.get_ident();
12✔
631
        auto j = server_slot.alt_connections.find(ident);
12✔
632
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
6✔
633
        REALM_ASSERT(&*j->second == &conn);
1,330✔
634
        server_slot.alt_connections.erase(j);
1,330✔
635
    }
1,330✔
636

1,324✔
637
    bool notify;
2,780✔
638
    {
2,780✔
639
        util::CheckedLockGuard lk(m_drain_mutex);
2,780✔
640
        REALM_ASSERT(m_num_connections);
2,476✔
641
        notify = --m_num_connections <= 0;
2,476✔
642
    }
2,780✔
643
    if (notify) {
1,456✔
644
        m_drain_cv.notify_all();
1,150✔
645
    }
1,150✔
646
}
1,456✔
647

648

52✔
649
// ################ SessionImpl ################
650

52!
651
void SessionImpl::force_close()
52✔
652
{
102✔
653
    // Allow force_close() if session is active or hasn't been activated yet.
52✔
654
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
50!
655
        m_wrapper.force_close();
50✔
656
    }
50✔
657
}
5,690✔
658

659
void SessionImpl::on_connection_state_changed(ConnectionState state,
5,640✔
660
                                              const std::optional<SessionErrorInfo>& error_info)
5,640✔
661
{
11,954✔
662
    // Only used to report errors back to the SyncSession while the Session is active
5,640✔
663
    if (m_state == SessionImpl::Active) {
6,314✔
664
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
6,314✔
665
    }
6,314✔
666
}
10,168✔
667

668

3,854!
669
const std::string& SessionImpl::get_virt_path() const noexcept
3,854✔
670
{
7,272✔
671
    // Can only be called if the session is active or being activated
672
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
3,418!
673
    return m_wrapper.m_virt_path;
8,420✔
674
}
3,418✔
675

5,002✔
676
const std::string& SessionImpl::get_realm_path() const noexcept
5,002✔
677
{
10,346✔
678
    // Can only be called if the session is active or being activated
679
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
5,344✔
680
    return m_wrapper.m_db->get_path();
17,708✔
681
}
5,344✔
682

12,364!
683
DBRef SessionImpl::get_db() const noexcept
12,364✔
684
{
26,196✔
685
    // Can only be called if the session is active or being activated
686
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
13,832!
687
    return m_wrapper.m_db;
71,444✔
688
}
13,832✔
689

57,612✔
690
ClientReplication& SessionImpl::get_repl() const noexcept
57,612✔
691
{
119,408✔
692
    // Can only be called if the session is active or being activated
693
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
61,796✔
694
    return m_wrapper.get_replication();
118,432✔
695
}
118,432✔
696

56,636✔
697
ClientHistory& SessionImpl::get_history() const noexcept
698
{
60,808✔
699
    return get_repl().get_history();
69,130✔
700
}
60,808✔
701

8,322✔
702
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
8,322✔
703
{
17,468✔
704
    // Can only be called if the session is active or being activated
705
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
9,146✔
706
    return m_wrapper.m_client_reset_config;
9,886✔
707
}
9,146✔
708

740!
709
SessionReason SessionImpl::get_session_reason() noexcept
740✔
710
{
1,478✔
711
    // Can only be called if the session is active or being activated
712
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
738!
713
    return m_wrapper.m_session_reason;
1,478✔
714
}
738✔
715

740!
716
uint64_t SessionImpl::get_schema_version() noexcept
740✔
717
{
1,478✔
718
    // Can only be called if the session is active or being activated
719
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
738!
720
    return m_wrapper.m_schema_version;
738✔
721
}
21,610✔
722

723
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
20,872✔
724
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
725
{
24,738✔
726
    // Ignore the call if the session is not active
727
    if (m_state != State::Active) {
45,610✔
728
        return;
20,872!
729
    }
20,872✔
730

731
    try {
24,738✔
732
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
45,610!
733
        if (simulate_integration_error) {
45,610✔
734
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
20,872✔
735
        }
20,872✔
736
        version_type client_version;
45,610✔
737
        if (REALM_LIKELY(!get_client().is_dry_run())) {
45,610✔
738
            VersionInfo version_info;
24,732✔
739
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
24,732✔
740
            client_version = version_info.realm_version;
24,732✔
741
        }
24,732✔
742
        else {
20,878✔
743
            // Fake it for "dry run" mode
20,872✔
744
            client_version = m_last_version_available + 1;
20,878✔
745
        }
18✔
746
        on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
24,750✔
747
    }
45,610✔
748
    catch (const IntegrationException& e) {
24,738✔
749
        on_integration_failure(e);
12✔
750
    }
12✔
751
}
32,716✔
752

753

7,978✔
754
void SessionImpl::on_upload_completion()
7,978✔
755
{
15,468✔
756
    // Ignore the call if the session is not active
7,978✔
757
    if (m_state == State::Active) {
7,490✔
758
        m_wrapper.on_upload_completion(); // Throws
7,490✔
759
    }
7,490✔
760
}
7,822✔
761

762

332✔
763
void SessionImpl::on_download_completion()
332✔
764
{
8,518✔
765
    // Ignore the call if the session is not active
332✔
766
    if (m_state == State::Active) {
8,186✔
767
        m_wrapper.on_download_completion(); // Throws
8,186✔
768
    }
8,186✔
769
}
8,214✔
770

771

28✔
772
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
28✔
773
{
366✔
774
    // Ignore the call if the session is not active
28✔
775
    if (m_state == State::Active) {
338✔
776
        m_wrapper.on_suspended(error_info); // Throws
338✔
777
    }
5,340✔
778
}
338✔
779

5,002✔
780

5,002✔
781
void SessionImpl::on_resumed()
5,002✔
782
{
5,034✔
783
    // Ignore the call if the session is not active
784
    if (m_state == State::Active) {
32✔
785
        m_wrapper.on_resumed(); // Throws
180✔
786
    }
32✔
787
}
180✔
788

148✔
789
void SessionImpl::handle_pending_client_reset_acknowledgement()
148✔
790
{
5,492✔
791
    // Ignore the call if the session is not active
792
    if (m_state == State::Active) {
5,344✔
793
        m_wrapper.handle_pending_client_reset_acknowledgement();
5,344✔
794
    }
27,278✔
795
}
5,344✔
796

21,934✔
797
void SessionImpl::update_subscription_version_info()
798
{
148✔
799
    // Ignore the call if the session is not active
800
    if (m_state == State::Active) {
22,082✔
801
        m_wrapper.update_subscription_version_info();
21,020✔
802
    }
21,020✔
803
}
148✔
804

1,062✔
805
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
1,062✔
806
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
1,062✔
807
{
26,780✔
808
    // Ignore the call if the session is not active
970✔
809
    if (m_state != State::Active) {
25,810✔
810
        return false;
1,062✔
811
    }
1,062✔
812

1,062✔
813
    if (is_steady_state_download_message(batch_state, query_version)) {
26,872✔
814
        return false;
25,798✔
815
    }
24,736!
816

817
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,074✔
818
    std::optional<SyncProgress> maybe_progress;
1,074✔
819
    if (batch_state == DownloadBatchState::LastInBatch) {
1,074✔
820
        maybe_progress = progress;
980✔
821
    }
980✔
822

823
    bool new_batch = false;
1,074✔
824
    try {
1,074✔
825
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
1,074✔
826
    }
1,074✔
827
    catch (const LogicError& ex) {
2,136✔
828
        if (ex.code() == ErrorCodes::LimitExceeded) {
24!
829
            IntegrationException ex(ErrorCodes::LimitExceeded,
24✔
830
                                    "bootstrap changeset too large to store in pending bootstrap store",
831
                                    ProtocolError::bad_changeset_size);
1,062✔
832
            on_integration_failure(ex);
1,062✔
833
            return true;
1,062✔
834
        }
6✔
835
        throw;
6✔
836
    }
1,056✔
837

838
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
1,056✔
839
    // bootstrapping.
90✔
840
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
1,162✔
841
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
24✔
842
    }
990✔
843

966✔
844
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
2,038✔
845
                                       batch_state, received_changesets.size());
2,038✔
846
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,078✔
847
        return true;
12✔
848
    }
972✔
849
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,066✔
850

851
    if (batch_state == DownloadBatchState::MoreToCome) {
1,066✔
852
        notify_sync_progress();
1,056✔
853
        return true;
1,056✔
854
    }
90✔
855

856
    try {
976✔
857
        process_pending_flx_bootstrap();
6,798✔
858
    }
976✔
859
    catch (const IntegrationException& e) {
6,798✔
860
        on_integration_failure(e);
4,108✔
861
    }
4,108✔
862
    catch (...) {
976✔
863
        on_integration_failure(IntegrationException(exception_to_status()));
1,720✔
864
    }
1,720✔
865

1,720✔
866
    return true;
1,720✔
867
}
1,720✔
868

869

976✔
870
void SessionImpl::process_pending_flx_bootstrap()
976✔
871
{
7,148✔
872
    // Ignore the call if not a flx session or session is not active
976✔
873
    if (!m_is_flx_sync_session || m_state != State::Active) {
7,148✔
874
        return;
5,420✔
875
    }
5,420✔
876
    // Should never be called if session is not active
976✔
877
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
2,704✔
878
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,704✔
879
    if (!bootstrap_store->has_pending()) {
1,728✔
880
        return;
742✔
881
    }
1,718✔
882

2,022✔
883
    auto pending_batch_stats = bootstrap_store->pending_stats();
2,038✔
884
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
2,038✔
885
                "changeset size: %3)",
2,038✔
886
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
990✔
887
                pending_batch_stats.pending_changeset_bytes);
986✔
888
    auto& history = get_repl().get_history();
986✔
889
    VersionInfo new_version;
990✔
890
    SyncProgress progress;
990✔
891
    int64_t query_version = -1;
990✔
892
    size_t changesets_processed = 0;
990✔
893

894
    // Used to commit each batch after it was transformed.
1,048✔
895
    TransactionRef transact = get_db()->start_write();
2,034✔
896
    while (bootstrap_store->has_pending()) {
3,090✔
897
        auto start_time = std::chrono::steady_clock::now();
2,110✔
898
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,110✔
899
        if (!pending_batch.progress) {
2,110✔
900
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
1,052✔
901
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
2✔
902
            // bootstrap store requires a write transaction itself.
2✔
903
            transact->close();
4✔
904
            bootstrap_store->clear();
1,050✔
905
            return;
1,050✔
906
        }
4✔
907

1,046✔
908
        auto batch_state =
2,104✔
909
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,104✔
910
        uint64_t downloadable_bytes = 0;
2,098✔
911
        query_version = pending_batch.query_version;
2,098✔
912
        bool simulate_integration_error =
2,098✔
913
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,104✔
914
        if (simulate_integration_error) {
2,104✔
915
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
1,048✔
916
        }
2✔
917

1,046✔
918
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
2,102✔
919
                        batch_state, pending_batch.changesets.size());
2,102✔
920

921
        history.integrate_server_changesets(
2,102✔
922
            *pending_batch.progress, downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
2,102✔
923
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
2,102✔
924
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
2,096✔
925
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
2,096✔
926
            });
2,096✔
927
        progress = *pending_batch.progress;
1,056✔
928
        changesets_processed += pending_batch.changesets.size();
2,026✔
929
        auto duration = std::chrono::steady_clock::now() - start_time;
2,026✔
930

931
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
2,026✔
932
                                      batch_state, pending_batch.changesets.size());
2,026✔
933
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
2,026✔
934

935
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
2,026✔
936
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
2,026✔
937
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,056✔
938
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,056✔
939
                    pending_batch.remaining_changesets);
1,066✔
940
    }
1,056✔
941

10✔
942
    REALM_ASSERT_3(query_version, !=, -1);
990✔
943
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
990✔
944

10✔
945
    on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
980✔
946
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
980✔
947
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,968✔
948
    // NoAction/EarlyReturn are both valid no-op actions to take here.
949
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,968✔
950
}
1,968✔
951

988✔
952
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
988✔
953
{
10✔
954
    // Ignore the call if the session is not active
955
    if (m_state == State::Active) {
9,276✔
956
        m_wrapper.on_flx_sync_error(version, err_msg);
10✔
957
    }
9,276✔
958
}
9,276✔
959

9,266✔
960
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
961
{
998✔
962
    // Ignore the call if the session is not active
32,266✔
963
    if (m_state == State::Active) {
998✔
964
        m_wrapper.on_flx_sync_progress(version, batch_state);
33,264✔
965
    }
33,264✔
966
}
33,264✔
967

968
SubscriptionStore* SessionImpl::get_flx_subscription_store()
969
{
9,228✔
970
    // Should never be called if session is not active
971
    REALM_ASSERT_EX(m_state == State::Active, m_state);
9,228✔
972
    return m_wrapper.get_flx_subscription_store();
9,228✔
973
}
9,228✔
974

150✔
975
MigrationStore* SessionImpl::get_migration_store()
976
{
35,614✔
977
    // Should never be called if session is not active
1,248✔
978
    REALM_ASSERT_EX(m_state == State::Active, m_state);
35,614✔
979
    return m_wrapper.get_migration_store();
36,862✔
980
}
35,614✔
981

982
void SessionImpl::on_flx_sync_version_complete(int64_t version)
1,248✔
983
{
162✔
984
    // Ignore the call if the session is not active
12✔
985
    if (m_state == State::Active) {
1,386✔
986
        m_wrapper.on_flx_sync_version_complete(version);
1,386✔
987
    }
1,386✔
988
}
1,386✔
989

990
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
1,236✔
991
{
2,488✔
992
    // Should never be called if session is not active
6✔
993
    REALM_ASSERT_EX(m_state == State::Active, m_state);
1,258✔
994

6✔
995
    // Make sure we don't call the debug hook recursively.
996
    if (m_wrapper.m_in_debug_hook) {
1,258✔
997
        return SyncClientHookAction::NoAction;
18✔
998
    }
18✔
999
    m_wrapper.m_in_debug_hook = true;
1,240✔
1000
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
1,252✔
1001
        m_wrapper.m_in_debug_hook = false;
1,252✔
1002
    });
1,252✔
1003

1004
    auto action = m_wrapper.m_debug_hook(data);
2,454✔
1005
    switch (action) {
2,454✔
1006
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
1,242✔
1007
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
1,242✔
1008
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
6✔
1009

1010
            auto err_processing_err = receive_error_message(err_info);
6✔
1011
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
6✔
1012
            return SyncClientHookAction::EarlyReturn;
56,526✔
1013
        }
56,520✔
1014
        case realm::SyncClientHookAction::TriggerReconnect: {
55,388✔
1015
            get_connection().voluntary_disconnect();
55,388✔
1016
            return SyncClientHookAction::EarlyReturn;
1,156✔
1017
        }
×
1018
        default:
1,218✔
1019
            return action;
1,218✔
1020
    }
2,384✔
1021
}
2,384✔
1022

1,144✔
1023
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1,144✔
1024
                                                  int64_t query_version, DownloadBatchState batch_state,
1,144✔
1025
                                                  size_t num_changesets)
1,144✔
1026
{
64,208✔
1027
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
65,352✔
1028
        return SyncClientHookAction::NoAction;
64,196✔
1029
    }
63,052✔
1030
    if (REALM_UNLIKELY(m_state != State::Active)) {
1,156✔
1031
        return SyncClientHookAction::NoAction;
684✔
1032
    }
684✔
1033

584✔
1034
    SyncClientHookData data;
1,740✔
1035
    data.event = event;
1,256✔
1036
    data.batch_state = batch_state;
1,156✔
1037
    data.progress = progress;
1,156✔
1038
    data.num_changesets = num_changesets;
1,156✔
1039
    data.query_version = query_version;
1,256✔
1040

100✔
1041
    return call_debug_hook(data);
1,256✔
1042
}
1,256✔
1043

100✔
1044
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
100✔
1045
{
796✔
1046
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
696✔
1047
        return SyncClientHookAction::NoAction;
692✔
1048
    }
692✔
1049
    if (REALM_UNLIKELY(m_state != State::Active)) {
104✔
1050
        return SyncClientHookAction::NoAction;
1051
    }
9,596✔
1052

9,596✔
1053
    SyncClientHookData data;
9,700✔
1054
    data.event = event;
104✔
1055
    data.batch_state = DownloadBatchState::SteadyState;
104✔
1056
    data.progress = m_progress;
43,980✔
1057
    data.num_changesets = 0;
104✔
1058
    data.query_version = 0;
43,980✔
1059
    data.error_info = &error_info;
43,980✔
1060

20,872✔
1061
    return call_debug_hook(data);
20,976✔
1062
}
104✔
1063

23,004✔
1064
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
20,222✔
1065
{
29,722✔
1066
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
9,500✔
1067
}
9,500✔
1068

2,782✔
1069
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
652✔
1070
{
52,274✔
1071
    // Should never be called if session is not active
1072
    REALM_ASSERT_EX(m_state == State::Active, m_state);
53,752✔
1073
    if (batch_state == DownloadBatchState::SteadyState) {
54,404✔
1074
        return true;
24,736✔
1075
    }
24,736✔
1076

5,002✔
1077
    if (!m_is_flx_sync_session) {
31,888✔
1078
        return true;
29,076✔
1079
    }
29,076✔
1080

1081
    // If this is a steady state DOWNLOAD, no need for special handling.
1082
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
23,944✔
1083
        return true;
21,796✔
1084
    }
21,796✔
1085

1086
    return false;
2,148✔
1087
}
2,842✔
1088

30✔
1089
void SessionImpl::init_progress_handler()
1090
{
5,344✔
1091
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
5,344✔
1092
    m_wrapper.init_progress_handler();
5,374✔
1093
}
5,374✔
1094

30✔
1095
void SessionImpl::enable_progress_notifications()
2✔
1096
{
25,004✔
1097
    m_wrapper.m_reliable_download_progress = true;
25,004✔
1098
}
25,030✔
1099

1100
void SessionImpl::notify_sync_progress()
1101
{
25,818✔
1102
    if (m_state != State::Active)
25,820✔
1103
        return;
2✔
1104

2✔
1105
    m_wrapper.report_progress();
25,790✔
1106
}
25,816✔
1107

26✔
1108
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1109
{
56✔
1110
    if (m_state != State::Active) {
30✔
1111
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1112
    }
×
1113

1114
    try {
56✔
1115
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
56✔
1116
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
56✔
1117
            return Status{ErrorCodes::LogicError,
28✔
1118
                          "Must supply command name in \"command\" field of test command json object"};
2✔
1119
        }
28✔
1120
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
58✔
1121
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
1122
        }
1123
    }
28✔
1124
    catch (const nlohmann::json::parse_error& e) {
30✔
1125
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
2✔
1126
    }
2✔
1127

1128
    auto pf = util::make_promise_future<std::string>();
26✔
1129
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
4,920✔
1130
        // Includes operation_aborted
4,894✔
1131
        if (!status.is_ok()) {
4,920✔
1132
            promise.set_error(status);
4,894✔
1133
            return;
4,894✔
1134
        }
4,894✔
1135

4,894✔
1136
        auto id = ++m_last_pending_test_command_ident;
4,920✔
1137
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
4,920✔
1138
        ensure_enlisted_to_send();
4,920✔
1139
    });
4,920✔
1140

4,894✔
1141
    return std::move(pf.future);
4,920✔
1142
}
4,924✔
1143

4,894✔
1144
// ################ SessionWrapper ################
4,894✔
1145

4,894✔
1146
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
4,894✔
1147
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
4,894✔
1148
// the ClientImpl::Connection that owns the ClientImpl::Session.
4,894✔
1149
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
4,894✔
1150
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
4,894✔
1151
    : m_client{client}
4,894✔
1152
    , m_db(std::move(db))
4,894✔
1153
    , m_replication(m_db->get_replication())
4,894✔
1154
    , m_protocol_envelope{config.protocol_envelope}
4,894✔
1155
    , m_server_address{std::move(config.server_address)}
4,894✔
1156
    , m_server_port{config.server_port}
4,894✔
1157
    , m_server_verified{config.server_verified}
4,894✔
1158
    , m_user_id(std::move(config.user_id))
4,894✔
1159
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
4,894✔
1160
    , m_authorization_header_name{config.authorization_header_name}
4,894✔
1161
    , m_custom_http_headers{std::move(config.custom_http_headers)}
1162
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1163
    , m_simulate_integration_error{config.simulate_integration_error}
1164
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
1165
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
4,894✔
1166
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
4,894✔
1167
    , m_http_request_path_prefix{std::move(config.service_identifier)}
4,894✔
1168
    , m_virt_path{std::move(config.realm_identifier)}
1169
    , m_proxy_config{std::move(config.proxy_config)}
1170
    , m_signed_access_token{std::move(config.signed_user_token)}
4,888✔
1171
    , m_client_reset_config{std::move(config.client_reset_config)}
1172
    , m_progress_handler(std::move(config.progress_handler))
1173
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
1174
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
4,888✔
1175
    , m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
4,888✔
1176
    , m_schema_version(config.schema_version)
4,888✔
1177
    , m_flx_subscription_store(std::move(flx_sub_store))
4,888✔
1178
    , m_migration_store(std::move(migration_store))
4,888✔
1179
{
10,128✔
1180
    REALM_ASSERT(m_db);
5,240✔
1181
    REALM_ASSERT(m_db->get_replication());
5,240✔
1182
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
5,240✔
1183

57,612✔
1184
    // SessionWrapper begins at +1 retain count because Client retains and
57,612✔
1185
    // releases it while performing async operations, and these need to not
57,612✔
1186
    // take it to 0 or it could be deleted before the caller can retain it.
57,612✔
1187
    bind_ptr();
5,240✔
1188
    m_client.register_unactualized_session_wrapper(this);
5,240✔
1189
}
5,240✔
1190

1191
SessionWrapper::~SessionWrapper() noexcept
1192
{
5,234✔
1193
    // We begin actualization in the constructor and do not delete the wrapper
1194
    // until both the Client is done with it and the Session has abandoned it,
1195
    // so at this point we must have actualized, finalized, and been abandoned.
988✔
1196
    REALM_ASSERT(m_actualized);
6,222✔
1197
    REALM_ASSERT(m_abandoned);
6,222✔
1198
    REALM_ASSERT(m_finalized);
5,234✔
1199
    REALM_ASSERT(m_closed);
5,234✔
1200
    REALM_ASSERT(!m_db);
5,244✔
1201
}
5,244✔
1202

10✔
1203

10✔
1204
inline ClientReplication& SessionWrapper::get_replication() noexcept
1205
{
61,794✔
1206
    REALM_ASSERT(m_db);
62,908✔
1207
    return static_cast<ClientReplication&>(*m_replication);
62,908✔
1208
}
62,908✔
1209

1,114✔
1210

1,114✔
1211
inline ClientImpl& SessionWrapper::get_client() noexcept
1212
{
1213
    return m_client;
988✔
1214
}
988✔
1215

1216
bool SessionWrapper::has_flx_subscription_store() const
1217
{
1,986✔
1218
    return static_cast<bool>(m_flx_subscription_store);
1,986✔
1219
}
1,986✔
1220

988✔
1221
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
1222
{
998✔
1223
    REALM_ASSERT(!m_finalized);
10✔
1224
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
998✔
1225
}
10✔
1226

1227
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
1228
{
2,088✔
1229
    REALM_ASSERT(!m_finalized);
2,088✔
1230
    m_flx_last_seen_version = version;
1,124✔
1231
    m_flx_active_version = version;
1,124✔
1232
}
2,088✔
1233

964✔
1234
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
462✔
1235
{
1,460✔
1236
    if (!has_flx_subscription_store()) {
1,500✔
1237
        return;
502✔
1238
    }
502✔
1239
    REALM_ASSERT(!m_finalized);
1,500✔
1240
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
1,962✔
1241
    REALM_ASSERT(new_version >= m_flx_active_version);
1,022✔
1242
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
1,022✔
1243

1244
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
998✔
1245

1246
    switch (batch_state) {
1,022✔
1247
        case DownloadBatchState::SteadyState:
24✔
1248
            // Cannot be called with this value.
24✔
1249
            REALM_UNREACHABLE();
988✔
1250
        case DownloadBatchState::LastInBatch:
974✔
1251
            if (m_flx_active_version == new_version) {
1,962✔
1252
                return;
988✔
1253
            }
1254
            on_flx_sync_version_complete(new_version);
974✔
1255
            if (new_version == 0) {
11,238✔
1256
                new_state = SubscriptionSet::State::Complete;
10,726✔
1257
            }
10,726✔
1258
            else {
10,776✔
1259
                new_state = SubscriptionSet::State::AwaitingMark;
512✔
1260
                m_flx_pending_mark_version = new_version;
512✔
1261
            }
3,294✔
1262
            break;
3,756✔
1263
        case DownloadBatchState::MoreToCome:
2,806✔
1264
            if (m_flx_last_seen_version == new_version) {
2,806✔
1265
                return;
1266
            }
1267

32,268✔
1268
            m_flx_last_seen_version = new_version;
32,292✔
1269
            new_state = SubscriptionSet::State::Bootstrapping;
32,292✔
1270
            break;
32,292✔
1271
    }
998✔
1272

1273
    get_flx_subscription_store()->update_state(new_version, new_state);
5,892✔
1274
}
5,892✔
1275

4,894✔
1276
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
4,894✔
1277
{
14,980✔
1278
    REALM_ASSERT(!m_finalized);
10,086✔
1279
    return m_flx_subscription_store.get();
10,086✔
1280
}
10,086✔
1281

54,388✔
1282
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1283
{
57,188✔
1284
    REALM_ASSERT(!m_finalized);
57,186✔
1285
    return m_flx_pending_bootstrap_store.get();
57,186✔
1286
}
2,992✔
1287

54,194✔
1288
MigrationStore* SessionWrapper::get_migration_store()
54,194✔
1289
{
89,808✔
1290
    REALM_ASSERT(!m_finalized);
89,808✔
1291
    return m_migration_store.get();
90,002✔
1292
}
35,614✔
1293

1294
inline bool SessionWrapper::mark_abandoned()
1295
{
5,254✔
1296
    REALM_ASSERT(!m_abandoned);
5,240✔
1297
    m_abandoned = true;
5,240✔
1298
    return m_finalized;
5,254✔
1299
}
5,254✔
1300

14✔
1301

1302
void SessionWrapper::on_commit(version_type new_version)
1303
{
59,768✔
1304
    // Thread safety required
14✔
1305
    m_client.post([self = util::bind_ptr{this}, new_version] {
59,768✔
1306
        REALM_ASSERT(self->m_actualized);
59,780✔
1307
        if (REALM_UNLIKELY(!self->m_sess))
59,780✔
1308
            return; // Already finalized
576✔
1309
        SessionImpl& sess = *self->m_sess;
59,218✔
1310
        sess.recognize_sync_version(new_version); // Throws
59,218✔
1311
        self->report_progress();                  // Throws
59,218✔
1312
    });
59,204✔
1313
}
59,768✔
1314

1315

13,972✔
1316
void SessionWrapper::cancel_reconnect_delay()
13,972✔
1317
{
14✔
1318
    // Thread safety required
13,972✔
1319

13,972✔
1320
    m_client.post([self = util::bind_ptr{this}] {
13,986✔
1321
        REALM_ASSERT(self->m_actualized);
13,986✔
1322
        if (REALM_UNLIKELY(self->m_closed)) {
14✔
NEW
1323
            return;
×
NEW
1324
        }
×
1325

13,972✔
1326
        if (REALM_UNLIKELY(!self->m_sess))
14✔
1327
            return; // Already finalized
92✔
1328
        SessionImpl& sess = *self->m_sess;
106✔
1329
        sess.cancel_resumption_delay(); // Throws
106✔
1330
        ClientImpl::Connection& conn = sess.get_connection();
13,894✔
1331
        conn.cancel_reconnect_delay(); // Throws
7,612✔
1332
    });                                // Throws
7,612✔
1333
}
14✔
1334

158✔
1335
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
158✔
1336
                                    WaitOperCompletionHandler handler)
7,440✔
1337
{
2,688✔
1338
    REALM_ASSERT(upload_completion || download_completion);
10,128✔
1339

7,440✔
1340
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
10,286✔
1341
                   download_completion]() mutable {
8,970✔
1342
        REALM_ASSERT(self->m_actualized);
2,688✔
1343
        if (REALM_UNLIKELY(!self->m_sess)) {
8,970✔
1344
            // Already finalized
6,282✔
1345
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
13,932✔
1346
            return;
13,932✔
1347
        }
7,650✔
1348
        if (upload_completion) {
16,516✔
1349
            if (download_completion) {
7,830✔
1350
                // Wait for upload and download completion
13,880✔
1351
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
14,130✔
1352
            }
158✔
1353
            else {
1,232✔
1354
                // Wait for upload completion only
1355
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
7,688✔
1356
            }
1,232✔
1357
        }
7,846✔
1358
        else {
1,246✔
1359
            // Wait for download completion only
6,456✔
1360
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
7,702✔
1361
        }
7,702✔
1362
        SessionImpl& sess = *self->m_sess;
9,092✔
1363
        if (upload_completion)
9,092✔
1364
            sess.request_upload_completion_notification(); // Throws
7,846✔
1365
        if (download_completion)
2,636✔
1366
            sess.request_download_completion_notification(); // Throws
1,404✔
1367
    });                                                      // Throws
2,636✔
1368
}
7,690✔
1369

1370

5,002✔
1371
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1372
{
11,458✔
1373
    // Thread safety required
5,002✔
1374
    REALM_ASSERT(!m_abandoned);
11,458✔
1375

5,002✔
1376
    std::int_fast64_t target_mark;
11,458✔
1377
    {
11,458✔
1378
        util::CheckedLockGuard lock{m_client.m_mutex};
6,456✔
1379
        target_mark = ++m_target_upload_mark;
6,456✔
1380
    }
6,456✔
1381

108✔
1382
    m_client.post([self = util::bind_ptr{this}, target_mark] {
6,456✔
1383
        REALM_ASSERT(self->m_actualized);
6,564✔
1384
        // The session wrapper may already have been finalized. This can only
1385
        // happen if it was abandoned, but in that case, the call of
108✔
1386
        // wait_for_upload_complete_or_client_stopped() must have returned
108✔
1387
        // already.
108✔
1388
        if (REALM_UNLIKELY(!self->m_sess))
6,458✔
1389
            return;
116✔
1390
        if (target_mark > self->m_staged_upload_mark) {
6,552✔
1391
            self->m_staged_upload_mark = target_mark;
6,552✔
1392
            SessionImpl& sess = *self->m_sess;
6,446✔
1393
            sess.request_upload_completion_notification(); // Throws
6,552✔
1394
        }
6,552✔
1395
    }); // Throws
6,552✔
1396

106✔
1397
    bool completion_condition_was_satisfied;
6,564✔
1398
    {
6,456✔
1399
        util::CheckedUniqueLock lock{m_client.m_mutex};
6,456✔
1400
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
16,428✔
1401
            return m_reached_upload_mark >= target_mark || m_client.m_stopped;
21,322✔
1402
        });
21,322✔
1403
        completion_condition_was_satisfied = !m_client.m_stopped;
11,350✔
1404
    }
11,350✔
1405
    return completion_condition_was_satisfied;
6,456✔
1406
}
6,456✔
1407

1408

1409
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
4,858✔
1410
{
4,998✔
1411
    // Thread safety required
4,858✔
1412
    REALM_ASSERT(!m_abandoned);
9,856✔
1413

1414
    std::int_fast64_t target_mark;
4,998✔
1415
    {
9,856✔
1416
        util::CheckedLockGuard lock{m_client.m_mutex};
9,856✔
1417
        target_mark = ++m_target_download_mark;
4,998✔
1418
    }
9,856✔
1419

1420
    m_client.post([self = util::bind_ptr{this}, target_mark] {
9,856✔
1421
        REALM_ASSERT(self->m_actualized);
5,000✔
1422
        // The session wrapper may already have been finalized. This can only
2✔
1423
        // happen if it was abandoned, but in that case, the call of
1424
        // wait_for_download_complete_or_client_stopped() must have returned
4,858✔
1425
        // already.
4,858✔
1426
        if (REALM_UNLIKELY(!self->m_sess))
9,856✔
1427
            return;
30✔
1428
        if (target_mark > self->m_staged_download_mark) {
4,968✔
1429
            self->m_staged_download_mark = target_mark;
4,966✔
1430
            SessionImpl& sess = *self->m_sess;
9,824✔
1431
            sess.request_download_completion_notification(); // Throws
9,824✔
1432
        }
9,824✔
1433
    }); // Throws
9,826✔
1434

4,858✔
1435
    bool completion_condition_was_satisfied;
9,856✔
1436
    {
9,856✔
1437
        util::CheckedUniqueLock lock{m_client.m_mutex};
9,856✔
1438
        m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
10,178!
1439
            return m_reached_download_mark >= target_mark || m_client.m_stopped;
10,178✔
1440
        });
10,178✔
1441
        completion_condition_was_satisfied = !m_client.m_stopped;
4,998✔
1442
    }
4,998✔
1443
    return completion_condition_was_satisfied;
9,856✔
1444
}
9,856✔
1445

4,858✔
1446

754✔
1447
void SessionWrapper::refresh(std::string_view signed_access_token)
754✔
1448
{
108✔
1449
    // Thread safety required
4,858✔
1450
    REALM_ASSERT(!m_abandoned);
4,966✔
1451

4,858✔
1452
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
108✔
1453
        REALM_ASSERT(self->m_actualized);
108✔
1454
        if (REALM_UNLIKELY(!self->m_sess))
4,966✔
1455
            return; // Already finalized
2✔
1456
        self->m_signed_access_token = std::move(token);
106✔
1457
        SessionImpl& sess = *self->m_sess;
106✔
1458
        ClientImpl::Connection& conn = sess.get_connection();
4,964✔
1459
        // FIXME: This only makes sense when each session uses a separate connection.
1460
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
4,964✔
1461
        sess.cancel_resumption_delay();                                                          // Throws
1,432✔
1462
        conn.cancel_reconnect_delay();                                                           // Throws
106✔
1463
    });
4,964✔
1464
}
4,956✔
1465

4,848✔
1466

3,448✔
1467
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
3,448✔
1468
{
8,648✔
1469
    ClientImpl& client = wrapper->m_client;
8,688✔
1470
    client.register_abandoned_session_wrapper(std::move(wrapper));
10,088✔
1471
}
5,240✔
1472

4,858✔
1473

4,662✔
1474
// Must be called from event loop thread
4,858✔
1475
void SessionWrapper::actualize()
1476
{
5,198✔
1477
    // actualize() can only ever be called once
4,904✔
1478
    REALM_ASSERT(!m_actualized);
10,102✔
1479
    REALM_ASSERT(!m_sess);
5,252✔
1480
    // The client should have removed this wrapper from those pending
54✔
1481
    // actualization if it called force_close() or finalize_before_actualize()
4,850✔
1482
    REALM_ASSERT(!m_finalized);
10,048✔
1483
    REALM_ASSERT(!m_closed);
10,048✔
1484

1485
    m_actualized = true;
10,048✔
1486

4,850✔
1487
    ScopeExitFail close_on_error([&]() noexcept {
5,198✔
1488
        m_closed = true;
2✔
1489
    });
2✔
1490

4,850✔
1491
    m_db->claim_sync_agent();
5,198✔
1492
    m_db->add_commit_listener(this);
5,198✔
1493
    ScopeExitFail remove_commit_listener([&]() noexcept {
10,048✔
1494
        m_db->remove_commit_listener(this);
1495
    });
4,850✔
1496

4,850✔
1497
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
10,048✔
1498
                            m_user_id,           m_sync_mode,      m_server_verified};
5,198✔
1499
    bool was_created = false;
10,048✔
1500
    ClientImpl::Connection& conn = m_client.get_connection(
5,198✔
1501
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
5,198✔
1502
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
10,226✔
1503
        was_created); // Throws
5,376✔
1504
    ScopeExitFail remove_connection([&]() noexcept {
5,376✔
1505
        if (was_created)
178!
1506
            m_client.remove_connection(conn);
178✔
1507
    });
5,036✔
1508

186✔
1509
    // FIXME: This only makes sense when each session uses a separate connection.
186✔
1510
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
5,384✔
1511
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
5,384✔
1512
    if (m_sync_mode == SyncServerMode::FLX) {
5,384✔
1513
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
5,608✔
1514
    }
758✔
1515

6✔
1516
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
5,204✔
1517
    m_sess = sess.get();
5,204✔
1518
    ScopeExitFail clear_sess([&]() noexcept {
10,048✔
1519
        m_sess = nullptr;
1520
    });
1521
    conn.activate_session(std::move(sess)); // Throws
5,198✔
1522

1523
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
1524
    // session cannot change the state of the bootstrap store at the same time.
1525
    update_subscription_version_info();
5,198✔
1526

4,852✔
1527
    if (was_created)
10,050✔
1528
        conn.activate(); // Throws
6,314✔
1529

4,852✔
1530
    if (m_connection_state_change_listener) {
5,198✔
1531
        ConnectionState state = conn.get_state();
10,042✔
1532
        if (state != ConnectionState::disconnected) {
5,190✔
1533
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
8,480✔
1534
            if (state == ConnectionState::connected)
3,628✔
1535
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
3,532✔
1536
        }
3,628✔
1537
    }
5,190✔
1538

4,852✔
1539
    if (!m_client_reset_config)
10,050✔
1540
        report_progress(); // Throws
9,860✔
1541
}
5,198✔
1542

1543
void SessionWrapper::force_close()
1544
{
5,242✔
1545
    if (m_closed) {
5,242✔
1546
        return;
52✔
1547
    }
88✔
1548
    REALM_ASSERT(m_actualized);
5,226✔
1549
    REALM_ASSERT(m_sess);
5,226✔
1550
    m_closed = true;
5,226✔
1551

36✔
1552
    ClientImpl::Connection& conn = m_sess->get_connection();
5,226✔
1553
    conn.initiate_session_deactivation(m_sess); // Throws
5,226✔
1554

36✔
1555
    // We need to keep the DB open until finalization, but we no longer want to
36✔
1556
    // know when commits are made
36✔
1557
    m_db->remove_commit_listener(this);
5,190✔
1558

1559
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
7,978✔
1560
    m_flx_pending_bootstrap_store.reset();
5,190✔
1561
    // Clear the subscription and migration store refs since they are owned by SyncSession
1562
    m_flx_subscription_store.reset();
5,190✔
1563
    m_migration_store.reset();
5,190✔
1564
    m_sess = nullptr;
13,168✔
1565
    // Everything is being torn down, no need to report connection state anymore
1566
    m_connection_state_change_listener = {};
19,362✔
1567
}
11,384✔
1568

6,194✔
1569
// Must be called from event loop thread
6,194✔
1570
//
6,194✔
1571
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
8,026✔
1572
// have been acquired at some point in between the final read or write ever made
48✔
1573
// from a different thread and when this is called.
48✔
1574
void SessionWrapper::finalize()
48✔
1575
{
5,240✔
1576
    REALM_ASSERT(m_actualized);
5,192✔
1577
    REALM_ASSERT(m_abandoned);
13,170✔
1578
    REALM_ASSERT(!m_finalized);
5,640✔
1579

448✔
1580
    force_close();
5,640✔
1581

448✔
1582
    m_finalized = true;
5,640✔
1583

7,978✔
1584
    // The Realm file can be closed now, as no access to the Realm file is
1585
    // supposed to happen on behalf of a session after initiation of
1586
    // deactivation.
1587
    m_db->release_sync_agent();
5,524✔
1588
    m_db = nullptr;
5,524✔
1589

332✔
1590
    // All outstanding wait operations must be canceled
332✔
1591
    while (!m_upload_completion_handlers.empty()) {
5,742✔
1592
        auto handler = std::move(m_upload_completion_handlers.back());
550✔
1593
        m_upload_completion_handlers.pop_back();
550✔
1594
        handler(
218✔
1595
            {ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
218✔
1596
    }
218✔
1597
    while (!m_download_completion_handlers.empty()) {
5,334✔
1598
        auto handler = std::move(m_download_completion_handlers.back());
142✔
1599
        m_download_completion_handlers.pop_back();
142✔
1600
        handler(
142✔
1601
            {ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
142✔
1602
    }
142✔
1603
    while (!m_sync_completion_handlers.empty()) {
5,226✔
1604
        auto handler = std::move(m_sync_completion_handlers.back());
34✔
1605
        m_sync_completion_handlers.pop_back();
32✔
1606
        handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
34✔
1607
    }
38✔
1608
}
5,220✔
1609

1610

1611
// Must be called only when an unactualized session wrapper becomes abandoned.
1612
//
1613
// Called with a lock on `m_client.m_mutex`.
5,640✔
1614
inline void SessionWrapper::finalize_before_actualization() noexcept
5,640✔
1615
{
5,668✔
1616
    REALM_ASSERT(!m_finalized);
5,668✔
1617
    REALM_ASSERT(!m_sess);
5,682✔
1618
    m_actualized = true;
42✔
1619
    m_finalized = true;
42✔
1620
    m_closed = true;
5,046✔
1621
    m_db->remove_commit_listener(this);
5,046✔
1622
    m_db->release_sync_agent();
5,046✔
1623
    m_db = nullptr;
42✔
1624
}
42✔
1625

74,428✔
1626
void SessionWrapper::on_upload_completion()
74,428✔
1627
{
81,918✔
1628
    REALM_ASSERT(!m_finalized);
7,490✔
1629
    while (!m_upload_completion_handlers.empty()) {
82,980✔
1630
        auto handler = std::move(m_upload_completion_handlers.back());
36,916✔
1631
        m_upload_completion_handlers.pop_back();
1,062✔
1632
        handler(Status::OK()); // Throws
39,636✔
1633
    }
39,636✔
1634
    while (!m_sync_completion_handlers.empty()) {
46,164✔
1635
        auto handler = std::move(m_sync_completion_handlers.back());
38,674✔
1636
        m_download_completion_handlers.push_back(std::move(handler)); // Throws
38,674✔
1637
        m_sync_completion_handlers.pop_back();
100✔
1638
    }
38,674✔
1639
    util::CheckedLockGuard lock{m_client.m_mutex};
46,064✔
1640
    if (m_staged_upload_mark > m_reached_upload_mark) {
46,064✔
1641
        m_reached_upload_mark = m_staged_upload_mark;
6,436✔
1642
        m_client.m_wait_or_client_stopped_cond.notify_all();
6,436✔
1643
    }
45,018✔
1644
}
46,072✔
1645

28,946✔
1646

1647
void SessionWrapper::on_download_completion()
9,636✔
1648
{
17,822✔
1649
    while (!m_download_completion_handlers.empty()) {
19,048✔
1650
        auto handler = std::move(m_download_completion_handlers.back());
1,226✔
1651
        m_download_completion_handlers.pop_back();
18,172✔
1652
        handler(Status::OK()); // Throws
8,536✔
1653
    }
8,536✔
1654
    while (!m_sync_completion_handlers.empty()) {
15,544✔
1655
        auto handler = std::move(m_sync_completion_handlers.back());
7,358✔
1656
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
9,684✔
1657
        m_sync_completion_handlers.pop_back();
48✔
1658
    }
48✔
1659

38,582✔
1660
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
46,768✔
1661
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
14,158✔
1662
                             m_flx_pending_mark_version);
448✔
1663
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
448✔
1664
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
25,320✔
1665
    }
14,074✔
1666

1667
    util::CheckedLockGuard lock{m_client.m_mutex};
19,432✔
1668
    if (m_staged_download_mark > m_reached_download_mark) {
16,744✔
1669
        m_reached_download_mark = m_staged_download_mark;
13,492✔
1670
        m_client.m_wait_or_client_stopped_cond.notify_all();
4,934✔
1671
    }
4,934✔
1672
}
8,186✔
1673

1674

1675
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1676
{
338✔
1677
    REALM_ASSERT(!m_finalized);
338✔
1678
    m_suspended = true;
338✔
1679
    if (m_connection_state_change_listener) {
8,896✔
1680
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
8,896✔
1681
    }
4,506✔
1682
}
8,896✔
1683

8,558✔
1684

1685
void SessionWrapper::on_resumed()
11,246✔
1686
{
11,278✔
1687
    REALM_ASSERT(!m_finalized);
11,278✔
1688
    m_suspended = false;
4,176✔
1689
    if (m_connection_state_change_listener) {
32✔
1690
        ClientImpl::Connection& conn = m_sess->get_connection();
11,278✔
1691
        if (conn.get_state() != ConnectionState::disconnected) {
11,278✔
1692
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
11,274✔
1693
            if (conn.get_state() == ConnectionState::connected)
5,926✔
1694
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
292✔
1695
        }
294✔
1696
    }
298✔
1697
}
5,930✔
1698

1699

1700
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1701
                                                 const std::optional<SessionErrorInfo>& error_info)
1702
{
12,212✔
1703
    if (m_connection_state_change_listener && !m_suspended) {
12,212!
1704
        m_connection_state_change_listener(state, error_info); // Throws
6,298✔
1705
    }
12,196✔
1706
}
11,662✔
1707

1708
void SessionWrapper::init_progress_handler()
1709
{
5,344✔
1710
    ClientHistory::get_upload_download_bytes(m_db.get(), m_final_downloaded, m_final_uploaded);
10,692✔
1711
}
10,692✔
1712

4,414✔
1713
void SessionWrapper::report_progress()
5,348✔
1714
{
90,002✔
1715
    REALM_ASSERT(!m_finalized);
101,248✔
1716
    REALM_ASSERT(m_sess);
96,834✔
1717

11,246✔
1718
    if (!m_progress_handler)
97,104✔
1719
        return;
61,470✔
1720

11,246✔
1721
    // Ignore progress messages from before we first receive a DOWNLOAD message
8,120✔
1722
    if (!m_reliable_download_progress)
28,532✔
1723
        return;
17,550✔
1724

1725
    ReportedProgress p;
17,234✔
1726
    DownloadableProgress downloadable;
20,120✔
1727
    ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, downloadable, p.uploaded, p.uploadable,
20,120✔
1728
                                             p.snapshot);
14,108✔
1729

6,012✔
1730
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
20,120✔
1731
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
17,314✔
1732
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
14,308✔
1733

3,006✔
1734
        // The effect of this calculation is that if new bytes are added for download/upload,
3,006✔
1735
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
3,006✔
1736
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
3,006✔
1737
        // before progress has reached 1.0.
3,006✔
1738
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1739
        // Example for upload progress reported:
3,126✔
1740
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
3,126✔
1741

3,126✔
1742
        double progress_estimate = 1.0;
11,302✔
1743
        if (final_transferred < transferable && transferred < transferable)
11,302✔
1744
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
6,218✔
1745
        return progress_estimate;
11,332✔
1746
    };
11,302✔
1747

1748
    bool upload_completed = p.uploaded == p.uploadable;
14,108✔
1749
    double upload_estimate = 1.0;
14,138✔
1750
    if (!upload_completed)
14,138✔
1751
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
6,148✔
1752

1753
    bool download_completed = p.downloaded == 0;
19,110✔
1754
    double download_estimate = 1.00;
19,110✔
1755
    if (m_flx_pending_bootstrap_store) {
14,108✔
1756
        if (m_flx_pending_bootstrap_store->has_pending()) {
11,128✔
1757
            download_estimate = downloadable.as_estimate();
5,356✔
1758
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
5,194✔
1759
        }
5,194✔
1760
        download_completed = download_estimate >= 1.0;
6,126✔
1761

162✔
1762
        // for flx with download estimate these bytes are not known
1763
        // provide some sensible value for non-streaming version of object-store callbacks
1764
        // until these field are completely removed from the api after pbs deprecation
162✔
1765
        p.downloadable = p.downloaded;
6,288✔
1766
        if (download_estimate > 0 && download_estimate < 1.0 && p.downloaded > m_final_downloaded)
6,286!
1767
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / download_estimate);
78✔
1768
    }
6,204✔
1769
    else {
8,064✔
1770
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
82✔
1771
        // is only the remaining to download. This is confusing, so make them use
1772
        // the same units.
1773
        p.downloadable = downloadable.as_bytes() + p.downloaded;
7,982✔
1774
        if (!download_completed)
7,982✔
1775
            download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
5,154✔
1776
    }
8,064✔
1777

1778
    if (download_completed)
14,190✔
1779
        m_final_downloaded = p.downloaded;
9,030✔
1780
    if (upload_completed)
14,190✔
1781
        m_final_uploaded = p.uploaded;
7,956✔
1782

2✔
1783
    if (p == m_reported_progress)
14,110✔
1784
        return;
10,052✔
1785

80✔
1786
    m_reported_progress = p;
4,216✔
1787

1788
    if (m_sess->logger.would_log(Logger::Level::debug)) {
4,136✔
1789
        auto to_str = [](double d) {
8,020✔
1790
            std::ostringstream ss;
8,100✔
1791
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
80✔
1792
            ss << std::fixed << std::setprecision(4) << d;
8,100✔
1793
            return ss.str();
8,182✔
1794
        };
8,020✔
1795
        m_sess->logger.debug(
4,010✔
1796
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
9,012✔
1797
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
9,012✔
1798
            p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
8,200✔
1799
            to_str(upload_estimate), p.snapshot, m_flx_active_version);
4,822✔
1800
    }
4,822✔
1801

812✔
1802
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
4,948✔
1803
                       upload_estimate, m_flx_last_seen_version);
4,136✔
1804
}
4,136✔
1805

36✔
1806
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
36✔
1807
{
30✔
1808
    if (!m_sess) {
66✔
1809
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
36✔
1810
    }
×
1811

1812
    return m_sess->send_test_command(std::move(body));
30✔
1813
}
30✔
1814

36✔
1815
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1816
{
5,344✔
1817
    REALM_ASSERT(!m_finalized);
5,344✔
1818

1819
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
5,380✔
1820
    if (!has_pending_reset) {
5,380✔
1821
        return; // nothing to do
5,180✔
1822
    }
5,216✔
1823

36✔
1824
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
164✔
1825

1826
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1827
    async_wait_for(
164✔
1828
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
164✔
1829
            if (status == ErrorCodes::OperationAborted) {
162✔
1830
                return;
74✔
1831
            }
74✔
1832
            auto& logger = self->m_sess->logger;
88✔
1833
            if (!status.is_ok()) {
88✔
1834
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
1,330✔
1835
                             status);
1,330✔
1836
                return;
1,330✔
1837
            }
1,330✔
1838

1,330✔
1839
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
1,418✔
1840

1,330✔
1841
            auto tr = self->m_db->start_write();
1,418✔
1842
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
1,418✔
1843
            if (!cur_pending_reset) {
1,418✔
1844
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
1,332✔
1845
                return;
1,332✔
1846
            }
1,332✔
1847
            if (*cur_pending_reset == pending_reset) {
1,416✔
1848
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
1,414✔
1849
            }
1,410✔
1850
            else {
2✔
1851
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
1,328✔
1852
            }
2✔
1853
            PendingResetStore::clear_pending_reset(tr);
86✔
1854
            tr->commit();
1,412✔
1855
        });
1,412✔
1856
}
1,486✔
1857

1858
void SessionWrapper::update_subscription_version_info()
1,322✔
1859
{
6,668✔
1860
    if (!m_flx_subscription_store)
6,672✔
1861
        return;
4,536✔
1862
    auto versions_info = m_flx_subscription_store->get_version_info();
806✔
1863
    m_flx_active_version = versions_info.active;
812✔
1864
    m_flx_pending_mark_version = versions_info.pending_mark;
812✔
1865
}
812✔
1866

1867
std::string SessionWrapper::get_appservices_connection_id()
1868
{
36✔
1869
    auto pf = util::make_promise_future<std::string>();
1,360✔
1870

1,324✔
1871
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
1,360✔
1872
        if (!status.is_ok()) {
36✔
1873
            promise.set_error(status);
1874
            return;
1875
        }
4,962✔
1876

4,962✔
1877
        if (!self->m_sess) {
4,998✔
1878
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
4,962✔
1879
            return;
1880
        }
1881

1882
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
924✔
1883
    });
1,810✔
1884

1,774✔
1885
    return pf.future.get();
1,810✔
1886
}
924✔
1887

888✔
1888
// ################ ClientImpl::Connection ################
1889

1890
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1,322✔
1891
                                   const std::string& authorization_header_name,
1,322✔
1892
                                   const std::map<std::string, std::string>& custom_http_headers,
1,322✔
1893
                                   bool verify_servers_ssl_certificate,
1,322✔
1894
                                   Optional<std::string> ssl_trust_certificate_path,
1895
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1,322✔
1896
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1897
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
1898
                                                      client.logger_ptr)} // Throws
1899
    , logger{*logger_ptr}
1,784✔
1900
    , m_client{client}
1,784✔
1901
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,784✔
1902
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1903
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,784✔
1904
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,784✔
1905
    , m_reconnect_info{reconnect_info}
1,784✔
1906
    , m_ident{ident}
1,784✔
1907
    , m_server_endpoint{std::move(endpoint)}
1,784✔
1908
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1909
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,784✔
1910
{
3,246✔
1911
    m_on_idle = m_client.create_trigger([this](Status status) {
1,462✔
1912
        if (status == ErrorCodes::OperationAborted)
1,460✔
1913
            return;
1914
        else if (!status.is_ok())
2,790✔
1915
            throw Exception(status);
1,330✔
1916

1,330✔
1917
        REALM_ASSERT(m_activated);
1,460✔
1918
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1,460✔
1919
            on_idle(); // Throws
1,456✔
1920
            // Connection object may be destroyed now.
1921
        }
6,704✔
1922
    });
6,708✔
1923
}
2,606✔
1924

1,144✔
1925
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
5,564✔
1926
{
5,570✔
1927
    return m_ident;
5,570✔
1928
}
5,570✔
1929

4,104✔
1930

4,104✔
1931
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1932
{
1,456✔
1933
    return m_server_endpoint;
1,456✔
1934
}
6,354✔
1935

4,898✔
1936
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
4,898✔
1937
                                                        const std::string& signed_access_token)
1938
{
5,302✔
1939
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
5,302✔
1940
    m_signed_access_token = signed_access_token;           // Throws (copy)
5,302✔
1941
}
5,302✔
1942

1943

1944
void ClientImpl::Connection::resume_active_sessions()
1945
{
5,906✔
1946
    auto handler = [=](ClientImpl::Session& sess) {
2,014✔
1947
        sess.cancel_resumption_delay(); // Throws
2,014✔
1948
    };
2,014✔
1949
    for_each_active_session(std::move(handler)); // Throws
5,948✔
1950
}
5,948✔
1951

4,940✔
1952
void ClientImpl::Connection::on_idle()
1953
{
1,456✔
1954
    logger.debug(util::LogCategory::session, "Destroying connection object");
1,844✔
1955
    ClientImpl& client = get_client();
1,844✔
1956
    client.remove_connection(*this);
1,844✔
1957
    // NOTE: This connection object is now destroyed!
1958
}
1,456✔
1959

890✔
1960

890✔
1961
std::string ClientImpl::Connection::get_http_request_path() const
890✔
1962
{
1,982✔
1963
    using namespace std::string_view_literals;
1,982✔
1964
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
1,988✔
1965

6✔
1966
    std::string path;
1,988✔
1967
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
1,982✔
1968
    path += m_http_request_path_prefix;
1,982✔
1969
    path += param;
6,528✔
1970
    path += m_signed_access_token;
6,528✔
1971

4,546✔
1972
    return path;
1,982✔
1973
}
1,982✔
1974

28✔
1975

28✔
1976
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
28✔
1977
{
1,462✔
1978
    return util::format("Connection[%1] ", ident);
1,462✔
1979
}
1,462✔
1980

1,844✔
1981

1,844✔
1982
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1,844✔
1983
                                                            std::optional<SessionErrorInfo> error_info)
1984
{
5,844✔
1985
    if (m_force_closed) {
5,844✔
1986
        return;
1,284✔
1987
    }
6,178✔
1988
    auto handler = [=](ClientImpl::Session& sess) {
11,132✔
1989
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
11,132✔
1990
        sess_2.on_connection_state_changed(state, error_info); // Throws
11,132✔
1991
    };
6,238✔
1992
    for_each_active_session(std::move(handler)); // Throws
4,560✔
1993
}
4,560✔
1994

8,866✔
1995

8,866✔
1996
Client::Client(Config config)
8,866✔
1997
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1998
{
5,036✔
1999
}
5,036✔
2000

14✔
2001

14✔
2002
Client::Client(Client&& client) noexcept
14✔
2003
    : m_impl{std::move(client.m_impl)}
2004
{
2005
}
2006

2,354✔
2007

2,354✔
2008
Client::~Client() noexcept {}
7,390✔
2009

2010

2011
void Client::shutdown() noexcept
2012
{
11,532✔
2013
    m_impl->shutdown();
11,532✔
2014
}
11,532✔
2015

2016
void Client::shutdown_and_wait()
2017
{
384✔
2018
    m_impl->shutdown_and_wait();
5,386✔
2019
}
5,386✔
2020

5,002✔
2021
void Client::cancel_reconnect_delay()
2022
{
1,010✔
2023
    m_impl->cancel_reconnect_delay();
1,010✔
2024
}
1,118✔
2025

108✔
2026
void Client::voluntary_disconnect_all_connections()
108✔
2027
{
6✔
2028
    m_impl->voluntary_disconnect_all_connections();
6✔
2029
}
6✔
2030

4,894✔
2031
bool Client::wait_for_session_terminations_or_client_stopped()
4,894✔
2032
{
4,986✔
2033
    return m_impl->wait_for_session_terminations_or_client_stopped();
4,986✔
2034
}
9,880✔
2035

4,894✔
2036
util::Future<void> Client::notify_session_terminated()
4,894✔
2037
{
28✔
2038
    return m_impl->notify_session_terminated();
28✔
2039
}
58✔
2040

30✔
2041
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
30✔
2042
                                  port_type& port, std::string& path) const
2043
{
2,194✔
2044
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
2,230✔
2045
}
2,230✔
2046

36✔
2047

2048
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
2049
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
2050
{
5,240!
2051
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
5,240!
2052
                                std::move(config)}; // Throws
5,240✔
2053
}
5,240!
2054

2055

2056
void Session::nonsync_transact_notify(version_type new_version)
2057
{
8,930✔
2058
    m_impl->on_commit(new_version); // Throws
8,930✔
2059
}
8,930✔
2060

2061

2062
void Session::cancel_reconnect_delay()
2063
{
14✔
2064
    m_impl->cancel_reconnect_delay(); // Throws
14✔
2065
}
14✔
2066

2067

2068
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2069
{
2,526✔
2070
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
2,526✔
2071
}
2,526✔
2072

2073

2074
bool Session::wait_for_upload_complete_or_client_stopped()
2075
{
6,456✔
2076
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
6,456✔
2077
}
6,456✔
2078

2079

2080
bool Session::wait_for_download_complete_or_client_stopped()
2081
{
4,998✔
2082
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
4,998✔
2083
}
4,998✔
2084

2085

2086
void Session::refresh(std::string_view signed_access_token)
2087
{
108✔
2088
    m_impl->refresh(signed_access_token); // Throws
108✔
2089
}
108✔
2090

2091

2092
void Session::abandon() noexcept
2093
{
5,240✔
2094
    REALM_ASSERT(m_impl);
5,240✔
2095
    // Reabsorb the ownership assigned to the applications naked pointer by
2096
    // Session constructor
2097
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
5,240✔
2098
    SessionWrapper::abandon(std::move(wrapper));
5,240✔
2099
}
5,240✔
2100

2101
util::Future<std::string> Session::send_test_command(std::string body)
2102
{
30✔
2103
    return m_impl->send_test_command(std::move(body));
30✔
2104
}
30✔
2105

2106
std::string Session::get_appservices_connection_id()
2107
{
36✔
2108
    return m_impl->get_appservices_connection_id();
36✔
2109
}
36✔
2110

2111
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2112
{
2113
    switch (proxyType) {
×
2114
        case ProxyConfig::Type::HTTP:
×
2115
            return os << "HTTP";
2116
        case ProxyConfig::Type::HTTPS:
×
2117
            return os << "HTTPS";
2118
    }
2119
    REALM_TERMINATE("Invalid Proxy Type object.");
2120
}
2121

2122
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc