• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_2947

01 Dec 2023 08:08PM UTC coverage: 91.739% (+0.04%) from 91.695%
jonathan.reams_2947

Pull #7160

Evergreen

jbreams
allow handle_error to decide resumability
Pull Request #7160: Prevent resuming a session that has not been fully shut down

92428 of 169414 branches covered (0.0%)

315 of 349 new or added lines in 14 files covered. (90.26%)

80 existing lines in 14 files now uncovered.

232137 of 253041 relevant lines covered (91.74%)

6882826.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.62
/src/realm/sync/noinst/client_impl_base.hpp
1

2
#ifndef REALM_NOINST_CLIENT_IMPL_BASE_HPP
3
#define REALM_NOINST_CLIENT_IMPL_BASE_HPP
4

5
#include <realm/sync/client_base.hpp>
6

7
#include <realm/binary_data.hpp>
8
#include <realm/sync/history.hpp>
9
#include <realm/sync/network/default_socket.hpp>
10
#include <realm/sync/network/network_ssl.hpp>
11
#include <realm/sync/noinst/client_history_impl.hpp>
12
#include <realm/sync/noinst/migration_store.hpp>
13
#include <realm/sync/noinst/migration_store.hpp>
14
#include <realm/sync/noinst/protocol_codec.hpp>
15
#include <realm/sync/protocol.hpp>
16
#include <realm/sync/subscriptions.hpp>
17
#include <realm/sync/trigger.hpp>
18
#include <realm/util/buffer_stream.hpp>
19
#include <realm/util/logger.hpp>
20
#include <realm/util/optional.hpp>
21
#include <realm/util/span.hpp>
22

23
#include <cstdint>
24
#include <deque>
25
#include <functional>
26
#include <list>
27
#include <map>
28
#include <random>
29
#include <string>
30
#include <utility>
31

32
namespace realm::sync {
33

34
// (protocol, address, port, session_multiplex_ident)
35
//
36
// `protocol` is included for convenience, even though it is not strictly part
37
// of an endpoint.
38

39
struct ServerEndpoint {
40
    ProtocolEnvelope envelope;
41
    std::string address;
42
    network::Endpoint::port_type port;
43
    std::string user_id;
44
    SyncServerMode server_mode = SyncServerMode::PBS;
45

46
private:
47
    auto to_tuple() const
48
    {
46,314✔
49
        return std::make_tuple(server_mode, envelope, std::ref(address), port, std::ref(user_id));
46,314✔
50
    }
46,314✔
51

52
public:
53
    friend inline bool operator==(const ServerEndpoint& lhs, const ServerEndpoint& rhs)
54
    {
×
55
        return lhs.to_tuple() == rhs.to_tuple();
×
56
    }
×
57

58

59
    friend inline bool operator<(const ServerEndpoint& lhs, const ServerEndpoint& rhs)
60
    {
23,156✔
61
        return lhs.to_tuple() < rhs.to_tuple();
23,156✔
62
    }
23,156✔
63
};
64

65
class SessionWrapper;
66

67
class SessionWrapperStack {
68
public:
69
    bool empty() const noexcept;
70
    void push(util::bind_ptr<SessionWrapper>) noexcept;
71
    util::bind_ptr<SessionWrapper> pop() noexcept;
72
    void clear() noexcept;
73
    SessionWrapperStack() noexcept = default;
24,066✔
74
    SessionWrapperStack(SessionWrapperStack&&) noexcept;
75
    ~SessionWrapperStack();
76
    friend void swap(SessionWrapperStack& q_1, SessionWrapperStack& q_2) noexcept
77
    {
14,592✔
78
        std::swap(q_1.m_back, q_2.m_back);
14,592✔
79
    }
14,592✔
80

81
private:
82
    SessionWrapper* m_back = nullptr;
83
};
84

85
template <typename ErrorType, typename RandomEngine>
86
struct ErrorBackoffState {
87
    ErrorBackoffState() = default;
88
    explicit ErrorBackoffState(ResumptionDelayInfo default_delay_info, RandomEngine& random_engine)
89
        : default_delay_info(std::move(default_delay_info))
90
        , m_random_engine(random_engine)
91
    {
19,388✔
92
    }
19,388✔
93

94
    void update(ErrorType new_error, std::optional<ResumptionDelayInfo> new_delay_info)
95
    {
3,754✔
96
        if (triggering_error && *triggering_error == new_error) {
3,754✔
97
            return;
180✔
98
        }
180✔
99

1,734✔
100
        delay_info = new_delay_info.value_or(default_delay_info);
3,574✔
101
        cur_delay_interval = util::none;
3,574✔
102
        triggering_error = new_error;
3,574✔
103
    }
3,574✔
104

105
    void reset()
106
    {
1,746✔
107
        triggering_error = util::none;
1,746✔
108
        cur_delay_interval = util::none;
1,746✔
109
        delay_info = default_delay_info;
1,746✔
110
    }
1,746✔
111

112
    std::chrono::milliseconds delay_interval()
113
    {
634✔
114
        if (!cur_delay_interval) {
634✔
115
            cur_delay_interval = delay_info.resumption_delay_interval;
408✔
116
            return jitter_value(*cur_delay_interval);
408✔
117
        }
408✔
118
        if (*cur_delay_interval == delay_info.max_resumption_delay_interval) {
226✔
119
            return jitter_value(delay_info.max_resumption_delay_interval);
6✔
120
        }
6✔
121
        auto safe_delay_interval = cur_delay_interval->count();
220✔
122
        if (util::int_multiply_with_overflow_detect(safe_delay_interval,
220✔
123
                                                    delay_info.resumption_delay_backoff_multiplier)) {
137✔
124
            *cur_delay_interval = delay_info.max_resumption_delay_interval;
×
125
        }
×
126
        else {
220✔
127
            *cur_delay_interval =
220✔
128
                std::min(delay_info.max_resumption_delay_interval, std::chrono::milliseconds(safe_delay_interval));
220✔
129
        }
220✔
130
        return jitter_value(*cur_delay_interval);
220✔
131
    }
220✔
132

133
    ResumptionDelayInfo default_delay_info;
134
    ResumptionDelayInfo delay_info;
135
    util::Optional<std::chrono::milliseconds> cur_delay_interval;
136
    util::Optional<ErrorType> triggering_error;
137

138
private:
139
    std::chrono::milliseconds jitter_value(std::chrono::milliseconds value)
140
    {
634✔
141
        if (delay_info.delay_jitter_divisor == 0) {
634✔
142
            return value;
20✔
143
        }
20✔
144
        const auto max_jitter = value.count() / delay_info.delay_jitter_divisor;
614✔
145
        auto distr = std::uniform_int_distribution<std::chrono::milliseconds::rep>(0, max_jitter);
614✔
146
        std::chrono::milliseconds randomized_deduction(distr(m_random_engine.get()));
614✔
147
        return value - randomized_deduction;
614✔
148
    }
614✔
149

150
    std::reference_wrapper<RandomEngine> m_random_engine;
151
};
152

153
class ClientImpl {
154
public:
155
    enum class ConnectionTerminationReason;
156
    class Connection;
157
    class Session;
158

159
    using port_type = network::Endpoint::port_type;
160
    using OutputBuffer = util::ResettableExpandableBufferOutputStream;
161
    using ClientProtocol = _impl::ClientProtocol;
162
    using RandomEngine = std::mt19937_64;
163

164
    /// Per-server endpoint information used to determine reconnect delays.
165
    class ReconnectInfo {
166
    public:
167
        explicit ReconnectInfo(ReconnectMode mode, ResumptionDelayInfo default_delay_info, RandomEngine& random)
168
            : m_reconnect_mode(mode)
169
            , m_backoff_state(default_delay_info, random)
170
        {
9,694✔
171
        }
9,694✔
172

173
        void reset() noexcept;
174
        void update(ConnectionTerminationReason reason, std::optional<ResumptionDelayInfo> new_delay_info);
175
        std::chrono::milliseconds delay_interval();
176

177
        const std::optional<ConnectionTerminationReason> reason() const noexcept
178
        {
×
179
            return m_backoff_state.triggering_error;
×
180
        }
×
181

182
        // Set this flag to true to schedule a postponed invocation of reset(). See
183
        // Connection::cancel_reconnect_delay() for details and rationale.
184
        //
185
        // Will be set back to false when a PONG message arrives, and the
186
        // corresponding PING message was sent while `m_scheduled_reset` was
187
        // true. See receive_pong().
188
        bool scheduled_reset = false;
189

190
    private:
191
        ReconnectMode m_reconnect_mode;
192
        ErrorBackoffState<ConnectionTerminationReason, RandomEngine> m_backoff_state;
193
    };
194

195
    static constexpr milliseconds_type default_connect_timeout = 120000;        // 2 minutes
196
    static constexpr milliseconds_type default_connection_linger_time = 30000;  // 30 seconds
197
    static constexpr milliseconds_type default_ping_keepalive_period = 60000;   // 1 minute
198
    static constexpr milliseconds_type default_pong_keepalive_timeout = 120000; // 2 minutes
199
    static constexpr milliseconds_type default_fast_reconnect_limit = 60000;    // 1 minute
200

201
    const std::shared_ptr<util::Logger> logger_ptr;
202
    util::Logger& logger;
203

204
    ClientImpl(ClientConfig);
205
    ~ClientImpl();
206

207
    static constexpr int get_oldest_supported_protocol_version() noexcept;
208

209
    void shutdown() noexcept;
210

211
    void shutdown_and_wait();
212

213
    const std::string& get_user_agent_string() const noexcept;
214
    ReconnectMode get_reconnect_mode() const noexcept;
215
    bool is_dry_run() const noexcept;
216

217
    // Functions to post onto the event loop and create an event loop timer using the
218
    // SyncSocketProvider
219
    void post(SyncSocketProvider::FunctionHandler&& handler);
220
    SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
221
                                               SyncSocketProvider::FunctionHandler&& handler);
222
    using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
223
    SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
224

225
    RandomEngine& get_random() noexcept;
226

227
    /// Returns false if the specified URL is invalid.
228
    bool decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
229
                              port_type& port, std::string& path) const;
230

231
    void cancel_reconnect_delay();
232
    bool wait_for_session_terminations_or_client_stopped();
233
    void voluntary_disconnect_all_connections();
234

235
private:
236
    using connection_ident_type = std::int_fast64_t;
237

238
    const ReconnectMode m_reconnect_mode; // For testing purposes only
239
    const milliseconds_type m_connect_timeout;
240
    const milliseconds_type m_connection_linger_time;
241
    const milliseconds_type m_ping_keepalive_period;
242
    const milliseconds_type m_pong_keepalive_timeout;
243
    const milliseconds_type m_fast_reconnect_limit;
244
    const ResumptionDelayInfo m_reconnect_backoff_info;
245
    const bool m_disable_upload_activation_delay;
246
    const bool m_dry_run; // For testing purposes only
247
    const bool m_enable_default_port_hack;
248
    const bool m_disable_upload_compaction;
249
    const bool m_fix_up_object_ids;
250
    const std::function<RoundtripTimeHandler> m_roundtrip_time_handler;
251
    const std::string m_user_agent_string;
252
    std::shared_ptr<SyncSocketProvider> m_socket_provider;
253
    ClientProtocol m_client_protocol;
254
    session_ident_type m_prev_session_ident = 0;
255
    const bool m_one_connection_per_session;
256

257
    RandomEngine m_random;
258
    SyncTrigger m_actualize_and_finalize;
259

260
    // Note: There is one server slot per server endpoint (hostname, port,
261
    // session_multiplex_ident), and it survives from one connection object to
262
    // the next, which is important because it carries information about a
263
    // possible reconnect delay applying to the new connection object (server
264
    // hammering protection).
265
    //
266
    // Note: Due to a particular load balancing scheme that is currently in use,
267
    // every session is forced to open a seperate connection (via abuse of
268
    // `m_one_connection_per_session`, which is only intended for testing
269
    // purposes). This disables part of the hammering protection scheme built in
270
    // to the client.
271
    struct ServerSlot {
272
        explicit ServerSlot(ReconnectInfo reconnect_info)
273
            : reconnect_info(std::move(reconnect_info))
274
        {
2,402✔
275
        }
2,402✔
276

277
        ReconnectInfo reconnect_info; // Applies exclusively to `connection`.
278
        std::unique_ptr<ClientImpl::Connection> connection;
279

280
        // Used instead of `connection` when `m_one_connection_per_session` is
281
        // true.
282
        std::map<connection_ident_type, std::unique_ptr<ClientImpl::Connection>> alt_connections;
283
    };
284

285
    // Must be accessed only by event loop thread
286
    std::map<ServerEndpoint, ServerSlot> m_server_slots;
287

288
    // Must be accessed only by event loop thread
289
    connection_ident_type m_prev_connection_ident = 0;
290

291
    std::mutex m_drain_mutex;
292
    std::condition_variable m_drain_cv;
293
    bool m_drained = false;
294
    uint64_t m_outstanding_posts = 0;
295
    uint64_t m_num_connections = 0;
296

297
    std::mutex m_mutex;
298

299
    bool m_stopped = false;                       // Protected by `m_mutex`
300
    bool m_sessions_terminated = false;           // Protected by `m_mutex`
301
    bool m_actualize_and_finalize_needed = false; // Protected by `m_mutex`
302

303
    // The set of session wrappers that are not yet wrapping a session object,
304
    // and are not yet abandoned (still referenced by the application).
305
    //
306
    // Protected by `m_mutex`.
307
    std::map<SessionWrapper*, ServerEndpoint> m_unactualized_session_wrappers;
308

309
    // The set of session wrappers that were successfully actualized, but are
310
    // now abandoned (no longer referenced by the application), and have not yet
311
    // been finalized. Order in queue is immaterial.
312
    //
313
    // Protected by `m_mutex`.
314
    SessionWrapperStack m_abandoned_session_wrappers;
315

316
    // Protected by `m_mutex`.
317
    std::condition_variable m_wait_or_client_stopped_cond;
318

319
    void register_unactualized_session_wrapper(SessionWrapper*, ServerEndpoint);
320
    void register_abandoned_session_wrapper(util::bind_ptr<SessionWrapper>) noexcept;
321
    void actualize_and_finalize_session_wrappers();
322

323
    // Get or create a connection. If a connection exists for the specified
324
    // endpoint, it will be returned, otherwise a new connection will be
325
    // created. If `m_one_connection_per_session` is true (testing only), a new
326
    // connection will be created every time.
327
    //
328
    // Must only be accessed from event loop thread.
329
    //
330
    // FIXME: Passing these SSL parameters here is confusing at best, since they
331
    // are ignored if a connection is already available for the specified
332
    // endpoint. Also, there is no way to check that all the specified SSL
333
    // parameters are in agreement with a preexisting connection. A better
334
    // approach would be to allow for per-endpoint SSL parameters to be
335
    // specifiable through public member functions of ClientImpl from where they
336
    // could then be picked up as new connections are created on demand.
337
    //
338
    // FIXME: `session_multiplex_ident` should be eliminated from ServerEndpoint
339
    // as it effectively disables part of the hammering protection scheme if it
340
    // is used to ensure that each session gets a separate connection. With the
341
    // alternative approach outlined in the previous FIXME (specify per endpoint
342
    // SSL parameters at the client object level), there seems to be no more use
343
    // for `session_multiplex_ident`.
344
    ClientImpl::Connection& get_connection(ServerEndpoint, const std::string& authorization_header_name,
345
                                           const std::map<std::string, std::string>& custom_http_headers,
346
                                           bool verify_servers_ssl_certificate,
347
                                           util::Optional<std::string> ssl_trust_certificate_path,
348
                                           std::function<SyncConfig::SSLVerifyCallback>,
349
                                           util::Optional<SyncConfig::ProxyConfig>, bool& was_created);
350

351
    // Destroys the specified connection.
352
    void remove_connection(ClientImpl::Connection&) noexcept;
353

354
    void drain_connections();
355
    void drain_connections_on_loop();
356

357
    session_ident_type get_next_session_ident() noexcept;
358

359
    friend class ClientImpl::Connection;
360
    friend class SessionWrapper;
361
};
362

363
constexpr int ClientImpl::get_oldest_supported_protocol_version() noexcept
364
{
16,112✔
365
    // See get_current_protocol_version() for information about the
7,888✔
366
    // individual protocol versions.
7,888✔
367
    return 2;
16,112✔
368
}
16,112✔
369

370
static_assert(ClientImpl::get_oldest_supported_protocol_version() >= 1, "");
371
static_assert(ClientImpl::get_oldest_supported_protocol_version() <= get_current_protocol_version(), "");
372

373

374
/// Information about why a connection (or connection initiation attempt) was
375
/// terminated. This is used to determinte the delay until the next connection
376
/// initiation attempt.
377
enum class ClientImpl::ConnectionTerminationReason {
378
    connect_operation_failed,          ///< Failure during connect operation
379
    closed_voluntarily,                ///< Voluntarily closed or connection operation canceled
380
    read_or_write_error,               ///< Read/write error after successful TCP connect operation
381
    ssl_certificate_rejected,          ///< Client rejected the SSL certificate of the server
382
    ssl_protocol_violation,            ///< A violation of the SSL protocol
383
    websocket_protocol_violation,      ///< A violation of the WebSocket protocol
384
    http_response_says_fatal_error,    ///< Status code in HTTP response says "fatal error"
385
    http_response_says_nonfatal_error, ///< Status code in HTTP response says "nonfatal error"
386
    bad_headers_in_http_response,      ///< Missing or bad headers in HTTP response
387
    sync_protocol_violation,           ///< Client received a bad message from the server
388
    sync_connect_timeout,              ///< Sync connection was not fully established in time
389
    server_said_try_again_later,       ///< Client received ERROR message with try_again=yes
390
    server_said_do_not_reconnect,      ///< Client received ERROR message with try_again=no
391
    pong_timeout,                      ///< Client did not receive PONG after PING
392

393
    /// The application requested a feature that is unavailable in the
394
    /// negotiated protocol version.
395
    missing_protocol_feature,
396
};
397

398

399
/// All use of connection objects, including construction and destruction, must
400
/// occur on behalf of the event loop thread of the associated client object.
401

402
// TODO: The parent will be updated to WebSocketObserver once the WebSocket integration is complete
403
class ClientImpl::Connection {
404
public:
405
    using connection_ident_type = std::int_fast64_t;
406
    using SSLVerifyCallback = SyncConfig::SSLVerifyCallback;
407
    using ProxyConfig = SyncConfig::ProxyConfig;
408
    using ReconnectInfo = ClientImpl::ReconnectInfo;
409

410
    std::shared_ptr<util::Logger> logger_ptr;
411
    util::Logger& logger;
412

413
    ClientImpl& get_client() noexcept;
414
    ReconnectInfo get_reconnect_info() const noexcept;
415
    ClientProtocol& get_client_protocol() noexcept;
416

417
    /// Activate this connection object. No attempt is made to establish a
418
    /// connection before the connection object is activated.
419
    void activate();
420

421
    /// Activate the specified session.
422
    ///
423
    /// Prior to being activated, no messages will be sent or received on behalf
424
    /// of this session, and the associated Realm file will not be accessed,
425
    /// i.e., `Session::get_db()` will not be called.
426
    ///
427
    /// If activation is successful, the connection keeps the session alive
428
    /// until the application calls initiated_session_deactivation() or until
429
    /// the application destroys the connection object, whichever comes first.
430
    void activate_session(std::unique_ptr<Session>);
431

432
    /// Initiate the deactivation process which eventually (or immediately)
433
    /// leads to destruction of this session object.
434
    ///
435
    /// IMPORTANT: The session object may get destroyed before this function
436
    /// returns.
437
    ///
438
    /// The deactivation process must be considered initiated even if this
439
    /// function throws.
440
    ///
441
    /// The deactivation process is guaranteed to not be initiated until the
442
    /// application calls this function. So from the point of view of the
443
    /// application, after successful activation, a pointer to a session object
444
    /// remains valid until the application calls
445
    /// initiate_session_deactivation().
446
    ///
447
    /// After the initiation of the deactivation process, the associated Realm
448
    /// file will no longer be accessed, i.e., `get_db()` will not be called
449
    /// again, and a previously returned reference will also not be accessed
450
    /// again.
451
    ///
452
    /// The initiation of the deactivation process must be preceded by a
453
    /// successful invocation of activate_session(). It is an error to call
454
    /// initiate_session_deactivation() twice.
455
    void initiate_session_deactivation(Session*);
456

457
    /// Cancel the reconnect delay for this connection, if one is currently in
458
    /// effect. If a reconnect delay is not currently in effect, ensure that the
459
    /// delay before the next reconnection attempt will be canceled. This is
460
    /// necessary as an apparently established connection, or ongoing connection
461
    /// attempt can be about to fail for a reason that precedes the invocation
462
    /// of this function.
463
    ///
464
    /// It is an error to call this function before the connection has been
465
    /// activated.
466
    void cancel_reconnect_delay();
467

468
    void force_close();
469

470
    /// Returns zero until the HTTP response is received. After that point in
471
    /// time, it returns the negotiated protocol version, which is based on the
472
    /// contents of the `Sec-WebSocket-Protocol` header in the HTTP
473
    /// response. The negotiated protocol version is guaranteed to be greater
474
    /// than or equal to get_oldest_supported_protocol_version(), and be less
475
    /// than or equal to get_current_protocol_version().
476
    int get_negotiated_protocol_version() noexcept;
477

478
    // Methods from WebSocketObserver interface for websockets from the Socket Provider
479
    void websocket_connected_handler(const std::string& protocol);
480
    bool websocket_binary_message_received(util::Span<const char> data);
481
    void websocket_error_handler();
482
    bool websocket_closed_handler(bool, websocket::WebSocketError, std::string_view msg);
483

484
    connection_ident_type get_ident() const noexcept;
485
    const ServerEndpoint& get_server_endpoint() const noexcept;
486
    ConnectionState get_state() const noexcept;
487
    SyncServerMode get_sync_server_mode() const noexcept;
488
    bool is_flx_sync_connection() const noexcept;
489

490
    void update_connect_info(const std::string& http_request_path_prefix, const std::string& signed_access_token);
491

492
    void resume_active_sessions();
493

494
    void voluntary_disconnect();
495

496
    std::string get_active_appservices_connection_id();
497

498
    Connection(ClientImpl&, connection_ident_type, ServerEndpoint, const std::string& authorization_header_name,
499
               const std::map<std::string, std::string>& custom_http_headers, bool verify_servers_ssl_certificate,
500
               util::Optional<std::string> ssl_trust_certificate_path, std::function<SSLVerifyCallback>,
501
               util::Optional<ProxyConfig>, ReconnectInfo);
502

503
    ~Connection();
504

505
private:
506
    struct LifecycleSentinel : public util::AtomicRefCountBase {
507
        bool destroyed = false;
508
    };
509
    struct WebSocketObserverShim;
510

511
    using ReceivedChangesets = ClientProtocol::ReceivedChangesets;
512

513
    template <class H>
514
    void for_each_active_session(H handler);
515

516
    /// \brief Called when the connection becomes idle.
517
    ///
518
    /// The connection is considered idle when all of the following conditions
519
    /// are true:
520
    ///
521
    /// - The connection is activated.
522
    ///
523
    /// - The connection has no sessions in the Active state.
524
    ///
525
    /// - The connection is closed (in the disconnected state).
526
    ///
527
    /// From the point of view of this class, an overriding function is allowed
528
    /// to commit suicide (`delete this`).
529
    ///
530
    /// The default implementation of this function does nothing.
531
    ///
532
    /// This function is always called by the event loop thread of the
533
    /// associated client object.
534
    void on_idle();
535

536
    std::string get_http_request_path() const;
537

538
    void initiate_reconnect_wait();
539
    void handle_reconnect_wait(Status status);
540
    void initiate_reconnect();
541
    void initiate_connect_wait();
542
    void handle_connect_wait(Status status);
543

544
    void handle_connection_established();
545
    void schedule_urgent_ping();
546
    void initiate_ping_delay(milliseconds_type now);
547
    void handle_ping_delay();
548
    void initiate_pong_timeout();
549
    void handle_pong_timeout();
550
    void initiate_write_message(const OutputBuffer&, Session*);
551
    void handle_write_message();
552
    void send_next_message();
553
    void send_ping();
554
    void initiate_write_ping(const OutputBuffer&);
555
    void handle_write_ping();
556
    void handle_message_received(util::Span<const char> data);
557
    void initiate_disconnect_wait();
558
    void handle_disconnect_wait(Status status);
559
    void close_due_to_protocol_error(Status status);
560
    void close_due_to_client_side_error(Status, IsFatal is_fatal, ProtocolErrorInfo::Action error_action,
561
                                        ConnectionTerminationReason reason);
562
    void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
563
    void close_due_to_server_side_error(ProtocolError, const ProtocolErrorInfo& info);
564
    void involuntary_disconnect(const SessionErrorInfo& info, ConnectionTerminationReason reason);
565
    void disconnect(const SessionErrorInfo& info);
566
    void change_state_to_disconnected() noexcept;
567
    // These are only called from ClientProtocol class.
568
    void receive_pong(milliseconds_type timestamp);
569
    void receive_error_message(const ProtocolErrorInfo& info, session_ident_type);
570
    void receive_query_error_message(int error_code, std::string_view message, int64_t query_version,
571
                                     session_ident_type);
572
    void receive_ident_message(session_ident_type, SaltedFileIdent);
573
    void receive_download_message(session_ident_type, const SyncProgress&, std::uint_fast64_t downloadable_bytes,
574
                                  int64_t query_version, DownloadBatchState batch_state, const ReceivedChangesets&);
575
    void receive_mark_message(session_ident_type, request_ident_type);
576
    void receive_unbound_message(session_ident_type);
577
    void receive_test_command_response(session_ident_type, request_ident_type, std::string_view body);
578
    void receive_server_log_message(session_ident_type, util::Logger::Level, std::string_view body);
579
    void receive_appservices_request_id(std::string_view coid);
580
    void handle_protocol_error(Status status);
581

582
    // These are only called from Session class.
583
    void enlist_to_send(Session*);
584
    void one_more_active_unsuspended_session();
585
    void one_less_active_unsuspended_session();
586
    void finish_session_deactivation(Session* sess);
587

588
    OutputBuffer& get_output_buffer() noexcept;
589
    Session* get_session(session_ident_type) const noexcept;
590
    Session* find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept;
591
    static bool was_voluntary(ConnectionTerminationReason) noexcept;
592

593
    static std::string make_logger_prefix(connection_ident_type);
594

595
    void report_connection_state_change(ConnectionState, util::Optional<SessionErrorInfo> error_info = util::none);
596

597
    friend ClientProtocol;
598
    friend class Session;
599

600
    ClientImpl& m_client;
601
    util::bind_ptr<LifecycleSentinel> m_websocket_sentinel;
602
    std::unique_ptr<WebSocketInterface> m_websocket;
603

604
    /// DEPRECATED - These will be removed in a future release
605
    const bool m_verify_servers_ssl_certificate;
606
    const util::Optional<std::string> m_ssl_trust_certificate_path;
607
    const std::function<SSLVerifyCallback> m_ssl_verify_callback;
608
    const util::Optional<ProxyConfig> m_proxy_config;
609

610
    ReconnectInfo m_reconnect_info;
611
    int m_negotiated_protocol_version = 0;
612

613
    ConnectionState m_state = ConnectionState::disconnected;
614

615
    std::size_t m_num_active_unsuspended_sessions = 0;
616
    std::size_t m_num_active_sessions = 0;
617
    ClientImpl::SyncTrigger m_on_idle;
618

619
    // activate() has been called
620
    bool m_activated = false;
621

622
    // A reconnect delay is in progress
623
    bool m_reconnect_delay_in_progress = false;
624

625
    // Has no meaning when m_reconnect_delay_in_progress is false.
626
    bool m_nonzero_reconnect_delay = false;
627

628
    // A disconnect (linger) delay is in progress. This is for keeping the
629
    // connection open for a while after there are no more active unsuspended
630
    // sessions.
631
    bool m_disconnect_delay_in_progress = false;
632

633
    bool m_disconnect_has_occurred = false;
634

635
    // A message is currently being sent, i.e., the sending of a message has
636
    // been initiated, but not yet completed.
637
    bool m_sending = false;
638

639
    bool m_ping_delay_in_progress = false;
640
    bool m_waiting_for_pong = false;
641
    bool m_send_ping = false;
642
    bool m_minimize_next_ping_delay = false;
643
    bool m_ping_after_scheduled_reset_of_reconnect_info = false;
644

645
    // At least one PING message was sent since connection was established
646
    bool m_ping_sent = false;
647

648
    bool m_websocket_error_received = false;
649

650
    bool m_force_closed = false;
651

652
    // The timer will be constructed on demand, and will only be destroyed when
653
    // canceling a reconnect or disconnect delay.
654
    //
655
    // It is necessary to destroy and recreate the timer when canceling a wait
656
    // operation, because the next wait operation might need to be initiated
657
    // before the completion handler of the previous canceled wait operation
658
    // starts executing. Such an overlap is not allowed for wait operations on
659
    // the same timer instance.
660
    SyncSocketProvider::SyncTimer m_reconnect_disconnect_timer;
661

662
    // Timer for connect operation watchdog. For why this timer is optional, see
663
    // `m_reconnect_disconnect_timer`.
664
    SyncSocketProvider::SyncTimer m_connect_timer;
665

666
    // This timer is used to schedule the sending of PING messages, and as a
667
    // watchdog for timely reception of PONG messages. For why this timer is
668
    // optional, see `m_reconnect_disconnect_timer`.
669
    SyncSocketProvider::SyncTimer m_heartbeat_timer;
670

671
    milliseconds_type m_pong_wait_started_at = 0;
672
    milliseconds_type m_last_ping_sent_at = 0;
673

674
    // Round-trip time, in milliseconds, for last PING message for which a PONG
675
    // message has been received, or zero if no PONG message has been received.
676
    milliseconds_type m_previous_ping_rtt = 0;
677

678
    // Only valid when `m_disconnect_has_occurred` is true.
679
    milliseconds_type m_disconnect_time = 0;
680

681
    // The set of sessions associated with this connection. A session becomes
682
    // associated with a connection when it is activated.
683
    std::map<session_ident_type, std::unique_ptr<Session>> m_sessions;
684
    // Keep track of previously used sessions idents to see if a stale message was
685
    // received for a closed session
686
    std::unordered_set<session_ident_type> m_session_history;
687

688
    // A queue of sessions that have enlisted for an opportunity to send a
689
    // message to the server. Sessions will be served in the order that they
690
    // enlist. A session is only allowed to occur once in this queue. If the
691
    // connection is open, and the queue is not empty, and no message is
692
    // currently being written, the first session is taken out of the queue, and
693
    // then granted an opportunity to send a message.
694
    std::deque<Session*> m_sessions_enlisted_to_send;
695

696
    Session* m_sending_session = nullptr;
697

698
    std::unique_ptr<char[]> m_input_body_buffer;
699
    OutputBuffer m_output_buffer;
700

701
    const connection_ident_type m_ident;
702
    const ServerEndpoint m_server_endpoint;
703
    std::string m_appservices_coid;
704

705
    /// DEPRECATED - These will be removed in a future release
706
    const std::string m_authorization_header_name;
707
    const std::map<std::string, std::string> m_custom_http_headers;
708

709
    std::string m_http_request_path_prefix;
710
    std::string m_signed_access_token;
711
};
712

713

714
/// A synchronization session between a local and a remote Realm file.
715
///
716
/// All use of session objects, including construction and destruction, must
717
/// occur on the event loop thread of the associated client object.
718
class ClientImpl::Session {
719
public:
720
    using ReceivedChangesets = ClientProtocol::ReceivedChangesets;
721

722
    std::shared_ptr<util::Logger> logger_ptr;
723
    util::Logger& logger;
724

725
    ClientImpl& get_client() noexcept;
726
    Connection& get_connection() noexcept;
727
    session_ident_type get_ident() const noexcept;
728

729
    /// Inform this client about new changesets in the history.
730
    ///
731
    /// The type of the version specified here is the one that identifies an
732
    /// entry in the sync history. Whether this is the same as the snapshot
733
    /// version of the Realm depends on the history implementation.
734
    ///
735
    /// The application is supposed to call this function to inform the client
736
    /// about a new version produced by a transaction that was not performed on
737
    /// behalf of this client. If the application does not call this function,
738
    /// the client will not discover and upload new changesets in a timely
739
    /// manner.
740
    ///
741
    /// It is an error to call this function before activation of the session,
742
    /// or after initiation of deactivation.
743
    void recognize_sync_version(version_type);
744

745
    /// \brief Request notification when all changesets in the local history
746
    /// have been uploaded to the server.
747
    ///
748
    /// When uploading completes, on_upload_completion() will be called by the
749
    /// thread that processes the event loop (as long as such a thread exists).
750
    ///
751
    /// IMPORTANT: on_upload_completion() may get called before
752
    /// request_upload_completion_notification() returns (reentrant callback).
753
    ///
754
    /// If request_upload_completion_notification() is called while a previously
755
    /// requested completion notification has not yet occurred, the previous
756
    /// request is canceled and the corresponding notification will never
757
    /// occur. This ensure that there is no ambiguity about the meaning of each
758
    /// completion notification.
759
    ///
760
    /// The application must be prepared for "spurious" invocations of
761
    /// on_upload_completion() before the client's first invocation of
762
    /// request_upload_completion_notification(), or after a previous invocation
763
    /// of on_upload_completion(), as long as it is before the subsequent
764
    /// invocation by the client of
765
    /// request_upload_completion_notification(). This is possible because the
766
    /// client reserves the right to request upload completion notifications
767
    /// internally.
768
    ///
769
    /// Upload is considered complete when all changesets in the history, that
770
    /// are supposed to be uploaded, and that precede `current_client_version`,
771
    /// have been uploaded and acknowledged by the
772
    /// server. `current_client_version` is generally the version that refers to
773
    /// the last changeset in the history, but more precisely, it may be any
774
    /// version between the last version reported by the application through
775
    /// recognize_sync_version() and the version referring to the last history
776
    /// entry (both ends inclusive).
777
    ///
778
    /// If new changesets are added to the history while a previously requested
779
    /// completion notification has not yet occurred, it is unspecified whether
780
    /// the addition of those changesets will cause `current_client_version` to
781
    /// be bumped or stay fixed, regardless of whether they are advertised via
782
    /// recognize_sync_version().
783
    ///
784
    /// It is an error to call this function before activation of the session,
785
    /// or after initiation of deactivation.
786
    void request_upload_completion_notification();
787

788
    /// \brief Request notification when all changesets currently avaialble on
789
    /// the server have been downloaded.
790
    ///
791
    /// When downloading completes, on_download_completion() will be called by
792
    /// the thread that processes the event loop (as long as such a thread
793
    /// exists).
794
    ///
795
    /// If request_download_completion_notification() is called while a
796
    /// previously requested completion notification has not yet occurred, the
797
    /// previous request is canceled and the corresponding notification will
798
    /// never occur. This ensure that there is no ambiguity about the meaning of
799
    /// each completion notification.
800
    ///
801
    /// The application must be prepared for "spurious" invocations of
802
    /// on_download_completion() before the client's first invocation of
803
    /// request_download_completion_notification(), or after a previous
804
    /// invocation of on_download_completion(), as long as it is before the
805
    /// subsequent invocation by the client of
806
    /// request_download_completion_notification(). This is possible because the
807
    /// client reserves the right to request download completion notifications
808
    /// internally.
809
    ///
810
    /// Download is considered complete when all changesets in the server-side
811
    /// history, that are supposed to be downloaded, and that precede
812
    /// `current_server_version`, have been downloaded and integrated into the
813
    /// local history. `current_server_version` is the version that refers to
814
    /// the last changeset in the server-side history at the time the server
815
    /// receives the first MARK message that is sent by the client after the
816
    /// invocation of request_download_completion_notification().
817
    ///
818
    /// Every invocation of request_download_completion_notification() will
819
    /// cause a new MARK message to be sent to the server, to redetermine
820
    /// `current_server_version`.
821
    ///
822
    /// It is an error to call this function before activation of the session,
823
    /// or after initiation of deactivation.
824
    void request_download_completion_notification();
825

826
    /// \brief Gets the subscription store associated with this Session.
827
    SubscriptionStore* get_flx_subscription_store();
828

829
    /// \brief Gets the migration store associated with this Session.
830
    MigrationStore* get_migration_store();
831

832
    /// Update internal client state when a flx subscription becomes complete outside
833
    /// of the normal sync process. This can happen during client reset.
834
    void on_flx_sync_version_complete(int64_t version);
835

836
    /// If this session is currently suspended, resume it immediately.
837
    ///
838
    /// It is an error to call this function before activation of the session,
839
    /// or after initiation of deactivation.
840
    void cancel_resumption_delay();
841

842
    /// To be used in connection with implementations of
843
    /// initiate_integrate_changesets().
844
    void integrate_changesets(const SyncProgress&, std::uint_fast64_t downloadable_bytes, const ReceivedChangesets&,
845
                              VersionInfo&, DownloadBatchState last_in_batch);
846

847
    /// To be used in connection with implementations of
848
    /// initiate_integrate_changesets().
849
    ///
850
    /// If \a success is true, the value of \a error does not matter. If \a
851
    /// success is false, the values of \a client_version and \a
852
    /// download_progress do not matter.
853
    ///
854
    /// It is an error to call this function before activation of the session
855
    /// (Connection::activate_session()), or after initiation of deactivation
856
    /// (Connection::initiate_session_deactivation()).
857
    void on_changesets_integrated(version_type client_version, const SyncProgress& progress);
858

859
    void on_integration_failure(const IntegrationException& e);
860

861
    void on_connection_state_changed(ConnectionState, const util::Optional<SessionErrorInfo>&);
862

863
    /// The application must ensure that the new session object is either
864
    /// activated (Connection::activate_session()) or destroyed before the
865
    /// specified connection object is destroyed.
866
    ///
867
    /// The specified transaction reporter (via the config object) is guaranteed
868
    /// to not be called before activation, and also not after initiation of
869
    /// deactivation.
870
    Session(SessionWrapper&, ClientImpl::Connection&);
871
    ~Session();
872

873
    void force_close();
874

875
    util::Future<std::string> send_test_command(std::string body);
876

877
private:
878
    struct PendingTestCommand {
879
        request_ident_type id;
880
        std::string body;
881
        util::Promise<std::string> promise;
882
        bool pending = true;
883
    };
884

885
    /// Fetch a reference to the remote virtual path of the Realm associated
886
    /// with this session.
887
    ///
888
    /// This function is always called by the event loop thread of the
889
    /// associated client object.
890
    ///
891
    /// This function is guaranteed to not be called before activation, and also
892
    /// not after initiation of deactivation.
893
    const std::string& get_virt_path() const noexcept;
894

895
    const std::string& get_realm_path() const noexcept;
896

897
    // Can only be called if the session is active or being activated
898
    DBRef get_db() const noexcept;
899
    ClientReplication& get_repl() const noexcept;
900
    ClientHistory& get_history() const noexcept;
901

902
    // client_reset_config() returns the config for client
903
    // reset. If it returns none, ordinary sync is used. If it returns a
904
    // Config::ClientReset, the session will be initiated with a state Realm
905
    // transfer from the server.
906
    util::Optional<ClientReset>& get_client_reset_config() noexcept;
907

908
    // Get the reason a synchronization session is used for (regular sync or client reset)
909
    // - Client reset state means the session is going to be used to download a fresh realm.
910
    SessionReason get_session_reason() noexcept;
911

912
    /// \brief Initiate the integration of downloaded changesets.
913
    ///
914
    /// This function must provide for the passed changesets (if any) to
915
    /// eventually be integrated, and without unnecessary delay. If no
916
    /// changesets are passed, the purpose of this function reduces to causing
917
    /// the current synchronization progress (SyncProgress) to be persisted.
918
    ///
919
    /// When all changesets have been integrated, and the synchronization
920
    /// progress has been persisted, this function must provide for
921
    /// on_changesets_integrated() to be called without unnecessary delay,
922
    /// although never after initiation of session deactivation.
923
    ///
924
    /// The implementation is allowed, but not obliged to aggregate changesets
925
    /// from multiple invocations of initiate_integrate_changesets() and pass
926
    /// them to ClientReplication::integrate_server_changesets() at once.
927
    ///
928
    /// The synchronization progress passed to
929
    /// ClientReplication::integrate_server_changesets() must be obtained
930
    /// by calling get_status(), and that call must occur after the last
931
    /// invocation of initiate_integrate_changesets() whose changesets are
932
    /// included in what is passed to
933
    /// ClientReplication::integrate_server_changesets().
934
    ///
935
    /// The download cursor passed to on_changesets_integrated() must be
936
    /// SyncProgress::download of the synchronization progress passed to the
937
    /// last invocation of
938
    /// ClientReplication::integrate_server_changesets().
939
    ///
940
    /// The default implementation integrates the specified changesets and calls
941
    /// on_changesets_integrated() immediately (i.e., from the event loop thread
942
    /// of the associated client object, and before
943
    /// initiate_integrate_changesets() returns), and via the history accessor
944
    /// made available by access_realm().
945
    ///
946
    /// This function is always called by the event loop thread of the
947
    /// associated client object, and on_changesets_integrated() must always be
948
    /// called by that thread too.
949
    ///
950
    /// This function is guaranteed to not be called before activation, and also
951
    /// not after initiation of deactivation.
952
    void initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
953
                                       const SyncProgress& progress, const ReceivedChangesets&);
954

955
    /// See request_upload_completion_notification().
956
    ///
957
    /// The default implementation does nothing.
958
    void on_upload_completion();
959

960
    /// See request_download_completion_notification().
961
    ///
962
    /// The default implementation does nothing.
963
    void on_download_completion();
964

965
    //@{
966
    /// These are called as the state of the session changes between
967
    /// "suspended" and "resumed". The initial state is
968
    /// always "resumed".
969
    ///
970
    /// A switch to the suspended state only happens when an error occurs,
971
    /// and information about that error is passed to on_suspended().
972
    ///
973
    /// The default implementations of these functions do nothing.
974
    ///
975
    /// These functions are always called by the event loop thread of the
976
    /// associated client object.
977
    ///
978
    /// These functions are guaranteed to not be called before activation, and also
979
    /// not after initiation of deactivation.
980
    void on_suspended(const SessionErrorInfo& error_info);
981
    void on_resumed();
982
    //@}
983

984
    void on_flx_sync_error(int64_t version, std::string_view err_msg);
985
    void on_flx_sync_progress(int64_t version, DownloadBatchState batch_state);
986

987
    // Processes an FLX download message, if it's a bootstrap message. If it's not a bootstrap
988
    // message then this is a noop and will return false. Otherwise this will return true
989
    // and no further processing of the download message should take place.
990
    bool process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
991
                                       int64_t query_version, const ReceivedChangesets& received_changesets);
992

993
    // Processes any pending FLX bootstraps, if one exists. Otherwise this is a noop.
994
    void process_pending_flx_bootstrap();
995

996
    bool client_reset_if_needed();
997
    void handle_pending_client_reset_acknowledgement();
998

999
    void update_subscription_version_info();
1000

1001
    void gather_pending_compensating_writes(util::Span<Changeset> changesets, std::vector<ProtocolErrorInfo>* out);
1002

1003
    void begin_resumption_delay(const ProtocolErrorInfo& error_info);
1004
    void clear_resumption_delay_state();
1005

1006
private:
1007
    Connection& m_conn;
1008
    const session_ident_type m_ident;
1009

1010
    // The states only transition in one direction, from left to right.
1011
    // The transition to Active happens very soon after construction, as soon as
1012
    // it is registered with the Connection.
1013
    // The transition from Deactivating to Deactivated state happens when the
1014
    // unbinding process completes (unbind_process_complete()).
1015
    enum State { Unactivated, Active, Deactivating, Deactivated };
1016
    State m_state = Unactivated;
1017

1018
    bool m_suspended = false;
1019

1020
    SyncSocketProvider::SyncTimer m_try_again_activation_timer;
1021
    ErrorBackoffState<sync::ProtocolError, RandomEngine> m_try_again_delay_info;
1022

1023
    // Set to true when download completion is reached. Set to false after a
1024
    // slow reconnect, such that the upload process will become suspended until
1025
    // download completion is reached again.
1026
    bool m_allow_upload = false;
1027

1028
    bool m_upload_completion_notification_requested = false;
1029

1030
    bool m_is_flx_sync_session = false;
1031

1032
    bool m_fix_up_object_ids = false;
1033

1034
    // These are reset when the session is activated, and again whenever the
1035
    // connection is lost or the rebinding process is initiated.
1036
    bool m_enlisted_to_send;
1037
    bool m_bind_message_sent;            // Sending of BIND message has been initiated
1038
    bool m_ident_message_sent;           // Sending of IDENT message has been initiated
1039
    bool m_unbind_message_sent;          // Sending of UNBIND message has been initiated
1040
    bool m_unbind_message_send_complete; // Sending of UNBIND message has been completed
1041
    bool m_error_message_received;       // Session specific ERROR message received
1042
    bool m_unbound_message_received;     // UNBOUND message received
1043
    bool m_error_to_send;
1044

1045
    // True when there is a new FLX sync query we need to send to the server.
1046
    util::Optional<SubscriptionStore::PendingSubscription> m_pending_flx_sub_set;
1047
    int64_t m_last_sent_flx_query_version = 0;
1048

1049
    std::deque<ProtocolErrorInfo> m_pending_compensating_write_errors;
1050

1051
    util::Optional<IntegrationException> m_client_error;
1052

1053
    // `ident == 0` means unassigned.
1054
    SaltedFileIdent m_client_file_ident = {0, 0};
1055

1056
    // True while this session is in the process of performing a client reset.
1057
    bool m_performing_client_reset = false;
1058

1059
    // The latest sync progress reported by the server via a DOWNLOAD
1060
    // message. See struct SyncProgress for a description. The values stored in
1061
    // `m_progress` either are persisted, or are about to be.
1062
    //
1063
    // Initialized by way of ClientHistory::get_status() at session
1064
    // activation time.
1065
    //
1066
    // `m_progress.upload.client_version` is the client-side sync version
1067
    // produced by the latest local changeset that has been acknowledged as
1068
    // integrated by the server.
1069
    SyncProgress m_progress;
1070

1071
    // In general, the local version produced by the last changeset in the local
1072
    // history. The changeset that produced this version may, or may not
1073
    // contain changes of local origin.
1074
    //
1075
    // It is set to the current version of the local Realm at session activation
1076
    // time (although always zero for the initial empty Realm
1077
    // state). Thereafter, it is updated when the application calls
1078
    // recognize_sync_version(), when changesets are received from the server
1079
    // and integrated locally, and when the uploading process discovers newer
1080
    // versions.
1081
    //
1082
    // INVARIANT: m_progress.upload.client_version <= m_last_version_available
1083
    version_type m_last_version_available = 0;
1084

1085
    // In general, this is the position in the history reached while scanning
1086
    // for changesets to be uploaded.
1087
    //
1088
    // Set to `m_progress.upload` at session activation time and whenever the
1089
    // connection to the server is lost. When the connection is established, the
1090
    // scanning for changesets to be uploaded then progresses from there towards
1091
    // `m_last_version_available`.
1092
    //
1093
    // INVARIANT: m_progress.upload.client_version <= m_upload_progress.client_version
1094
    UploadCursor m_upload_progress = {0, 0};
1095

1096
    // Set to `m_progress.upload.client_version` at session activation time and
1097
    // whenever the connection to the server is lost. Otherwise it is the
1098
    // version of the latest changeset that has been selected for upload while
1099
    // scanning the history.
1100
    //
1101
    // INVARIANT: m_progress.upload.client_version <= m_last_version_selected_for_upload
1102
    // INVARIANT: m_last_version_selected_for_upload <= m_upload_progress.client_version
1103
    version_type m_last_version_selected_for_upload = 0;
1104

1105
    // Same as `m_progress.download` but is updated only as the progress gets
1106
    // persisted.
1107
    DownloadCursor m_download_progress = {0, 0};
1108

1109
    // Used to implement download completion notifications. Set equal to
1110
    // `m_progress.download.server_version` when a MARK message is received. Set
1111
    // back to zero when `m_download_progress.server_version` becomes greater
1112
    // than, or equal to `m_server_version_at_last_download_mark`. For further
1113
    // details, see check_for_download_completion().
1114
    version_type m_server_version_at_last_download_mark = 0;
1115

1116
    // The serial number to attach to the next download MARK message. A new MARK
1117
    // message will be sent when `m_target_download_mark >
1118
    // m_last_download_mark_sent`. To cause a new MARK message to be sent,
1119
    // simply increment `m_target_download_mark`.
1120
    request_ident_type m_target_download_mark = 0;
1121

1122
    // Set equal to `m_target_download_mark` as the sending of each MARK message
1123
    // is initiated. Must be set equal to `m_last_download_mark_received` when
1124
    // the connection to the server is lost.
1125
    request_ident_type m_last_download_mark_sent = 0;
1126

1127
    // Updated when a MARK message is received. See see
1128
    // check_for_download_completion() for how details on how it participates in
1129
    // the detection of download completion.
1130
    request_ident_type m_last_download_mark_received = 0;
1131

1132
    // Updated when a download completion is detected, to avoid multiple
1133
    // triggerings after reception of a single MARK message. See see
1134
    // check_for_download_completion() for how details on how it participates in
1135
    // the detection of download completion.
1136
    request_ident_type m_last_triggering_download_mark = 0;
1137

1138
    SessionWrapper& m_wrapper;
1139

1140
    request_ident_type m_last_pending_test_command_ident = 0;
1141
    std::list<PendingTestCommand> m_pending_test_commands;
1142

1143
    static std::string make_logger_prefix(session_ident_type);
1144

1145
    Session(SessionWrapper& wrapper, Connection&, session_ident_type);
1146

1147
    bool do_recognize_sync_version(version_type) noexcept;
1148

1149
    bool have_client_file_ident() const noexcept;
1150

1151
    // The unbinding process completes when both of the following become true:
1152
    //
1153
    //  - The sending of the UNBIND message has been completed
1154
    //    (m_unbind_message_sent_2).
1155
    //
1156
    //  - A session specific ERROR, or the UNBOUND message has been received
1157
    //    (m_error_message_received || m_unbond_message_received).
1158
    //
1159
    // Rebinding (sending of a new BIND message) can only be initiated while the
1160
    // session is in the Active state, and the unbinding process has completed
1161
    // (unbind_process_complete()).
1162
    bool unbind_process_complete() const noexcept;
1163

1164
    void activate();
1165
    void initiate_deactivation();
1166
    void complete_deactivation();
1167
    void connection_established(bool fast_reconnect);
1168
    void suspend(const SessionErrorInfo& session_error);
1169
    void connection_lost();
1170
    void send_message();
1171
    void message_sent();
1172
    void send_bind_message();
1173
    void send_ident_message();
1174
    void send_upload_message();
1175
    void send_mark_message();
1176
    void send_alloc_message();
1177
    void send_unbind_message();
1178
    void send_query_change_message();
1179
    void send_json_error_message();
1180
    void send_test_command_message();
1181
    Status receive_ident_message(SaltedFileIdent);
1182
    Status receive_download_message(const SyncProgress&, std::uint_fast64_t downloadable_bytes,
1183
                                    DownloadBatchState last_in_batch, int64_t query_version,
1184
                                    const ReceivedChangesets&);
1185
    Status receive_mark_message(request_ident_type);
1186
    Status receive_unbound_message();
1187
    Status receive_error_message(const ProtocolErrorInfo& info);
1188
    Status receive_query_error_message(int error_code, std::string_view message, int64_t query_version);
1189
    Status receive_test_command_response(request_ident_type, std::string_view body);
1190

1191
    void initiate_rebind();
1192
    void reset_protocol_state() noexcept;
1193
    void ensure_enlisted_to_send();
1194
    void enlist_to_send();
1195
    Status check_received_sync_progress(const SyncProgress&) noexcept;
1196
    void check_for_upload_completion();
1197
    void check_for_download_completion();
1198

1199
    SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const SyncProgress&, int64_t, DownloadBatchState,
1200
                                         size_t);
1201
    SyncClientHookAction call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo&);
1202
    SyncClientHookAction call_debug_hook(const SyncClientHookData& data);
1203

1204
    bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version);
1205

1206
    friend class Connection;
1207
};
1208

1209

1210
// Implementation
1211

1212
inline const std::string& ClientImpl::get_user_agent_string() const noexcept
1213
{
×
1214
    return m_user_agent_string;
×
1215
}
×
1216

1217
inline auto ClientImpl::get_reconnect_mode() const noexcept -> ReconnectMode
1218
{
×
1219
    return m_reconnect_mode;
×
1220
}
×
1221

1222
inline bool ClientImpl::is_dry_run() const noexcept
1223
{
111,520✔
1224
    return m_dry_run;
111,520✔
1225
}
111,520✔
1226

1227
inline ClientImpl::RandomEngine& ClientImpl::get_random() noexcept
1228
{
22,830✔
1229
    return m_random;
22,830✔
1230
}
22,830✔
1231

1232
inline auto ClientImpl::get_next_session_ident() noexcept -> session_ident_type
1233
{
9,694✔
1234
    return ++m_prev_session_ident;
9,694✔
1235
}
9,694✔
1236

1237

1238
inline ClientImpl& ClientImpl::Connection::get_client() noexcept
1239
{
172,128✔
1240
    return m_client;
172,128✔
1241
}
172,128✔
1242

1243
inline ConnectionState ClientImpl::Connection::get_state() const noexcept
1244
{
10,518✔
1245
    return m_state;
10,518✔
1246
}
10,518✔
1247

1248
inline SyncServerMode ClientImpl::Connection::get_sync_server_mode() const noexcept
1249
{
×
1250
    return m_server_endpoint.server_mode;
×
1251
}
×
1252

1253
inline auto ClientImpl::Connection::get_reconnect_info() const noexcept -> ReconnectInfo
1254
{
2,508✔
1255
    return m_reconnect_info;
2,508✔
1256
}
2,508✔
1257

1258
inline auto ClientImpl::Connection::get_client_protocol() noexcept -> ClientProtocol&
1259
{
168,072✔
1260
    return m_client.m_client_protocol;
168,072✔
1261
}
168,072✔
1262

1263
inline int ClientImpl::Connection::get_negotiated_protocol_version() noexcept
1264
{
63,202✔
1265
    return m_negotiated_protocol_version;
63,202✔
1266
}
63,202✔
1267

1268
template <class H>
1269
void ClientImpl::Connection::for_each_active_session(H handler)
1270
{
9,506✔
1271
    for (auto& p : m_sessions) {
14,034✔
1272
        Session& sess = *p.second;
14,034✔
1273
        if (sess.m_state == Session::Active)
14,034✔
1274
            handler(sess); // Throws
14,034✔
1275
    }
14,034✔
1276
}
9,506✔
1277

1278
inline void ClientImpl::Connection::voluntary_disconnect()
1279
{
2,268✔
1280
    m_reconnect_info.update(ConnectionTerminationReason::closed_voluntarily, std::nullopt);
2,268✔
1281
    SessionErrorInfo error_info{Status{ErrorCodes::ConnectionClosed, "Connection closed"}, IsFatal{false},
2,268✔
1282
                                ProtocolErrorInfo::Action::Transient};
2,268✔
1283

1,068✔
1284
    disconnect(std::move(error_info)); // Throws
2,268✔
1285
}
2,268✔
1286

1287
inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInfo& info,
1288
                                                           ConnectionTerminationReason reason)
1289
{
1,094✔
1290
    REALM_ASSERT(!was_voluntary(reason));
1,094✔
1291
    m_reconnect_info.update(reason, info.resumption_delay_interval);
1,094✔
1292
    disconnect(info); // Throws
1,094✔
1293
}
1,094✔
1294

1295
inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
1296
{
3,362✔
1297
    REALM_ASSERT(m_on_idle);
3,362✔
1298
    REALM_ASSERT(m_state != ConnectionState::disconnected);
3,362✔
1299
    m_state = ConnectionState::disconnected;
3,362✔
1300

1,630✔
1301
    if (m_num_active_sessions == 0)
3,362✔
1302
        m_on_idle->trigger();
2,196✔
1303

1,630✔
1304
    REALM_ASSERT(!m_reconnect_delay_in_progress);
3,362✔
1305
    if (m_disconnect_delay_in_progress) {
3,362✔
1306
        m_reconnect_disconnect_timer.reset();
2,276✔
1307
        m_disconnect_delay_in_progress = false;
2,276✔
1308
    }
2,276✔
1309
}
3,362✔
1310

1311
inline void ClientImpl::Connection::one_more_active_unsuspended_session()
1312
{
10,072✔
1313
    if (m_num_active_unsuspended_sessions++ != 0)
10,072✔
1314
        return;
5,396✔
1315
    // Rose from zero to one
2,232✔
1316
    if (m_state == ConnectionState::disconnected && !m_reconnect_delay_in_progress && m_activated)
4,676✔
UNCOV
1317
        initiate_reconnect(); // Throws
×
1318
}
4,676✔
1319

1320
inline void ClientImpl::Connection::one_less_active_unsuspended_session()
1321
{
10,072✔
1322
    REALM_ASSERT(m_num_active_unsuspended_sessions);
10,072✔
1323
    if (--m_num_active_unsuspended_sessions != 0)
10,072✔
1324
        return;
5,396✔
1325

2,232✔
1326
    // Dropped from one to zero
2,232✔
1327
    if (m_state != ConnectionState::disconnected)
4,676✔
1328
        initiate_disconnect_wait(); // Throws
4,382✔
1329
}
4,676✔
1330

1331
// Sessions, and the connection, should get the output_buffer and insert a message,
1332
// after which they call initiate_write_output_buffer(Session* sess).
1333
inline auto ClientImpl::Connection::get_output_buffer() noexcept -> OutputBuffer&
1334
{
93,380✔
1335
    m_output_buffer.reset();
93,380✔
1336
    return m_output_buffer;
93,380✔
1337
}
93,380✔
1338

1339
inline auto ClientImpl::Connection::get_session(session_ident_type ident) const noexcept -> Session*
1340
{
72,592✔
1341
    auto i = m_sessions.find(ident);
72,592✔
1342
    bool found = (i != m_sessions.end());
72,592✔
1343
    return found ? i->second.get() : nullptr;
2,147,518,747✔
1344
}
72,592✔
1345

1346
inline bool ClientImpl::Connection::was_voluntary(ConnectionTerminationReason reason) noexcept
1347
{
1,094✔
1348
    switch (reason) {
1,094✔
1349
        case ConnectionTerminationReason::closed_voluntarily:
✔
1350
            return true;
×
1351
        case ConnectionTerminationReason::connect_operation_failed:
564✔
1352
        case ConnectionTerminationReason::read_or_write_error:
1,018✔
1353
        case ConnectionTerminationReason::ssl_certificate_rejected:
1,022✔
1354
        case ConnectionTerminationReason::ssl_protocol_violation:
1,022✔
1355
        case ConnectionTerminationReason::websocket_protocol_violation:
1,024✔
1356
        case ConnectionTerminationReason::http_response_says_fatal_error:
1,024✔
1357
        case ConnectionTerminationReason::http_response_says_nonfatal_error:
1,048✔
1358
        case ConnectionTerminationReason::bad_headers_in_http_response:
1,048✔
1359
        case ConnectionTerminationReason::sync_protocol_violation:
1,052✔
1360
        case ConnectionTerminationReason::sync_connect_timeout:
1,052✔
1361
        case ConnectionTerminationReason::server_said_try_again_later:
1,078✔
1362
        case ConnectionTerminationReason::server_said_do_not_reconnect:
1,088✔
1363
        case ConnectionTerminationReason::pong_timeout:
1,094✔
1364
        case ConnectionTerminationReason::missing_protocol_feature:
1,094✔
1365
            break;
1,094✔
1366
    }
1,094✔
1367
    return false;
1,094✔
1368
}
1,094✔
1369

1370
inline ClientImpl& ClientImpl::Session::get_client() noexcept
1371
{
140,532✔
1372
    return m_conn.get_client();
140,532✔
1373
}
140,532✔
1374

1375
inline auto ClientImpl::Session::get_connection() noexcept -> Connection&
1376
{
10,382✔
1377
    return m_conn;
10,382✔
1378
}
10,382✔
1379

1380
inline auto ClientImpl::Session::get_ident() const noexcept -> session_ident_type
1381
{
78,194✔
1382
    return m_ident;
78,194✔
1383
}
78,194✔
1384

1385
inline void ClientImpl::Session::recognize_sync_version(version_type version)
1386
{
105,556✔
1387
    REALM_ASSERT(m_state == Active);
105,556✔
1388

53,380✔
1389
    bool resume_upload = do_recognize_sync_version(version);
105,556✔
1390
    if (REALM_LIKELY(resume_upload)) {
105,556✔
1391
        // Since the deactivation process has not been initiated, the UNBIND
13,146✔
1392
        // message cannot have been sent unless the session was suspended due to
13,146✔
1393
        // an error.
13,146✔
1394
        REALM_ASSERT(m_suspended || !m_unbind_message_sent);
29,554✔
1395
        if (m_ident_message_sent && !m_suspended)
29,554✔
1396
            ensure_enlisted_to_send(); // Throws
24,392✔
1397
    }
29,554✔
1398
}
105,556✔
1399

1400
inline void ClientImpl::Session::request_upload_completion_notification()
1401
{
15,274✔
1402
    REALM_ASSERT(m_state == Active);
15,274✔
1403

7,550✔
1404
    m_upload_completion_notification_requested = true;
15,274✔
1405
    check_for_upload_completion(); // Throws
15,274✔
1406
}
15,274✔
1407

1408
inline void ClientImpl::Session::request_download_completion_notification()
1409
{
13,220✔
1410
    REALM_ASSERT(m_state == Active);
13,220✔
1411

6,612✔
1412
    ++m_target_download_mark;
13,220✔
1413

6,612✔
1414
    // Since the deactivation process has not been initiated, the UNBIND message
6,612✔
1415
    // cannot have been sent unless an ERROR message was received.
6,612✔
1416
    REALM_ASSERT(m_suspended || !m_unbind_message_sent);
13,220✔
1417
    if (m_ident_message_sent && !m_suspended)
13,220✔
1418
        ensure_enlisted_to_send(); // Throws
11,696✔
1419
}
13,220✔
1420

1421
inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn)
1422
    : Session{wrapper, conn, conn.get_client().get_next_session_ident()} // Throws
1423
{
9,694✔
1424
}
9,694✔
1425

1426
inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident)
1427
    : logger_ptr{std::make_shared<util::PrefixLogger>(make_logger_prefix(ident), conn.logger_ptr)} // Throws
1428
    , logger{*logger_ptr}
1429
    , m_conn{conn}
1430
    , m_ident{ident}
1431
    , m_try_again_delay_info(conn.get_client().m_reconnect_backoff_info, conn.get_client().get_random())
1432
    , m_is_flx_sync_session(conn.is_flx_sync_connection())
1433
    , m_fix_up_object_ids(get_client().m_fix_up_object_ids)
1434
    , m_wrapper{wrapper}
1435
{
9,694✔
1436
    if (get_client().m_disable_upload_activation_delay)
9,694✔
1437
        m_allow_upload = true;
×
1438
}
9,694✔
1439

1440
inline bool ClientImpl::Session::do_recognize_sync_version(version_type version) noexcept
1441
{
149,644✔
1442
    if (REALM_LIKELY(version > m_last_version_available)) {
149,644✔
1443
        m_last_version_available = version;
73,642✔
1444
        return true;
73,642✔
1445
    }
73,642✔
1446
    return false;
76,002✔
1447
}
76,002✔
1448

1449
inline bool ClientImpl::Session::have_client_file_ident() const noexcept
1450
{
24,942✔
1451
    return (m_client_file_ident.ident != 0);
24,942✔
1452
}
24,942✔
1453

1454
inline bool ClientImpl::Session::unbind_process_complete() const noexcept
1455
{
9,358✔
1456
    return (m_unbind_message_send_complete && (m_error_message_received || m_unbound_message_received));
9,358!
1457
}
9,358✔
1458

1459
inline void ClientImpl::Session::connection_established(bool fast_reconnect)
1460
{
10,988✔
1461
    REALM_ASSERT(m_state == Active);
10,988✔
1462

5,396✔
1463
    if (!fast_reconnect && !get_client().m_disable_upload_activation_delay) {
10,988✔
1464
        // Disallow immediate activation of the upload process, even if download
4,462✔
1465
        // completion was reached during an earlier period of connectivity.
4,462✔
1466
        m_allow_upload = false;
9,182✔
1467
    }
9,182✔
1468

5,396✔
1469
    if (!m_allow_upload) {
10,988✔
1470
        // Request download completion notification
4,482✔
1471
        ++m_target_download_mark;
9,224✔
1472
    }
9,224✔
1473

5,396✔
1474
    if (!m_suspended) {
10,988✔
1475
        // Ready to send BIND message
5,394✔
1476
        enlist_to_send(); // Throws
10,982✔
1477
    }
10,982✔
1478
}
10,988✔
1479

1480
// The caller (Connection) must discard the session if the session has become
1481
// deactivated upon return.
1482
inline void ClientImpl::Session::connection_lost()
1483
{
3,640✔
1484
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
3,640✔
1485
    // If the deactivation process has been initiated, it can now be immediately
1,864✔
1486
    // completed.
1,864✔
1487
    if (m_state == Deactivating) {
3,640✔
1488
        complete_deactivation(); // Throws
1,636✔
1489
        REALM_ASSERT(m_state == Deactivated);
1,636✔
1490
        return;
1,636✔
1491
    }
1,636✔
1492
    reset_protocol_state();
2,004✔
1493
}
2,004✔
1494

1495
// The caller (Connection) must discard the session if the session has become
1496
// deactivated upon return.
1497
inline void ClientImpl::Session::message_sent()
1498
{
91,584✔
1499
    // Note that it is possible for this function to get called after the client
45,470✔
1500
    // has received a message sent by the server in reposnse to the message that
45,470✔
1501
    // the client has just finished sending.
45,470✔
1502

45,470✔
1503
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
91,584✔
1504

45,470✔
1505
    // No message will be sent after the UNBIND message
45,470✔
1506
    REALM_ASSERT(!m_unbind_message_send_complete);
91,584✔
1507

45,470✔
1508
    if (m_unbind_message_sent) {
91,584✔
1509
        REALM_ASSERT(!m_enlisted_to_send);
5,356✔
1510

2,528✔
1511
        // If the sending of the UNBIND message has been initiated, this must be
2,528✔
1512
        // the time when the sending of that message completes.
2,528✔
1513
        m_unbind_message_send_complete = true;
5,356✔
1514

2,528✔
1515
        // Detect the completion of the unbinding process
2,528✔
1516
        if (m_error_message_received || m_unbound_message_received) {
5,356✔
1517
            // If the deactivation process has been initiated, it can now be
432✔
1518
            // immediately completed.
432✔
1519
            if (m_state == Deactivating) {
818✔
1520
                // Life cycle state is Deactivating
54✔
1521
                complete_deactivation(); // Throws
108✔
1522
                // Life cycle state is now Deactivated
54✔
1523
                return;
108✔
1524
            }
108✔
1525

378✔
1526
            // The session is still in the Active state, so initiate the
378✔
1527
            // rebinding process if the session is no longer suspended.
378✔
1528
            if (!m_suspended)
710✔
UNCOV
1529
                initiate_rebind(); // Throws
×
1530
        }
710✔
1531
    }
5,356✔
1532
}
91,584✔
1533

1534
inline void ClientImpl::Session::initiate_rebind()
1535
{
378✔
1536
    // Life cycle state must be Active
214✔
1537
    REALM_ASSERT(m_state == Active);
378✔
1538

214✔
1539
    REALM_ASSERT(!m_suspended);
378✔
1540
    REALM_ASSERT(!m_enlisted_to_send);
378✔
1541

214✔
1542
    reset_protocol_state();
378✔
1543

214✔
1544
    // Ready to send BIND message
214✔
1545
    enlist_to_send(); // Throws
378✔
1546
}
378✔
1547

1548
inline void ClientImpl::Session::reset_protocol_state() noexcept
1549
{
12,076✔
1550
    // clang-format off
5,916✔
1551
    m_enlisted_to_send                    = false;
12,076✔
1552
    m_bind_message_sent                   = false;
12,076✔
1553
    m_error_to_send                       = false;
12,076✔
1554
    m_ident_message_sent = false;
12,076✔
1555
    m_unbind_message_sent = false;
12,076✔
1556
    m_unbind_message_send_complete = false;
12,076✔
1557
    m_error_message_received = false;
12,076✔
1558
    m_unbound_message_received = false;
12,076✔
1559
    m_client_error = util::none;
12,076✔
1560

5,916✔
1561
    m_upload_progress = m_progress.upload;
12,076✔
1562
    m_last_version_selected_for_upload = m_upload_progress.client_version;
12,076✔
1563
    m_last_download_mark_sent          = m_last_download_mark_received;
12,076✔
1564
    // clang-format on
5,916✔
1565
}
12,076✔
1566

1567
inline void ClientImpl::Session::ensure_enlisted_to_send()
1568
{
88,614✔
1569
    if (!m_enlisted_to_send)
88,614✔
1570
        enlist_to_send(); // Throws
58,518✔
1571
}
88,614✔
1572

1573
// This function will never "commit suicide" despite the fact that it may
1574
// involve an invocation of send_message(), which in certain cases can lead to
1575
// the completion of the deactivation process, and if that did happen, it would
1576
// cause Connection::send_next_message() to destroy this session, but it does
1577
// not happen.
1578
//
1579
// If the session is already in the Deactivating state, send_message() will
1580
// complete the deactivation process immediately when, and only when the BIND
1581
// message has not already been sent.
1582
//
1583
// Note however, that this function gets called when the establishment of the
1584
// connection completes, but at that time, the session cannot be in the
1585
// Deactivating state, because until the BIND message is sent, the deactivation
1586
// process will complete immediately. So the first invocation of this function
1587
// after establishemnt of the connection will not commit suicide.
1588
//
1589
// Note then, that the session will stay enlisted to send, until it gets to send
1590
// the BIND message, and since the and enlist_to_send() must not be called while
1591
// the session is enlisted, the next invocation of this function will be after
1592
// the BIND message has been sent, but then the deactivation process will no
1593
// longer be completed by send_message().
1594
inline void ClientImpl::Session::enlist_to_send()
1595
{
154,834✔
1596
    REALM_ASSERT(m_state == Active || m_state == Deactivating);
154,834✔
1597
    REALM_ASSERT(!m_unbind_message_sent);
154,834✔
1598
    REALM_ASSERT(!m_enlisted_to_send);
154,834✔
1599
    m_enlisted_to_send = true;
154,834✔
1600
    m_conn.enlist_to_send(this); // Throws
154,834✔
1601
}
154,834✔
1602

1603
} // namespace realm::sync
1604

1605
#endif // REALM_NOINST_CLIENT_IMPL_BASE_HPP
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc