• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_440

02 Jul 2024 07:51PM UTC coverage: 91.007% (+0.03%) from 90.974%
thomas.goyne_440

push

Evergreen

web-flow
[RCORE-2146] CAPI Remove `is_fatal` flag flip (#7751)

102408 of 180620 branches covered (56.7%)

0 of 1 new or added line in 1 file covered. (0.0%)

619 existing lines in 26 files now uncovered.

215623 of 236930 relevant lines covered (91.01%)

5563737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.79
/src/realm/sync/client.cpp
1
#include <realm/sync/client.hpp>
2

3
#include <realm/sync/config.hpp>
4
#include <realm/sync/noinst/client_impl_base.hpp>
5
#include <realm/sync/noinst/client_reset.hpp>
6
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
7
#include <realm/sync/noinst/pending_reset_store.hpp>
8
#include <realm/sync/protocol.hpp>
9
#include <realm/sync/subscriptions.hpp>
10
#include <realm/util/bind_ptr.hpp>
11

12
namespace realm::sync {
13
namespace {
14
using namespace realm::util;
15

16

17
// clang-format off
18
using SessionImpl                     = ClientImpl::Session;
19
using SyncTransactCallback            = Session::SyncTransactCallback;
20
using ProgressHandler                 = Session::ProgressHandler;
21
using WaitOperCompletionHandler       = Session::WaitOperCompletionHandler;
22
using ConnectionStateChangeListener   = Session::ConnectionStateChangeListener;
23
using port_type                       = Session::port_type;
24
using connection_ident_type           = std::int_fast64_t;
25
using ProxyConfig                     = SyncConfig::ProxyConfig;
26
// clang-format on
27

28
} // unnamed namespace
29

30

31
// Life cycle states of a session wrapper:
32
//
33
// The session wrapper begins life with an associated Client, but no underlying
34
// SessionImpl. On construction, it begins the actualization process by posting
35
// a job to the client's event loop. That job will set `m_sess` to a session impl
36
// and then set `m_actualized = true`. Once this happens `m_actualized` will
37
// never change again.
38
//
39
// When the external reference to the session (`sync::Session`, which in
40
// non-test code is always owned by a `SyncSession`) is destroyed, the wrapper
41
// begins finalization. If the wrapper has not yet been actualized this takes
42
// place immediately and `m_finalized = true` is set directly on the calling
43
// thread. If it has been actualized, a job is posted to the client's event loop
44
// which will tear down the session and then set `m_finalized = true`. Regardless
45
// of whether or not the session has been actualized, `m_abandoned = true` is
46
// immediately set when the external reference is released.
47
//
48
// When the associated Client is destroyed it calls force_close() on all
49
// actualized wrappers from its event loop. This causes the wrapper to tear down
50
// the session, but not not make it proceed to the finalized state. In normal
51
// usage the client will outlive all sessions, but in tests getting the teardown
52
// correct and race-free can be tricky so we permit either order.
53
//
54
// The wrapper will exist with `m_abandoned = true` and `m_finalized = false`
55
// only while waiting for finalization to happen. It will exist with
56
// `m_finalized = true` only while there are pending post handlers yet to be
57
// executed.
58
class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener {
59
public:
60
    SessionWrapper(ClientImpl&, DBRef db, std::shared_ptr<SubscriptionStore>, std::shared_ptr<MigrationStore>,
61
                   Session::Config&&);
62
    ~SessionWrapper() noexcept;
63

64
    ClientReplication& get_replication() noexcept;
65
    ClientImpl& get_client() noexcept;
66

67
    bool has_flx_subscription_store() const;
68
    SubscriptionStore* get_flx_subscription_store();
69
    PendingBootstrapStore* get_flx_pending_bootstrap_store();
70

71
    MigrationStore* get_migration_store();
72

73
    // Immediately initiate deactivation of the wrapped session. Sets m_closed
74
    // but *not* m_finalized.
75
    // Must be called from event loop thread.
76
    void force_close();
77

78
    // Can be called from any thread.
79
    void on_commit(version_type new_version) override;
80
    // Can be called from any thread.
81
    void cancel_reconnect_delay();
82

83
    // Can be called from any thread.
84
    void async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler);
85
    // Can be called from any thread.
86
    bool wait_for_upload_complete_or_client_stopped();
87
    // Can be called from any thread.
88
    bool wait_for_download_complete_or_client_stopped();
89

90
    // Can be called from any thread.
91
    void refresh(std::string_view signed_access_token);
92

93
    // Can be called from any thread.
94
    static void abandon(util::bind_ptr<SessionWrapper>) noexcept;
95

96
    // These are called from ClientImpl
97
    // Must be called from event loop thread.
98
    void actualize();
99
    void finalize();
100
    void finalize_before_actualization() noexcept;
101

102
    // Can be called from any thread.
103
    util::Future<std::string> send_test_command(std::string body);
104

105
    void handle_pending_client_reset_acknowledgement();
106

107
    void update_subscription_version_info();
108

109
    // Can be called from any thread.
110
    std::string get_appservices_connection_id();
111

112
    // Can be called from any thread, but inherently cannot be called
113
    // concurrently with calls to any of the other non-confined functions.
114
    bool mark_abandoned();
115

116
private:
117
    ClientImpl& m_client;
118
    DBRef m_db;
119
    Replication* m_replication;
120

121
    const ProtocolEnvelope m_protocol_envelope;
122
    const std::string m_server_address;
123
    const port_type m_server_port;
124
    const bool m_server_verified;
125
    const std::string m_user_id;
126
    const SyncServerMode m_sync_mode;
127
    const std::string m_authorization_header_name;
128
    const std::map<std::string, std::string> m_custom_http_headers;
129
    const bool m_verify_servers_ssl_certificate;
130
    const bool m_simulate_integration_error;
131
    const std::optional<std::string> m_ssl_trust_certificate_path;
132
    const std::function<SyncConfig::SSLVerifyCallback> m_ssl_verify_callback;
133
    const size_t m_flx_bootstrap_batch_size_bytes;
134
    const std::string m_http_request_path_prefix;
135
    const std::string m_virt_path;
136
    const std::optional<ProxyConfig> m_proxy_config;
137

138
    // This one is different from null when, and only when the session wrapper
139
    // is in ClientImpl::m_abandoned_session_wrappers.
140
    SessionWrapper* m_next = nullptr;
141

142
    // These may only be accessed by the event loop thread.
143
    std::string m_signed_access_token;
144
    std::optional<ClientReset> m_client_reset_config;
145

146
    struct ReportedProgress {
147
        uint64_t snapshot;
148
        uint64_t uploaded;
149
        uint64_t uploadable;
150
        uint64_t downloaded;
151
        uint64_t downloadable;
152

153
        // Does not check snapshot
20,318✔
154
        bool operator==(const ReportedProgress& p) const noexcept
20,318✔
155
        {
20,318✔
156
            return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
20,318✔
157
                   downloadable == p.downloadable;
158
        }
159
    };
160
    std::optional<ReportedProgress> m_reported_progress;
161
    uint64_t m_final_uploaded = 0;
162
    uint64_t m_final_downloaded = 0;
163

164
    const util::UniqueFunction<ProgressHandler> m_progress_handler;
165
    util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;
166

167
    const util::UniqueFunction<SyncClientHookAction(SyncClientHookData const&)> m_debug_hook;
168
    bool m_in_debug_hook = false;
169

170
    const SessionReason m_session_reason;
171

172
    const uint64_t m_schema_version;
173

174
    std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
175
    int64_t m_flx_active_version = 0;
176
    int64_t m_flx_last_seen_version = 0;
177
    int64_t m_flx_pending_mark_version = 0;
178
    std::unique_ptr<PendingBootstrapStore> m_flx_pending_bootstrap_store;
179

180
    std::shared_ptr<MigrationStore> m_migration_store;
181

182
    // Set to true when this session wrapper is actualized (i.e. the wrapped
183
    // session is created), or when the wrapper is finalized before actualization.
184
    // It is then never modified again.
185
    //
186
    // Actualization is scheduled during the construction of SessionWrapper, and
187
    // so a session specific post handler will always find that `m_actualized`
188
    // is true as the handler will always be run after the actualization job.
189
    // This holds even if the wrapper is finalized or closed before actualization.
190
    bool m_actualized = false;
191

192
    // Set to true when session deactivation is begun, either via force_close()
193
    // or finalize().
194
    bool m_closed = false;
195

196
    // Set to true in on_suspended() and then false in on_resumed(). Used to
197
    // suppress spurious connection state and error reporting while the session
198
    // is already in an error state.
199
    bool m_suspended = false;
200

201
    // Set when the session has been abandoned. After this point none of the
202
    // public API functions should be called again.
203
    bool m_abandoned = false;
204
    // Has the SessionWrapper been finalized?
205
    bool m_finalized = false;
206

207
    // Set to true when the first DOWNLOAD message is received to indicate that
208
    // the byte-level download progress parameters can be considered reasonable
209
    // reliable. Before that, a lot of time may have passed, so our record of
210
    // the download progress is likely completely out of date.
211
    bool m_reliable_download_progress = false;
212

213
    // Set to point to an activated session object during actualization of the
214
    // session wrapper. Set to null during finalization of the session
215
    // wrapper. Both modifications are guaranteed to be performed by the event
216
    // loop thread.
217
    //
218
    // If a session specific post handler, that is submitted after the
219
    // initiation of the session wrapper, sees that `m_sess` is null, it can
220
    // conclude that the session wrapper has either been force closed or has
221
    // been both abandoned and finalized.
222
    //
223
    // Must only be accessed from the event loop thread.
224
    SessionImpl* m_sess = nullptr;
225

226
    // These must only be accessed from the event loop thread.
227
    std::vector<WaitOperCompletionHandler> m_upload_completion_handlers;
228
    std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
229
    std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
230

231
    version_type m_upload_completion_requested_version = -1;
232

233
    void on_download_completion();
234
    void on_suspended(const SessionErrorInfo& error_info);
235
    void on_resumed();
236
    void on_connection_state_changed(ConnectionState, const std::optional<SessionErrorInfo>&);
237
    void on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state);
238
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
239
    void on_flx_sync_version_complete(int64_t version);
240

241
    void init_progress_handler();
242
    void check_progress();
243
    void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
244
    void report_upload_completion(version_type);
245

246
    friend class SessionWrapperStack;
19,884✔
247
    friend class ClientImpl::Session;
19,884✔
248
};
19,884✔
249

250

251
// ################ SessionWrapperStack ################
252

20,248✔
253
inline bool SessionWrapperStack::empty() const noexcept
20,248✔
254
{
20,248✔
255
    return !m_back;
20,248✔
256
}
20,248✔
257

258

259
inline void SessionWrapperStack::push(util::bind_ptr<SessionWrapper> w) noexcept
260
{
59,734✔
261
    REALM_ASSERT(!w->m_next);
59,734✔
262
    w->m_next = m_back;
59,734✔
263
    m_back = w.release();
20,186✔
264
}
20,186✔
265

20,186✔
266

59,734✔
267
inline util::bind_ptr<SessionWrapper> SessionWrapperStack::pop() noexcept
59,734✔
268
{
269
    util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
270
    if (m_back) {
271
        m_back = m_back->m_next;
19,884✔
272
        w->m_next = nullptr;
19,884✔
UNCOV
273
    }
×
UNCOV
274
    return w;
×
UNCOV
275
}
×
276

19,884✔
277

278
inline void SessionWrapperStack::clear() noexcept
279
{
280
    while (m_back) {
10,152✔
281
        util::bind_ptr<SessionWrapper> w{m_back, util::bind_ptr_base::adopt_tag{}};
10,152✔
282
        m_back = w->m_next;
10,312✔
283
    }
160✔
284
}
160✔
285

10,152✔
286

10,092✔
287
inline bool SessionWrapperStack::erase(SessionWrapper* w) noexcept
10,092✔
288
{
60✔
289
    SessionWrapper** p = &m_back;
60✔
290
    while (*p && *p != w) {
60✔
291
        p = &(*p)->m_next;
10,152✔
292
    }
293
    if (!*p) {
294
        return false;
295
    }
19,884✔
296
    *p = w->m_next;
19,884✔
297
    util::bind_ptr<SessionWrapper>{w, util::bind_ptr_base::adopt_tag{}};
19,884✔
298
    return true;
299
}
300

301

302
SessionWrapperStack::~SessionWrapperStack()
303
{
9,942✔
304
    clear();
305
}
306

307

9,942✔
308
// ################ ClientImpl ################
309

310
ClientImpl::~ClientImpl()
9,942✔
311
{
9,942✔
312
    // Since no other thread is allowed to be accessing this client or any of
9,942✔
313
    // its subobjects at this time, no mutex locking is necessary.
9,942✔
314

315
    shutdown_and_wait();
316
    // Session wrappers are removed from m_unactualized_session_wrappers as they
317
    // are abandoned.
1,860✔
318
    REALM_ASSERT(m_stopped);
319
    REALM_ASSERT(m_unactualized_session_wrappers.empty());
1,860✔
320
    REALM_ASSERT(m_abandoned_session_wrappers.empty());
1,860✔
321
}
1,860✔
322

1,860✔
UNCOV
323

×
UNCOV
324
void ClientImpl::cancel_reconnect_delay()
×
UNCOV
325
{
×
UNCOV
326
    // Thread safety required
×
UNCOV
327
    post([this] {
×
UNCOV
328
        for (auto& p : m_server_slots) {
×
UNCOV
329
            ServerSlot& slot = p.second;
×
330
            if (m_one_connection_per_session) {
1,860✔
331
                REALM_ASSERT(!slot.connection);
1,860✔
332
                for (const auto& p : slot.alt_connections) {
1,860✔
333
                    ClientImpl::Connection& conn = *p.second;
1,856✔
334
                    conn.resume_active_sessions(); // Throws
1,856✔
335
                    conn.cancel_reconnect_delay(); // Throws
1,856✔
336
                }
1,856✔
337
            }
4✔
338
            else {
4✔
339
                REALM_ASSERT(slot.alt_connections.empty());
4✔
340
                if (slot.connection) {
1,860✔
341
                    ClientImpl::Connection& conn = *slot.connection;
1,860✔
342
                    conn.resume_active_sessions(); // Throws
1,860✔
343
                    conn.cancel_reconnect_delay(); // Throws
1,860✔
344
                }
345
                else {
346
                    slot.reconnect_info.reset();
347
                }
12✔
348
            }
12✔
349
        }
12✔
350
    }); // Throws
12✔
351
}
12✔
352

12✔
353

12✔
UNCOV
354
void ClientImpl::voluntary_disconnect_all_connections()
×
UNCOV
355
{
×
UNCOV
356
    auto done_pf = util::make_promise_future<void>();
×
UNCOV
357
    post([this, promise = std::move(done_pf.promise)]() mutable {
×
UNCOV
358
        try {
×
359
            for (auto& p : m_server_slots) {
12✔
360
                ServerSlot& slot = p.second;
12✔
361
                if (m_one_connection_per_session) {
12✔
362
                    REALM_ASSERT(!slot.connection);
12✔
363
                    for (const auto& [_, conn] : slot.alt_connections) {
12✔
364
                        conn->voluntary_disconnect();
12✔
365
                    }
12✔
366
                }
12✔
367
                else {
12✔
UNCOV
368
                    REALM_ASSERT(slot.alt_connections.empty());
×
UNCOV
369
                    if (slot.connection) {
×
UNCOV
370
                        slot.connection->voluntary_disconnect();
×
371
                    }
12✔
372
                }
12✔
373
            }
12✔
374
        }
12✔
375
        catch (...) {
376
            promise.set_error(exception_to_status());
377
            return;
378
        }
9,576✔
379
        promise.emplace_value();
380
    });
381
    done_pf.future.get();
9,576✔
382
}
9,576✔
383

9,576✔
384

9,576✔
385
bool ClientImpl::wait_for_session_terminations_or_client_stopped()
386
{
387
    // Thread safety required
388

389
    {
390
        util::CheckedLockGuard lock{m_mutex};
391
        m_sessions_terminated = false;
392
    }
393

394
    // The technique employed here relies on the fact that
395
    // actualize_and_finalize_session_wrappers() must get to execute at least
396
    // once before the post handler submitted below gets to execute, but still
397
    // at a time where all session wrappers, that are abandoned prior to the
398
    // execution of wait_for_session_terminations_or_client_stopped(), have been
399
    // added to `m_abandoned_session_wrappers`.
400
    //
401
    // To see that this is the case, consider a session wrapper that was
402
    // abandoned before wait_for_session_terminations_or_client_stopped() was
403
    // invoked. Then the session wrapper will have been added to
9,576✔
404
    // `m_abandoned_session_wrappers`, and an invocation of
9,576✔
405
    // actualize_and_finalize_session_wrappers() will have been scheduled. The
9,576✔
406
    // guarantees mentioned in the documentation of Trigger then ensure
9,576✔
407
    // that at least one execution of actualize_and_finalize_session_wrappers()
9,576✔
408
    // will happen after the session wrapper has been added to
9,576✔
409
    // `m_abandoned_session_wrappers`, but before the post handler submitted
9,576✔
410
    // below gets to execute.
411
    post([this] {
9,576✔
412
        {
9,576✔
413
            util::CheckedLockGuard lock{m_mutex};
9,576✔
414
            m_sessions_terminated = true;
19,148✔
415
        }
19,148✔
416
        m_wait_or_client_stopped_cond.notify_all();
19,148✔
417
    }); // Throws
9,576✔
418

9,576✔
419
    bool completion_condition_was_satisfied;
9,576✔
420
    {
9,576✔
421
        util::CheckedUniqueLock lock{m_mutex};
422
        m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_mutex) {
423
            return m_sessions_terminated || m_stopped;
424
        });
425
        completion_condition_was_satisfied = !m_stopped;
56✔
426
    }
56✔
427
    return completion_condition_was_satisfied;
56✔
428
}
429

56✔
UNCOV
430

×
UNCOV
431
// This relies on the same assumptions and guarantees as wait_for_session_terminations_or_client_stopped().
×
UNCOV
432
util::Future<void> ClientImpl::notify_session_terminated()
×
433
{
434
    auto pf = util::make_promise_future<void>();
56✔
435
    post([promise = std::move(pf.promise)](Status status) mutable {
56✔
436
        // Includes operation_aborted
437
        if (!status.is_ok()) {
56✔
438
            promise.set_error(status);
56✔
439
            return;
440
        }
441

9,942✔
442
        promise.emplace_value();
9,942✔
443
    });
9,942✔
444

9,942✔
445
    return std::move(pf.future);
9,942✔
446
}
9,942✔
447

448
void ClientImpl::drain_connections_on_loop()
449
{
10,714✔
450
    post([this](Status status) {
10,714✔
451
        REALM_ASSERT(status.is_ok());
10,714✔
452
        drain_connections();
10,714✔
453
    });
772✔
454
}
772✔
455

456
void ClientImpl::shutdown_and_wait()
9,942✔
457
{
15,848✔
458
    shutdown();
15,848✔
459
    util::CheckedUniqueLock lock{m_drain_mutex};
15,848✔
460
    if (m_drained) {
461
        return;
9,942✔
462
    }
9,942✔
463

464
    logger.debug("Waiting for %1 connections to drain", m_num_connections);
465
    m_drain_cv.wait(lock.native_handle(), [&]() REQUIRES(m_drain_mutex) {
20,738✔
466
        return m_num_connections == 0 && m_outstanding_posts == 0;
20,738✔
467
    });
20,738✔
468

20,738✔
469
    m_drained = true;
10,796✔
470
}
9,942✔
471

9,942✔
UNCOV
472
void ClientImpl::shutdown() noexcept
×
473
{
474
    {
9,942✔
475
        util::CheckedLockGuard lock{m_mutex};
9,942✔
476
        if (m_stopped)
477
            return;
478
        m_stopped = true;
479
    }
10,152✔
480
    m_wait_or_client_stopped_cond.notify_all();
481

10,152✔
482
    drain_connections_on_loop();
10,152✔
483
}
484

485

10,152✔
UNCOV
486
void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
×
UNCOV
487
{
×
UNCOV
488
    // Thread safety required.
×
489
    {
490
        util::CheckedLockGuard lock{m_mutex};
10,152✔
491
        // We can't actualize the session if we've already been stopped, so
10,152✔
492
        // just finalize it immediately.
10,152✔
UNCOV
493
        if (m_stopped) {
×
494
            wrapper->finalize_before_actualization();
10,152✔
495
            return;
496
        }
497

498
        REALM_ASSERT(m_actualize_and_finalize);
10,154✔
499
        m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
500
    }
10,154✔
501
    m_actualize_and_finalize->trigger();
10,154✔
502
}
10,154✔
503

504

505
void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper> wrapper) noexcept
10,154✔
506
{
4✔
507
    // Thread safety required.
508
    {
509
        util::CheckedLockGuard lock{m_mutex};
510
        REALM_ASSERT(m_actualize_and_finalize);
511
        // The wrapper may have already been finalized before being abandoned
512
        // if we were stopped when it was created.
10,150✔
513
        if (wrapper->mark_abandoned())
60✔
514
            return;
60✔
515

60✔
516
        // If the session wrapper has not yet been actualized (on the event loop
10,090✔
517
        // thread), it can be immediately finalized. This ensures that we will
10,090✔
UNCOV
518
        // generally not actualize a session wrapper that has already been
×
519
        // abandoned.
10,090✔
520
        if (m_unactualized_session_wrappers.erase(wrapper.get())) {
521
            wrapper->finalize_before_actualization();
522
            return;
523
        }
524
        m_abandoned_session_wrappers.push(std::move(wrapper));
14,734✔
525
    }
526
    m_actualize_and_finalize->trigger();
527
}
528

529

530
// Must be called from the event loop thread.
531
void ClientImpl::actualize_and_finalize_session_wrappers()
532
{
533
    // We need to pop from the wrapper stacks while holding the lock to ensure
534
    // that all updates to `SessionWrapper:m_next` are thread-safe, but then
535
    // release the lock before finalizing or actualizing because those functions
536
    // invoke user callbacks which may try to access the client and reacquire
34,918✔
537
    // the lock.
34,912✔
538
    //
34,912✔
539
    // Finalization must always happen before actualization because we may be
34,912✔
540
    // finalizing and actualizing sessions for the same Realm file, and
34,912✔
541
    // actualizing first would result in overlapping sessions. Because we're
34,912✔
542
    // releasing the lock new sessions may come in as we're looping, so we need
34,912✔
543
    // a single loop that checks both fields.
34,912✔
544
    while (true) {
24,822✔
545
        bool finalize = true;
24,822✔
546
        bool stopped;
24,822✔
547
        util::bind_ptr<SessionWrapper> wrapper;
34,912✔
548
        {
34,912✔
549
            util::CheckedLockGuard lock{m_mutex};
34,912✔
550
            wrapper = m_abandoned_session_wrappers.pop();
14,728✔
551
            if (!wrapper) {
20,184✔
552
                wrapper = m_unactualized_session_wrappers.pop();
10,092✔
553
                finalize = false;
10,092✔
554
            }
4✔
555
            stopped = m_stopped;
10,088✔
556
        }
10,088✔
557
        if (!wrapper)
20,184✔
558
            break;
14,734✔
559
        if (finalize)
560
            wrapper->finalize(); // Throws
561
        else if (stopped)
562
            wrapper->finalize_before_actualization();
563
        else
564
            wrapper->actualize(); // Throws
565
    }
566
}
567

568

10,084✔
569
ClientImpl::Connection& ClientImpl::get_connection(ServerEndpoint endpoint,
10,084✔
570
                                                   const std::string& authorization_header_name,
10,084✔
571
                                                   const std::map<std::string, std::string>& custom_http_headers,
10,084✔
572
                                                   bool verify_servers_ssl_certificate,
573
                                                   Optional<std::string> ssl_trust_certificate_path,
574
                                                   std::function<SyncConfig::SSLVerifyCallback> ssl_verify_callback,
10,084✔
575
                                                   Optional<ProxyConfig> proxy_config, bool& was_created)
576
{
7,282✔
577
    auto&& [server_slot_it, inserted] =
7,282✔
578
        m_server_slots.try_emplace(endpoint, ReconnectInfo(m_reconnect_mode, m_reconnect_backoff_info, get_random()));
7,282✔
579
    ServerSlot& server_slot = server_slot_it->second; // Throws
580

581
    // TODO: enable multiplexing with proxies
2,802✔
582
    if (server_slot.connection && !m_one_connection_per_session && !proxy_config) {
2,802✔
583
        // Use preexisting connection
2,802✔
584
        REALM_ASSERT(server_slot.alt_connections.empty());
2,802✔
585
        return *server_slot.connection;
2,802✔
586
    }
2,802✔
587

2,802✔
588
    // Create a new connection
2,802✔
589
    REALM_ASSERT(!server_slot.connection);
2,790✔
590
    connection_ident_type ident = m_prev_connection_ident + 1;
2,790✔
591
    std::unique_ptr<ClientImpl::Connection> conn_2 = std::make_unique<ClientImpl::Connection>(
12✔
592
        *this, ident, std::move(endpoint), authorization_header_name, custom_http_headers,
12✔
593
        verify_servers_ssl_certificate, std::move(ssl_trust_certificate_path), std::move(ssl_verify_callback),
12✔
594
        std::move(proxy_config), server_slot.reconnect_info); // Throws
2,802✔
595
    ClientImpl::Connection& conn = *conn_2;
2,802✔
596
    if (!m_one_connection_per_session) {
2,802✔
597
        server_slot.connection = std::move(conn_2);
2,802✔
598
    }
2,802✔
599
    else {
2,802✔
600
        server_slot.alt_connections[ident] = std::move(conn_2); // Throws
2,802✔
601
    }
10,084✔
602
    m_prev_connection_ident = ident;
603
    was_created = true;
604
    {
605
        util::CheckedLockGuard lk(m_drain_mutex);
2,806✔
606
        ++m_num_connections;
2,806✔
607
    }
2,806✔
608
    return conn;
2,806✔
609
}
2,806✔
610

2,806✔
611

2,792✔
612
void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept
2,792✔
613
{
2,792✔
614
    const ServerEndpoint& endpoint = conn.get_server_endpoint();
2,792✔
615
    auto i = m_server_slots.find(endpoint);
2,792✔
616
    REALM_ASSERT(i != m_server_slots.end()); // Must be found
14✔
617
    ServerSlot& server_slot = i->second;
14✔
618
    if (!m_one_connection_per_session) {
14✔
619
        REALM_ASSERT(server_slot.alt_connections.empty());
14✔
620
        REALM_ASSERT(&*server_slot.connection == &conn);
14✔
621
        server_slot.reconnect_info = conn.get_reconnect_info();
14✔
622
        server_slot.connection.reset();
14✔
623
    }
14✔
624
    else {
625
        REALM_ASSERT(!server_slot.connection);
2,806✔
626
        connection_ident_type ident = conn.get_ident();
2,806✔
627
        auto j = server_slot.alt_connections.find(ident);
2,806✔
628
        REALM_ASSERT(j != server_slot.alt_connections.end()); // Must be found
2,806✔
629
        REALM_ASSERT(&*j->second == &conn);
2,806✔
630
        server_slot.alt_connections.erase(j);
2,806✔
631
    }
2,806✔
632

2,192✔
633
    bool notify;
2,192✔
634
    {
2,806✔
635
        util::CheckedLockGuard lk(m_drain_mutex);
636
        REALM_ASSERT(m_num_connections);
637
        notify = --m_num_connections <= 0;
638
    }
639
    if (notify) {
640
        m_drain_cv.notify_all();
102✔
641
    }
642
}
102!
643

102✔
644

102✔
645
// ################ SessionImpl ################
102✔
646

647
void SessionImpl::force_close()
648
{
649
    // Allow force_close() if session is active or hasn't been activated yet.
11,894✔
650
    if (m_state == SessionImpl::Active || m_state == SessionImpl::Unactivated) {
651
        m_wrapper.force_close();
11,894✔
652
    }
11,894✔
653
}
11,894✔
654

11,894✔
655
void SessionImpl::on_connection_state_changed(ConnectionState state,
656
                                              const std::optional<SessionErrorInfo>& error_info)
657
{
658
    // Only used to report errors back to the SyncSession while the Session is active
7,292✔
659
    if (m_state == SessionImpl::Active) {
660
        m_wrapper.on_connection_state_changed(state, error_info); // Throws
7,292!
661
    }
7,292✔
662
}
7,292✔
663

664

665
const std::string& SessionImpl::get_virt_path() const noexcept
10,384✔
666
{
667
    // Can only be called if the session is active or being activated
10,384✔
668
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
10,384✔
669
    return m_wrapper.m_virt_path;
10,384✔
670
}
671

672
const std::string& SessionImpl::get_realm_path() const noexcept
26,248✔
673
{
674
    // Can only be called if the session is active or being activated
26,248✔
675
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
26,248✔
676
    return m_wrapper.m_db->get_path();
26,248✔
677
}
678

679
DBRef SessionImpl::get_db() const noexcept
119,850✔
680
{
681
    // Can only be called if the session is active or being activated
119,850✔
682
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
119,850✔
683
    return m_wrapper.m_db;
119,850✔
684
}
685

686
ClientReplication& SessionImpl::get_repl() const noexcept
117,834✔
687
{
117,834✔
688
    // Can only be called if the session is active or being activated
117,834✔
689
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
690
    return m_wrapper.get_replication();
691
}
17,544✔
692

693
ClientHistory& SessionImpl::get_history() const noexcept
17,544✔
694
{
17,544✔
695
    return get_repl().get_history();
17,544✔
696
}
697

698
std::optional<ClientReset>& SessionImpl::get_client_reset_config() noexcept
1,540✔
699
{
700
    // Can only be called if the session is active or being activated
1,540!
701
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,540✔
702
    return m_wrapper.m_client_reset_config;
1,540✔
703
}
704

705
SessionReason SessionImpl::get_session_reason() noexcept
1,540✔
706
{
707
    // Can only be called if the session is active or being activated
1,540✔
708
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
1,540✔
709
    return m_wrapper.m_session_reason;
1,540✔
710
}
711

712
uint64_t SessionImpl::get_schema_version() noexcept
713
{
45,618✔
714
    // Can only be called if the session is active or being activated
715
    REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
45,618✔
UNCOV
716
    return m_wrapper.m_schema_version;
×
UNCOV
717
}
×
718

719
void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
45,618✔
720
                                                const SyncProgress& progress, const ReceivedChangesets& changesets)
45,618!
721
{
45,618✔
UNCOV
722
    // Ignore the call if the session is not active
×
UNCOV
723
    if (m_state != State::Active) {
×
724
        return;
45,618✔
725
    }
45,618✔
726

45,618✔
727
    try {
45,618✔
728
        bool simulate_integration_error = (m_wrapper.m_simulate_integration_error && !changesets.empty());
45,618✔
729
        if (simulate_integration_error) {
45,618✔
730
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
×
731
        }
UNCOV
732
        version_type client_version;
×
UNCOV
733
        if (REALM_LIKELY(!get_client().is_dry_run())) {
×
734
            VersionInfo version_info;
45,618✔
735
            integrate_changesets(progress, downloadable_bytes, changesets, version_info, batch_state); // Throws
45,618✔
736
            client_version = version_info.realm_version;
45,618✔
737
        }
24✔
738
        else {
24✔
739
            // Fake it for "dry run" mode
45,618✔
740
            client_version = m_last_version_available + 1;
741
        }
742
        on_changesets_integrated(client_version, progress); // Throws
743
    }
16,246✔
744
    catch (const IntegrationException& e) {
745
        on_integration_failure(e);
16,246✔
746
    }
16,246✔
747
}
16,246✔
748

16,246✔
749

750
void SessionImpl::on_download_completion()
751
{
752
    // Ignore the call if the session is not active
708✔
753
    if (m_state == State::Active) {
754
        m_wrapper.on_download_completion(); // Throws
708✔
755
    }
708✔
756
}
708✔
757

708✔
758

759
void SessionImpl::on_suspended(const SessionErrorInfo& error_info)
760
{
761
    // Ignore the call if the session is not active
94✔
762
    if (m_state == State::Active) {
763
        m_wrapper.on_suspended(error_info); // Throws
94✔
764
    }
94✔
765
}
94✔
766

94✔
767

768
void SessionImpl::on_resumed()
769
{
10,384✔
770
    // Ignore the call if the session is not active
771
    if (m_state == State::Active) {
10,384✔
772
        m_wrapper.on_resumed(); // Throws
10,384✔
773
    }
10,384✔
774
}
10,384✔
775

776
void SessionImpl::handle_pending_client_reset_acknowledgement()
777
{
778
    // Ignore the call if the session is not active
47,874✔
779
    if (m_state == State::Active) {
780
        m_wrapper.handle_pending_client_reset_acknowledgement();
47,874✔
781
    }
45,618✔
782
}
45,618✔
783

784
void SessionImpl::update_subscription_version_info()
2,256✔
785
{
2,256✔
786
    // Ignore the call if the session is not active
2,256✔
787
    if (m_state == State::Active) {
2,004✔
788
        m_wrapper.update_subscription_version_info();
2,004✔
789
    }
790
}
2,256✔
791

2,256✔
792
bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
2,256✔
793
                                                int64_t query_version, const ReceivedChangesets& received_changesets)
2,256✔
UNCOV
794
{
×
UNCOV
795
    // Ignore the call if the session is not active
×
UNCOV
796
    if (m_state != State::Active) {
×
797
        return false;
×
798
    }
×
UNCOV
799

×
UNCOV
800
    if (is_steady_state_download_message(batch_state, query_version)) {
×
UNCOV
801
        return false;
×
UNCOV
802
    }
×
803

804
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
2,256✔
805
    std::optional<SyncProgress> maybe_progress;
2,256✔
806
    if (batch_state == DownloadBatchState::LastInBatch) {
2,256✔
807
        maybe_progress = progress;
12✔
808
    }
12✔
809

2,244✔
810
    bool new_batch = false;
811
    try {
2,244✔
812
        bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
248✔
813
    }
248✔
814
    catch (const LogicError& ex) {
815
        if (ex.code() == ErrorCodes::LimitExceeded) {
1,996✔
816
            IntegrationException ex(ErrorCodes::LimitExceeded,
1,996✔
817
                                    "bootstrap changeset too large to store in pending bootstrap store",
1,996✔
818
                                    ProtocolError::bad_changeset_size);
1,996✔
819
            on_integration_failure(ex);
12✔
820
            return true;
12✔
821
        }
1,996✔
822
        throw;
×
823
    }
×
824

825
    // If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
1,996✔
826
    // bootstrapping.
1,996✔
827
    if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
828
        on_flx_sync_progress(query_version, DownloadBatchState::MoreToCome);
829
    }
830

12,082✔
831
    auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, progress, query_version,
832
                                       batch_state, received_changesets.size());
12,082✔
833
    if (hook_action == SyncClientHookAction::EarlyReturn) {
8,556✔
834
        return true;
8,556✔
835
    }
836
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
3,526✔
837

3,526✔
838
    if (batch_state == DownloadBatchState::MoreToCome) {
3,526✔
839
        return true;
1,510✔
840
    }
1,510✔
841

842
    try {
2,016✔
843
        process_pending_flx_bootstrap();
2,016✔
844
    }
2,016✔
845
    catch (const IntegrationException& e) {
2,016✔
846
        on_integration_failure(e);
2,016✔
847
    }
2,016✔
848
    catch (...) {
2,016✔
849
        on_integration_failure(IntegrationException(exception_to_status()));
2,016✔
850
    }
2,016✔
851

2,016✔
852
    return true;
853
}
854

2,016✔
855

4,172✔
856
void SessionImpl::process_pending_flx_bootstrap()
2,168✔
857
{
2,168✔
858
    // Ignore the call if not a flx session or session is not active
2,168✔
859
    if (!m_is_flx_sync_session || m_state != State::Active) {
8✔
860
        return;
8✔
861
    }
8✔
862
    // Should never be called if session is not active
8✔
863
    REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
8✔
864
    auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
865
    if (!bootstrap_store->has_pending()) {
2,160✔
866
        return;
2,160✔
867
    }
2,160✔
868

2,160✔
869
    auto pending_batch_stats = bootstrap_store->pending_stats();
2,160✔
870
    logger.info("Begin processing pending FLX bootstrap for query version %1. (changesets: %2, original total "
2,160✔
871
                "changeset size: %3)",
2,160✔
872
                pending_batch_stats.query_version, pending_batch_stats.pending_changesets,
4✔
873
                pending_batch_stats.pending_changeset_bytes);
4✔
874
    auto& history = get_repl().get_history();
875
    VersionInfo new_version;
2,156✔
876
    SyncProgress progress;
2,156✔
877
    int64_t query_version = -1;
878
    size_t changesets_processed = 0;
2,156✔
879

2,156✔
880
    // Used to commit each batch after it was transformed.
2,156✔
881
    TransactionRef transact = get_db()->start_write();
2,144✔
882
    while (bootstrap_store->has_pending()) {
2,144✔
883
        auto start_time = std::chrono::steady_clock::now();
2,144✔
884
        auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
2,156✔
885
        if (!pending_batch.progress) {
2,156✔
886
            logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
2,156✔
887
            // Close the write transation before clearing the bootstrap store to avoid a deadlock because the
888
            // bootstrap store requires a write transaction itself.
2,156✔
889
            transact->close();
2,156✔
890
            bootstrap_store->clear();
2,156✔
891
            return;
892
        }
2,156✔
893

2,156✔
894
        auto batch_state =
2,156✔
895
            pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
2,156✔
896
        uint64_t downloadable_bytes = 0;
2,156✔
897
        query_version = pending_batch.query_version;
2,156✔
898
        bool simulate_integration_error =
899
            (m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
2,004✔
900
        if (simulate_integration_error) {
901
            throw IntegrationException(ErrorCodes::BadChangeset, "simulated failure", ProtocolError::bad_changeset);
2,004✔
902
        }
2,004✔
903

2,004✔
904
        call_debug_hook(SyncClientHookEvent::BootstrapBatchAboutToProcess, *pending_batch.progress, query_version,
905
                        batch_state, pending_batch.changesets.size());
2,004✔
906

2,004✔
907
        history.integrate_server_changesets(
908
            *pending_batch.progress, downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
909
            transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
20✔
910
                REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
911
                bootstrap_store->pop_front_pending(tr, changesets_applied.size());
20✔
912
            });
20✔
913
        progress = *pending_batch.progress;
20✔
914
        changesets_processed += pending_batch.changesets.size();
20✔
915
        auto duration = std::chrono::steady_clock::now() - start_time;
916

917
        auto action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
19,182✔
918
                                      batch_state, pending_batch.changesets.size());
919
        REALM_ASSERT_EX(action == SyncClientHookAction::NoAction, action);
19,182✔
920

19,182✔
921
        logger.info("Integrated %1 changesets from pending bootstrap for query version %2, producing client version "
19,182✔
922
                    "%3 in %4 ms. %5 changesets remaining in bootstrap",
923
                    pending_batch.changesets.size(), pending_batch.query_version, new_version.realm_version,
924
                    std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
68,108✔
925
                    pending_batch.remaining_changesets);
926
    }
68,108✔
927

68,108✔
928
    REALM_ASSERT_3(query_version, !=, -1);
68,108✔
929
    on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
930

931
    on_changesets_integrated(new_version.realm_version, progress);
3,688✔
932
    auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
933
                                  DownloadBatchState::LastInBatch, changesets_processed);
3,688✔
934
    // NoAction/EarlyReturn are both valid no-op actions to take here.
935
    REALM_ASSERT_EX(action == SyncClientHookAction::NoAction || action == SyncClientHookAction::EarlyReturn, action);
936
}
3,688✔
937

24✔
938
void SessionImpl::on_flx_sync_error(int64_t version, std::string_view err_msg)
24✔
939
{
3,664✔
940
    // Ignore the call if the session is not active
3,664✔
941
    if (m_state == State::Active) {
3,664✔
942
        m_wrapper.on_flx_sync_error(version, err_msg);
3,664✔
943
    }
944
}
3,664✔
945

3,664✔
946
void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch_state)
12✔
947
{
12✔
948
    // Ignore the call if the session is not active
12✔
949
    if (m_state == State::Active) {
950
        m_wrapper.on_flx_sync_progress(version, batch_state);
12✔
951
    }
12✔
952
}
12✔
UNCOV
953

×
954
SubscriptionStore* SessionImpl::get_flx_subscription_store()
24✔
955
{
24✔
956
    // Should never be called if session is not active
24✔
UNCOV
957
    REALM_ASSERT_EX(m_state == State::Active, m_state);
×
958
    return m_wrapper.get_flx_subscription_store();
3,620✔
959
}
3,620✔
960

3,664✔
961
MigrationStore* SessionImpl::get_migration_store()
3,664✔
962
{
963
    // Should never be called if session is not active
964
    REALM_ASSERT_EX(m_state == State::Active, m_state);
965
    return m_wrapper.get_migration_store();
966
}
102,052✔
967

102,052✔
968
void SessionImpl::on_flx_sync_version_complete(int64_t version)
100,034✔
969
{
100,034✔
970
    // Ignore the call if the session is not active
2,018✔
UNCOV
971
    if (m_state == State::Active) {
×
UNCOV
972
        m_wrapper.on_flx_sync_version_complete(version);
×
973
    }
974
}
2,018✔
975

2,018✔
976
SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
2,018✔
977
{
2,018✔
978
    // Should never be called if session is not active
2,018✔
979
    REALM_ASSERT_EX(m_state == State::Active, m_state);
2,018✔
980

981
    // Make sure we don't call the debug hook recursively.
2,018✔
982
    if (m_wrapper.m_in_debug_hook) {
2,018✔
983
        return SyncClientHookAction::NoAction;
984
    }
985
    m_wrapper.m_in_debug_hook = true;
39,796✔
986
    auto in_hook_guard = util::make_scope_exit([&]() noexcept {
39,796✔
987
        m_wrapper.m_in_debug_hook = false;
38,126✔
988
    });
38,126✔
989

1,670✔
UNCOV
990
    auto action = m_wrapper.m_debug_hook(data);
×
UNCOV
991
    switch (action) {
×
992
        case realm::SyncClientHookAction::SuspendWithRetryableError: {
993
            SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
1,670✔
994
            err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
1,670✔
995

1,670✔
996
            auto err_processing_err = receive_error_message(err_info);
1,670✔
997
            REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
1,670✔
998
            return SyncClientHookAction::EarlyReturn;
1,670✔
999
        }
1,670✔
1000
        case realm::SyncClientHookAction::TriggerReconnect: {
1001
            get_connection().voluntary_disconnect();
1,670✔
1002
            return SyncClientHookAction::EarlyReturn;
1,670✔
1003
        }
1004
        default:
1005
            return action;
10,384✔
1006
    }
10,384✔
1007
}
10,384✔
1008

10,384✔
1009
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const SyncProgress& progress,
1010
                                                  int64_t query_version, DownloadBatchState batch_state,
1011
                                                  size_t num_changesets)
46,220✔
1012
{
46,220✔
1013
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
46,220✔
1014
        return SyncClientHookAction::NoAction;
1015
    }
1016
    if (REALM_UNLIKELY(m_state != State::Active)) {
68✔
1017
        return SyncClientHookAction::NoAction;
68✔
1018
    }
×
UNCOV
1019

×
1020
    SyncClientHookData data;
1021
    data.event = event;
68✔
1022
    data.batch_state = batch_state;
68✔
1023
    data.progress = progress;
68✔
1024
    data.num_changesets = num_changesets;
4✔
1025
    data.query_version = query_version;
4✔
1026

4✔
1027
    return call_debug_hook(data);
64✔
UNCOV
1028
}
×
UNCOV
1029

×
1030
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
64✔
1031
{
68✔
1032
    if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
4✔
1033
        return SyncClientHookAction::NoAction;
4✔
1034
    }
1035
    if (REALM_UNLIKELY(m_state != State::Active)) {
60✔
1036
        return SyncClientHookAction::NoAction;
60✔
1037
    }
1038

60✔
UNCOV
1039
    SyncClientHookData data;
×
UNCOV
1040
    data.event = event;
×
UNCOV
1041
    data.batch_state = DownloadBatchState::SteadyState;
×
1042
    data.progress = m_progress;
1043
    data.num_changesets = 0;
60✔
1044
    data.query_version = 0;
60✔
1045
    data.error_info = &error_info;
60✔
1046

60✔
1047
    return call_debug_hook(data);
1048
}
60✔
1049

68✔
1050
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
1051
{
1052
    return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
1053
}
1054

1055
bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
1056
{
1057
    // Should never be called if session is not active
1058
    REALM_ASSERT_EX(m_state == State::Active, m_state);
4,906✔
1059
    if (batch_state == DownloadBatchState::SteadyState) {
4,906✔
1060
        return true;
4,906✔
1061
    }
4,906✔
1062

4,906✔
1063
    if (!m_is_flx_sync_session) {
4,906✔
1064
        return true;
4,906✔
1065
    }
4,906✔
1066

4,906✔
1067
    // If this is a steady state DOWNLOAD, no need for special handling.
4,906✔
1068
    if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
4,906✔
1069
        return true;
4,906✔
1070
    }
4,906✔
1071

4,906✔
1072
    return false;
4,906✔
1073
}
4,906✔
1074

4,906✔
1075
void SessionImpl::init_progress_handler()
4,906✔
1076
{
4,906✔
1077
    REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
4,906✔
1078
    m_wrapper.init_progress_handler();
4,906✔
1079
}
4,906✔
1080

4,906✔
1081
void SessionImpl::enable_progress_notifications()
4,906✔
1082
{
4,906✔
1083
    m_wrapper.m_reliable_download_progress = true;
4,906✔
1084
}
4,906✔
1085

4,906✔
1086
util::Future<std::string> SessionImpl::send_test_command(std::string body)
10,156✔
1087
{
10,156✔
1088
    if (m_state != State::Active) {
10,156✔
1089
        return Status{ErrorCodes::RuntimeError, "Cannot send a test command for a session that is not active"};
10,156✔
1090
    }
1091

1092
    try {
1093
        auto json_body = nlohmann::json::parse(body.begin(), body.end());
1094
        if (auto it = json_body.find("command"); it == json_body.end() || !it->is_string()) {
10,156✔
1095
            return Status{ErrorCodes::LogicError,
10,156✔
1096
                          "Must supply command name in \"command\" field of test command json object"};
10,156✔
1097
        }
1098
        if (json_body.size() > 1 && json_body.find("args") == json_body.end()) {
1099
            return Status{ErrorCodes::LogicError, "Only valid fields in a test command are \"command\" and \"args\""};
10,154✔
1100
        }
1101
    }
1102
    catch (const nlohmann::json::parse_error& e) {
1103
        return Status{ErrorCodes::LogicError, util::format("Invalid json input to send_test_command: %1", e.what())};
10,154✔
1104
    }
10,154✔
1105

10,154✔
1106
    auto pf = util::make_promise_future<std::string>();
10,154✔
1107
    get_client().post([this, promise = std::move(pf.promise), body = std::move(body)](Status status) mutable {
10,154✔
1108
        // Includes operation_aborted
10,154✔
1109
        if (!status.is_ok()) {
1110
            promise.set_error(status);
1111
            return;
1112
        }
119,848✔
1113

119,848✔
1114
        auto id = ++m_last_pending_test_command_ident;
119,848✔
1115
        m_pending_test_commands.push_back(PendingTestCommand{id, std::move(body), std::move(promise)});
119,848✔
1116
        ensure_enlisted_to_send();
1117
    });
1118

UNCOV
1119
    return std::move(pf.future);
×
UNCOV
1120
}
×
UNCOV
1121

×
1122
// ################ SessionWrapper ################
1123

UNCOV
1124
// The SessionWrapper class is held by a sync::Session (which is owned by the SyncSession instance) and
×
UNCOV
1125
// provides a link to the ClientImpl::Session that creates and receives messages with the server with
×
UNCOV
1126
// the ClientImpl::Connection that owns the ClientImpl::Session.
×
1127
SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1128
                               std::shared_ptr<MigrationStore> migration_store, Session::Config&& config)
1129
    : m_client{client}
19,182✔
1130
    , m_db(std::move(db))
19,182✔
1131
    , m_replication(m_db->get_replication())
19,182✔
1132
    , m_protocol_envelope{config.protocol_envelope}
19,182✔
1133
    , m_server_address{std::move(config.server_address)}
1134
    , m_server_port{config.server_port}
1135
    , m_server_verified{config.server_verified}
5,782✔
1136
    , m_user_id(std::move(config.user_id))
5,782✔
1137
    , m_sync_mode(flx_sub_store ? SyncServerMode::FLX : SyncServerMode::PBS)
5,782✔
1138
    , m_authorization_header_name{config.authorization_header_name}
5,782✔
1139
    , m_custom_http_headers{std::move(config.custom_http_headers)}
1140
    , m_verify_servers_ssl_certificate{config.verify_servers_ssl_certificate}
1141
    , m_simulate_integration_error{config.simulate_integration_error}
68,110✔
1142
    , m_ssl_trust_certificate_path{std::move(config.ssl_trust_certificate_path)}
68,110✔
1143
    , m_ssl_verify_callback{std::move(config.ssl_verify_callback)}
68,110✔
1144
    , m_flx_bootstrap_batch_size_bytes(config.flx_bootstrap_batch_size_bytes)
68,110✔
1145
    , m_http_request_path_prefix{std::move(config.service_identifier)}
1146
    , m_virt_path{std::move(config.realm_identifier)}
1147
    , m_proxy_config{std::move(config.proxy_config)}
10,156✔
1148
    , m_signed_access_token{std::move(config.signed_user_token)}
10,156✔
1149
    , m_client_reset_config{std::move(config.client_reset_config)}
10,156✔
1150
    , m_progress_handler(std::move(config.progress_handler))
10,156✔
1151
    , m_connection_state_change_listener(std::move(config.connection_state_change_listener))
10,156✔
1152
    , m_debug_hook(std::move(config.on_sync_client_event_hook))
1153
    , m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
1154
    , m_schema_version(config.schema_version)
1155
    , m_flx_subscription_store(std::move(flx_sub_store))
112,850✔
1156
    , m_migration_store(std::move(migration_store))
1157
{
112,850✔
1158
    REALM_ASSERT(m_db);
112,844✔
1159
    REALM_ASSERT(m_db->get_replication());
112,844✔
1160
    REALM_ASSERT(dynamic_cast<ClientReplication*>(m_db->get_replication()));
424✔
1161

112,420✔
1162
    // SessionWrapper begins at +1 retain count because Client retains and
112,420✔
1163
    // releases it while performing async operations, and these need to not
112,420✔
1164
    // take it to 0 or it could be deleted before the caller can retain it.
112,420✔
1165
    bind_ptr();
112,850✔
1166
    m_client.register_unactualized_session_wrapper(this);
1167
}
1168

1169
SessionWrapper::~SessionWrapper() noexcept
28✔
1170
{
1171
    // We begin actualization in the constructor and do not delete the wrapper
1172
    // until both the Client is done with it and the Session has abandoned it,
28✔
1173
    // so at this point we must have actualized, finalized, and been abandoned.
28✔
1174
    REALM_ASSERT(m_actualized);
28✔
UNCOV
1175
    REALM_ASSERT(m_abandoned);
×
UNCOV
1176
    REALM_ASSERT(m_finalized);
×
1177
    REALM_ASSERT(m_closed);
1178
    REALM_ASSERT(!m_db);
28✔
UNCOV
1179
}
×
1180

28✔
1181

28✔
1182
inline ClientReplication& SessionWrapper::get_replication() noexcept
28✔
1183
{
28✔
1184
    REALM_ASSERT(m_db);
28✔
1185
    return static_cast<ClientReplication&>(*m_replication);
28✔
1186
}
1187

1188

1189
inline ClientImpl& SessionWrapper::get_client() noexcept
28,234✔
1190
{
28,234✔
1191
    return m_client;
1192
}
28,234✔
1193

28,234✔
1194
bool SessionWrapper::has_flx_subscription_store() const
28,234✔
1195
{
28,234✔
UNCOV
1196
    return static_cast<bool>(m_flx_subscription_store);
×
UNCOV
1197
}
×
UNCOV
1198

×
1199
void SessionWrapper::on_flx_sync_error(int64_t version, std::string_view err_msg)
28,234✔
1200
{
1201
    REALM_ASSERT(!m_finalized);
188✔
1202
    get_flx_subscription_store()->update_state(version, SubscriptionSet::State::Error, err_msg);
188✔
1203
}
188✔
1204

28,046✔
1205
void SessionWrapper::on_flx_sync_version_complete(int64_t version)
15,424✔
1206
{
15,424✔
1207
    REALM_ASSERT(!m_finalized);
1208
    m_flx_last_seen_version = version;
314✔
1209
    m_flx_active_version = version;
314✔
1210
}
15,110✔
1211

1212
void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state)
15,110✔
1213
{
15,110✔
1214
    if (!has_flx_subscription_store()) {
15,424✔
1215
        return;
12,622✔
1216
    }
1217
    REALM_ASSERT(!m_finalized);
12,622✔
1218
    REALM_ASSERT(new_version >= m_flx_last_seen_version);
12,622✔
1219
    REALM_ASSERT(new_version >= m_flx_active_version);
28,046✔
1220
    REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
28,046✔
1221

15,424✔
1222
    SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy
28,046✔
1223

12,936✔
1224
    switch (batch_state) {
28,046✔
1225
        case DownloadBatchState::SteadyState:
28,234✔
1226
            // Cannot be called with this value.
1227
            REALM_UNREACHABLE();
1228
        case DownloadBatchState::LastInBatch:
1229
            if (m_flx_active_version == new_version) {
12,912✔
1230
                return;
1231
            }
12,912✔
1232
            on_flx_sync_version_complete(new_version);
1233
            if (new_version == 0) {
12,912✔
1234
                new_state = SubscriptionSet::State::Complete;
12,912✔
1235
            }
12,910✔
1236
            else {
12,910✔
1237
                new_state = SubscriptionSet::State::AwaitingMark;
12,912✔
1238
                m_flx_pending_mark_version = new_version;
12,912✔
1239
            }
1240
            break;
1241
        case DownloadBatchState::MoreToCome:
1242
            if (m_flx_last_seen_version == new_version) {
10,000✔
1243
                return;
1244
            }
10,000✔
1245

1246
            m_flx_last_seen_version = new_version;
10,000✔
1247
            new_state = SubscriptionSet::State::Bootstrapping;
10,000✔
1248
            break;
10,000✔
1249
    }
10,000✔
1250

10,000✔
1251
    get_flx_subscription_store()->update_state(new_version, new_state);
10,000✔
1252
}
1253

1254
SubscriptionStore* SessionWrapper::get_flx_subscription_store()
1255
{
216✔
1256
    REALM_ASSERT(!m_finalized);
1257
    return m_flx_subscription_store.get();
216✔
1258
}
1259

216✔
1260
PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store()
216✔
1261
{
216✔
UNCOV
1262
    REALM_ASSERT(!m_finalized);
×
1263
    return m_flx_pending_bootstrap_store.get();
216✔
1264
}
216✔
1265

216✔
1266
MigrationStore* SessionWrapper::get_migration_store()
1267
{
216✔
1268
    REALM_ASSERT(!m_finalized);
216✔
1269
    return m_migration_store.get();
216✔
1270
}
216✔
1271

216✔
1272
inline bool SessionWrapper::mark_abandoned()
1273
{
1274
    REALM_ASSERT(!m_abandoned);
1275
    m_abandoned = true;
10,156✔
1276
    return m_finalized;
10,156✔
1277
}
10,156✔
1278

10,156✔
1279

1280
void SessionWrapper::on_commit(version_type new_version)
1281
{
1282
    // Thread safety required
1283
    m_client.post([self = util::bind_ptr{this}, new_version] {
10,090✔
1284
        REALM_ASSERT(self->m_actualized);
1285
        if (REALM_UNLIKELY(!self->m_sess))
10,090✔
1286
            return; // Already finalized
10,090✔
1287
        SessionImpl& sess = *self->m_sess;
1288
        sess.recognize_sync_version(new_version); // Throws
1289
        self->check_progress();                   // Throws
10,090✔
1290
    });
10,090✔
1291
}
1292

10,090✔
1293

1294
void SessionWrapper::cancel_reconnect_delay()
10,090✔
1295
{
4✔
1296
    // Thread safety required
4✔
1297

1298
    m_client.post([self = util::bind_ptr{this}] {
10,090✔
1299
        REALM_ASSERT(self->m_actualized);
10,090✔
1300
        if (REALM_UNLIKELY(self->m_closed)) {
10,090✔
1301
            return;
×
1302
        }
×
1303

1304
        if (REALM_UNLIKELY(!self->m_sess))
10,090✔
1305
            return; // Already finalized
10,090✔
1306
        SessionImpl& sess = *self->m_sess;
10,090✔
1307
        sess.cancel_resumption_delay(); // Throws
10,090✔
1308
        ClientImpl::Connection& conn = sess.get_connection();
10,090✔
1309
        conn.cancel_reconnect_delay(); // Throws
10,090✔
1310
    });                                // Throws
10,090✔
1311
}
10,090✔
UNCOV
1312

×
UNCOV
1313
void SessionWrapper::async_wait_for(bool upload_completion, bool download_completion,
×
UNCOV
1314
                                    WaitOperCompletionHandler handler)
×
1315
{
1316
    REALM_ASSERT(upload_completion || download_completion);
1317

10,090✔
1318
    m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
10,090✔
1319
                   download_completion](Status status) mutable {
10,090✔
1320
        REALM_ASSERT(self->m_actualized);
1,530✔
1321
        if (!status.is_ok()) {
1,530✔
1322
            handler(status); // Throws
1,530✔
1323
            return;
1324
        }
10,090✔
1325
        if (REALM_UNLIKELY(!self->m_sess)) {
10,090✔
1326
            // Already finalized
10,090✔
UNCOV
1327
            handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
×
UNCOV
1328
            return;
×
1329
        }
10,090✔
1330
        if (upload_completion) {
1331
            self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
10,090✔
1332
            if (download_completion) {
2,798✔
1333
                // Wait for upload and download completion
1334
                self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
10,090✔
1335
            }
10,076✔
1336
            else {
10,076✔
1337
                // Wait for upload completion only
7,098✔
1338
                self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws
7,098✔
1339
            }
6,964✔
1340
        }
7,098✔
1341
        else {
10,076✔
1342
            // Wait for download completion only
1343
            self->m_download_completion_handlers.push_back(std::move(handler)); // Throws
10,090✔
1344
        }
9,710✔
1345
        SessionImpl& sess = *self->m_sess;
10,090✔
1346
        if (upload_completion)
1347
            self->check_progress();
1348
        if (download_completion)
10,194✔
1349
            sess.request_download_completion_notification(); // Throws
10,194✔
1350
    });                                                      // Throws
106✔
1351
}
106✔
1352

10,088✔
1353

10,088✔
1354
bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
10,088✔
1355
{
1356
    // Thread safety required
10,088✔
1357
    REALM_ASSERT(!m_abandoned);
10,088✔
1358

1359
    auto pf = util::make_promise_future<bool>();
1360
    async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
1361
        promise.emplace_value(status.is_ok());
10,088✔
1362
    });
1363
    return pf.future.get();
1364
}
10,088✔
1365

1366

10,088✔
1367
bool SessionWrapper::wait_for_download_complete_or_client_stopped()
10,088✔
1368
{
10,088✔
1369
    // Thread safety required
1370
    REALM_ASSERT(!m_abandoned);
10,088✔
1371

1372
    auto pf = util::make_promise_future<bool>();
1373
    async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
10,446✔
1374
        promise.emplace_value(status.is_ok());
358✔
1375
    });
358✔
1376
    return pf.future.get();
358✔
1377
}
358✔
1378

10,470✔
1379

382✔
1380
void SessionWrapper::refresh(std::string_view signed_access_token)
382✔
1381
{
382✔
1382
    // Thread safety required
382✔
1383
    REALM_ASSERT(!m_abandoned);
382✔
1384

10,100✔
1385
    m_client.post([self = util::bind_ptr{this}, token = std::string(signed_access_token)] {
12✔
1386
        REALM_ASSERT(self->m_actualized);
12✔
1387
        if (REALM_UNLIKELY(!self->m_sess))
12✔
1388
            return; // Already finalized
12✔
1389
        self->m_signed_access_token = std::move(token);
10,088✔
1390
        SessionImpl& sess = *self->m_sess;
1391
        ClientImpl::Connection& conn = sess.get_connection();
1392
        // FIXME: This only makes sense when each session uses a separate connection.
1393
        conn.update_connect_info(self->m_http_request_path_prefix, self->m_signed_access_token); // Throws
1394
        sess.cancel_resumption_delay();                                                          // Throws
1395
        conn.cancel_reconnect_delay();                                                           // Throws
1396
    });
1397
}
10,090✔
1398

10,090✔
1399

10,090✔
1400
void SessionWrapper::abandon(util::bind_ptr<SessionWrapper> wrapper) noexcept
10,090✔
1401
{
1402
    ClientImpl& client = wrapper->m_client;
10,090✔
1403
    client.register_abandoned_session_wrapper(std::move(wrapper));
1404
}
10,090✔
1405

1406

1407
// Must be called from event loop thread
1408
void SessionWrapper::actualize()
1409
{
10,090✔
1410
    // actualize() can only ever be called once
10,090✔
1411
    REALM_ASSERT(!m_actualized);
10,090✔
1412
    REALM_ASSERT(!m_sess);
1413
    // The client should have removed this wrapper from those pending
1414
    // actualization if it called force_close() or finalize_before_actualize()
1415
    REALM_ASSERT(!m_finalized);
1416
    REALM_ASSERT(!m_closed);
1417

1418
    m_actualized = true;
64✔
1419

64✔
1420
    ScopeExitFail close_on_error([&]() noexcept {
64✔
1421
        m_closed = true;
64✔
1422
    });
64✔
1423

64✔
1424
    m_db->claim_sync_agent();
64✔
1425
    m_db->add_commit_listener(this);
64✔
1426
    ScopeExitFail remove_commit_listener([&]() noexcept {
64✔
1427
        m_db->remove_commit_listener(this);
64✔
1428
    });
1429

1430
    ServerEndpoint endpoint{m_protocol_envelope, m_server_address, m_server_port,
16,248✔
1431
                            m_user_id,           m_sync_mode,      m_server_verified};
16,248✔
1432
    bool was_created = false;
2,430✔
1433
    ClientImpl::Connection& conn = m_client.get_connection(
2,430✔
1434
        std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate,
1435
        m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config,
1436
        was_created); // Throws
1437
    ScopeExitFail remove_connection([&]() noexcept {
1438
        if (was_created)
1439
            m_client.remove_connection(conn);
16,248✔
1440
    });
1441

28,694✔
1442
    // FIXME: This only makes sense when each session uses a separate connection.
12,446✔
1443
    conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token);    // Throws
12,446✔
1444
    std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
12,446✔
1445
    if (m_sync_mode == SyncServerMode::FLX) {
12,446✔
1446
        m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
16,344✔
1447
    }
96✔
1448

96✔
1449
    sess->logger.info("Binding '%1' to '%2'", m_db->get_path(), m_virt_path); // Throws
96✔
1450
    m_sess = sess.get();
96✔
1451
    ScopeExitFail clear_sess([&]() noexcept {
16,248✔
1452
        m_sess = nullptr;
1453
    });
1454
    conn.activate_session(std::move(sess)); // Throws
1455

708✔
1456
    // Initialize the variables relying on the bootstrap store from the event loop to guarantee that a previous
708✔
1457
    // session cannot change the state of the bootstrap store at the same time.
708✔
1458
    update_subscription_version_info();
708✔
1459

708✔
1460
    if (was_created)
708✔
1461
        conn.activate(); // Throws
708✔
1462

1463
    if (m_connection_state_change_listener) {
1464
        ConnectionState state = conn.get_state();
1465
        if (state != ConnectionState::disconnected) {
94✔
1466
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
94✔
1467
            if (state == ConnectionState::connected)
94✔
1468
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
94✔
1469
        }
94✔
1470
    }
94✔
1471

92✔
1472
    if (!m_client_reset_config)
92✔
1473
        check_progress(); // Throws
82✔
1474
}
92✔
1475

94✔
1476
void SessionWrapper::force_close()
94✔
1477
{
1478
    if (m_closed) {
1479
        return;
1480
    }
1481
    REALM_ASSERT(m_actualized);
11,894✔
1482
    REALM_ASSERT(m_sess);
11,894✔
1483
    m_closed = true;
11,858✔
1484

11,858✔
1485
    ClientImpl::Connection& conn = m_sess->get_connection();
11,894✔
1486
    conn.initiate_session_deactivation(m_sess); // Throws
1487

1488
    // We need to keep the DB open until finalization, but we no longer want to
10,384✔
1489
    // know when commits are made
10,384✔
1490
    m_db->remove_commit_listener(this);
10,384✔
1491

1492
    // Delete the pending bootstrap store since it uses a reference to the logger in m_sess
1493
    m_flx_pending_bootstrap_store.reset();
153,802✔
1494
    // Clear the subscription and migration store refs since they are owned by SyncSession
153,802✔
1495
    m_flx_subscription_store.reset();
153,802✔
1496
    m_migration_store.reset();
1497
    m_sess = nullptr;
153,802✔
1498
    // Everything is being torn down, no need to report connection state anymore
1499
    m_connection_state_change_listener = {};
29,434✔
1500

29,434✔
1501
    // All outstanding wait operations must be canceled
1502
    while (!m_upload_completion_handlers.empty()) {
153,802✔
1503
        auto handler = std::move(m_upload_completion_handlers.back());
74,294✔
1504
        m_upload_completion_handlers.pop_back();
1505
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
79,508✔
1506
    }
79,508✔
1507
    while (!m_download_completion_handlers.empty()) {
79,508✔
1508
        auto handler = std::move(m_download_completion_handlers.back());
79,508✔
1509
        m_download_completion_handlers.pop_back();
79,508✔
1510
        handler(
1511
            {ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
79,508✔
1512
    }
79,508✔
1513
    while (!m_sync_completion_handlers.empty()) {
79,508✔
1514
        auto handler = std::move(m_sync_completion_handlers.back());
1515
        m_sync_completion_handlers.pop_back();
1516
        handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
79,510✔
1517
    }
79,510✔
1518
}
59,930✔
1519

1520
// Must be called from event loop thread
19,580✔
1521
//
19,580✔
1522
// `m_client.m_mutex` is not held while this is called, but it is guaranteed to
19,580✔
1523
// have been acquired at some point in between the final read or write ever made
1524
// from a different thread and when this is called.
34,426✔
1525
void SessionWrapper::finalize()
14,846✔
1526
{
14,846✔
1527
    REALM_ASSERT(m_actualized);
14,846✔
1528
    REALM_ASSERT(m_abandoned);
14,846✔
1529
    REALM_ASSERT(!m_finalized);
19,580✔
1530

1531
    force_close();
1532

79,510✔
1533
    m_finalized = true;
79,510✔
1534

27,720✔
1535
    // The Realm file can be closed now, as no access to the Realm file is
1536
    // supposed to happen on behalf of a session after initiation of
1537
    // deactivation.
51,790✔
1538
    m_db->release_sync_agent();
28,916✔
1539
    m_db = nullptr;
1540
}
22,874✔
1541

18,780✔
1542

18,780✔
1543
// Must be called only when an unactualized session wrapper becomes abandoned.
1544
//
1545
// Called with a lock on `m_client.m_mutex`.
1546
inline void SessionWrapper::finalize_before_actualization() noexcept
1547
{
1548
    REALM_ASSERT(!m_finalized);
1549
    REALM_ASSERT(!m_sess);
1550
    m_actualized = true;
1551
    m_finalized = true;
1552
    m_closed = true;
18,780✔
1553
    m_db->remove_commit_listener(this);
18,780✔
1554
    m_db->release_sync_agent();
9,582✔
1555
    m_db = nullptr;
18,780✔
1556
}
18,780✔
1557

1558
void SessionWrapper::on_download_completion()
22,874✔
1559
{
22,874✔
1560
    // Ensure that progress handlers get called before completion handlers. The
22,874✔
1561
    // download completing performed a commit and will trigger progress
9,534✔
1562
    // notifications asynchronously, but they would arrive after the download
1563
    // completion without this.
22,874✔
1564
    check_progress();
22,874✔
1565

22,874✔
1566
    while (!m_download_completion_handlers.empty()) {
10,348✔
1567
        auto handler = std::move(m_download_completion_handlers.back());
572✔
1568
        m_download_completion_handlers.pop_back();
572✔
1569
        handler(Status::OK()); // Throws
572✔
1570
    }
10,348✔
1571
    while (!m_sync_completion_handlers.empty()) {
1572
        auto handler = std::move(m_sync_completion_handlers.back());
1573
        m_upload_completion_handlers.push_back(std::move(handler)); // Throws
1574
        m_sync_completion_handlers.pop_back();
1575
    }
10,348✔
1576

10,348!
UNCOV
1577
    if (m_flx_subscription_store && m_flx_pending_mark_version != SubscriptionSet::EmptyVersion) {
×
1578
        m_sess->logger.debug("Marking query version %1 as complete after receiving MARK message",
10,348✔
1579
                             m_flx_pending_mark_version);
12,526✔
1580
        m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
1581
        m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
1582
    }
1583
}
12,526✔
1584

12,526✔
1585

9,246✔
1586
void SessionWrapper::on_suspended(const SessionErrorInfo& error_info)
12,526✔
1587
{
1588
    REALM_ASSERT(!m_finalized);
22,874✔
1589
    m_suspended = true;
13,592✔
1590
    if (m_connection_state_change_listener) {
22,874✔
1591
        m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws
13,340✔
1592
    }
1593
}
22,874✔
1594

15,488✔
1595

1596
void SessionWrapper::on_resumed()
7,386✔
1597
{
1598
    REALM_ASSERT(!m_finalized);
7,386✔
1599
    m_suspended = false;
7,386✔
1600
    if (m_connection_state_change_listener) {
2,106✔
1601
        ClientImpl::Connection& conn = m_sess->get_connection();
2,106✔
1602
        if (conn.get_state() != ConnectionState::disconnected) {
1603
            m_connection_state_change_listener(ConnectionState::connecting, util::none); // Throws
7,386✔
1604
            if (conn.get_state() == ConnectionState::connected)
14,304✔
1605
                m_connection_state_change_listener(ConnectionState::connected, util::none); // Throws
14,304✔
1606
        }
1607
    }
14,304✔
1608
}
14,304✔
1609

14,304✔
1610

7,152✔
1611
void SessionWrapper::on_connection_state_changed(ConnectionState state,
7,152✔
1612
                                                 const std::optional<SessionErrorInfo>& error_info)
7,152✔
1613
{
7,152✔
1614
    if (m_connection_state_change_listener && !m_suspended) {
7,152✔
1615
        m_connection_state_change_listener(state, error_info); // Throws
7,152✔
1616
    }
1617
}
7,386✔
1618

7,386✔
1619
void SessionWrapper::init_progress_handler()
7,386✔
1620
{
1621
    ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
1622
}
68✔
1623

68✔
UNCOV
1624
void SessionWrapper::check_progress()
×
UNCOV
1625
{
×
1626
    REALM_ASSERT(!m_finalized);
1627
    REALM_ASSERT(m_sess);
68✔
1628

68✔
1629
    if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
1630
        return;
1631

10,384✔
1632
    version_type uploaded_version;
10,384✔
1633
    ReportedProgress p;
1634
    DownloadableProgress downloadable;
10,384✔
1635
    ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
10,384✔
1636
                                             uploaded_version);
10,056✔
1637

10,056✔
1638
    report_progress(p, downloadable);
1639
    report_upload_completion(uploaded_version);
328✔
1640
}
1641

1642
void SessionWrapper::report_upload_completion(version_type uploaded_version)
328✔
1643
{
328✔
1644
    if (uploaded_version < m_upload_completion_requested_version)
322✔
1645
        return;
158✔
1646

158✔
1647
    std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
164✔
1648
              std::back_inserter(m_download_completion_handlers));
164✔
UNCOV
1649
    m_sync_completion_handlers.clear();
×
UNCOV
1650

×
UNCOV
1651
    while (!m_upload_completion_handlers.empty()) {
×
UNCOV
1652
        auto handler = std::move(m_upload_completion_handlers.back());
×
1653
        m_upload_completion_handlers.pop_back();
1654
        handler(Status::OK()); // Throws
164✔
1655
    }
1656
}
164✔
1657

164✔
1658
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
164✔
1659
{
4✔
1660
    if (!m_progress_handler)
4✔
1661
        return;
4✔
1662

160✔
1663
    // Ignore progress messages from before we first receive a DOWNLOAD message
160✔
1664
    if (!m_reliable_download_progress)
160✔
UNCOV
1665
        return;
×
UNCOV
1666

×
UNCOV
1667
    auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
×
1668
        REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
160✔
1669
        REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);
160✔
1670

160✔
1671
        // The effect of this calculation is that if new bytes are added for download/upload,
328✔
1672
        // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage.
1673
        // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync
1674
        // before progress has reached 1.0.
72✔
1675
        // Then once it is at 1.0 the next batch of changes will restart the estimate at 0.
72✔
1676
        // Example for upload progress reported:
1677
        // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0
72✔
1678

72✔
UNCOV
1679
        double progress_estimate = 1.0;
×
UNCOV
1680
        if (final_transferred < transferable && transferred < transferable)
×
UNCOV
1681
            progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred);
×
1682
        return progress_estimate;
1683
    };
72✔
UNCOV
1684

×
UNCOV
1685
    bool upload_completed = p.uploaded == p.uploadable;
×
UNCOV
1686
    double upload_estimate = 1.0;
×
1687
    if (!upload_completed)
1688
        upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);
72✔
1689

72✔
1690
    bool download_completed = p.downloaded == 0;
1691
    double download_estimate = 1.00;
72✔
1692
    if (m_flx_pending_bootstrap_store) {
72✔
1693
        if (m_flx_pending_bootstrap_store->has_pending()) {
1694
            download_estimate = downloadable.as_estimate();
1695
            p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
1696
        }
1697
        download_completed = download_estimate >= 1.0;
1698

1699
        // for flx with download estimate these bytes are not known
1700
        // provide some sensible value for non-streaming version of object-store callbacks
1701
        // until these field are completely removed from the api after pbs deprecation
1702
        p.downloadable = p.downloaded;
1703
        if (download_estimate > 0 && download_estimate < 1.0 && p.downloaded > m_final_downloaded)
1,336✔
1704
            p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / download_estimate);
1,336✔
1705
    }
1,336✔
1706
    else {
1,336✔
1707
        // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
1,336✔
1708
        // is only the remaining to download. This is confusing, so make them use
1,336✔
1709
        // the same units.
1,336✔
1710
        p.downloadable = downloadable.as_bytes() + p.downloaded;
1,336✔
1711
        if (!download_completed)
1,336✔
1712
            download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
1,336✔
1713
    }
1,336✔
1714

1,336✔
1715
    if (download_completed)
1,336✔
1716
        m_final_downloaded = p.downloaded;
2,806✔
1717
    if (upload_completed)
2,814✔
1718
        m_final_uploaded = p.uploaded;
2,814✔
UNCOV
1719

×
1720
    if (p == m_reported_progress)
2,814✔
UNCOV
1721
        return;
×
1722

1723
    m_reported_progress = p;
2,814✔
1724

2,814✔
1725
    if (m_sess->logger.would_log(Logger::Level::debug)) {
2,806✔
1726
        auto to_str = [](double d) {
1727
            std::ostringstream ss;
2,806✔
1728
            // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision
2,814✔
1729
            ss << std::fixed << std::setprecision(4) << d;
2,806✔
1730
            return ss.str();
1731
        };
1732
        m_sess->logger.debug(
12✔
1733
            "Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
12✔
1734
            "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
12✔
1735
            p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
1736
            to_str(upload_estimate), p.snapshot, m_flx_active_version);
1737
    }
1738

2,806✔
1739
    m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
2,806✔
1740
                       upload_estimate, m_flx_last_seen_version);
2,806✔
1741
}
1742

1743
util::Future<std::string> SessionWrapper::send_test_command(std::string body)
1744
{
10,302✔
1745
    if (!m_sess) {
10,302✔
1746
        return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"};
10,302✔
1747
    }
10,302✔
1748

1749
    return m_sess->send_test_command(std::move(body));
1750
}
1751

1,856✔
1752
void SessionWrapper::handle_pending_client_reset_acknowledgement()
3,708✔
1753
{
3,708✔
1754
    REALM_ASSERT(!m_finalized);
3,708✔
1755

1,856✔
1756
    auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
1,856✔
1757
    if (!has_pending_reset) {
1758
        return; // nothing to do
1759
    }
2,806✔
1760

2,806✔
1761
    m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);
2,806✔
1762

2,806✔
1763
    // Now that the client reset merge is complete, wait for the changes to synchronize with the server
1764
    async_wait_for(
2,806✔
1765
        true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
1766
            if (status == ErrorCodes::OperationAborted) {
1767
                return;
1768
            }
3,760✔
1769
            auto& logger = self->m_sess->logger;
3,760✔
1770
            if (!status.is_ok()) {
3,760✔
1771
                logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
1772
                             status);
3,760✔
1773
                return;
3,760✔
1774
            }
3,760✔
1775

3,760✔
1776
            logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);
3,760✔
1777

1778
            auto tr = self->m_db->start_write();
3,760✔
1779
            auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
3,760✔
1780
            if (!cur_pending_reset) {
1781
                logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
1782
                return;
1783
            }
2,802✔
1784
            if (*cur_pending_reset == pending_reset) {
2,802✔
1785
                logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
2,802✔
1786
            }
1787
            else {
1788
                logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
1789
            }
1790
            PendingResetStore::clear_pending_reset(tr);
11,086✔
1791
            tr->commit();
11,086✔
1792
        });
2,440✔
1793
}
2,440✔
1794

11,740✔
1795
void SessionWrapper::update_subscription_version_info()
11,740✔
1796
{
11,740✔
1797
    if (!m_flx_subscription_store)
11,740✔
1798
        return;
8,646✔
1799
    auto versions_info = m_flx_subscription_store->get_version_info();
8,646✔
1800
    m_flx_active_version = versions_info.active;
1801
    m_flx_pending_mark_version = versions_info.pending_mark;
1802
}
1803

4,902✔
1804
std::string SessionWrapper::get_appservices_connection_id()
9,942✔
1805
{
9,942✔
1806
    auto pf = util::make_promise_future<std::string>();
1807

1808
    m_client.post([self = util::bind_ptr{this}, promise = std::move(pf.promise)](Status status) mutable {
1809
        if (!status.is_ok()) {
1810
            promise.set_error(status);
×
1811
            return;
×
1812
        }
1813

1814
        if (!self->m_sess) {
9,942✔
1815
            promise.set_error({ErrorCodes::RuntimeError, "session already finalized"});
1816
            return;
1817
        }
1818

10,024✔
1819
        promise.emplace_value(self->m_sess->get_connection().get_active_appservices_connection_id());
10,024✔
1820
    });
10,024✔
1821

1822
    return pf.future.get();
1823
}
772✔
1824

772✔
1825
// ################ ClientImpl::Connection ################
772✔
1826

1827
ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
1828
                                   const std::string& authorization_header_name,
1,860✔
1829
                                   const std::map<std::string, std::string>& custom_http_headers,
1,860✔
1830
                                   bool verify_servers_ssl_certificate,
1,860✔
1831
                                   Optional<std::string> ssl_trust_certificate_path,
1832
                                   std::function<SSLVerifyCallback> ssl_verify_callback,
1833
                                   Optional<ProxyConfig> proxy_config, ReconnectInfo reconnect_info)
12✔
1834
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
12✔
1835
                                                      client.logger_ptr)} // Throws
12✔
1836
    , logger{*logger_ptr}
1837
    , m_client{client}
1838
    , m_verify_servers_ssl_certificate{verify_servers_ssl_certificate}    // DEPRECATED
9,576✔
1839
    , m_ssl_trust_certificate_path{std::move(ssl_trust_certificate_path)} // DEPRECATED
9,576✔
1840
    , m_ssl_verify_callback{std::move(ssl_verify_callback)}               // DEPRECATED
9,576✔
1841
    , m_proxy_config{std::move(proxy_config)}                             // DEPRECATED
1842
    , m_reconnect_info{reconnect_info}
1843
    , m_ident{ident}
56✔
1844
    , m_server_endpoint{std::move(endpoint)}
56✔
1845
    , m_authorization_header_name{authorization_header_name} // DEPRECATED
56✔
1846
    , m_custom_http_headers{custom_http_headers}             // DEPRECATED
1847
{
1848
    m_on_idle = m_client.create_trigger([this](Status status) {
1849
        if (status == ErrorCodes::OperationAborted)
4,060✔
1850
            return;
4,060✔
1851
        else if (!status.is_ok())
4,060✔
1852
            throw Exception(status);
1853

1854
        REALM_ASSERT(m_activated);
1855
        if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
1856
            on_idle(); // Throws
10,156✔
1857
            // Connection object may be destroyed now.
10,156✔
1858
        }
10,156✔
1859
    });
10,156✔
1860
}
1861

1862
inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
1863
{
17,244✔
1864
    return m_ident;
17,244✔
1865
}
17,244✔
1866

1867

1868
inline const ServerEndpoint& ClientImpl::Connection::get_server_endpoint() const noexcept
1869
{
28✔
1870
    return m_server_endpoint;
28✔
1871
}
28✔
1872

1873
inline void ClientImpl::Connection::update_connect_info(const std::string& http_request_path_prefix,
1874
                                                        const std::string& signed_access_token)
1875
{
5,000✔
1876
    m_http_request_path_prefix = http_request_path_prefix; // Throws (copy)
5,000✔
1877
    m_signed_access_token = signed_access_token;           // Throws (copy)
5,000✔
1878
}
1879

1880

1881
void ClientImpl::Connection::resume_active_sessions()
12,912✔
1882
{
12,912✔
1883
    auto handler = [=](ClientImpl::Session& sess) {
12,912✔
1884
        sess.cancel_resumption_delay(); // Throws
1885
    };
1886
    for_each_active_session(std::move(handler)); // Throws
1887
}
10,000✔
1888

10,000✔
1889
void ClientImpl::Connection::on_idle()
10,000✔
1890
{
1891
    logger.debug(util::LogCategory::session, "Destroying connection object");
1892
    ClientImpl& client = get_client();
1893
    client.remove_connection(*this);
216✔
1894
    // NOTE: This connection object is now destroyed!
216✔
1895
}
216✔
1896

1897

1898
std::string ClientImpl::Connection::get_http_request_path() const
1899
{
10,156✔
1900
    using namespace std::string_view_literals;
10,156✔
1901
    const auto param = m_http_request_path_prefix.find('?') == std::string::npos ? "?baas_at="sv : "&baas_at="sv;
1902

1903
    std::string path;
10,156✔
1904
    path.reserve(m_http_request_path_prefix.size() + param.size() + m_signed_access_token.size());
10,156✔
1905
    path += m_http_request_path_prefix;
10,156✔
1906
    path += param;
1907
    path += m_signed_access_token;
1908

68✔
1909
    return path;
68✔
1910
}
68✔
1911

1912

1913
std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ident)
72✔
1914
{
72✔
1915
    return util::format("Connection[%1] ", ident);
72✔
1916
}
1917

UNCOV
1918

×
UNCOV
1919
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
×
UNCOV
1920
                                                            std::optional<SessionErrorInfo> error_info)
×
UNCOV
1921
{
×
UNCOV
1922
    if (m_force_closed) {
×
UNCOV
1923
        return;
×
UNCOV
1924
    }
×
1925
    auto handler = [=](ClientImpl::Session& sess) {
UNCOV
1926
        SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
×
1927
        sess_2.on_connection_state_changed(state, error_info); // Throws
1928
    };
1929
    for_each_active_session(std::move(handler)); // Throws
1930
}
1931

1932

1933
Client::Client(Config config)
1934
    : m_impl{new ClientImpl{std::move(config)}} // Throws
1935
{
1936
}
1937

1938

1939
Client::Client(Client&& client) noexcept
1940
    : m_impl{std::move(client.m_impl)}
1941
{
1942
}
1943

1944

1945
Client::~Client() noexcept {}
1946

1947

1948
void Client::shutdown() noexcept
1949
{
1950
    m_impl->shutdown();
1951
}
1952

1953
void Client::shutdown_and_wait()
1954
{
1955
    m_impl->shutdown_and_wait();
1956
}
1957

1958
void Client::cancel_reconnect_delay()
1959
{
1960
    m_impl->cancel_reconnect_delay();
1961
}
1962

1963
void Client::voluntary_disconnect_all_connections()
1964
{
1965
    m_impl->voluntary_disconnect_all_connections();
1966
}
1967

1968
bool Client::wait_for_session_terminations_or_client_stopped()
1969
{
1970
    return m_impl->wait_for_session_terminations_or_client_stopped();
1971
}
1972

1973
util::Future<void> Client::notify_session_terminated()
1974
{
1975
    return m_impl->notify_session_terminated();
1976
}
1977

1978
bool Client::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1979
                                  port_type& port, std::string& path) const
1980
{
1981
    return m_impl->decompose_server_url(url, protocol, address, port, path); // Throws
1982
}
1983

1984

1985
Session::Session(Client& client, DBRef db, std::shared_ptr<SubscriptionStore> flx_sub_store,
1986
                 std::shared_ptr<MigrationStore> migration_store, Config&& config)
1987
{
1988
    m_impl = new SessionWrapper{*client.m_impl, std::move(db), std::move(flx_sub_store), std::move(migration_store),
1989
                                std::move(config)}; // Throws
1990
}
1991

1992

1993
void Session::nonsync_transact_notify(version_type new_version)
1994
{
1995
    m_impl->on_commit(new_version); // Throws
1996
}
1997

1998

1999
void Session::cancel_reconnect_delay()
2000
{
2001
    m_impl->cancel_reconnect_delay(); // Throws
2002
}
2003

2004

2005
void Session::async_wait_for(bool upload_completion, bool download_completion, WaitOperCompletionHandler handler)
2006
{
2007
    m_impl->async_wait_for(upload_completion, download_completion, std::move(handler)); // Throws
2008
}
2009

2010

2011
bool Session::wait_for_upload_complete_or_client_stopped()
2012
{
2013
    return m_impl->wait_for_upload_complete_or_client_stopped(); // Throws
2014
}
2015

2016

2017
bool Session::wait_for_download_complete_or_client_stopped()
2018
{
2019
    return m_impl->wait_for_download_complete_or_client_stopped(); // Throws
2020
}
2021

2022

2023
void Session::refresh(std::string_view signed_access_token)
2024
{
2025
    m_impl->refresh(signed_access_token); // Throws
2026
}
2027

2028

2029
void Session::abandon() noexcept
2030
{
2031
    REALM_ASSERT(m_impl);
2032
    // Reabsorb the ownership assigned to the applications naked pointer by
2033
    // Session constructor
2034
    util::bind_ptr<SessionWrapper> wrapper{m_impl, util::bind_ptr_base::adopt_tag{}};
2035
    SessionWrapper::abandon(std::move(wrapper));
2036
}
2037

2038
util::Future<std::string> Session::send_test_command(std::string body)
2039
{
2040
    return m_impl->send_test_command(std::move(body));
2041
}
2042

2043
std::string Session::get_appservices_connection_id()
2044
{
2045
    return m_impl->get_appservices_connection_id();
2046
}
2047

2048
std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
2049
{
2050
    switch (proxyType) {
2051
        case ProxyConfig::Type::HTTP:
2052
            return os << "HTTP";
2053
        case ProxyConfig::Type::HTTPS:
2054
            return os << "HTTPS";
2055
    }
2056
    REALM_TERMINATE("Invalid Proxy Type object.");
2057
}
2058

2059
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc