• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2469

03 Jul 2024 10:12PM UTC coverage: 90.958% (-0.03%) from 90.984%
2469

push

Evergreen

web-flow
RCORE-2185 Sync client should steal file ident of fresh realm when performing client reset (#7850)

* Initial changes to use the file ident from the fresh realm during client reset

* Fixed failing realm_sync_test tests

* Don't send UPLOAD Messages while downloading fresh realm

* Allow sending QUERY bootstrap for fresh download sessions

* Added SHARED_GROUP_FRESH_PATH to generate path for fresh realm

* Removed SHARED_GROUP_FRESH_PATH and used session_reason setting instead

* Some cleanup after tests passing

* Added test to verify no UPLOAD messages are sent during fresh realm download

* Use is_fresh_path to determine if hook event called by client reset fresh realm download session

* Fixed tsan failure around REQUIRE() within hook event callback in flx_migration test

* Updates from review and streamlined changes based on recommendations

* Reverted some test changes that are no longer needed

* Updated logic for when to perform a client reset diff

* Updated fresh realm download to update upload progress but not send upload messages

* Removed has_client_reset_config flag in favor of get_cliet_reset_config()

* Updats from the review - renamed m_allow_uploads to m_delay_uploads

* Updated assert

* Updated test to start with file ident, added comment about client reset and no file ident

* Updated comment for m_delay_uploads

102284 of 180462 branches covered (56.68%)

140 of 147 new or added lines in 10 files covered. (95.24%)

90 existing lines in 15 files now uncovered.

215145 of 236531 relevant lines covered (90.96%)

6144068.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.79
/src/realm/sync/noinst/client_impl_base.hpp
1

2
#ifndef REALM_NOINST_CLIENT_IMPL_BASE_HPP
3
#define REALM_NOINST_CLIENT_IMPL_BASE_HPP
4

5
#include <realm/sync/client_base.hpp>
6

7
#include <realm/binary_data.hpp>
8
#include <realm/sync/history.hpp>
9
#include <realm/sync/network/default_socket.hpp>
10
#include <realm/sync/network/network_ssl.hpp>
11
#include <realm/sync/noinst/client_history_impl.hpp>
12
#include <realm/sync/noinst/migration_store.hpp>
13
#include <realm/sync/noinst/protocol_codec.hpp>
14
#include <realm/sync/protocol.hpp>
15
#include <realm/sync/subscriptions.hpp>
16
#include <realm/sync/trigger.hpp>
17
#include <realm/util/buffer_stream.hpp>
18
#include <realm/util/checked_mutex.hpp>
19
#include <realm/util/logger.hpp>
20
#include <realm/util/span.hpp>
21

22
#include <cstdint>
23
#include <deque>
24
#include <functional>
25
#include <list>
26
#include <map>
27
#include <random>
28
#include <string>
29
#include <unordered_set>
30
#include <utility>
31

32
namespace realm::sync {
33

34
// (protocol, address, port, user_id)
35
//
36
// `protocol` is included for convenience, even though it is not strictly part
37
// of an endpoint.
38
struct ServerEndpoint {
39
    ProtocolEnvelope envelope;
40
    std::string address;
41
    network::Endpoint::port_type port;
42
    std::string user_id;
43
    SyncServerMode server_mode = SyncServerMode::PBS;
44
    bool is_verified = false;
45

46
private:
47
    auto to_tuple() const
48
    {
47,788✔
49
        // Does not include server_mode because all endpoints for a single Client
50
        // must have the same mode. is_verified is not part of an endpoint's identity.
51
        return std::make_tuple(server_mode, envelope, std::ref(address), port, std::ref(user_id));
47,788✔
52
    }
47,788✔
53

54
public:
55
    friend inline bool operator==(const ServerEndpoint& lhs, const ServerEndpoint& rhs)
56
    {
×
57
        return lhs.to_tuple() == rhs.to_tuple();
×
58
    }
×
59

60
    friend inline bool operator<(const ServerEndpoint& lhs, const ServerEndpoint& rhs)
61
    {
23,896✔
62
        return lhs.to_tuple() < rhs.to_tuple();
23,896✔
63
    }
23,896✔
64
};
65

66
class SessionWrapper;
67

68
class SessionWrapperStack {
69
public:
70
    bool empty() const noexcept;
71
    void push(util::bind_ptr<SessionWrapper>) noexcept;
72
    util::bind_ptr<SessionWrapper> pop() noexcept;
73
    void clear() noexcept;
74
    bool erase(SessionWrapper*) noexcept;
75
    SessionWrapperStack() noexcept = default;
19,868✔
76
    ~SessionWrapperStack();
77

78
private:
79
    SessionWrapper* m_back = nullptr;
80
};
81

82
template <typename ErrorType, typename RandomEngine>
83
struct ErrorBackoffState {
84
    ErrorBackoffState() = default;
85
    explicit ErrorBackoffState(ResumptionDelayInfo default_delay_info, RandomEngine& random_engine)
86
        : default_delay_info(std::move(default_delay_info))
9,717✔
87
        , m_random_engine(random_engine)
9,717✔
88
    {
20,132✔
89
    }
20,132✔
90

91
    void update(ErrorType new_error, std::optional<ResumptionDelayInfo> new_delay_info)
92
    {
3,854✔
93
        if (triggering_error && *triggering_error == new_error) {
3,854✔
94
            return;
86✔
95
        }
86✔
96

97
        delay_info = new_delay_info.value_or(default_delay_info);
3,768✔
98
        cur_delay_interval = util::none;
3,768✔
99
        triggering_error = new_error;
3,768✔
100
    }
3,768✔
101

102
    void reset()
103
    {
1,978✔
104
        triggering_error = util::none;
1,978✔
105
        cur_delay_interval = util::none;
1,978✔
106
        delay_info = default_delay_info;
1,978✔
107
    }
1,978✔
108

109
    std::chrono::milliseconds delay_interval()
110
    {
420✔
111
        if (!cur_delay_interval) {
420✔
112
            cur_delay_interval = delay_info.resumption_delay_interval;
286✔
113
            return jitter_value(*cur_delay_interval);
286✔
114
        }
286✔
115
        if (*cur_delay_interval == delay_info.max_resumption_delay_interval) {
134✔
116
            return jitter_value(delay_info.max_resumption_delay_interval);
10✔
117
        }
10✔
118
        auto safe_delay_interval = cur_delay_interval->count();
124✔
119
        if (util::int_multiply_with_overflow_detect(safe_delay_interval,
124✔
120
                                                    delay_info.resumption_delay_backoff_multiplier)) {
124✔
121
            *cur_delay_interval = delay_info.max_resumption_delay_interval;
×
122
        }
×
123
        else {
124✔
124
            *cur_delay_interval =
124✔
125
                std::min(delay_info.max_resumption_delay_interval, std::chrono::milliseconds(safe_delay_interval));
124✔
126
        }
124✔
127
        return jitter_value(*cur_delay_interval);
124✔
128
    }
134✔
129

130
    ResumptionDelayInfo default_delay_info;
131
    ResumptionDelayInfo delay_info;
132
    util::Optional<std::chrono::milliseconds> cur_delay_interval;
133
    util::Optional<ErrorType> triggering_error;
134

135
private:
136
    std::chrono::milliseconds jitter_value(std::chrono::milliseconds value)
137
    {
420✔
138
        if (delay_info.delay_jitter_divisor == 0) {
420✔
139
            return value;
20✔
140
        }
20✔
141
        const auto max_jitter = value.count() / delay_info.delay_jitter_divisor;
400✔
142
        auto distr = std::uniform_int_distribution<std::chrono::milliseconds::rep>(0, max_jitter);
400✔
143
        std::chrono::milliseconds randomized_deduction(distr(m_random_engine.get()));
400✔
144
        return value - randomized_deduction;
400✔
145
    }
420✔
146

147
    std::reference_wrapper<RandomEngine> m_random_engine;
148
};
149

150
class ClientImpl {
151
public:
152
    enum class ConnectionTerminationReason;
153
    class Connection;
154
    class Session;
155

156
    using port_type = network::Endpoint::port_type;
157
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
158
    using ClientProtocol = _impl::ClientProtocol;
159
    using RandomEngine = std::mt19937_64;
160

161
    /// Per-server endpoint information used to determine reconnect delays.
162
    class ReconnectInfo {
163
    public:
164
        explicit ReconnectInfo(ReconnectMode mode, ResumptionDelayInfo default_delay_info, RandomEngine& random)
165
            : m_reconnect_mode(mode)
4,858✔
166
            , m_backoff_state(default_delay_info, random)
4,858✔
167
        {
10,064✔
168
        }
10,064✔
169

170
        void reset() noexcept;
171
        void update(ConnectionTerminationReason reason, std::optional<ResumptionDelayInfo> new_delay_info);
172
        std::chrono::milliseconds delay_interval();
173

174
        const std::optional<ConnectionTerminationReason> reason() const noexcept
175
        {
×
176
            return m_backoff_state.triggering_error;
×
177
        }
×
178

179
        // Set this flag to true to schedule a postponed invocation of reset(). See
180
        // Connection::cancel_reconnect_delay() for details and rationale.
181
        //
182
        // Will be set back to false when a PONG message arrives, and the
183
        // corresponding PING message was sent while `m_scheduled_reset` was
184
        // true. See receive_pong().
185
        bool scheduled_reset = false;
186

187
    private:
188
        ReconnectMode m_reconnect_mode;
189
        ErrorBackoffState<ConnectionTerminationReason, RandomEngine> m_backoff_state;
190
    };
191

192
    static constexpr milliseconds_type default_connect_timeout = 120000;        // 2 minutes
193
    static constexpr milliseconds_type default_connection_linger_time = 30000;  // 30 seconds
194
    static constexpr milliseconds_type default_ping_keepalive_period = 60000;   // 1 minute
195
    static constexpr milliseconds_type default_pong_keepalive_timeout = 120000; // 2 minutes
196
    static constexpr milliseconds_type default_fast_reconnect_limit = 60000;    // 1 minute
197

198
    const std::shared_ptr<util::Logger> logger_ptr;
199
    util::Logger& logger;
200

201
    ClientImpl(ClientConfig);
202
    ~ClientImpl();
203

204
    static constexpr int get_oldest_supported_protocol_version() noexcept;
205

206
    void shutdown() noexcept REQUIRES(!m_mutex, !m_drain_mutex);
207

208
    void shutdown_and_wait() REQUIRES(!m_mutex, !m_drain_mutex);
209

210
    const std::string& get_user_agent_string() const noexcept;
211
    ReconnectMode get_reconnect_mode() const noexcept;
212
    bool is_dry_run() const noexcept;
213

214
    // Functions to post onto the event loop and create an event loop timer using the
215
    // SyncSocketProvider
216
    void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
217
    void post(util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
218
    SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
219
                                               SyncSocketProvider::FunctionHandler&& handler)
220
        REQUIRES(!m_drain_mutex);
221
    using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
222
    SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
223

224
    RandomEngine& get_random() noexcept;
225

226
    /// Returns false if the specified URL is invalid.
227
    bool decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
228
                              port_type& port, std::string& path) const;
229

230
    void cancel_reconnect_delay() REQUIRES(!m_drain_mutex);
231
    bool wait_for_session_terminations_or_client_stopped() REQUIRES(!m_mutex, !m_drain_mutex);
232
    // Async version of wait_for_session_terminations_or_client_stopped().
233
    util::Future<void> notify_session_terminated() REQUIRES(!m_drain_mutex);
234
    void voluntary_disconnect_all_connections() REQUIRES(!m_drain_mutex);
235

236
private:
237
    using connection_ident_type = std::int_fast64_t;
238

239
    const ReconnectMode m_reconnect_mode; // For testing purposes only
240
    const milliseconds_type m_connect_timeout;
241
    const milliseconds_type m_connection_linger_time;
242
    const milliseconds_type m_ping_keepalive_period;
243
    const milliseconds_type m_pong_keepalive_timeout;
244
    const milliseconds_type m_fast_reconnect_limit;
245
    const ResumptionDelayInfo m_reconnect_backoff_info;
246
    const bool m_disable_upload_activation_delay;
247
    const bool m_dry_run; // For testing purposes only
248
    const bool m_enable_default_port_hack;
249
    const bool m_disable_upload_compaction;
250
    const bool m_fix_up_object_ids;
251
    const std::function<RoundtripTimeHandler> m_roundtrip_time_handler;
252
    const std::string m_user_agent_string;
253
    std::shared_ptr<SyncSocketProvider> m_socket_provider;
254
    ClientProtocol m_client_protocol;
255
    session_ident_type m_prev_session_ident = 0;
256
    const bool m_one_connection_per_session;
257

258
    RandomEngine m_random;
259
    SyncTrigger m_actualize_and_finalize;
260

261
    // Note: There is one server slot per server endpoint (hostname, port,
262
    // user_id), and it survives from one connection object to the next, which
263
    // is important because it carries information about a possible reconnect
264
    // delay applying to the new connection object (server hammering
265
    // protection).
266
    struct ServerSlot {
267
        explicit ServerSlot(ReconnectInfo reconnect_info);
268
        ~ServerSlot();
269

270
        ReconnectInfo reconnect_info; // Applies exclusively to `connection`.
271
        std::unique_ptr<ClientImpl::Connection> connection;
272

273
        // Used instead of `connection` when `m_one_connection_per_session` is
274
        // true.
275
        std::map<connection_ident_type, std::unique_ptr<ClientImpl::Connection>> alt_connections;
276
    };
277

278
    // Must be accessed only by event loop thread
279
    std::map<ServerEndpoint, ServerSlot> m_server_slots;
280

281
    // Must be accessed only by event loop thread
282
    connection_ident_type m_prev_connection_ident = 0;
283

284
    util::CheckedMutex m_drain_mutex;
285
    std::condition_variable m_drain_cv;
286
    bool m_drained GUARDED_BY(m_drain_mutex) = false;
287
    uint64_t m_outstanding_posts GUARDED_BY(m_drain_mutex) = 0;
288
    uint64_t m_num_connections GUARDED_BY(m_drain_mutex) = 0;
289

290
    util::CheckedMutex m_mutex;
291

292
    bool m_stopped GUARDED_BY(m_mutex) = false;
293
    bool m_sessions_terminated GUARDED_BY(m_mutex) = false;
294

295
    // The set of session wrappers that are not yet wrapping a session object,
296
    // and are not yet abandoned (still referenced by the application).
297
    SessionWrapperStack m_unactualized_session_wrappers GUARDED_BY(m_mutex);
298

299
    // The set of session wrappers that were successfully actualized, but are
300
    // now abandoned (no longer referenced by the application), and have not yet
301
    // been finalized. Order in queue is immaterial.
302
    SessionWrapperStack m_abandoned_session_wrappers GUARDED_BY(m_mutex);
303

304
    // Used with m_mutex
305
    std::condition_variable m_wait_or_client_stopped_cond;
306

307
    void register_unactualized_session_wrapper(SessionWrapper*) REQUIRES(!m_mutex);
308
    void register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper>) noexcept REQUIRES(!m_mutex);
309
    void actualize_and_finalize_session_wrappers() REQUIRES(!m_mutex);
310

311
    // Get or create a connection. If a connection exists for the specified
312
    // endpoint, it will be returned, otherwise a new connection will be
313
    // created. If `m_one_connection_per_session` is true (testing only), a new
314
    // connection will be created every time.
315
    //
316
    // Must only be accessed from event loop thread.
317
    //
318
    // FIXME: Passing these SSL parameters here is confusing at best, since they
319
    // are ignored if a connection is already available for the specified
320
    // endpoint. Also, there is no way to check that all the specified SSL
321
    // parameters are in agreement with a preexisting connection. A better
322
    // approach would be to allow for per-endpoint SSL parameters to be
323
    // specifiable through public member functions of ClientImpl from where they
324
    // could then be picked up as new connections are created on demand.
325
    ClientImpl::Connection& get_connection(ServerEndpoint, const std::string& authorization_header_name,
326
                                           const std::map<std::string, std::string>& custom_http_headers,
327
                                           bool verify_servers_ssl_certificate,
328
                                           util::Optional<std::string> ssl_trust_certificate_path,
329
                                           std::function<SyncConfig::SSLVerifyCallback>,
330
                                           util::Optional<SyncConfig::ProxyConfig>, bool& was_created)
331
        REQUIRES(!m_drain_mutex);
332

333
    // Destroys the specified connection.
334
    void remove_connection(ClientImpl::Connection&) noexcept REQUIRES(!m_drain_mutex);
335

336
    void drain_connections();
337
    void drain_connections_on_loop() REQUIRES(!m_drain_mutex);
338

339
    void incr_outstanding_posts() REQUIRES(!m_drain_mutex);
340
    void decr_outstanding_posts() REQUIRES(!m_drain_mutex);
341

342
    session_ident_type get_next_session_ident() noexcept;
343

344
    friend class ClientImpl::Connection;
345
    friend class SessionWrapper;
346
};
347

348
constexpr int ClientImpl::get_oldest_supported_protocol_version() noexcept
349
{
17,318✔
350
    // See get_current_protocol_version() for information about the
351
    // individual protocol versions.
352
    return 2;
17,318✔
353
}
17,318✔
354

355
static_assert(ClientImpl::get_oldest_supported_protocol_version() >= 1, "");
356
static_assert(ClientImpl::get_oldest_supported_protocol_version() <= get_current_protocol_version(), "");
357

358

359
/// Information about why a connection (or connection initiation attempt) was
360
/// terminated. This is used to determinte the delay until the next connection
361
/// initiation attempt.
362
enum class ClientImpl::ConnectionTerminationReason {
363
    connect_operation_failed,          ///< Failure during connect operation
364
    closed_voluntarily,                ///< Voluntarily closed or connection operation canceled
365
    read_or_write_error,               ///< Read/write error after successful TCP connect operation
366
    ssl_certificate_rejected,          ///< Client rejected the SSL certificate of the server
367
    ssl_protocol_violation,            ///< A violation of the SSL protocol
368
    websocket_protocol_violation,      ///< A violation of the WebSocket protocol
369
    http_response_says_fatal_error,    ///< Status code in HTTP response says "fatal error"
370
    http_response_says_nonfatal_error, ///< Status code in HTTP response says "nonfatal error"
371
    bad_headers_in_http_response,      ///< Missing or bad headers in HTTP response
372
    sync_protocol_violation,           ///< Client received a bad message from the server
373
    sync_connect_timeout,              ///< Sync connection was not fully established in time
374
    server_said_try_again_later,       ///< Client received ERROR message with try_again=yes
375
    server_said_do_not_reconnect,      ///< Client received ERROR message with try_again=no
376
    pong_timeout,                      ///< Client did not receive PONG after PING
377

378
    /// The application requested a feature that is unavailable in the
379
    /// negotiated protocol version.
380
    missing_protocol_feature,
381
};
382

383
/// All use of connection objects, including construction and destruction, must
384
/// occur on behalf of the event loop thread of the associated client object.
385

386
// TODO: The parent will be updated to WebSocketObserver once the WebSocket integration is complete
387
class ClientImpl::Connection {
388
public:
389
    using connection_ident_type = std::int_fast64_t;
390
    using SSLVerifyCallback = SyncConfig::SSLVerifyCallback;
391
    using ProxyConfig = SyncConfig::ProxyConfig;
392
    using ReconnectInfo = ClientImpl::ReconnectInfo;
393
    using DownloadMessage = ClientProtocol::DownloadMessage;
394

395
    std::shared_ptr<util::Logger> logger_ptr;
396
    util::Logger& logger;
397

398
    ClientImpl& get_client() noexcept;
399
    ReconnectInfo get_reconnect_info() const noexcept;
400
    ClientProtocol& get_client_protocol() noexcept;
401

402
    /// Activate this connection object. No attempt is made to establish a
403
    /// connection before the connection object is activated.
404
    void activate();
405

406
    /// Activate the specified session.
407
    ///
408
    /// Prior to being activated, no messages will be sent or received on behalf
409
    /// of this session, and the associated Realm file will not be accessed,
410
    /// i.e., `Session::get_db()` will not be called.
411
    ///
412
    /// If activation is successful, the connection keeps the session alive
413
    /// until the application calls initiated_session_deactivation() or until
414
    /// the application destroys the connection object, whichever comes first.
415
    void activate_session(std::unique_ptr<Session>);
416

417
    /// Initiate the deactivation process which eventually (or immediately)
418
    /// leads to destruction of this session object.
419
    ///
420
    /// IMPORTANT: The session object may get destroyed before this function
421
    /// returns.
422
    ///
423
    /// The deactivation process must be considered initiated even if this
424
    /// function throws.
425
    ///
426
    /// The deactivation process is guaranteed to not be initiated until the
427
    /// application calls this function. So from the point of view of the
428
    /// application, after successful activation, a pointer to a session object
429
    /// remains valid until the application calls
430
    /// initiate_session_deactivation().
431
    ///
432
    /// After the initiation of the deactivation process, the associated Realm
433
    /// file will no longer be accessed, i.e., `get_db()` will not be called
434
    /// again, and a previously returned reference will also not be accessed
435
    /// again.
436
    ///
437
    /// The initiation of the deactivation process must be preceded by a
438
    /// successful invocation of activate_session(). It is an error to call
439
    /// initiate_session_deactivation() twice.
440
    void initiate_session_deactivation(Session*);
441

442
    /// Cancel the reconnect delay for this connection, if one is currently in
443
    /// effect. If a reconnect delay is not currently in effect, ensure that the
444
    /// delay before the next reconnection attempt will be canceled. This is
445
    /// necessary as an apparently established connection, or ongoing connection
446
    /// attempt can be about to fail for a reason that precedes the invocation
447
    /// of this function.
448
    ///
449
    /// It is an error to call this function before the connection has been
450
    /// activated.
451
    void cancel_reconnect_delay();
452

453
    void force_close();
454

455
    /// Returns zero until the HTTP response is received. After that point in
456
    /// time, it returns the negotiated protocol version, which is based on the
457
    /// contents of the `Sec-WebSocket-Protocol` header in the HTTP
458
    /// response. The negotiated protocol version is guaranteed to be greater
459
    /// than or equal to get_oldest_supported_protocol_version(), and be less
460
    /// than or equal to get_current_protocol_version().
461
    int get_negotiated_protocol_version() noexcept;
462

463
    // Methods from WebSocketObserver interface for websockets from the Socket Provider
464
    void websocket_connected_handler(const std::string& protocol);
465
    bool websocket_binary_message_received(util::Span<const char> data);
466
    void websocket_error_handler();
467
    bool websocket_closed_handler(bool, websocket::WebSocketError, std::string_view msg);
468

469
    connection_ident_type get_ident() const noexcept;
470
    const ServerEndpoint& get_server_endpoint() const noexcept;
471
    ConnectionState get_state() const noexcept;
472
    SyncServerMode get_sync_server_mode() const noexcept;
473
    bool is_flx_sync_connection() const noexcept;
474

475
    void update_connect_info(const std::string& http_request_path_prefix, const std::string& signed_access_token);
476

477
    void resume_active_sessions();
478

479
    void voluntary_disconnect();
480

481
    std::string get_active_appservices_connection_id();
482

483
    Connection(ClientImpl&, connection_ident_type, ServerEndpoint, const std::string& authorization_header_name,
484
               const std::map<std::string, std::string>& custom_http_headers, bool verify_servers_ssl_certificate,
485
               util::Optional<std::string> ssl_trust_certificate_path, std::function<SSLVerifyCallback>,
486
               util::Optional<ProxyConfig>, ReconnectInfo);
487

488
    ~Connection();
489

490
private:
491
    struct LifecycleSentinel : public util::AtomicRefCountBase {
492
        bool destroyed = false;
493
    };
494
    struct WebSocketObserverShim;
495

496
    using ReceivedChangesets = ClientProtocol::ReceivedChangesets;
497

498
    template <class H>
499
    void for_each_active_session(H handler);
500

501
    /// \brief Called when the connection becomes idle.
502
    ///
503
    /// The connection is considered idle when all of the following conditions
504
    /// are true:
505
    ///
506
    /// - The connection is activated.
507
    ///
508
    /// - The connection has no sessions in the Active state.
509
    ///
510
    /// - The connection is closed (in the disconnected state).
511
    ///
512
    /// From the point of view of this class, an overriding function is allowed
513
    /// to commit suicide (`delete this`).
514
    ///
515
    /// The default implementation of this function does nothing.
516
    ///
517
    /// This function is always called by the event loop thread of the
518
    /// associated client object.
519
    void on_idle();
520

521
    std::string get_http_request_path() const;
522

523
    void initiate_reconnect_wait();
524
    void handle_reconnect_wait(Status status);
525
    void initiate_reconnect();
526
    void initiate_connect_wait();
527
    void handle_connect_wait(Status status);
528

529
    void handle_connection_established();
530
    void schedule_urgent_ping();
531
    void initiate_ping_delay(milliseconds_type now);
532
    void handle_ping_delay();
533
    void initiate_pong_timeout();
534
    void handle_pong_timeout();
535
    void initiate_write_message(const OutputBuffer&, Session*);
536
    void handle_write_message();
537
    void send_next_message();
538
    void send_ping();
539
    void initiate_write_ping(const OutputBuffer&);
540
    void handle_write_ping();
541
    void handle_message_received(util::Span<const char> data);
542
    void initiate_disconnect_wait();
543
    void handle_disconnect_wait(Status status);
544
    void close_due_to_protocol_error(Status status);
545
    void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason);
546
    void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
547
    void close_due_to_server_side_error(ProtocolError, const ProtocolErrorInfo& info);
548
    void involuntary_disconnect(const SessionErrorInfo& info, ConnectionTerminationReason reason);
549
    void disconnect(const SessionErrorInfo& info);
550
    void change_state_to_disconnected() noexcept;
551
    // These are only called from ClientProtocol class.
552
    void receive_pong(milliseconds_type timestamp);
553
    void receive_error_message(const ProtocolErrorInfo& info, session_ident_type);
554
    void receive_query_error_message(int error_code, std::string_view message, int64_t query_version,
555
                                     session_ident_type);
556
    void receive_ident_message(session_ident_type, SaltedFileIdent);
557
    void receive_download_message(session_ident_type, const DownloadMessage& message);
558

559
    void receive_mark_message(session_ident_type, request_ident_type);
560
    void receive_unbound_message(session_ident_type);
561
    void receive_test_command_response(session_ident_type, request_ident_type, std::string_view body);
562
    void receive_server_log_message(session_ident_type, util::Logger::Level, std::string_view body);
563
    void receive_appservices_request_id(std::string_view coid);
564
    void handle_protocol_error(Status status);
565

566
    // These are only called from Session class.
567
    void enlist_to_send(Session*);
568
    void one_more_active_unsuspended_session();
569
    void one_less_active_unsuspended_session();
570
    void finish_session_deactivation(Session* sess);
571

572
    OutputBuffer& get_output_buffer() noexcept;
573
    Session* get_session(session_ident_type) const noexcept;
574
    Session* find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept;
575
    static bool was_voluntary(ConnectionTerminationReason) noexcept;
576

577
    static std::string make_logger_prefix(connection_ident_type);
578

579
    void report_connection_state_change(ConnectionState, util::Optional<SessionErrorInfo> error_info = util::none);
580

581
    friend ClientProtocol;
582
    friend class Session;
583

584
    ClientImpl& m_client;
585
    util::bind_ptr<LifecycleSentinel> m_websocket_sentinel;
586
    std::unique_ptr<WebSocketInterface> m_websocket;
587

588
    /// DEPRECATED - These will be removed in a future release
589
    const bool m_verify_servers_ssl_certificate;
590
    const util::Optional<std::string> m_ssl_trust_certificate_path;
591
    const std::function<SSLVerifyCallback> m_ssl_verify_callback;
592
    const util::Optional<ProxyConfig> m_proxy_config;
593

594
    ReconnectInfo m_reconnect_info;
595
    int m_negotiated_protocol_version = 0;
596

597
    ConnectionState m_state = ConnectionState::disconnected;
598

599
    std::size_t m_num_active_unsuspended_sessions = 0;
600
    std::size_t m_num_active_sessions = 0;
601
    ClientImpl::SyncTrigger m_on_idle;
602

603
    // activate() has been called
604
    bool m_activated = false;
605

606
    // A reconnect delay is in progress
607
    bool m_reconnect_delay_in_progress = false;
608

609
    // Has no meaning when m_reconnect_delay_in_progress is false.
610
    bool m_nonzero_reconnect_delay = false;
611

612
    // A disconnect (linger) delay is in progress. This is for keeping the
613
    // connection open for a while after there are no more active unsuspended
614
    // sessions.
615
    bool m_disconnect_delay_in_progress = false;
616

617
    bool m_disconnect_has_occurred = false;
618

619
    // A message is currently being sent, i.e., the sending of a message has
620
    // been initiated, but not yet completed.
621
    bool m_sending = false;
622

623
    bool m_ping_delay_in_progress = false;
624
    bool m_waiting_for_pong = false;
625
    bool m_send_ping = false;
626
    bool m_minimize_next_ping_delay = false;
627
    bool m_ping_after_scheduled_reset_of_reconnect_info = false;
628

629
    // At least one PING message was sent since connection was established
630
    bool m_ping_sent = false;
631

632
    bool m_websocket_error_received = false;
633

634
    bool m_force_closed = false;
635

636
    // The timer will be constructed on demand, and will only be destroyed when
637
    // canceling a reconnect or disconnect delay.
638
    //
639
    // It is necessary to destroy and recreate the timer when canceling a wait
640
    // operation, because the next wait operation might need to be initiated
641
    // before the completion handler of the previous canceled wait operation
642
    // starts executing. Such an overlap is not allowed for wait operations on
643
    // the same timer instance.
644
    SyncSocketProvider::SyncTimer m_reconnect_disconnect_timer;
645

646
    // Timer for connect operation watchdog. For why this timer is optional, see
647
    // `m_reconnect_disconnect_timer`.
648
    SyncSocketProvider::SyncTimer m_connect_timer;
649

650
    // This timer is used to schedule the sending of PING messages, and as a
651
    // watchdog for timely reception of PONG messages. For why this timer is
652
    // optional, see `m_reconnect_disconnect_timer`.
653
    SyncSocketProvider::SyncTimer m_heartbeat_timer;
654

655
    milliseconds_type m_pong_wait_started_at = 0;
656
    milliseconds_type m_last_ping_sent_at = 0;
657

658
    // Round-trip time, in milliseconds, for last PING message for which a PONG
659
    // message has been received, or zero if no PONG message has been received.
660
    milliseconds_type m_previous_ping_rtt = 0;
661

662
    // Only valid when `m_disconnect_has_occurred` is true.
663
    milliseconds_type m_disconnect_time = 0;
664

665
    // The set of sessions associated with this connection. A session becomes
666
    // associated with a connection when it is activated.
667
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
668
    // Keep track of previously used sessions idents to see if a stale message was
669
    // received for a closed session
670
    std::unordered_set<session_ident_type> m_session_history;
671

672
    // A queue of sessions that have enlisted for an opportunity to send a
673
    // message to the server. Sessions will be served in the order that they
674
    // enlist. A session is only allowed to occur once in this queue. If the
675
    // connection is open, and the queue is not empty, and no message is
676
    // currently being written, the first session is taken out of the queue, and
677
    // then granted an opportunity to send a message.
678
    std::deque<Session*> m_sessions_enlisted_to_send;
679

680
    Session* m_sending_session = nullptr;
681

682
    std::unique_ptr<char[]> m_input_body_buffer;
683
    OutputBuffer m_output_buffer;
684

685
    const connection_ident_type m_ident;
686
    ServerEndpoint m_server_endpoint;
687
    std::string m_appservices_coid;
688

689
    /// DEPRECATED - These will be removed in a future release
690
    const std::string m_authorization_header_name;
691
    const std::map<std::string, std::string> m_custom_http_headers;
692

693
    std::string m_http_request_path_prefix;
694
    std::string m_signed_access_token;
695
};
696

697

698
/// A synchronization session between a local and a remote Realm file.
699
///
700
/// All use of session objects, including construction and destruction, must
701
/// occur on the event loop thread of the associated client object.
702
class ClientImpl::Session {
703
public:
704
    using ReceivedChangesets = ClientProtocol::ReceivedChangesets;
705
    using DownloadMessage = ClientProtocol::DownloadMessage;
706

707
    std::shared_ptr<util::Logger> logger_ptr;
708
    util::Logger& logger;
709

710
    ClientImpl& get_client() noexcept;
711
    Connection& get_connection() noexcept;
712
    session_ident_type get_ident() const noexcept;
713

714
    /// Inform this client about new changesets in the history.
715
    ///
716
    /// The type of the version specified here is the one that identifies an
717
    /// entry in the sync history. Whether this is the same as the snapshot
718
    /// version of the Realm depends on the history implementation.
719
    ///
720
    /// The application is supposed to call this function to inform the client
721
    /// about a new version produced by a transaction that was not performed on
722
    /// behalf of this client. If the application does not call this function,
723
    /// the client will not discover and upload new changesets in a timely
724
    /// manner.
725
    ///
726
    /// It is an error to call this function before activation of the session,
727
    /// or after initiation of deactivation.
728
    void recognize_sync_version(version_type);
729

730
    /// \brief Request notification when all changesets currently avaialble on
731
    /// the server have been downloaded.
732
    ///
733
    /// When downloading completes, on_download_completion() will be called by
734
    /// the thread that processes the event loop (as long as such a thread
735
    /// exists).
736
    ///
737
    /// If request_download_completion_notification() is called while a
738
    /// previously requested completion notification has not yet occurred, the
739
    /// previous request is canceled and the corresponding notification will
740
    /// never occur. This ensure that there is no ambiguity about the meaning of
741
    /// each completion notification.
742
    ///
743
    /// The application must be prepared for "spurious" invocations of
744
    /// on_download_completion() before the client's first invocation of
745
    /// request_download_completion_notification(), or after a previous
746
    /// invocation of on_download_completion(), as long as it is before the
747
    /// subsequent invocation by the client of
748
    /// request_download_completion_notification(). This is possible because the
749
    /// client reserves the right to request download completion notifications
750
    /// internally.
751
    ///
752
    /// Download is considered complete when all changesets in the server-side
753
    /// history, that are supposed to be downloaded, and that precede
754
    /// `current_server_version`, have been downloaded and integrated into the
755
    /// local history. `current_server_version` is the version that refers to
756
    /// the last changeset in the server-side history at the time the server
757
    /// receives the first MARK message that is sent by the client after the
758
    /// invocation of request_download_completion_notification().
759
    ///
760
    /// Every invocation of request_download_completion_notification() will
761
    /// cause a new MARK message to be sent to the server, to redetermine
762
    /// `current_server_version`.
763
    ///
764
    /// It is an error to call this function before activation of the session,
765
    /// or after initiation of deactivation.
766
    void request_download_completion_notification();
767

768
    /// \brief Gets the subscription store associated with this Session.
769
    SubscriptionStore* get_flx_subscription_store();
770

771
    /// \brief Gets the migration store associated with this Session.
772
    MigrationStore* get_migration_store();
773

774
    /// Update internal client state when a flx subscription becomes complete outside
775
    /// of the normal sync process. This can happen during client reset.
776
    void on_flx_sync_version_complete(int64_t version);
777

778
    /// If this session is currently suspended, resume it immediately.
779
    ///
780
    /// It is an error to call this function before activation of the session,
781
    /// or after initiation of deactivation.
782
    void cancel_resumption_delay();
783

784
    /// To be used in connection with implementations of
785
    /// initiate_integrate_changesets().
786
    void integrate_changesets(const SyncProgress&, std::uint_fast64_t downloadable_bytes, const ReceivedChangesets&,
787
                              VersionInfo&, DownloadBatchState last_in_batch);
788

789
    /// It is an error to call this function before activation of the session
790
    /// (Connection::activate_session()), or after initiation of deactivation
791
    /// (Connection::initiate_session_deactivation()).
792
    void on_changesets_integrated(version_type client_version, const SyncProgress& progress);
793

794
    void on_integration_failure(const IntegrationException& e);
795

796
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
797

798
    /// The application must ensure that the new session object is either
799
    /// activated (Connection::activate_session()) or destroyed before the
800
    /// specified connection object is destroyed.
801
    ///
802
    /// The specified transaction reporter (via the config object) is guaranteed
803
    /// to not be called before activation, and also not after initiation of
804
    /// deactivation.
805
    Session(SessionWrapper&, ClientImpl::Connection&);
806
    ~Session();
807

808
    void force_close();
809

810
    util::Future<std::string> send_test_command(std::string body);
811

812
private:
813
    struct PendingTestCommand {
814
        request_ident_type id;
815
        std::string body;
816
        util::Promise<std::string> promise;
817
        bool pending = true;
818
    };
819

820
    /// Fetch a reference to the remote virtual path of the Realm associated
821
    /// with this session.
822
    ///
823
    /// This function is always called by the event loop thread of the
824
    /// associated client object.
825
    ///
826
    /// This function is guaranteed to not be called before activation, and also
827
    /// not after initiation of deactivation.
828
    const std::string& get_virt_path() const noexcept;
829

830
    const std::string& get_realm_path() const noexcept;
831

832
    // Can only be called if the session is active or being activated
833
    DBRef get_db() const noexcept;
834
    ClientReplication& get_repl() const noexcept;
835
    ClientHistory& get_history() const noexcept;
836

837
    // client_reset_config() returns the config for client
838
    // reset. If it returns none, ordinary sync is used. If it returns a
839
    // Config::ClientReset, the session will be initiated with a state Realm
840
    // transfer from the server.
841
    util::Optional<ClientReset>& get_client_reset_config() noexcept;
842

843
    // Get the reason a synchronization session is used for (regular sync or client reset)
844
    // - Client reset state means the session is going to be used to download a fresh realm.
845
    SessionReason get_session_reason() noexcept;
846

847
    /// Returns the schema version the synchronization session connects with to the server.
848
    uint64_t get_schema_version() noexcept;
849

850
    // Returns false if this session is not allowed to send UPLOAD messages to the server to
851
    // update the cursor info, such as during a client reset fresh realm download
852
    bool upload_messages_allowed() noexcept;
853

854
    /// \brief Initiate the integration of downloaded changesets.
855
    ///
856
    /// This function must provide for the passed changesets (if any) to
857
    /// eventually be integrated, and without unnecessary delay. If no
858
    /// changesets are passed, the purpose of this function reduces to causing
859
    /// the current synchronization progress (SyncProgress) to be persisted.
860
    ///
861
    /// When all changesets have been integrated, and the synchronization
862
    /// progress has been persisted, this function must provide for
863
    /// on_changesets_integrated() to be called without unnecessary delay,
864
    /// although never after initiation of session deactivation.
865
    ///
866
    /// The implementation is allowed, but not obliged to aggregate changesets
867
    /// from multiple invocations of initiate_integrate_changesets() and pass
868
    /// them to ClientReplication::integrate_server_changesets() at once.
869
    ///
870
    /// The synchronization progress passed to
871
    /// ClientReplication::integrate_server_changesets() must be obtained
872
    /// by calling get_status(), and that call must occur after the last
873
    /// invocation of initiate_integrate_changesets() whose changesets are
874
    /// included in what is passed to
875
    /// ClientReplication::integrate_server_changesets().
876
    ///
877
    /// The download cursor passed to on_changesets_integrated() must be
878
    /// SyncProgress::download of the synchronization progress passed to the
879
    /// last invocation of
880
    /// ClientReplication::integrate_server_changesets().
881
    ///
882
    /// The default implementation integrates the specified changesets and calls
883
    /// on_changesets_integrated() immediately (i.e., from the event loop thread
884
    /// of the associated client object, and before
885
    /// initiate_integrate_changesets() returns), and via the history accessor
886
    /// made available by access_realm().
887
    ///
888
    /// This function is always called by the event loop thread of the
889
    /// associated client object, and on_changesets_integrated() must always be
890
    /// called by that thread too.
891
    ///
892
    /// This function is guaranteed to not be called before activation, and also
893
    /// not after initiation of deactivation.
894
    void initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
895
                                       const SyncProgress& progress, const ReceivedChangesets&);
896

897
    /// See request_download_completion_notification().
898
    void on_download_completion();
899

900
    //@{
901
    /// These are called as the state of the session changes between
902
    /// "suspended" and "resumed". The initial state is
903
    /// always "resumed".
904
    ///
905
    /// A switch to the suspended state only happens when an error occurs,
906
    /// and information about that error is passed to on_suspended().
907
    ///
908
    /// These functions are always called by the event loop thread of the
909
    /// associated client object.
910
    ///
911
    /// These functions are guaranteed to not be called before activation, and also
912
    /// not after initiation of deactivation.
913
    void on_suspended(const SessionErrorInfo& error_info);
914
    void on_resumed();
915
    //@}
916

917
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
918
    void on_flx_sync_progress(int64_t version, DownloadBatchState batch_state);
919

920
    // Processes an FLX download message, if it's a bootstrap message. If it's not a bootstrap
921
    // message then this is a noop and will return false. Otherwise this will return true
922
    // and no further processing of the download message should take place.
923
    bool process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
924
                                       int64_t query_version, const ReceivedChangesets& received_changesets);
925

926
    // Processes any pending FLX bootstraps, if one exists. Otherwise this is a noop.
927
    void process_pending_flx_bootstrap();
928

929
    bool client_reset_if_needed();
930
    void handle_pending_client_reset_acknowledgement();
931

932
    void update_subscription_version_info();
933

934
    void gather_pending_compensating_writes(util::Span<Changeset> changesets, std::vector<ProtocolErrorInfo>* out);
935

936
    void begin_resumption_delay(const ProtocolErrorInfo& error_info);
937
    void clear_resumption_delay_state();
938

939
private:
940
    Connection& m_conn;
941
    const session_ident_type m_ident;
942

943
    // The states only transition in one direction, from left to right.
944
    // The transition to Active happens very soon after construction, as soon as
945
    // it is registered with the Connection.
946
    // The transition from Deactivating to Deactivated state happens when the
947
    // unbinding process completes (unbind_process_complete()).
948
    enum State { Unactivated, Active, Deactivating, Deactivated };
949
    State m_state = Unactivated;
950

951
    bool m_suspended = false;
952

953
    SyncSocketProvider::SyncTimer m_try_again_activation_timer;
954
    ErrorBackoffState<sync::ProtocolError, RandomEngine> m_try_again_delay_info;
955

956
    // Set to false when download completion is reached. Set to true after a
957
    // slow reconnect, such that UPLOAD and QUERY messages will not be sent until
958
    // download completion is reached again. This feature can be disabled (always
959
    // false) if ClientConfig::disable_upload_activation_delay is true.
960
    bool m_delay_uploads = true;
961

962
    bool m_is_flx_sync_session = false;
963

964
    bool m_fix_up_object_ids = false;
965

966
    // These are reset when the session is activated, and again whenever the
967
    // connection is lost or the rebinding process is initiated.
968
    bool m_enlisted_to_send;
969
    bool m_bind_message_sent;            // Sending of BIND message has been initiated
970
    bool m_ident_message_sent;           // Sending of IDENT message has been initiated
971
    bool m_unbind_message_sent;          // Sending of UNBIND message has been initiated
972
    bool m_unbind_message_send_complete; // Sending of UNBIND message has been completed
973
    bool m_error_message_received;       // Session specific ERROR message received
974
    bool m_unbound_message_received;     // UNBOUND message received
975
    bool m_error_to_send;
976

977
    // True when there is a new FLX sync query we need to send to the server.
978
    util::Optional<SubscriptionStore::PendingSubscription> m_pending_flx_sub_set;
979
    int64_t m_last_sent_flx_query_version = 0;
980

981
    std::deque<ProtocolErrorInfo> m_pending_compensating_write_errors;
982

983
    util::Optional<IntegrationException> m_client_error;
984

985
    // `ident == 0` means unassigned.
986
    SaltedFileIdent m_client_file_ident = {0, 0};
987

988
    // The latest sync progress reported by the server via a DOWNLOAD
989
    // message. See struct SyncProgress for a description. The values stored in
990
    // `m_progress` either are persisted, or are about to be.
991
    //
992
    // Initialized by way of ClientHistory::get_status() at session
993
    // activation time.
994
    //
995
    // `m_progress.upload.client_version` is the client-side sync version
996
    // produced by the latest local changeset that has been acknowledged as
997
    // integrated by the server.
998
    SyncProgress m_progress;
999

1000
    // In general, the local version produced by the last changeset in the local
1001
    // history. The changeset that produced this version may, or may not
1002
    // contain changes of local origin.
1003
    //
1004
    // It is set to the current version of the local Realm at session activation
1005
    // time (although always zero for the initial empty Realm
1006
    // state). Thereafter, it is updated when the application calls
1007
    // recognize_sync_version(), when changesets are received from the server
1008
    // and integrated locally, and when the uploading process discovers newer
1009
    // versions.
1010
    //
1011
    // INVARIANT: m_progress.upload.client_version <= m_last_version_available
1012
    version_type m_last_version_available = 0;
1013

1014
    // In general, this is the position in the history reached while scanning
1015
    // for changesets to be uploaded.
1016
    //
1017
    // Set to `m_progress.upload` at session activation time and whenever the
1018
    // connection to the server is lost. When the connection is established, the
1019
    // scanning for changesets to be uploaded then progresses from there towards
1020
    // `m_last_version_available`.
1021
    //
1022
    // INVARIANT: m_progress.upload.client_version <= m_upload_progress.client_version
1023
    UploadCursor m_upload_progress = {0, 0};
1024

1025
    // Same as `m_progress.download` but is updated only as the progress gets
1026
    // persisted.
1027
    DownloadCursor m_download_progress = {0, 0};
1028

1029
    // Used to implement download completion notifications. Set equal to
1030
    // `m_progress.download.server_version` when a MARK message is received. Set
1031
    // back to zero when `m_download_progress.server_version` becomes greater
1032
    // than, or equal to `m_server_version_at_last_download_mark`. For further
1033
    // details, see check_for_download_completion().
1034
    version_type m_server_version_at_last_download_mark = 0;
1035

1036
    // The serial number to attach to the next download MARK message. A new MARK
1037
    // message will be sent when `m_target_download_mark >
1038
    // m_last_download_mark_sent`. To cause a new MARK message to be sent,
1039
    // simply increment `m_target_download_mark`.
1040
    request_ident_type m_target_download_mark = 0;
1041

1042
    // Set equal to `m_target_download_mark` as the sending of each MARK message
1043
    // is initiated. Must be set equal to `m_last_download_mark_received` when
1044
    // the connection to the server is lost.
1045
    request_ident_type m_last_download_mark_sent = 0;
1046

1047
    // Updated when a MARK message is received. See see
1048
    // check_for_download_completion() for how details on how it participates in
1049
    // the detection of download completion.
1050
    request_ident_type m_last_download_mark_received = 0;
1051

1052
    // Updated when a download completion is detected, to avoid multiple
1053
    // triggerings after reception of a single MARK message. See see
1054
    // check_for_download_completion() for how details on how it participates in
1055
    // the detection of download completion.
1056
    request_ident_type m_last_triggering_download_mark = 0;
1057

1058
    SessionWrapper& m_wrapper;
1059

1060
    request_ident_type m_last_pending_test_command_ident = 0;
1061
    std::list<PendingTestCommand> m_pending_test_commands;
1062

1063
    static std::string make_logger_prefix(session_ident_type);
1064

1065
    Session(SessionWrapper& wrapper, Connection&, session_ident_type);
1066

1067
    bool do_recognize_sync_version(version_type) noexcept;
1068

1069
    bool have_client_file_ident() const noexcept;
1070

1071
    // The unbinding process completes when both of the following become true:
1072
    //
1073
    //  - The sending of the UNBIND message has been completed
1074
    //    (m_unbind_message_sent_2).
1075
    //
1076
    //  - A session specific ERROR, or the UNBOUND message has been received
1077
    //    (m_error_message_received || m_unbond_message_received).
1078
    //
1079
    // Rebinding (sending of a new BIND message) can only be initiated while the
1080
    // session is in the Active state, and the unbinding process has completed
1081
    // (unbind_process_complete()).
1082
    bool unbind_process_complete() const noexcept;
1083

1084
    void activate();
1085
    void initiate_deactivation();
1086
    void complete_deactivation();
1087
    void connection_established(bool fast_reconnect);
1088
    void suspend(const SessionErrorInfo& session_error);
1089
    void connection_lost();
1090
    void send_message();
1091
    void message_sent();
1092
    void send_bind_message();
1093
    void send_ident_message();
1094
    void send_upload_message();
1095
    void send_mark_message();
1096
    void send_alloc_message();
1097
    void send_unbind_message();
1098
    void send_query_change_message();
1099
    void send_json_error_message();
1100
    void send_test_command_message();
1101
    Status receive_ident_message(SaltedFileIdent);
1102
    Status receive_download_message(const DownloadMessage& message);
1103
    Status receive_mark_message(request_ident_type);
1104
    Status receive_unbound_message();
1105
    Status receive_error_message(const ProtocolErrorInfo& info);
1106
    Status receive_query_error_message(int error_code, std::string_view message, int64_t query_version);
1107
    Status receive_test_command_response(request_ident_type, std::string_view body);
1108

1109
    void initiate_rebind();
1110
    void reset_protocol_state() noexcept;
1111
    void ensure_enlisted_to_send();
1112
    void enlist_to_send();
1113
    Status check_received_sync_progress(const SyncProgress&) noexcept;
1114
    void check_for_download_completion();
1115

1116
    SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const SyncProgress&, int64_t, DownloadBatchState,
1117
                                         size_t);
1118
    SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo&);
1119
    SyncClientHookAction call_debug_hook(const SyncClientHookData& data);
1120
    SyncClientHookAction call_debug_hook(SyncClientHookEvent event);
1121

1122
    bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version);
1123

1124
    void init_progress_handler();
1125
    void enable_progress_notifications();
1126

1127
    friend class Connection;
1128
};
1129

1130

1131
// Implementation
1132

1133
inline const std::string& ClientImpl::get_user_agent_string() const noexcept
1134
{
×
1135
    return m_user_agent_string;
×
1136
}
×
1137

1138
inline auto ClientImpl::get_reconnect_mode() const noexcept -> ReconnectMode
1139
{
×
1140
    return m_reconnect_mode;
×
1141
}
×
1142

1143
inline bool ClientImpl::is_dry_run() const noexcept
1144
{
119,502✔
1145
    return m_dry_run;
119,502✔
1146
}
119,502✔
1147

1148
inline ClientImpl::RandomEngine& ClientImpl::get_random() noexcept
1149
{
23,862✔
1150
    return m_random;
23,862✔
1151
}
23,862✔
1152

1153
inline auto ClientImpl::get_next_session_ident() noexcept -> session_ident_type
1154
{
10,062✔
1155
    return ++m_prev_session_ident;
10,062✔
1156
}
10,062✔
1157

1158

1159
inline ClientImpl& ClientImpl::Connection::get_client() noexcept
1160
{
182,438✔
1161
    return m_client;
182,438✔
1162
}
182,438✔
1163

1164
inline ConnectionState ClientImpl::Connection::get_state() const noexcept
1165
{
10,308✔
1166
    return m_state;
10,308✔
1167
}
10,308✔
1168

1169
inline ClientImpl::ServerSlot::ServerSlot(ReconnectInfo reconnect_info)
1170
    : reconnect_info(std::move(reconnect_info))
1,276✔
1171
{
2,686✔
1172
}
2,686✔
1173

1174
inline ClientImpl::ServerSlot::~ServerSlot() = default;
2,686✔
1175

1176
inline SyncServerMode ClientImpl::Connection::get_sync_server_mode() const noexcept
1177
{
×
1178
    return m_server_endpoint.server_mode;
×
1179
}
×
1180

1181
inline auto ClientImpl::Connection::get_reconnect_info() const noexcept -> ReconnectInfo
1182
{
2,776✔
1183
    return m_reconnect_info;
2,776✔
1184
}
2,776✔
1185

1186
inline auto ClientImpl::Connection::get_client_protocol() noexcept -> ClientProtocol&
1187
{
179,104✔
1188
    return m_client.m_client_protocol;
179,104✔
1189
}
179,104✔
1190

1191
inline int ClientImpl::Connection::get_negotiated_protocol_version() noexcept
1192
{
67,080✔
1193
    return m_negotiated_protocol_version;
67,080✔
1194
}
67,080✔
1195

1196
template <class H>
1197
void ClientImpl::Connection::for_each_active_session(H handler)
1198
{
10,704✔
1199
    for (auto& p : m_sessions) {
15,828✔
1200
        Session& sess = *p.second;
15,828✔
1201
        if (sess.m_state == Session::Active)
15,828✔
1202
            handler(sess); // Throws
15,826✔
1203
    }
15,828✔
1204
}
10,704✔
1205

1206
inline void ClientImpl::Connection::voluntary_disconnect()
1207
{
2,474✔
1208
    if (m_state == ConnectionState::disconnected) {
2,474✔
1209
        return;
×
1210
    }
×
1211
    m_reconnect_info.update(ConnectionTerminationReason::closed_voluntarily, std::nullopt);
2,474✔
1212
    SessionErrorInfo error_info{Status{ErrorCodes::ConnectionClosed, "Connection closed"}, IsFatal{false}};
2,474✔
1213
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
2,474✔
1214

1215
    disconnect(std::move(error_info)); // Throws
2,474✔
1216
}
2,474✔
1217

1218
inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInfo& info,
1219
                                                           ConnectionTerminationReason reason)
1220
{
1,318✔
1221
    REALM_ASSERT(!was_voluntary(reason));
1,318✔
1222
    m_reconnect_info.update(reason, info.resumption_delay_interval);
1,318✔
1223
    disconnect(info); // Throws
1,318✔
1224
}
1,318✔
1225

1226
inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
1227
{
3,794✔
1228
    REALM_ASSERT(m_on_idle);
3,794✔
1229
    REALM_ASSERT(m_state != ConnectionState::disconnected);
3,794✔
1230
    m_state = ConnectionState::disconnected;
3,794✔
1231

1232
    if (m_num_active_sessions == 0)
3,794✔
1233
        m_on_idle->trigger();
2,424✔
1234

1235
    REALM_ASSERT(!m_reconnect_delay_in_progress);
3,794✔
1236
    if (m_disconnect_delay_in_progress) {
3,794✔
1237
        m_reconnect_disconnect_timer.reset();
2,502✔
1238
        m_disconnect_delay_in_progress = false;
2,502✔
1239
    }
2,502✔
1240
}
3,794✔
1241

1242
inline void ClientImpl::Connection::one_more_active_unsuspended_session()
1243
{
10,120✔
1244
    if (m_num_active_unsuspended_sessions++ != 0)
10,120✔
1245
        return;
5,424✔
1246
    // Rose from zero to one
1247
    if (m_state == ConnectionState::disconnected && !m_reconnect_delay_in_progress && m_activated)
4,696✔
1248
        initiate_reconnect(); // Throws
6✔
1249
}
4,696✔
1250

1251
inline void ClientImpl::Connection::one_less_active_unsuspended_session()
1252
{
10,120✔
1253
    REALM_ASSERT(m_num_active_unsuspended_sessions);
10,120✔
1254
    if (--m_num_active_unsuspended_sessions != 0)
10,120✔
1255
        return;
5,424✔
1256

1257
    // Dropped from one to zero
1258
    if (m_state != ConnectionState::disconnected)
4,696✔
1259
        initiate_disconnect_wait(); // Throws
4,350✔
1260
}
4,696✔
1261

1262
// Sessions, and the connection, should get the output_buffer and insert a message,
1263
// after which they call initiate_write_output_buffer(Session* sess).
1264
inline auto ClientImpl::Connection::get_output_buffer() noexcept -> OutputBuffer&
1265
{
99,442✔
1266
    m_output_buffer.reset();
99,442✔
1267
    return m_output_buffer;
99,442✔
1268
}
99,442✔
1269

1270
inline auto ClientImpl::Connection::get_session(session_ident_type ident) const noexcept -> Session*
1271
{
77,420✔
1272
    auto i = m_sessions.find(ident);
77,420✔
1273
    bool found = (i != m_sessions.end());
77,420✔
1274
    return found ? i->second.get() : nullptr;
77,420✔
1275
}
77,420✔
1276

1277
inline bool ClientImpl::Connection::was_voluntary(ConnectionTerminationReason reason) noexcept
1278
{
1,318✔
1279
    switch (reason) {
1,318✔
1280
        case ConnectionTerminationReason::closed_voluntarily:
✔
1281
            return true;
×
1282
        case ConnectionTerminationReason::connect_operation_failed:
112✔
1283
        case ConnectionTerminationReason::read_or_write_error:
1,154✔
1284
        case ConnectionTerminationReason::ssl_certificate_rejected:
1,164✔
1285
        case ConnectionTerminationReason::ssl_protocol_violation:
1,164✔
1286
        case ConnectionTerminationReason::websocket_protocol_violation:
1,168✔
1287
        case ConnectionTerminationReason::http_response_says_fatal_error:
1,168✔
1288
        case ConnectionTerminationReason::http_response_says_nonfatal_error:
1,224✔
1289
        case ConnectionTerminationReason::bad_headers_in_http_response:
1,224✔
1290
        case ConnectionTerminationReason::sync_protocol_violation:
1,240✔
1291
        case ConnectionTerminationReason::sync_connect_timeout:
1,240✔
1292
        case ConnectionTerminationReason::server_said_try_again_later:
1,288✔
1293
        case ConnectionTerminationReason::server_said_do_not_reconnect:
1,306✔
1294
        case ConnectionTerminationReason::pong_timeout:
1,318✔
1295
        case ConnectionTerminationReason::missing_protocol_feature:
1,318✔
1296
            break;
1,318✔
1297
    }
1,318✔
1298
    return false;
1,318✔
1299
}
1,318✔
1300

1301
inline ClientImpl& ClientImpl::Session::get_client() noexcept
1302
{
149,452✔
1303
    return m_conn.get_client();
149,452✔
1304
}
149,452✔
1305

1306
inline auto ClientImpl::Session::get_connection() noexcept -> Connection&
1307
{
10,460✔
1308
    return m_conn;
10,460✔
1309
}
10,460✔
1310

1311
inline auto ClientImpl::Session::get_ident() const noexcept -> session_ident_type
1312
{
82,844✔
1313
    return m_ident;
82,844✔
1314
}
82,844✔
1315

1316
inline void ClientImpl::Session::recognize_sync_version(version_type version)
1317
{
113,890✔
1318
    REALM_ASSERT(m_state == Active);
113,890✔
1319

1320
    bool resume_upload = do_recognize_sync_version(version);
113,890✔
1321
    if (REALM_LIKELY(resume_upload)) {
113,890✔
1322
        // Since the deactivation process has not been initiated, the UNBIND
1323
        // message cannot have been sent unless the session was suspended due to
1324
        // an error.
1325
        REALM_ASSERT_3(m_suspended, ||, !m_unbind_message_sent);
29,350✔
1326
        if (m_ident_message_sent && !m_suspended)
29,350✔
1327
            ensure_enlisted_to_send(); // Throws
25,180✔
1328
    }
29,350✔
1329
}
113,890✔
1330

1331
inline void ClientImpl::Session::request_download_completion_notification()
1332
{
14,024✔
1333
    REALM_ASSERT(m_state == Active);
14,024✔
1334

1335
    ++m_target_download_mark;
14,024✔
1336

1337
    // Since the deactivation process has not been initiated, the UNBIND message
1338
    // cannot have been sent unless an ERROR message was received.
1339
    REALM_ASSERT(m_error_message_received || !m_unbind_message_sent);
14,024✔
1340
    if (m_ident_message_sent && !m_error_message_received)
14,024✔
1341
        ensure_enlisted_to_send(); // Throws
12,080✔
1342
}
14,024✔
1343

1344
inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn)
1345
    : Session{wrapper, conn, conn.get_client().get_next_session_ident()} // Throws
4,858✔
1346
{
10,064✔
1347
}
10,064✔
1348

1349
inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident)
1350
    : logger_ptr{std::make_shared<util::PrefixLogger>(util::LogCategory::session, make_logger_prefix(ident),
4,858✔
1351
                                                      conn.logger_ptr)} // Throws
4,858✔
1352
    , logger{*logger_ptr}
4,858✔
1353
    , m_conn{conn}
4,858✔
1354
    , m_ident{ident}
4,858✔
1355
    , m_try_again_delay_info(conn.get_client().m_reconnect_backoff_info, conn.get_client().get_random())
4,858✔
1356
    , m_is_flx_sync_session(conn.is_flx_sync_connection())
4,858✔
1357
    , m_fix_up_object_ids(get_client().m_fix_up_object_ids)
4,858✔
1358
    , m_wrapper{wrapper}
4,858✔
1359
{
10,060✔
1360
    if (get_client().m_disable_upload_activation_delay)
10,060✔
NEW
1361
        m_delay_uploads = false;
×
1362
}
10,060✔
1363

1364
inline bool ClientImpl::Session::do_recognize_sync_version(version_type version) noexcept
1365
{
161,616✔
1366
    if (REALM_LIKELY(version > m_last_version_available)) {
161,616✔
1367
        m_last_version_available = version;
77,080✔
1368
        return true;
77,080✔
1369
    }
77,080✔
1370
    return false;
84,536✔
1371
}
161,616✔
1372

1373
inline bool ClientImpl::Session::have_client_file_ident() const noexcept
1374
{
27,214✔
1375
    return (m_client_file_ident.ident != 0);
27,214✔
1376
}
27,214✔
1377

1378
inline bool ClientImpl::Session::unbind_process_complete() const noexcept
1379
{
9,584✔
1380
    return (m_unbind_message_send_complete && (m_error_message_received || m_unbound_message_received));
9,584!
1381
}
9,584✔
1382

1383
inline void ClientImpl::Session::connection_established(bool fast_reconnect)
1384
{
11,738✔
1385
    REALM_ASSERT(m_state == Active);
11,738✔
1386

1387
    if (!fast_reconnect && !get_client().m_disable_upload_activation_delay) {
11,738✔
1388
        // Disallow immediate activation of the upload process, even if download
1389
        // completion was reached during an earlier period of connectivity.
1390
        m_delay_uploads = true;
9,712✔
1391
    }
9,712✔
1392

1393
    if (m_delay_uploads) {
11,738✔
1394
        // Request download completion notification
1395
        ++m_target_download_mark;
9,764✔
1396
    }
9,764✔
1397

1398
    if (!m_suspended) {
11,738✔
1399
        // Ready to send BIND message
1400
        enlist_to_send(); // Throws
11,738✔
1401
    }
11,738✔
1402
}
11,738✔
1403

1404
// The caller (Connection) must discard the session if the session has become
1405
// deactivated upon return.
1406
inline void ClientImpl::Session::connection_lost()
1407
{
4,460✔
1408
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
4,460✔
1409
    // If the deactivation process has been initiated, it can now be immediately
1410
    // completed.
1411
    if (m_state == Deactivating) {
4,460✔
1412
        complete_deactivation(); // Throws
2,256✔
1413
        REALM_ASSERT(m_state == Deactivated);
2,256✔
1414
        return;
2,256✔
1415
    }
2,256✔
1416
    reset_protocol_state();
2,204✔
1417
}
2,204✔
1418

1419
// The caller (Connection) must discard the session if the session has become
1420
// deactivated upon return.
1421
inline void ClientImpl::Session::message_sent()
1422
{
97,736✔
1423
    // Note that it is possible for this function to get called after the client
1424
    // has received a message sent by the server in reposnse to the message that
1425
    // the client has just finished sending.
1426

1427
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
97,736✔
1428

1429
    // No message will be sent after the UNBIND message
1430
    REALM_ASSERT(!m_unbind_message_send_complete);
97,736✔
1431

1432
    // If the client reset config structure is populated, then try to perform
1433
    // the client reset diff once the BIND message has been sent successfully
1434
    if (m_bind_message_sent && m_state == Active && get_client_reset_config()) {
97,742✔
1435
        client_reset_if_needed();
376✔
1436
        // Ready to send the IDENT message
1437
        ensure_enlisted_to_send(); // Throws
376✔
1438
    }
376✔
1439

1440
    if (m_unbind_message_sent) {
97,736✔
1441
        REALM_ASSERT(!m_enlisted_to_send);
5,888✔
1442

1443
        // If the sending of the UNBIND message has been initiated, this must be
1444
        // the time when the sending of that message completes.
1445
        m_unbind_message_send_complete = true;
5,888✔
1446

1447
        // Detect the completion of the unbinding process
1448
        if (m_error_message_received || m_unbound_message_received) {
5,888✔
1449
            // If the deactivation process has been initiated, it can now be
1450
            // immediately completed.
1451
            if (m_state == Deactivating) {
552✔
1452
                // Life cycle state is Deactivating
1453
                complete_deactivation(); // Throws
128✔
1454
                // Life cycle state is now Deactivated
1455
                return;
128✔
1456
            }
128✔
1457

1458
            // The session is still in the Active state, so initiate the
1459
            // rebinding process if the session is no longer suspended.
1460
            if (!m_suspended)
424✔
1461
                initiate_rebind(); // Throws
14✔
1462
        }
424✔
1463
    }
5,888✔
1464
}
97,736✔
1465

1466
inline void ClientImpl::Session::initiate_rebind()
1467
{
40✔
1468
    // Life cycle state must be Active
1469
    REALM_ASSERT(m_state == Active);
40✔
1470

1471
    REALM_ASSERT(!m_suspended);
40✔
1472
    REALM_ASSERT(!m_enlisted_to_send);
40✔
1473

1474
    reset_protocol_state();
40✔
1475

1476
    // Ready to send BIND message
1477
    enlist_to_send(); // Throws
40✔
1478
}
40✔
1479

1480
inline void ClientImpl::Session::reset_protocol_state() noexcept
1481
{
12,310✔
1482
    m_enlisted_to_send = false;
12,310✔
1483
    m_bind_message_sent = false;
12,310✔
1484
    m_error_to_send = false;
12,310✔
1485
    m_ident_message_sent = false;
12,310✔
1486
    m_unbind_message_sent = false;
12,310✔
1487
    m_unbind_message_send_complete = false;
12,310✔
1488
    m_error_message_received = false;
12,310✔
1489
    m_unbound_message_received = false;
12,310✔
1490
    m_client_error = util::none;
12,310✔
1491

1492
    m_upload_progress = m_progress.upload;
12,310✔
1493
    m_last_download_mark_sent = m_last_download_mark_received;
12,310✔
1494
}
12,310✔
1495

1496
inline void ClientImpl::Session::ensure_enlisted_to_send()
1497
{
93,996✔
1498
    if (!m_enlisted_to_send)
93,996✔
1499
        enlist_to_send(); // Throws
64,946✔
1500
}
93,996✔
1501

1502
// This function will never "commit suicide" despite the fact that it may
1503
// involve an invocation of send_message(), which in certain cases can lead to
1504
// the completion of the deactivation process, and if that did happen, it would
1505
// cause Connection::send_next_message() to destroy this session, but it does
1506
// not happen.
1507
//
1508
// If the session is already in the Deactivating state, send_message() will
1509
// complete the deactivation process immediately when, and only when the BIND
1510
// message has not already been sent.
1511
//
1512
// Note however, that this function gets called when the establishment of the
1513
// connection completes, but at that time, the session cannot be in the
1514
// Deactivating state, because until the BIND message is sent, the deactivation
1515
// process will complete immediately. So the first invocation of this function
1516
// after establishemnt of the connection will not commit suicide.
1517
//
1518
// Note then, that the session will stay enlisted to send, until it gets to send
1519
// the BIND message, and since the and enlist_to_send() must not be called while
1520
// the session is enlisted, the next invocation of this function will be after
1521
// the BIND message has been sent, but then the deactivation process will no
1522
// longer be completed by send_message().
1523
inline void ClientImpl::Session::enlist_to_send()
1524
{
168,354✔
1525
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
168,354✔
1526
    REALM_ASSERT(!m_unbind_message_sent);
168,354✔
1527
    REALM_ASSERT(!m_enlisted_to_send);
168,354✔
1528
    m_enlisted_to_send = true;
168,354✔
1529
    m_conn.enlist_to_send(this); // Throws
168,354✔
1530
}
168,354✔
1531

1532
} // namespace realm::sync
1533

1534
#endif // REALM_NOINST_CLIENT_IMPL_BASE_HPP
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc