• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_312964

19 Feb 2025 07:31PM UTC coverage: 90.814% (-0.3%) from 91.119%
github_pull_request_312964

Pull #8071

Evergreen

web-flow
Bump serialize-javascript and mocha

Bumps [serialize-javascript](https://github.com/yahoo/serialize-javascript) to 6.0.2 and updates ancestor dependency [mocha](https://github.com/mochajs/mocha). These dependencies need to be updated together.


Updates `serialize-javascript` from 6.0.0 to 6.0.2
- [Release notes](https://github.com/yahoo/serialize-javascript/releases)
- [Commits](https://github.com/yahoo/serialize-javascript/compare/v6.0.0...v6.0.2)

Updates `mocha` from 10.2.0 to 10.8.2
- [Release notes](https://github.com/mochajs/mocha/releases)
- [Changelog](https://github.com/mochajs/mocha/blob/main/CHANGELOG.md)
- [Commits](https://github.com/mochajs/mocha/compare/v10.2.0...v10.8.2)

---
updated-dependencies:
- dependency-name: serialize-javascript
  dependency-type: indirect
- dependency-name: mocha
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #8071: Bump serialize-javascript and mocha

96552 of 179126 branches covered (53.9%)

212672 of 234185 relevant lines covered (90.81%)

3115802.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.89
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    // Can be called from any thread.
108
    std::string get_appservices_connection_id();
109

110
    // Can be called from any thread, but inherently cannot be called
111
    // concurrently with calls to any of the other non-confined functions.
112
    bool mark_abandoned();
113

114
private:
115
    ClientImpl& m_client;
116
    DBRef m_db;
117
    Replication* m_replication;
118

119
    const ProtocolEnvelope m_protocol_envelope;
120
    const std::string m_server_address;
121
    const port_type m_server_port;
122
    const bool m_server_verified;
123
    const std::string m_user_id;
124
    const SyncServerMode m_sync_mode;
125
    const std::string m_authorization_header_name;
126
    const std::map<std::string, std::string> m_custom_http_headers;
127
    const bool m_verify_servers_ssl_certificate;
128
    const bool m_simulate_integration_error;
129
    const std::optional<std::string> m_ssl_trust_certificate_path;
130
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
131
    const size_t m_flx_bootstrap_batch_size_bytes;
132
    const std::string m_http_request_path_prefix;
133
    const std::string m_virt_path;
134
    const std::optional<ProxyConfig> m_proxy_config;
135

136
    // This one is different from null when, and only when the session wrapper
137
    // is in ClientImpl::m_abandoned_session_wrappers.
138
    SessionWrapper* m_next = nullptr;
139

140
    // These may only be accessed by the event loop thread.
141
    std::string m_signed_access_token;
142
    std::optional<ClientReset> m_client_reset_config;
143

144
    struct ReportedProgress {
145
        uint64_t snapshot;
146
        uint64_t uploaded;
147
        uint64_t uploadable;
148
        uint64_t downloaded;
149
        uint64_t downloadable;
150
        int64_t query_version = 0;
151
        double download_estimate;
152

153
        // Does not check snapshot
154
        bool operator==(const ReportedProgress& p) const noexcept
155
        {
11,552✔
156
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
11,552✔
157
                   downloadable == p.downloadable && query_version == p.query_version &&
11,552✔
158
                   download_estimate == p.download_estimate;
11,552✔
159
        }
11,552✔
160
    };
161
    std::optional<ReportedProgress> m_reported_progress;
162
    uint64_t m_final_uploaded = 0;
163
    uint64_t m_final_downloaded = 0;
164

165
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
166
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
167

168
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
169
    bool m_in_debug_hook = false;
170

171
    const SessionReason m_session_reason;
172

173
    // If false, QUERY and MARK messages are allowed but UPLOAD messages will not
174
    // be sent to the server.
175
    const bool m_allow_upload_messages;
176

177
    const uint64_t m_schema_version;
178

179
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
180
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
181
    std::shared_ptr<MigrationStore> m_migration_store;
182

183
    // Set to true when this session wrapper is actualized (i.e. the wrapped
184
    // session is created), or when the wrapper is finalized before actualization.
185
    // It is then never modified again.
186
    //
187
    // Actualization is scheduled during the construction of SessionWrapper, and
188
    // so a session specific post handler will always find that `m_actualized`
189
    // is true as the handler will always be run after the actualization job.
190
    // This holds even if the wrapper is finalized or closed before actualization.
191
    bool m_actualized = false;
192

193
    // Set to true when session deactivation is begun, either via force_close()
194
    // or finalize().
195
    bool m_closed = false;
196

197
    // Set to true in on_suspended() and then false in on_resumed(). Used to
198
    // suppress spurious connection state and error reporting while the session
199
    // is already in an error state.
200
    bool m_suspended = false;
201

202
    // Set when the session has been abandoned. After this point none of the
203
    // public API functions should be called again.
204
    bool m_abandoned = false;
205
    // Has the SessionWrapper been finalized?
206
    bool m_finalized = false;
207

208
    // Set to true when the first DOWNLOAD message is received to indicate that
209
    // the byte-level download progress parameters can be considered reasonable
210
    // reliable. Before that, a lot of time may have passed, so our record of
211
    // the download progress is likely completely out of date.
212
    bool m_reliable_download_progress = false;
213

214
    // Set to point to an activated session object during actualization of the
215
    // session wrapper. Set to null during finalization of the session
216
    // wrapper. Both modifications are guaranteed to be performed by the event
217
    // loop thread.
218
    //
219
    // If a session specific post handler, that is submitted after the
220
    // initiation of the session wrapper, sees that `m_sess` is null, it can
221
    // conclude that the session wrapper has either been force closed or has
222
    // been both abandoned and finalized.
223
    //
224
    // Must only be accessed from the event loop thread.
225
    SessionImpl* m_sess = nullptr;
226

227
    // These must only be accessed from the event loop thread.
228
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
229
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
230
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
231

232
    version_type m_upload_completion_requested_version = -1;
233

234
    void on_download_completion();
235
    void on_suspended(const SessionErrorInfo& error_info);
236
    void on_resumed();
237
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
238
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
239

240
    void init_progress_handler();
241
    void check_progress();
242
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
243
    void report_upload_completion(version_type);
244

245
    friend class SessionWrapperStack;
246
    friend class ClientImpl::Session;
247
};
248

249

250
// ################ SessionWrapperStack ################
251

252
inline bool SessionWrapperStack::empty() const noexcept
253
{
9,824✔
254
    return !m_back;
9,824✔
255
}
9,824✔
256

257

258
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
259
{
10,118✔
260
    REALM_ASSERT(!w->m_next);
10,118✔
261
    w->m_next = m_back;
10,118✔
262
    m_back = w.release();
10,118✔
263
}
10,118✔
264

265

266
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
267
{
30,316✔
268
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
30,316✔
269
    if (m_back) {
30,316✔
270
        m_back = m_back->m_next;
10,086✔
271
        w->m_next = nullptr;
10,086✔
272
    }
10,086✔
273
    return w;
30,316✔
274
}
30,316✔
275

276

277
inline void SessionWrapperStack::clear() noexcept
278
{
9,824✔
279
    while (m_back) {
9,824✔
280
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
×
281
        m_back = w->m_next;
×
282
    }
×
283
}
9,824✔
284

285

286
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
287
{
5,074✔
288
    SessionWrapper** p = &m_back;
5,074✔
289
    while (*p && *p != w) {
5,164✔
290
        p = &(*p)->m_next;
90✔
291
    }
90✔
292
    if (!*p) {
5,074✔
293
        return false;
5,042✔
294
    }
5,042✔
295
    *p = w->m_next;
32✔
296
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
32✔
297
    return true;
32✔
298
}
5,074✔
299

300

301
SessionWrapperStack::~SessionWrapperStack()
302
{
9,824✔
303
    clear();
9,824✔
304
}
9,824✔
305

306

307
// ################ ClientImpl ################
308

309
ClientImpl::~ClientImpl()
310
{
4,912✔
311
    // Since no other thread is allowed to be accessing this client or any of
312
    // its subobjects at this time, no mutex locking is necessary.
313

314
    shutdown_and_wait();
4,912✔
315
    // Session wrappers are removed from m_unactualized_session_wrappers as they
316
    // are abandoned.
317
    REALM_ASSERT(m_stopped);
4,912✔
318
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
4,912✔
319
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
4,912✔
320
}
4,912✔
321

322

323
void ClientImpl::cancel_reconnect_delay()
324
{
830✔
325
    // Thread safety required
326
    post([this] {
830✔
327
        for (auto& p : m_server_slots) {
830✔
328
            ServerSlot& slot = p.second;
830✔
329
            if (m_one_connection_per_session) {
830✔
330
                REALM_ASSERT(!slot.connection);
×
331
                for (const auto& p : slot.alt_connections) {
×
332
                    ClientImpl::Connection& conn = *p.second;
×
333
                    conn.resume_active_sessions(); // Throws
×
334
                    conn.cancel_reconnect_delay(); // Throws
×
335
                }
×
336
            }
×
337
            else {
830✔
338
                REALM_ASSERT(slot.alt_connections.empty());
830✔
339
                if (slot.connection) {
830✔
340
                    ClientImpl::Connection& conn = *slot.connection;
828✔
341
                    conn.resume_active_sessions(); // Throws
828✔
342
                    conn.cancel_reconnect_delay(); // Throws
828✔
343
                }
828✔
344
                else {
2✔
345
                    slot.reconnect_info.reset();
2✔
346
                }
2✔
347
            }
830✔
348
        }
830✔
349
    }); // Throws
830✔
350
}
830✔
351

352

353
void ClientImpl::voluntary_disconnect_all_connections()
354
{
4✔
355
    auto done_pf = util::make_promise_future<void>();
4✔
356
    post([this, promise = std::move(done_pf.promise)]() mutable {
4✔
357
        try {
4✔
358
            for (auto& p : m_server_slots) {
4✔
359
                ServerSlot& slot = p.second;
4✔
360
                if (m_one_connection_per_session) {
4✔
361
                    REALM_ASSERT(!slot.connection);
×
362
                    for (const auto& [_, conn] : slot.alt_connections) {
×
363
                        conn->voluntary_disconnect();
×
364
                    }
×
365
                }
×
366
                else {
4✔
367
                    REALM_ASSERT(slot.alt_connections.empty());
4✔
368
                    if (slot.connection) {
4✔
369
                        slot.connection->voluntary_disconnect();
4✔
370
                    }
4✔
371
                }
4✔
372
            }
4✔
373
        }
4✔
374
        catch (...) {
4✔
375
            promise.set_error(exception_to_status());
×
376
            return;
×
377
        }
×
378
        promise.emplace_value();
4✔
379
    });
4✔
380
    done_pf.future.get();
4✔
381
}
4✔
382

383

384
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
385
{
4,600✔
386
    // Thread safety required
387

388
    {
4,600✔
389
        util::CheckedLockGuard lock{m_mutex};
4,600✔
390
        m_sessions_terminated = false;
4,600✔
391
    }
4,600✔
392

393
    // The technique employed here relies on the fact that
394
    // actualize_and_finalize_session_wrappers() must get to execute at least
395
    // once before the post handler submitted below gets to execute, but still
396
    // at a time where all session wrappers, that are abandoned prior to the
397
    // execution of wait_for_session_terminations_or_client_stopped(), have been
398
    // added to `m_abandoned_session_wrappers`.
399
    //
400
    // To see that this is the case, consider a session wrapper that was
401
    // abandoned before wait_for_session_terminations_or_client_stopped() was
402
    // invoked. Then the session wrapper will have been added to
403
    // `m_abandoned_session_wrappers`, and an invocation of
404
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
405
    // guarantees mentioned in the documentation of Trigger then ensure
406
    // that at least one execution of actualize_and_finalize_session_wrappers()
407
    // will happen after the session wrapper has been added to
408
    // `m_abandoned_session_wrappers`, but before the post handler submitted
409
    // below gets to execute.
410
    post([this] {
4,600✔
411
        {
4,600✔
412
            util::CheckedLockGuard lock{m_mutex};
4,600✔
413
            m_sessions_terminated = true;
4,600✔
414
        }
4,600✔
415
        m_wait_or_client_stopped_cond.notify_all();
4,600✔
416
    }); // Throws
4,600✔
417

418
    bool completion_condition_was_satisfied;
4,600✔
419
    {
4,600✔
420
        util::CheckedUniqueLock lock{m_mutex};
4,600✔
421
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
9,200✔
422
            return m_sessions_terminated || m_stopped;
9,200✔
423
        });
9,200✔
424
        completion_condition_was_satisfied = !m_stopped;
4,600✔
425
    }
4,600✔
426
    return completion_condition_was_satisfied;
4,600✔
427
}
4,600✔
428

429

430
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
431
util::Future<void> ClientImpl::notify_session_terminated()
432
{
28✔
433
    auto pf = util::make_promise_future<void>();
28✔
434
    post([promise = std::move(pf.promise)](Status status) mutable {
28✔
435
        // Includes operation_aborted
436
        if (!status.is_ok()) {
28✔
437
            promise.set_error(status);
×
438
            return;
×
439
        }
×
440

441
        promise.emplace_value();
28✔
442
    });
28✔
443

444
    return std::move(pf.future);
28✔
445
}
28✔
446

447
void ClientImpl::drain_connections_on_loop()
448
{
4,912✔
449
    post([this](Status status) {
4,912✔
450
        REALM_ASSERT(status.is_ok());
4,912✔
451
        drain_connections();
4,912✔
452
    });
4,912✔
453
}
4,912✔
454

455
void ClientImpl::shutdown_and_wait()
456
{
5,300✔
457
    shutdown();
5,300✔
458
    util::CheckedUniqueLock lock{m_drain_mutex};
5,300✔
459
    if (m_drained) {
5,300✔
460
        return;
388✔
461
    }
388✔
462

463
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
4,912✔
464
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
9,816✔
465
        return m_num_connections == 0 && m_outstanding_posts == 0;
9,816✔
466
    });
9,816✔
467

468
    m_drained = true;
4,912✔
469
}
4,912✔
470

471
void ClientImpl::shutdown() noexcept
472
{
10,254✔
473
    {
10,254✔
474
        util::CheckedLockGuard lock{m_mutex};
10,254✔
475
        if (m_stopped)
10,254✔
476
            return;
5,342✔
477
        m_stopped = true;
4,912✔
478
    }
4,912✔
479
    m_wait_or_client_stopped_cond.notify_all();
×
480

481
    drain_connections_on_loop();
4,912✔
482
}
4,912✔
483

484

485
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
486
{
5,076✔
487
    // Thread safety required.
488
    {
5,076✔
489
        util::CheckedLockGuard lock{m_mutex};
5,076✔
490
        // We can't actualize the session if we've already been stopped, so
491
        // just finalize it immediately.
492
        if (m_stopped) {
5,076✔
493
            wrapper->finalize_before_actualization();
×
494
            return;
×
495
        }
×
496

497
        REALM_ASSERT(m_actualize_and_finalize);
5,076✔
498
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
5,076✔
499
    }
5,076✔
500
    m_actualize_and_finalize->trigger();
×
501
}
5,076✔
502

503

504
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
505
{
5,076✔
506
    // Thread safety required.
507
    {
5,076✔
508
        util::CheckedLockGuard lock{m_mutex};
5,076✔
509
        REALM_ASSERT(m_actualize_and_finalize);
5,076✔
510
        // The wrapper may have already been finalized before being abandoned
511
        // if we were stopped when it was created.
512
        if (wrapper->mark_abandoned())
5,076✔
513
            return;
2✔
514

515
        // If the session wrapper has not yet been actualized (on the event loop
516
        // thread), it can be immediately finalized. This ensures that we will
517
        // generally not actualize a session wrapper that has already been
518
        // abandoned.
519
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
5,074✔
520
            wrapper->finalize_before_actualization();
32✔
521
            return;
32✔
522
        }
32✔
523
        m_abandoned_session_wrappers.push(std::move(wrapper));
5,042✔
524
    }
5,042✔
525
    m_actualize_and_finalize->trigger();
×
526
}
5,042✔
527

528

529
// Must be called from the event loop thread.
530
void ClientImpl::actualize_and_finalize_session_wrappers()
531
{
7,596✔
532
    // We need to pop from the wrapper stacks while holding the lock to ensure
533
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
534
    // release the lock before finalizing or actualizing because those functions
535
    // invoke user callbacks which may try to access the client and reacquire
536
    // the lock.
537
    //
538
    // Finalization must always happen before actualization because we may be
539
    // finalizing and actualizing sessions for the same Realm file, and
540
    // actualizing first would result in overlapping sessions. Because we're
541
    // releasing the lock new sessions may come in as we're looping, so we need
542
    // a single loop that checks both fields.
543
    while (true) {
17,682✔
544
        bool finalize = true;
17,680✔
545
        bool stopped;
17,680✔
546
        util::bind_ptr<SessionWrapper> wrapper;
17,680✔
547
        {
17,680✔
548
            util::CheckedLockGuard lock{m_mutex};
17,680✔
549
            wrapper = m_abandoned_session_wrappers.pop();
17,680✔
550
            if (!wrapper) {
17,680✔
551
                wrapper = m_unactualized_session_wrappers.pop();
12,638✔
552
                finalize = false;
12,638✔
553
            }
12,638✔
554
            stopped = m_stopped;
17,680✔
555
        }
17,680✔
556
        if (!wrapper)
17,680✔
557
            break;
7,594✔
558
        if (finalize)
10,086✔
559
            wrapper->finalize(); // Throws
5,042✔
560
        else if (stopped)
5,044✔
561
            wrapper->finalize_before_actualization();
2✔
562
        else
5,042✔
563
            wrapper->actualize(); // Throws
5,042✔
564
    }
10,086✔
565
}
7,596✔
566

567

568
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
569
                                                   const std::string& authorization_header_name,
570
                                                   const std::map<std::string, std::string>& custom_http_headers,
571
                                                   bool verify_servers_ssl_certificate,
572
                                                   Optional<std::string> ssl_trust_certificate_path,
573
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
574
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
575
{
5,040✔
576
    auto&& [server_slot_it, inserted] =
5,040✔
577
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
5,040✔
578
    ServerSlot& server_slot = server_slot_it->second; // Throws
5,040✔
579

580
    // TODO: enable multiplexing with proxies
581
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
5,040✔
582
        // Use preexisting connection
583
        REALM_ASSERT(server_slot.alt_connections.empty());
3,686✔
584
        return *server_slot.connection;
3,686✔
585
    }
3,686✔
586

587
    // Create a new connection
588
    REALM_ASSERT(!server_slot.connection);
1,354✔
589
    connection_ident_type ident = m_prev_connection_ident + 1;
1,354✔
590
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
1,354✔
591
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
1,354✔
592
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
1,354✔
593
        std::move(proxy_config), server_slot.reconnect_info); // Throws
1,354✔
594
    ClientImpl::Connection& conn = *conn_2;
1,354✔
595
    if (!m_one_connection_per_session) {
1,354✔
596
        server_slot.connection = std::move(conn_2);
1,348✔
597
    }
1,348✔
598
    else {
6✔
599
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
6✔
600
    }
6✔
601
    m_prev_connection_ident = ident;
1,354✔
602
    was_created = true;
1,354✔
603
    {
1,354✔
604
        util::CheckedLockGuard lk(m_drain_mutex);
1,354✔
605
        ++m_num_connections;
1,354✔
606
    }
1,354✔
607
    return conn;
1,354✔
608
}
5,040✔
609

610

611
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
612
{
1,354✔
613
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
1,354✔
614
    auto i = m_server_slots.find(endpoint);
1,354✔
615
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
1,354✔
616
    ServerSlot& server_slot = i->second;
1,354✔
617
    if (!m_one_connection_per_session) {
1,354✔
618
        REALM_ASSERT(server_slot.alt_connections.empty());
1,348✔
619
        REALM_ASSERT(&*server_slot.connection == &conn);
1,348✔
620
        server_slot.reconnect_info = conn.get_reconnect_info();
1,348✔
621
        server_slot.connection.reset();
1,348✔
622
    }
1,348✔
623
    else {
6✔
624
        REALM_ASSERT(!server_slot.connection);
6✔
625
        connection_ident_type ident = conn.get_ident();
6✔
626
        auto j = server_slot.alt_connections.find(ident);
6✔
627
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
6✔
628
        REALM_ASSERT(&*j->second == &conn);
6✔
629
        server_slot.alt_connections.erase(j);
6✔
630
    }
6✔
631

632
    bool notify;
1,354✔
633
    {
1,354✔
634
        util::CheckedLockGuard lk(m_drain_mutex);
1,354✔
635
        REALM_ASSERT(m_num_connections);
1,354✔
636
        notify = --m_num_connections <= 0;
1,354✔
637
    }
1,354✔
638
    if (notify) {
1,354✔
639
        m_drain_cv.notify_all();
1,042✔
640
    }
1,042✔
641
}
1,354✔
642

643

644
// ################ SessionImpl ################
645

646
void SessionImpl::force_close()
647
{
52✔
648
    // Allow force_close() if session is active or hasn't been activated yet.
649
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
52!
650
        m_wrapper.force_close();
52✔
651
    }
52✔
652
}
52✔
653

654
void SessionImpl::on_connection_state_changed(ConnectionState state,
655
                                              const std::optional<SessionErrorInfo>& error_info)
656
{
5,528✔
657
    // Only used to report errors back to the SyncSession while the Session is active
658
    if (m_state == SessionImpl::Active) {
5,528✔
659
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
5,528✔
660
    }
5,528✔
661
}
5,528✔
662

663

664
const std::string& SessionImpl::get_virt_path() const noexcept
665
{
3,486✔
666
    // Can only be called if the session is active or being activated
667
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
3,486!
668
    return m_wrapper.m_virt_path;
3,486✔
669
}
3,486✔
670

671
const std::string& SessionImpl::get_realm_path() const noexcept
672
{
5,212✔
673
    // Can only be called if the session is active or being activated
674
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
5,212✔
675
    return m_wrapper.m_db->get_path();
5,212✔
676
}
5,212✔
677

678
DBRef SessionImpl::get_db() const noexcept
679
{
12,720✔
680
    // Can only be called if the session is active or being activated
681
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
12,720!
682
    return m_wrapper.m_db;
12,720✔
683
}
12,720✔
684

685
ClientReplication& SessionImpl::get_repl() const noexcept
686
{
59,636✔
687
    // Can only be called if the session is active or being activated
688
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
59,636✔
689
    return m_wrapper.get_replication();
59,636✔
690
}
59,636✔
691

692
ClientHistory& SessionImpl::get_history() const noexcept
693
{
58,422✔
694
    return get_repl().get_history();
58,422✔
695
}
58,422✔
696

697
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
698
{
51,364✔
699
    // Can only be called if the session is active or being activated
700
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
51,364✔
701
    return m_wrapper.m_client_reset_config;
51,364✔
702
}
51,364✔
703

704
SessionReason SessionImpl::get_session_reason() noexcept
705
{
946✔
706
    // Can only be called if the session is active or being activated
707
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
946!
708
    return m_wrapper.m_session_reason;
946✔
709
}
946✔
710

711
uint64_t SessionImpl::get_schema_version() noexcept
712
{
946✔
713
    // Can only be called if the session is active or being activated
714
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
946!
715
    return m_wrapper.m_schema_version;
946✔
716
}
946✔
717

718
bool SessionImpl::upload_messages_allowed() noexcept
719
{
32,832✔
720
    // Can only be called if the session is active or being activated
721
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
32,832✔
722
    return m_wrapper.m_allow_upload_messages;
32,832✔
723
}
32,832✔
724

725
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
726
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
727
{
21,070✔
728
    // Ignore the call if the session is not active
729
    if (m_state != State::Active) {
21,070✔
730
        return;
×
731
    }
×
732

733
    try {
21,070✔
734
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
21,070!
735
        if (simulate_integration_error) {
21,070✔
736
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
737
        }
×
738
        version_type client_version;
21,070✔
739
        if (REALM_LIKELY(!get_client().is_dry_run())) {
21,070✔
740
            VersionInfo version_info;
21,070✔
741
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
21,070✔
742
            client_version = version_info.realm_version;
21,070✔
743
        }
21,070✔
744
        else {
×
745
            // Fake it for "dry run" mode
746
            client_version = m_last_version_available + 1;
×
747
        }
×
748
        on_changesets_integrated(client_version, progress); // Throws
21,070✔
749
    }
21,070✔
750
    catch (const IntegrationException& e) {
21,070✔
751
        on_integration_failure(e);
12✔
752
    }
12✔
753
}
21,070✔
754

755

756
void SessionImpl::on_download_completion()
757
{
8,332✔
758
    // Ignore the call if the session is not active
759
    if (m_state == State::Active) {
8,332✔
760
        m_wrapper.on_download_completion(); // Throws
8,332✔
761
    }
8,332✔
762
}
8,332✔
763

764

765
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
766
{
412✔
767
    // Ignore the call if the session is not active
768
    if (m_state == State::Active) {
412✔
769
        m_wrapper.on_suspended(error_info); // Throws
412✔
770
    }
412✔
771
}
412✔
772

773

774
void SessionImpl::on_resumed()
775
{
86✔
776
    // Ignore the call if the session is not active
777
    if (m_state == State::Active) {
86✔
778
        m_wrapper.on_resumed(); // Throws
86✔
779
    }
86✔
780
}
86✔
781

782
void SessionImpl::handle_pending_client_reset_acknowledgement()
783
{
5,212✔
784
    // Ignore the call if the session is not active
785
    if (m_state == State::Active) {
5,212✔
786
        m_wrapper.handle_pending_client_reset_acknowledgement();
5,212✔
787
    }
5,212✔
788
}
5,212✔
789

790
bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
791
{
23,048✔
792
    // Ignore the message if the session is not active or a steady state message
793
    if (m_state != State::Active || message.batch_state == DownloadBatchState::SteadyState) {
23,048✔
794
        return false;
21,070✔
795
    }
21,070✔
796

797
    REALM_ASSERT(m_is_flx_sync_session);
1,978✔
798

799
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
1,978✔
800
    std::optional<SyncProgress> maybe_progress;
1,978✔
801
    if (message.batch_state == DownloadBatchState::LastInBatch) {
1,978✔
802
        maybe_progress = message.progress;
1,202✔
803
    }
1,202✔
804

805
    try {
1,978✔
806
        bootstrap_store->add_batch(*message.query_version, maybe_progress, message.downloadable, message.changesets);
1,978✔
807
    }
1,978✔
808
    catch (const LogicError& ex) {
1,978✔
809
        if (ex.code() == ErrorCodes::LimitExceeded) {
×
810
            IntegrationException ex(ErrorCodes::LimitExceeded,
×
811
                                    "bootstrap changeset too large to store in pending bootstrap store",
×
812
                                    ProtocolError::bad_changeset_size);
×
813
            on_integration_failure(ex);
×
814
            return true;
×
815
        }
×
816
        throw;
×
817
    }
×
818

819
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
1,978✔
820
                                       *message.query_version, message.batch_state, message.changesets.size());
1,978✔
821
    if (hook_action == SyncClientHookAction::EarlyReturn) {
1,978✔
822
        return true;
6✔
823
    }
6✔
824
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
1,972✔
825

826
    if (message.batch_state == DownloadBatchState::MoreToCome) {
1,972✔
827
        return true;
774✔
828
    }
774✔
829

830
    try {
1,198✔
831
        process_pending_flx_bootstrap(); // throws
1,198✔
832
    }
1,198✔
833
    catch (const IntegrationException& e) {
1,198✔
834
        on_integration_failure(e);
6✔
835
    }
6✔
836
    catch (...) {
1,198✔
837
        on_integration_failure(IntegrationException(exception_to_status()));
×
838
    }
×
839

840
    return true;
1,198✔
841
}
1,198✔
842

843

844
void SessionImpl::process_pending_flx_bootstrap()
845
{
6,324✔
846
    // Ignore the call if not a flx session or session is not active
847
    if (!m_is_flx_sync_session || m_state != State::Active) {
6,324✔
848
        return;
4,144✔
849
    }
4,144✔
850
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,180✔
851
    if (!bootstrap_store->has_pending()) {
2,180✔
852
        return;
966✔
853
    }
966✔
854

855
    auto pending_batch_stats = bootstrap_store->pending_stats();
1,214✔
856
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
1,214✔
857
                "changeset size: %3)",
1,214✔
858
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
1,214✔
859
                pending_batch_stats.pending_changeset_bytes);
1,214✔
860
    auto& history = get_repl().get_history();
1,214✔
861
    VersionInfo new_version;
1,214✔
862
    SyncProgress progress;
1,214✔
863
    int64_t query_version = -1;
1,214✔
864
    size_t changesets_processed = 0;
1,214✔
865

866
    // Used to commit each batch after it was transformed.
867
    TransactionRef transact = get_db()->start_write();
1,214✔
868
    while (bootstrap_store->has_pending()) {
2,570✔
869
        auto start_time = std::chrono::steady_clock::now();
1,368✔
870
        auto pending_batch = bootstrap_store->peek_pending(*transact, m_wrapper.m_flx_bootstrap_batch_size_bytes);
1,368✔
871
        if (!pending_batch.progress) {
1,368✔
872
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
10✔
873
            bootstrap_store->clear(*transact, pending_batch.query_version);
10✔
874
            transact->commit();
10✔
875
            return;
10✔
876
        }
10✔
877

878
        auto batch_state =
1,358✔
879
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
1,358✔
880
        query_version = pending_batch.query_version;
1,358✔
881
        bool simulate_integration_error =
1,358✔
882
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
1,358✔
883
        if (simulate_integration_error) {
1,358✔
884
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
2✔
885
        }
2✔
886

887
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
1,356✔
888
                        batch_state, pending_batch.changesets.size());
1,356✔
889

890
        history.integrate_server_changesets(
1,356✔
891
            *pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
1,356✔
892
            [&](const Transaction& tr, util::Span<Changeset> changesets_applied) {
1,356✔
893
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
1,350✔
894
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
1,350✔
895
            });
1,350✔
896
        progress = *pending_batch.progress;
1,356✔
897
        changesets_processed += pending_batch.changesets.size();
1,356✔
898
        auto duration = std::chrono::steady_clock::now() - start_time;
1,356✔
899

900
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
1,356✔
901
                                      batch_state, pending_batch.changesets.size());
1,356✔
902
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
1,356✔
903

904
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
1,356✔
905
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
1,356✔
906
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
1,356✔
907
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
1,356✔
908
                    pending_batch.remaining_changesets);
1,356✔
909
    }
1,356✔
910

911
    REALM_ASSERT_3(query_version, !=, -1);
1,202✔
912

913
    on_changesets_integrated(new_version.realm_version, progress);
1,202✔
914
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
1,202✔
915
                                  DownloadBatchState::LastInBatch, changesets_processed);
1,202✔
916
    // NoAction/EarlyReturn are both valid no-op actions to take here.
917
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
1,202✔
918
}
1,202✔
919

920
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
921
{
10✔
922
    // Ignore the call if the session is not active
923
    if (m_state == State::Active) {
10✔
924
        get_flx_subscription_store()->set_error(version, err_msg);
10✔
925
    }
10✔
926
}
10✔
927

928
SubscriptionStore* SessionImpl::get_flx_subscription_store()
929
{
12,906✔
930
    // Should never be called if session is not active
931
    REALM_ASSERT_EX(m_state == State::Active, m_state);
12,906✔
932
    return m_wrapper.get_flx_subscription_store();
12,906✔
933
}
12,906✔
934

935
MigrationStore* SessionImpl::get_migration_store()
936
{
35,866✔
937
    // Should never be called if session is not active
938
    REALM_ASSERT_EX(m_state == State::Active, m_state);
35,866✔
939
    return m_wrapper.get_migration_store();
35,866✔
940
}
35,866✔
941

942
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
943
{
7,800✔
944
    // Should never be called if session is not active
945
    REALM_ASSERT_EX(m_state == State::Active, m_state);
7,800✔
946

947
    // Make sure we don't call the debug hook recursively.
948
    if (m_wrapper.m_in_debug_hook) {
7,800✔
949
        return SyncClientHookAction::NoAction;
12✔
950
    }
12✔
951
    m_wrapper.m_in_debug_hook = true;
7,788✔
952
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
7,788✔
953
        m_wrapper.m_in_debug_hook = false;
7,788✔
954
    });
7,788✔
955

956
    auto action = m_wrapper.m_debug_hook(data);
7,788✔
957
    switch (action) {
7,788✔
958
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
6✔
959
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
6✔
960
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
6✔
961

962
            auto err_processing_err = receive_error_message(err_info);
6✔
963
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
6✔
964
            return SyncClientHookAction::EarlyReturn;
6✔
965
        }
×
966
        case realm::SyncClientHookAction::TriggerReconnect: {
12✔
967
            get_connection().voluntary_disconnect();
12✔
968
            return SyncClientHookAction::EarlyReturn;
12✔
969
        }
×
970
        default:
7,766✔
971
            return action;
7,766✔
972
    }
7,788✔
973
}
7,788✔
974

975
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
976
                                                  int64_t query_version, DownloadBatchState batch_state,
977
                                                  size_t num_changesets)
978
{
50,012✔
979
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
50,012✔
980
        return SyncClientHookAction::NoAction;
45,566✔
981
    }
45,566✔
982
    if (REALM_UNLIKELY(m_state != State::Active)) {
4,446✔
983
        return SyncClientHookAction::NoAction;
×
984
    }
×
985

986
    SyncClientHookData data;
4,446✔
987
    data.event = event;
4,446✔
988
    data.batch_state = batch_state;
4,446✔
989
    data.progress = progress;
4,446✔
990
    data.num_changesets = num_changesets;
4,446✔
991
    data.query_version = query_version;
4,446✔
992

993
    return call_debug_hook(data);
4,446✔
994
}
4,446✔
995

996
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo* error_info)
997
{
47,500✔
998
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
47,500✔
999
        return SyncClientHookAction::NoAction;
44,148✔
1000
    }
44,148✔
1001
    if (REALM_UNLIKELY(m_state != State::Active)) {
3,352✔
1002
        return SyncClientHookAction::NoAction;
×
1003
    }
×
1004

1005
    SyncClientHookData data;
3,352✔
1006
    data.event = event;
3,352✔
1007
    data.batch_state = DownloadBatchState::SteadyState;
3,352✔
1008
    data.progress = m_progress;
3,352✔
1009
    data.num_changesets = 0;
3,352✔
1010
    data.query_version = m_last_sent_flx_query_version;
3,352✔
1011
    data.error_info = error_info;
3,352✔
1012

1013
    return call_debug_hook(data);
3,352✔
1014
}
3,352✔
1015

1016
void SessionImpl::init_progress_handler()
1017
{
5,212✔
1018
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
5,212✔
1019
    m_wrapper.init_progress_handler();
5,212✔
1020
}
5,212✔
1021

1022
void SessionImpl::enable_progress_notifications()
1023
{
22,098✔
1024
    m_wrapper.m_reliable_download_progress = true;
22,098✔
1025
}
22,098✔
1026

1027
util::Future<std::string> SessionImpl::send_test_command(std::string body)
1028
{
36✔
1029
    if (m_state != State::Active) {
36✔
1030
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
×
1031
    }
×
1032

1033
    try {
36✔
1034
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
36✔
1035
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
36✔
1036
            return Status{ErrorCodes::LogicError,
2✔
1037
                          "Must supply command name in \"command\" field of test command json object"};
2✔
1038
        }
2✔
1039
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
34✔
1040
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
×
1041
        }
×
1042
    }
34✔
1043
    catch (const nlohmann::json::parse_error& e) {
36✔
1044
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
2✔
1045
    }
2✔
1046

1047
    auto pf = util::make_promise_future<std::string>();
32✔
1048
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
32✔
1049
        // Includes operation_aborted
1050
        if (!status.is_ok()) {
32✔
1051
            promise.set_error(status);
×
1052
            return;
×
1053
        }
×
1054

1055
        auto id = ++m_last_pending_test_command_ident;
32✔
1056
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
32✔
1057
        ensure_enlisted_to_send();
32✔
1058
    });
32✔
1059

1060
    return std::move(pf.future);
32✔
1061
}
36✔
1062

1063
// ################ SessionWrapper ################
1064

1065
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
1066
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
1067
// the ClientImpl::Connection that owns the ClientImpl::Session.
1068
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1069
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1070
    : m_client{client}
5,076✔
1071
    , m_db(std::move(db))
5,076✔
1072
    , m_replication(m_db->get_replication())
5,076✔
1073
    , m_protocol_envelope{config.protocol_envelope}
5,076✔
1074
    , m_server_address{std::move(config.server_address)}
5,076✔
1075
    , m_server_port{config.server_port}
5,076✔
1076
    , m_server_verified{config.server_verified}
5,076✔
1077
    , m_user_id(std::move(config.user_id))
5,076✔
1078
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
5,076✔
1079
    , m_authorization_header_name{config.authorization_header_name}
5,076✔
1080
    , m_custom_http_headers{std::move(config.custom_http_headers)}
5,076✔
1081
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
5,076✔
1082
    , m_simulate_integration_error{config.simulate_integration_error}
5,076✔
1083
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
5,076✔
1084
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
5,076✔
1085
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
5,076✔
1086
    , m_http_request_path_prefix{std::move(config.service_identifier)}
5,076✔
1087
    , m_virt_path{std::move(config.realm_identifier)}
5,076✔
1088
    , m_proxy_config{std::move(config.proxy_config)}
5,076✔
1089
    , m_signed_access_token{std::move(config.signed_user_token)}
5,076✔
1090
    , m_client_reset_config{std::move(config.client_reset_config)}
5,076✔
1091
    , m_progress_handler(std::move(config.progress_handler))
5,076✔
1092
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
5,076✔
1093
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
5,076✔
1094
    , m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
5,076✔
1095
                                                                            : SessionReason::Sync)
5,076✔
1096
    , m_allow_upload_messages(!config.fresh_realm_download)
5,076✔
1097
    , m_schema_version(config.schema_version)
5,076✔
1098
    , m_flx_subscription_store(std::move(flx_sub_store))
5,076✔
1099
    , m_migration_store(std::move(migration_store))
5,076✔
1100
{
5,076✔
1101
    REALM_ASSERT(m_db);
5,076✔
1102
    REALM_ASSERT(m_db->get_replication());
5,076✔
1103
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
5,076✔
1104

1105
    // SessionWrapper begins at +1 retain count because Client retains and
1106
    // releases it while performing async operations, and these need to not
1107
    // take it to 0 or it could be deleted before the caller can retain it.
1108
    bind_ptr();
5,076✔
1109
    m_client.register_unactualized_session_wrapper(this);
5,076✔
1110
}
5,076✔
1111

1112
SessionWrapper::~SessionWrapper() noexcept
1113
{
5,076✔
1114
    // We begin actualization in the constructor and do not delete the wrapper
1115
    // until both the Client is done with it and the Session has abandoned it,
1116
    // so at this point we must have actualized, finalized, and been abandoned.
1117
    REALM_ASSERT(m_actualized);
5,076✔
1118
    REALM_ASSERT(m_abandoned);
5,076✔
1119
    REALM_ASSERT(m_finalized);
5,076✔
1120
    REALM_ASSERT(m_closed);
5,076✔
1121
    REALM_ASSERT(!m_db);
5,076✔
1122
}
5,076✔
1123

1124

1125
inline ClientReplication& SessionWrapper::get_replication() noexcept
1126
{
59,636✔
1127
    REALM_ASSERT(m_db);
59,636✔
1128
    return static_cast<ClientReplication&>(*m_replication);
59,636✔
1129
}
59,636✔
1130

1131

1132
inline ClientImpl& SessionWrapper::get_client() noexcept
1133
{
×
1134
    return m_client;
×
1135
}
×
1136

1137
bool SessionWrapper::has_flx_subscription_store() const
1138
{
×
1139
    return static_cast<bool>(m_flx_subscription_store);
×
1140
}
×
1141

1142
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1143
{
12,906✔
1144
    REALM_ASSERT(!m_finalized);
12,906✔
1145
    return m_flx_subscription_store.get();
12,906✔
1146
}
12,906✔
1147

1148
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
1149
{
4,158✔
1150
    REALM_ASSERT(!m_finalized);
4,158✔
1151
    return m_flx_pending_bootstrap_store.get();
4,158✔
1152
}
4,158✔
1153

1154
MigrationStore* SessionWrapper::get_migration_store()
1155
{
35,866✔
1156
    REALM_ASSERT(!m_finalized);
35,866✔
1157
    return m_migration_store.get();
35,866✔
1158
}
35,866✔
1159

1160
inline bool SessionWrapper::mark_abandoned()
1161
{
5,076✔
1162
    REALM_ASSERT(!m_abandoned);
5,076✔
1163
    m_abandoned = true;
5,076✔
1164
    return m_finalized;
5,076✔
1165
}
5,076✔
1166

1167

1168
void SessionWrapper::on_commit(version_type new_version)
1169
{
57,142✔
1170
    // Thread safety required
1171
    m_client.post([self = util::bind_ptr{this}, new_version] {
57,142✔
1172
        REALM_ASSERT(self->m_actualized);
57,142✔
1173
        if (REALM_UNLIKELY(!self->m_sess))
57,142✔
1174
            return; // Already finalized
222✔
1175
        SessionImpl& sess = *self->m_sess;
56,920✔
1176
        sess.recognize_sync_version(new_version); // Throws
56,920✔
1177
        self->check_progress();                   // Throws
56,920✔
1178
    });
56,920✔
1179
}
57,142✔
1180

1181

1182
void SessionWrapper::cancel_reconnect_delay()
1183
{
16✔
1184
    // Thread safety required
1185

1186
    m_client.post([self = util::bind_ptr{this}] {
16✔
1187
        REALM_ASSERT(self->m_actualized);
16✔
1188
        if (REALM_UNLIKELY(self->m_closed)) {
16✔
1189
            return;
×
1190
        }
×
1191

1192
        if (REALM_UNLIKELY(!self->m_sess))
16✔
1193
            return; // Already finalized
×
1194
        SessionImpl& sess = *self->m_sess;
16✔
1195
        sess.cancel_resumption_delay(); // Throws
16✔
1196
        ClientImpl::Connection& conn = sess.get_connection();
16✔
1197
        conn.cancel_reconnect_delay(); // Throws
16✔
1198
    });                                // Throws
16✔
1199
}
16✔
1200

1201
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
1202
                                    WaitOperCompletionHandler handler)
1203
{
14,282✔
1204
    REALM_ASSERT(upload_completion || download_completion);
14,282✔
1205

1206
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
14,282✔
1207
                   download_completion](Status status) mutable {
14,282✔
1208
        REALM_ASSERT(self->m_actualized);
14,280✔
1209
        if (!status.is_ok()) {
14,280✔
1210
            handler(status); // Throws
×
1211
            return;
×
1212
        }
×
1213
        if (REALM_UNLIKELY(!self->m_sess)) {
14,280✔
1214
            // Already finalized
1215
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
92✔
1216
            return;
92✔
1217
        }
92✔
1218
        if (upload_completion) {
14,188✔
1219
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
7,744✔
1220
            if (download_completion) {
7,744✔
1221
                // Wait for upload and download completion
1222
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
182✔
1223
            }
182✔
1224
            else {
7,562✔
1225
                // Wait for upload completion only
1226
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
7,562✔
1227
            }
7,562✔
1228
        }
7,744✔
1229
        else {
6,444✔
1230
            // Wait for download completion only
1231
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
6,444✔
1232
        }
6,444✔
1233
        SessionImpl& sess = *self->m_sess;
14,188✔
1234
        if (upload_completion)
14,188✔
1235
            self->check_progress();
7,746✔
1236
        if (download_completion)
14,188✔
1237
            sess.request_download_completion_notification(); // Throws
6,626✔
1238
    });                                                      // Throws
14,188✔
1239
}
14,282✔
1240

1241

1242
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
1243
{
6,456✔
1244
    // Thread safety required
1245
    REALM_ASSERT(!m_abandoned);
6,456✔
1246

1247
    auto pf = util::make_promise_future<bool>();
6,456✔
1248
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
6,456✔
1249
        promise.emplace_value(status.is_ok());
6,456✔
1250
    });
6,456✔
1251
    return pf.future.get();
6,456✔
1252
}
6,456✔
1253

1254

1255
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
1256
{
5,004✔
1257
    // Thread safety required
1258
    REALM_ASSERT(!m_abandoned);
5,004✔
1259

1260
    auto pf = util::make_promise_future<bool>();
5,004✔
1261
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
5,004✔
1262
        promise.emplace_value(status.is_ok());
5,004✔
1263
    });
5,004✔
1264
    return pf.future.get();
5,004✔
1265
}
5,004✔
1266

1267

1268
void SessionWrapper::refresh(std::string_view signed_access_token)
1269
{
114✔
1270
    // Thread safety required
1271
    REALM_ASSERT(!m_abandoned);
114✔
1272

1273
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
114✔
1274
        REALM_ASSERT(self->m_actualized);
114✔
1275
        if (REALM_UNLIKELY(!self->m_sess))
114✔
1276
            return; // Already finalized
×
1277
        self->m_signed_access_token = std::move(token);
114✔
1278
        SessionImpl& sess = *self->m_sess;
114✔
1279
        ClientImpl::Connection& conn = sess.get_connection();
114✔
1280
        // FIXME: This only makes sense when each session uses a separate connection.
1281
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
114✔
1282
        sess.cancel_resumption_delay();                                                          // Throws
114✔
1283
        conn.cancel_reconnect_delay();                                                           // Throws
114✔
1284
    });
114✔
1285
}
114✔
1286

1287

1288
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
1289
{
5,076✔
1290
    ClientImpl& client = wrapper->m_client;
5,076✔
1291
    client.register_abandoned_session_wrapper(std::move(wrapper));
5,076✔
1292
}
5,076✔
1293

1294

1295
// Must be called from event loop thread
1296
void SessionWrapper::actualize()
1297
{
5,042✔
1298
    // actualize() can only ever be called once
1299
    REALM_ASSERT(!m_actualized);
5,042✔
1300
    REALM_ASSERT(!m_sess);
5,042✔
1301
    // The client should have removed this wrapper from those pending
1302
    // actualization if it called force_close() or finalize_before_actualize()
1303
    REALM_ASSERT(!m_finalized);
5,042✔
1304
    REALM_ASSERT(!m_closed);
5,042✔
1305

1306
    m_actualized = true;
5,042✔
1307

1308
    ScopeExitFail close_on_error([&]() noexcept {
5,042✔
1309
        m_closed = true;
2✔
1310
    });
2✔
1311

1312
    m_db->claim_sync_agent();
5,042✔
1313
    m_db->add_commit_listener(this);
5,042✔
1314
    ScopeExitFail remove_commit_listener([&]() noexcept {
5,042✔
1315
        m_db->remove_commit_listener(this);
×
1316
    });
×
1317

1318
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
5,042✔
1319
                            m_user_id,           m_sync_mode,      m_server_verified};
5,042✔
1320
    bool was_created = false;
5,042✔
1321
    ClientImpl::Connection& conn = m_client.get_connection(
5,042✔
1322
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
5,042✔
1323
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
5,042✔
1324
        was_created); // Throws
5,042✔
1325
    ScopeExitFail remove_connection([&]() noexcept {
5,042✔
1326
        if (was_created)
×
1327
            m_client.remove_connection(conn);
×
1328
    });
×
1329

1330
    // FIXME: This only makes sense when each session uses a separate connection.
1331
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
5,042✔
1332
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
5,042✔
1333
    if (m_sync_mode == SyncServerMode::FLX) {
5,042✔
1334
        m_flx_pending_bootstrap_store =
904✔
1335
            std::make_unique<PendingBootstrapStore>(m_db, sess->logger, m_flx_subscription_store);
904✔
1336
    }
904✔
1337

1338
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
5,042✔
1339
    m_sess = sess.get();
5,042✔
1340
    ScopeExitFail clear_sess([&]() noexcept {
5,042✔
1341
        m_sess = nullptr;
×
1342
    });
×
1343
    conn.activate_session(std::move(sess)); // Throws
5,042✔
1344

1345
    if (was_created)
5,042✔
1346
        conn.activate(); // Throws
1,354✔
1347

1348
    if (m_connection_state_change_listener) {
5,042✔
1349
        ConnectionState state = conn.get_state();
5,036✔
1350
        if (state != ConnectionState::disconnected) {
5,036✔
1351
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
3,606✔
1352
            if (state == ConnectionState::connected)
3,606✔
1353
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
3,578✔
1354
        }
3,606✔
1355
    }
5,036✔
1356

1357
    if (!m_client_reset_config)
5,042✔
1358
        check_progress(); // Throws
4,828✔
1359
}
5,042✔
1360

1361
void SessionWrapper::force_close()
1362
{
5,094✔
1363
    if (m_closed) {
5,094✔
1364
        return;
54✔
1365
    }
54✔
1366
    REALM_ASSERT(m_actualized);
5,040✔
1367
    REALM_ASSERT(m_sess);
5,040✔
1368
    m_closed = true;
5,040✔
1369

1370
    ClientImpl::Connection& conn = m_sess->get_connection();
5,040✔
1371
    conn.initiate_session_deactivation(m_sess); // Throws
5,040✔
1372

1373
    // We need to keep the DB open until finalization, but we no longer want to
1374
    // know when commits are made
1375
    m_db->remove_commit_listener(this);
5,040✔
1376

1377
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1378
    m_flx_pending_bootstrap_store.reset();
5,040✔
1379
    // Clear the subscription and migration store refs since they are owned by SyncSession
1380
    m_flx_subscription_store.reset();
5,040✔
1381
    m_migration_store.reset();
5,040✔
1382
    m_sess = nullptr;
5,040✔
1383
    // Everything is being torn down, no need to report connection state anymore
1384
    m_connection_state_change_listener = {};
5,040✔
1385

1386
    // All outstanding wait operations must be canceled
1387
    while (!m_upload_completion_handlers.empty()) {
5,220✔
1388
        auto handler = std::move(m_upload_completion_handlers.back());
180✔
1389
        m_upload_completion_handlers.pop_back();
180✔
1390
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
180✔
1391
    }
180✔
1392
    while (!m_download_completion_handlers.empty()) {
5,240✔
1393
        auto handler = std::move(m_download_completion_handlers.back());
200✔
1394
        m_download_completion_handlers.pop_back();
200✔
1395
        handler(
200✔
1396
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
200✔
1397
    }
200✔
1398
    while (!m_sync_completion_handlers.empty()) {
5,046✔
1399
        auto handler = std::move(m_sync_completion_handlers.back());
6✔
1400
        m_sync_completion_handlers.pop_back();
6✔
1401
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
6✔
1402
    }
6✔
1403
}
5,040✔
1404

1405
// Must be called from event loop thread
1406
//
1407
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
1408
// have been acquired at some point in between the final read or write ever made
1409
// from a different thread and when this is called.
1410
void SessionWrapper::finalize()
1411
{
5,042✔
1412
    REALM_ASSERT(m_actualized);
5,042✔
1413
    REALM_ASSERT(m_abandoned);
5,042✔
1414
    REALM_ASSERT(!m_finalized);
5,042✔
1415

1416
    force_close();
5,042✔
1417

1418
    m_finalized = true;
5,042✔
1419

1420
    // The Realm file can be closed now, as no access to the Realm file is
1421
    // supposed to happen on behalf of a session after initiation of
1422
    // deactivation.
1423
    m_db->release_sync_agent();
5,042✔
1424
    m_db = nullptr;
5,042✔
1425
}
5,042✔
1426

1427

1428
// Must be called only when an unactualized session wrapper becomes abandoned.
1429
//
1430
// Called with a lock on `m_client.m_mutex`.
1431
inline void SessionWrapper::finalize_before_actualization() noexcept
1432
{
34✔
1433
    REALM_ASSERT(!m_finalized);
34✔
1434
    REALM_ASSERT(!m_sess);
34✔
1435
    m_actualized = true;
34✔
1436
    m_finalized = true;
34✔
1437
    m_closed = true;
34✔
1438
    m_db->remove_commit_listener(this);
34✔
1439
    m_db->release_sync_agent();
34✔
1440
    m_db = nullptr;
34✔
1441
}
34✔
1442

1443
void SessionWrapper::on_download_completion()
1444
{
8,332✔
1445
    // Ensure that progress handlers get called before completion handlers. The
1446
    // download completing performed a commit and will trigger progress
1447
    // notifications asynchronously, but they would arrive after the download
1448
    // completion without this.
1449
    check_progress();
8,332✔
1450

1451
    if (m_flx_subscription_store) {
8,332✔
1452
        m_flx_subscription_store->download_complete();
1,488✔
1453
    }
1,488✔
1454

1455
    while (!m_download_completion_handlers.empty()) {
14,702✔
1456
        auto handler = std::move(m_download_completion_handlers.back());
6,370✔
1457
        m_download_completion_handlers.pop_back();
6,370✔
1458
        handler(Status::OK()); // Throws
6,370✔
1459
    }
6,370✔
1460
    while (!m_sync_completion_handlers.empty()) {
8,380✔
1461
        auto handler = std::move(m_sync_completion_handlers.back());
48✔
1462
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
48✔
1463
        m_sync_completion_handlers.pop_back();
48✔
1464
    }
48✔
1465
}
8,332✔
1466

1467

1468
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
1469
{
412✔
1470
    REALM_ASSERT(!m_finalized);
412✔
1471
    m_suspended = true;
412✔
1472
    if (m_connection_state_change_listener) {
412✔
1473
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
412✔
1474
    }
412✔
1475
}
412✔
1476

1477

1478
void SessionWrapper::on_resumed()
1479
{
86✔
1480
    REALM_ASSERT(!m_finalized);
86✔
1481
    m_suspended = false;
86✔
1482
    if (m_connection_state_change_listener) {
86✔
1483
        ClientImpl::Connection& conn = m_sess->get_connection();
86✔
1484
        if (conn.get_state() != ConnectionState::disconnected) {
86✔
1485
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
82✔
1486
            if (conn.get_state() == ConnectionState::connected)
82✔
1487
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
76✔
1488
        }
82✔
1489
    }
86✔
1490
}
86✔
1491

1492

1493
void SessionWrapper::on_connection_state_changed(ConnectionState state,
1494
                                                 const std::optional<SessionErrorInfo>& error_info)
1495
{
5,528✔
1496
    if (m_connection_state_change_listener && !m_suspended) {
5,528✔
1497
        m_connection_state_change_listener(state, error_info); // Throws
5,498✔
1498
    }
5,498✔
1499
}
5,528✔
1500

1501
void SessionWrapper::init_progress_handler()
1502
{
5,212✔
1503
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
5,212✔
1504
}
5,212✔
1505

1506
void SessionWrapper::check_progress()
1507
{
77,824✔
1508
    REALM_ASSERT(!m_finalized);
77,824✔
1509
    REALM_ASSERT(m_sess);
77,824✔
1510

1511
    // Check if there's anything which even wants progress or completion information
1512
    bool has_progress_handler = m_progress_handler && m_reliable_download_progress;
77,824✔
1513
    bool has_completion_handler = !m_upload_completion_handlers.empty() || !m_sync_completion_handlers.empty();
77,824✔
1514
    if (!m_flx_subscription_store && !has_progress_handler && !has_completion_handler)
77,824✔
1515
        return;
39,464✔
1516

1517
    // The order in which we report each type of completion or progress is important,
1518
    // and changing it needs to be avoided as it'd be a breaking change to the APIs
1519

1520
    TransactionRef tr;
38,360✔
1521
    ReportedProgress p;
38,360✔
1522
    if (m_flx_subscription_store) {
38,360✔
1523
        m_flx_subscription_store->report_progress(tr);
18,382✔
1524
    }
18,382✔
1525

1526
    if (!has_progress_handler && !has_completion_handler)
38,360✔
1527
        return;
10,196✔
1528
    // The subscription store may have started a read transaction that we'll
1529
    // reuse, but it may not have needed to or may not exist
1530
    if (!tr)
28,164✔
1531
        tr = m_db->start_read();
24,172✔
1532

1533
    version_type uploaded_version;
28,164✔
1534
    DownloadableProgress downloadable;
28,164✔
1535
    ClientHistory::get_upload_download_state(*tr, m_db->get_alloc(), p.downloaded, downloadable, p.uploaded,
28,164✔
1536
                                             p.uploadable, p.snapshot, uploaded_version);
28,164✔
1537
    if (m_flx_subscription_store && has_progress_handler)
28,164✔
1538
        p.query_version = m_flx_subscription_store->get_downloading_query_version(*tr);
7,402✔
1539

1540
    report_progress(p, downloadable);
28,164✔
1541
    report_upload_completion(uploaded_version);
28,164✔
1542
}
28,164✔
1543

1544
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1545
{
28,164✔
1546
    if (uploaded_version < m_upload_completion_requested_version)
28,164✔
1547
        return;
18,442✔
1548

1549
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
9,722✔
1550
              std::back_inserter(m_download_completion_handlers));
9,722✔
1551
    m_sync_completion_handlers.clear();
9,722✔
1552

1553
    while (!m_upload_completion_handlers.empty()) {
17,154✔
1554
        auto handler = std::move(m_upload_completion_handlers.back());
7,432✔
1555
        m_upload_completion_handlers.pop_back();
7,432✔
1556
        handler(Status::OK()); // Throws
7,432✔
1557
    }
7,432✔
1558
}
9,722✔
1559

1560
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1561
{
28,164✔
1562
    if (!m_progress_handler)
28,164✔
1563
        return;
13,686✔
1564

1565
    // Ignore progress messages from before we first receive a DOWNLOAD message
1566
    if (!m_reliable_download_progress)
14,478✔
1567
        return;
1,610✔
1568

1569
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
12,868✔
1570
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
8,568✔
1571
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
8,568✔
1572

1573
        // The effect of this calculation is that if new bytes are added for download/upload,
1574
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1575
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1576
        // before progress has reached 1.0.
1577
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
1578
        // Example for upload progress reported:
1579
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
1580

1581
        double progress_estimate = 1.0;
8,568✔
1582
        if (final_transferred < transferable && transferred < transferable)
8,568✔
1583
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
4,100✔
1584
        return progress_estimate;
8,568✔
1585
    };
8,568✔
1586

1587
    bool upload_completed = p.uploaded == p.uploadable;
12,868✔
1588
    double upload_estimate = 1.0;
12,868✔
1589
    if (!upload_completed)
12,868✔
1590
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
4,076✔
1591

1592
    bool download_completed = p.downloaded == 0;
12,868✔
1593
    p.download_estimate = 1.00;
12,868✔
1594
    if (m_flx_pending_bootstrap_store) {
12,868✔
1595
        p.download_estimate = downloadable.as_estimate();
7,402✔
1596
        if (m_flx_pending_bootstrap_store->has_pending()) {
7,402✔
1597
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
1,372✔
1598
        }
1,372✔
1599
        download_completed = p.download_estimate >= 1.0;
7,402✔
1600

1601
        // for flx with download estimate these bytes are not known
1602
        // provide some sensible value for non-streaming version of object-store callbacks
1603
        // until these field are completely removed from the api after pbs deprecation
1604
        p.downloadable = p.downloaded;
7,402✔
1605
        if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
7,402✔
1606
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
1,390✔
1607
    }
7,402✔
1608
    else {
5,466✔
1609
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1610
        // is only the remaining to download. This is confusing, so make them use
1611
        // the same units.
1612
        p.downloadable = downloadable.as_bytes() + p.downloaded;
5,466✔
1613
        if (!download_completed)
5,466✔
1614
            p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
4,492✔
1615
    }
5,466✔
1616

1617
    if (download_completed)
12,868✔
1618
        m_final_downloaded = p.downloaded;
6,982✔
1619
    if (upload_completed)
12,868✔
1620
        m_final_uploaded = p.uploaded;
8,792✔
1621

1622
    if (p == m_reported_progress)
12,868✔
1623
        return;
8,802✔
1624

1625
    m_reported_progress = p;
4,066✔
1626

1627
    if (m_sess->logger.would_log(Logger::Level::debug)) {
4,066✔
1628
        auto to_str = [](double d) {
7,896✔
1629
            std::ostringstream ss;
7,896✔
1630
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
1631
            ss << std::fixed << std::setprecision(4) << d;
7,896✔
1632
            return ss.str();
7,896✔
1633
        };
7,896✔
1634
        m_sess->logger.debug(
3,948✔
1635
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
3,948✔
1636
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
3,948✔
1637
            p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
3,948✔
1638
            to_str(upload_estimate), p.snapshot, p.query_version);
3,948✔
1639
    }
3,948✔
1640

1641
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
4,066✔
1642
                       upload_estimate, p.query_version);
4,066✔
1643
}
4,066✔
1644

1645
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1646
{
36✔
1647
    if (!m_sess) {
36✔
1648
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
×
1649
    }
×
1650

1651
    return m_sess->send_test_command(std::move(body));
36✔
1652
}
36✔
1653

1654
void SessionWrapper::handle_pending_client_reset_acknowledgement()
1655
{
5,212✔
1656
    REALM_ASSERT(!m_finalized);
5,212✔
1657

1658
    auto has_pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen());
5,212✔
1659
    if (!has_pending_reset) {
5,212✔
1660
        return; // nothing to do
5,028✔
1661
    }
5,028✔
1662

1663
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
184✔
1664

1665
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1666
    async_wait_for(
184✔
1667
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
184✔
1668
            if (status == ErrorCodes::OperationAborted) {
184✔
1669
                return;
82✔
1670
            }
82✔
1671
            auto& logger = self->m_sess->logger;
102✔
1672
            if (!status.is_ok()) {
102✔
1673
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
×
1674
                             status);
×
1675
                return;
×
1676
            }
×
1677

1678
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
102✔
1679

1680
            auto tr = self->m_db->start_write();
102✔
1681
            auto cur_pending_reset = PendingResetStore::has_pending_reset(*tr);
102✔
1682
            if (!cur_pending_reset) {
102✔
1683
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
2✔
1684
                return;
2✔
1685
            }
2✔
1686
            if (*cur_pending_reset == pending_reset) {
100✔
1687
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
100✔
1688
            }
100✔
1689
            else {
×
1690
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
×
1691
            }
×
1692
            PendingResetStore::clear_pending_reset(*tr);
100✔
1693
            tr->commit();
100✔
1694
        });
100✔
1695
}
184✔
1696

1697
std::string SessionWrapper::get_appservices_connection_id()
1698
{
38✔
1699
    auto pf = util::make_promise_future<std::string>();
38✔
1700

1701
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
38✔
1702
        if (!status.is_ok()) {
38✔
1703
            promise.set_error(status);
×
1704
            return;
×
1705
        }
×
1706

1707
        if (!self->m_sess) {
38✔
1708
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
×
1709
            return;
×
1710
        }
×
1711

1712
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
38✔
1713
    });
38✔
1714

1715
    return pf.future.get();
38✔
1716
}
38✔
1717

1718
// ################ ClientImpl::Connection ################
1719

1720
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1721
                                   const std::string& authorization_header_name,
1722
                                   const std::map<std::string, std::string>& custom_http_headers,
1723
                                   bool verify_servers_ssl_certificate,
1724
                                   Optional<std::string> ssl_trust_certificate_path,
1725
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1726
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
1727
    : logger{make_logger(ident, std::nullopt, client.logger.base_logger)} // Throws
1,354✔
1728
    , m_client{client}
1,354✔
1729
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
1,354✔
1730
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
1,354✔
1731
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
1,354✔
1732
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1,354✔
1733
    , m_reconnect_info{reconnect_info}
1,354✔
1734
    , m_ident{ident}
1,354✔
1735
    , m_server_endpoint{std::move(endpoint)}
1,354✔
1736
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
1,354✔
1737
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1,354✔
1738
{
1,354✔
1739
    m_on_idle = m_client.create_trigger([this](Status status) {
1,358✔
1740
        if (status == ErrorCodes::OperationAborted)
1,358✔
1741
            return;
×
1742
        else if (!status.is_ok())
1,358✔
1743
            throw Exception(status);
×
1744

1745
        REALM_ASSERT(m_activated);
1,358✔
1746
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1,358✔
1747
            on_idle(); // Throws
1,354✔
1748
            // Connection object may be destroyed now.
1749
        }
1,354✔
1750
    });
1,358✔
1751
}
1,354✔
1752

1753
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1754
{
6✔
1755
    return m_ident;
6✔
1756
}
6✔
1757

1758

1759
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1760
{
1,354✔
1761
    return m_server_endpoint;
1,354✔
1762
}
1,354✔
1763

1764
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1765
                                                        const std::string& signed_access_token)
1766
{
5,154✔
1767
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
5,154✔
1768
    m_signed_access_token = signed_access_token;           // Throws (copy)
5,154✔
1769
}
5,154✔
1770

1771

1772
void ClientImpl::Connection::resume_active_sessions()
1773
{
828✔
1774
    auto handler = [=](ClientImpl::Session& sess) {
1,654✔
1775
        sess.cancel_resumption_delay(); // Throws
1,654✔
1776
    };
1,654✔
1777
    for_each_active_session(std::move(handler)); // Throws
828✔
1778
}
828✔
1779

1780
void ClientImpl::Connection::on_idle()
1781
{
1,354✔
1782
    logger.debug(util::LogCategory::session, "Destroying connection object");
1,354✔
1783
    ClientImpl& client = get_client();
1,354✔
1784
    client.remove_connection(*this);
1,354✔
1785
    // NOTE: This connection object is now destroyed!
1786
}
1,354✔
1787

1788

1789
std::string ClientImpl::Connection::get_http_request_path() const
1790
{
1,782✔
1791
    using namespace std::string_view_literals;
1,782✔
1792
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
1,782✔
1793

1794
    std::string path;
1,782✔
1795
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
1,782✔
1796
    path += m_http_request_path_prefix;
1,782✔
1797
    path += param;
1,782✔
1798
    path += m_signed_access_token;
1,782✔
1799

1800
    return path;
1,782✔
1801
}
1,782✔
1802

1803

1804
std::shared_ptr<util::Logger> ClientImpl::Connection::make_logger(connection_ident_type ident,
1805
                                                                  std::optional<std::string_view> coid,
1806
                                                                  std::shared_ptr<util::Logger> base_logger)
1807
{
4,666✔
1808
    std::string prefix =
4,666✔
1809
        coid ? util::format("Connection[%1:%2] ", ident, *coid) : util::format("Connection[%1] ", ident);
4,666✔
1810
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix), base_logger);
4,666✔
1811
}
4,666✔
1812

1813

1814
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
1815
                                                            std::optional<SessionErrorInfo> error_info)
1816
{
5,244✔
1817
    if (m_force_closed) {
5,244✔
1818
        return;
1,172✔
1819
    }
1,172✔
1820
    auto handler = [=](ClientImpl::Session& sess) {
5,446✔
1821
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
5,446✔
1822
        sess_2.on_connection_state_changed(state, error_info); // Throws
5,446✔
1823
    };
5,446✔
1824
    for_each_active_session(std::move(handler)); // Throws
4,072✔
1825
}
4,072✔
1826

1827

1828
Client::Client(Config config)
1829
    : m_impl{new ClientImpl{std::move(config)}} // Throws
4,912✔
1830
{
4,912✔
1831
}
4,912✔
1832

1833

1834
Client::Client(Client&& client) noexcept
1835
    : m_impl{std::move(client.m_impl)}
×
1836
{
×
1837
}
×
1838

1839

1840
Client::~Client() noexcept {}
4,912✔
1841

1842

1843
void Client::shutdown() noexcept
1844
{
4,954✔
1845
    m_impl->shutdown();
4,954✔
1846
}
4,954✔
1847

1848
void Client::shutdown_and_wait()
1849
{
388✔
1850
    m_impl->shutdown_and_wait();
388✔
1851
}
388✔
1852

1853
void Client::cancel_reconnect_delay()
1854
{
830✔
1855
    m_impl->cancel_reconnect_delay();
830✔
1856
}
830✔
1857

1858
void Client::voluntary_disconnect_all_connections()
1859
{
4✔
1860
    m_impl->voluntary_disconnect_all_connections();
4✔
1861
}
4✔
1862

1863
bool Client::wait_for_session_terminations_or_client_stopped()
1864
{
4,600✔
1865
    return m_impl->wait_for_session_terminations_or_client_stopped();
4,600✔
1866
}
4,600✔
1867

1868
util::Future<void> Client::notify_session_terminated()
1869
{
28✔
1870
    return m_impl->notify_session_terminated();
28✔
1871
}
28✔
1872

1873
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1874
                                  port_type& port, std::string& path) const
1875
{
2,024✔
1876
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
2,024✔
1877
}
2,024✔
1878

1879

1880
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1881
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
1882
{
5,076✔
1883
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
5,076✔
1884
                                std::move(config)}; // Throws
5,076✔
1885
}
5,076✔
1886

1887

1888
void Session::nonsync_transact_notify(version_type new_version)
1889
{
10,082✔
1890
    m_impl->on_commit(new_version); // Throws
10,082✔
1891
}
10,082✔
1892

1893

1894
void Session::cancel_reconnect_delay()
1895
{
16✔
1896
    m_impl->cancel_reconnect_delay(); // Throws
16✔
1897
}
16✔
1898

1899

1900
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
1901
{
2,638✔
1902
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
2,638✔
1903
}
2,638✔
1904

1905

1906
bool Session::wait_for_upload_complete_or_client_stopped()
1907
{
6,456✔
1908
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
6,456✔
1909
}
6,456✔
1910

1911

1912
bool Session::wait_for_download_complete_or_client_stopped()
1913
{
5,004✔
1914
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
5,004✔
1915
}
5,004✔
1916

1917

1918
void Session::refresh(std::string_view signed_access_token)
1919
{
114✔
1920
    m_impl->refresh(signed_access_token); // Throws
114✔
1921
}
114✔
1922

1923

1924
void Session::abandon() noexcept
1925
{
5,076✔
1926
    REALM_ASSERT(m_impl);
5,076✔
1927
    // Reabsorb the ownership assigned to the applications naked pointer by
1928
    // Session constructor
1929
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
5,076✔
1930
    SessionWrapper::abandon(std::move(wrapper));
5,076✔
1931
}
5,076✔
1932

1933
util::Future<std::string> Session::send_test_command(std::string body)
1934
{
36✔
1935
    return m_impl->send_test_command(std::move(body));
36✔
1936
}
36✔
1937

1938
std::string Session::get_appservices_connection_id()
1939
{
38✔
1940
    return m_impl->get_appservices_connection_id();
38✔
1941
}
38✔
1942

1943
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
1944
{
×
1945
    switch (proxyType) {
×
1946
        case ProxyConfig::Type::HTTP:
×
1947
            return os << "HTTP";
×
1948
        case ProxyConfig::Type::HTTPS:
×
1949
            return os << "HTTPS";
×
1950
    }
×
1951
    REALM_TERMINATE("Invalid Proxy Type object.");
1952
}
×
1953

1954
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc