• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1817

04 Nov 2023 12:29AM UTC coverage: 91.695% (+0.04%) from 91.66%
1817

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92122 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

59 existing lines in 12 files now uncovered.

230819 of 251726 relevant lines covered (91.69%)

6481779.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.56
/src/realm/sync/network/network.hpp
1
#pragma once
2

3
#include <cstddef>
4
#include <memory>
5
#include <chrono>
6
#include <string>
7
#include <system_error>
8
#include <ostream>
9

10
#include <sys/types.h>
11

12
#ifdef _WIN32
13
#include <winsock2.h>
14
#include <ws2tcpip.h>
15
#include <stdio.h>
16
#include <Ws2def.h>
17
#pragma comment(lib, "Ws2_32.lib")
18
#else
19
#include <sys/socket.h>
20
#include <arpa/inet.h>
21
#include <netdb.h>
22
#endif
23

24
#include <realm/status.hpp>
25
#include <realm/util/features.h>
26
#include <realm/util/assert.hpp>
27
#include <realm/util/backtrace.hpp>
28
#include <realm/util/basic_system_errors.hpp>
29
#include <realm/util/bind_ptr.hpp>
30
#include <realm/util/buffer.hpp>
31
#include <realm/util/misc_ext_errors.hpp>
32
#include <realm/util/scope_exit.hpp>
33

34
// Linux epoll
35
#if defined(REALM_USE_EPOLL) && !REALM_ANDROID
36
#define REALM_NETWORK_USE_EPOLL 1
37
#else
38
#define REALM_NETWORK_USE_EPOLL 0
39
#endif
40

41
// FreeBSD Kqueue.
42
//
43
// Available on Mac OS X, FreeBSD, NetBSD, OpenBSD
44
#if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
45
#if !defined(REALM_HAVE_KQUEUE)
46
#if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE)
47
#define REALM_HAVE_KQUEUE 1
48
#endif
49
#endif
50
#endif
51
#if !defined(REALM_HAVE_KQUEUE)
52
#define REALM_HAVE_KQUEUE 0
53
#endif
54

55

56
// FIXME: Unfinished business around `Address::m_ip_v6_scope_id`.
57

58
namespace realm::sync::network {
59

60
/// \brief TCP/IP networking API.
61
///
62
/// The design of this networking API is heavily inspired by the ASIO C++
63
/// library (http://think-async.com).
64
///
65
///
66
/// ### Thread safety
67
///
68
/// A *service context* is a set of objects consisting of an instance of
69
/// Service, and all the objects that are associated with that instance (\ref
70
/// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, and
71
/// \ref ssl::Stream).
72
///
73
/// In general, it is unsafe for two threads to call functions on the same
74
/// object, or on different objects in the same service context. This also
75
/// applies to destructors. Notable exceptions are the fully thread-safe
76
/// functions, such as Service::post(), Service::stop(), and Service::reset().
77
///
78
/// On the other hand, it is always safe for two threads to call functions on
79
/// objects belonging to different service contexts.
80
///
81
/// One implication of these rules is that at most one thread must execute run()
82
/// at any given time, and if one thread is executing run(), then no other
83
/// thread is allowed to access objects in the same service context (with the
84
/// mentioned exceptions).
85
///
86
/// Unless otherwise specified, free-standing objects, such as \ref
87
/// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are
88
/// fully thread-safe as long as they are not mutated. If one thread is mutating
89
/// such an object, no other thread may access it. Note that these free-standing
90
/// objects are not associcated with an instance of Service, and are therefore
91
/// not part of a service context.
92
///
93
///
94
/// ### Comparison with ASIO
95
///
96
/// There is a crucial difference between the two libraries in regards to the
97
/// guarantees that are provided about the cancelability of asynchronous
98
/// operations. The Realm networking library (this library) considers an
99
/// asynchronous operation to be complete precisely when the completion handler
100
/// starts to execute, and it guarantees that such an operation is cancelable up
101
/// until that point in time. In particular, if `cancel()` is called on a socket
102
/// or a deadline timer object before the completion handler starts to execute,
103
/// then that operation will be canceled, and will receive
104
/// `error::operation_aborted`. This guarantee is possible to provide (and free
105
/// of ambiguities) precisely because this library prohibits multiple threads
106
/// from executing the event loop concurrently, and because `cancel()` is
107
/// allowed to be called only from a completion handler (executed by the event
108
/// loop thread) or while no thread is executing the event loop. This guarantee
109
/// allows for safe destruction of sockets and deadline timers as long as the
110
/// completion handlers react appropriately to `error::operation_aborted`, in
111
/// particular, that they do not attempt to access the socket or deadline timer
112
/// object in such cases.
113
///
114
/// ASIO, on the other hand, allows for an asynchronous operation to complete
115
/// and become **uncancellable** before the completion handler starts to
116
/// execute. For this reason, it is possible with ASIO to get the completion
117
/// handler of an asynchronous wait operation to start executing and receive an
118
/// error code other than "operation aborted" at a point in time where
119
/// `cancel()` has already been called on the deadline timer object, or even at
120
/// a point in timer where the deadline timer has been destroyed. This seems
121
/// like an inevitable consequence of the fact that ASIO allows for multiple
122
/// threads to execute the event loop concurrently. This generally forces ASIO
123
/// applications to invent ways of extending the lifetime of deadline timer and
124
/// socket objects until the completion handler starts executing.
125
///
126
/// IMPORTANT: Even if ASIO is used in a way where at most one thread executes
127
/// the event loop, there is still no guarantee that an asynchronous operation
128
/// remains cancelable up until the point in time where the completion handler
129
/// starts to execute.
130

131
std::string host_name();
132

133

134
class StreamProtocol;
135
class Address;
136
class Endpoint;
137
class Service;
138
class Resolver;
139
class SocketBase;
140
class Socket;
141
class Acceptor;
142
class DeadlineTimer;
143
class ReadAheadBuffer;
144
namespace ssl {
145
class Stream;
146
} // namespace ssl
147

148

149
/// \brief An IP protocol descriptor.
150
class StreamProtocol {
151
public:
152
    static StreamProtocol ip_v4();
153
    static StreamProtocol ip_v6();
154

155
    bool is_ip_v4() const;
156
    bool is_ip_v6() const;
157

158
    int protocol() const;
159
    int family() const;
160

161
    StreamProtocol();
162
    ~StreamProtocol() noexcept {}
145,878✔
163

164
private:
165
    int m_family;
166
    int m_socktype;
167
    int m_protocol;
168

169
    friend class Service;
170
    friend class SocketBase;
171
};
172

173

174
/// \brief An IP address (IPv4 or IPv6).
175
class Address {
176
public:
177
    bool is_ip_v4() const;
178
    bool is_ip_v6() const;
179

180
    template <class C, class T>
181
    friend std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>&, const Address&);
182

183
    Address();
184
    ~Address() noexcept {}
19,718✔
185

186
private:
187
    using ip_v4_type = in_addr;
188
    using ip_v6_type = in6_addr;
189
    union union_type {
190
        ip_v4_type m_ip_v4;
191
        ip_v6_type m_ip_v6;
192
    };
193
    union_type m_union;
194
    std::uint_least32_t m_ip_v6_scope_id = 0;
195
    bool m_is_ip_v6 = false;
196

197
    friend Address make_address(const char*, std::error_code&) noexcept;
198
    friend class Endpoint;
199
};
200

201
Address make_address(const char* c_str);
202
Address make_address(const char* c_str, std::error_code& ec) noexcept;
203
Address make_address(const std::string&);
204
Address make_address(const std::string&, std::error_code& ec) noexcept;
205

206

207
/// \brief An IP endpoint.
208
///
209
/// An IP endpoint is a triplet (`protocol`, `address`, `port`).
210
class Endpoint {
211
public:
212
    using port_type = std::uint_fast16_t;
213
    class List;
214

215
    StreamProtocol protocol() const;
216
    Address address() const;
217
    port_type port() const;
218

219
    Endpoint();
220
    Endpoint(const StreamProtocol&, port_type);
221
    Endpoint(const Address&, port_type);
222
    ~Endpoint() noexcept {}
43,990✔
223

224
    using data_type = sockaddr;
225
    data_type* data();
226
    const data_type* data() const;
227

228
private:
229
    StreamProtocol m_protocol;
230

231
    using sockaddr_base_type = sockaddr;
232
    using sockaddr_ip_v4_type = sockaddr_in;
233
    using sockaddr_ip_v6_type = sockaddr_in6;
234
    union sockaddr_union_type {
235
        sockaddr_base_type m_base;
236
        sockaddr_ip_v4_type m_ip_v4;
237
        sockaddr_ip_v6_type m_ip_v6;
238
    };
239
    sockaddr_union_type m_sockaddr_union;
240

241
    friend class Service;
242
    friend class Resolver;
243
    friend class SocketBase;
244
    friend class Socket;
245
};
246

247

248
/// \brief A list of IP endpoints.
249
class Endpoint::List {
250
public:
251
    using iterator = const Endpoint*;
252

253
    iterator begin() const noexcept;
254
    iterator end() const noexcept;
255
    std::size_t size() const noexcept;
256
    bool empty() const noexcept;
257

258
    List() noexcept = default;
14,566✔
259
    List(List&&) noexcept = default;
26,232✔
260
    ~List() noexcept = default;
40,796✔
261

262
    List& operator=(List&&) noexcept = default;
3,292✔
263

264
private:
265
    util::Buffer<Endpoint> m_endpoints;
266

267
    friend class Service;
268
};
269

270

271
/// \brief TCP/IP networking service.
272
class Service {
273
public:
274
    Service();
275
    ~Service() noexcept;
276

277
    /// \brief Execute the event loop.
278
    ///
279
    /// Execute completion handlers of completed asynchronous operations, or
280
    /// wait for more completion handlers to become ready for
281
    /// execution. Handlers submitted via post() are considered immeditely
282
    /// ready. If there are no completion handlers ready for execution, and
283
    /// there are no asynchronous operations in progress, run() returns.
284
    ///
285
    /// run_until_stopped() will continue running even if there are no completion
286
    /// handlers ready for execution, and no asynchronous operations in progress,
287
    /// until stop() is called.
288
    ///
289
    /// All completion handlers, including handlers submitted via post() will be
290
    /// executed from run(), that is, by the thread that executes run(). If no
291
    /// thread executes run(), then the completion handlers will not be
292
    /// executed.
293
    ///
294
    /// Exceptions thrown by completion handlers will always propagate back
295
    /// through run().
296
    ///
297
    /// Syncronous operations (e.g., Socket::connect()) execute independently of
298
    /// the event loop, and do not require that any thread calls run().
299
    void run();
300
    void run_until_stopped();
301

302
    /// @{ \brief Stop event loop execution.
303
    ///
304
    /// stop() puts the event loop into the stopped mode. If a thread is
305
    /// currently executing run(), it will be made to return in a timely
306
    /// fashion, that is, without further blocking. If a thread is currently
307
    /// blocked in run(), it will be unblocked. Handlers that can be executed
308
    /// immediately, may, or may not be executed before run() returns, but new
309
    /// handlers submitted by these, will not be executed before run()
310
    /// returns. Also, if a handler is submitted by a call to post, and that
311
    /// call happens after stop() returns, then that handler is guaranteed to
312
    /// not be executed before run() returns (assuming that reset() is not called
313
    /// before run() returns).
314
    ///
315
    /// The event loop will remain in the stopped mode until reset() is
316
    /// called. If reset() is called before run() returns, it may, or may not
317
    /// cause run() to resume normal operation without returning.
318
    ///
319
    /// Both stop() and reset() are thread-safe, that is, they may be called by
320
    /// any thread. Also, both of these function may be called from completion
321
    /// handlers (including posted handlers).
322
    void stop() noexcept;
323
    void reset() noexcept;
324
    /// @}
325

326
    /// \brief Submit a handler to be executed by the event loop thread.
327
    ///
328
    /// Register the sepcified completion handler for immediate asynchronous
329
    /// execution. The specified handler will be executed by an expression on
330
    /// the form `handler(status)` where status is a Status object whose value
331
    /// will always be OK, but may change in the future. If the handler object
332
    /// is movable, it will never be copied. Otherwise, it will be copied as
333
    /// necessary.
334
    ///
335
    /// This function is thread-safe, that is, it may be called by any
336
    /// thread. It may also be called from other completion handlers.
337
    ///
338
    /// The handler will never be called as part of the execution of post(). It
339
    /// will always be called by a thread that is executing run(). If no thread
340
    /// is currently executing run(), the handler will not be executed until a
341
    /// thread starts executing run(). If post() is called while another thread
342
    /// is executing run(), the handler may be called before post() returns. If
343
    /// post() is called from another completion handler, the submitted handler
344
    /// is guaranteed to not be called during the execution of post().
345
    ///
346
    /// Completion handlers added through post() will be executed in the order
347
    /// that they are added. More precisely, if post() is called twice to add
348
    /// two handlers, A and B, and the execution of post(A) ends before the
349
    /// beginning of the execution of post(B), then A is guaranteed to execute
350
    /// before B.
351
    template <class H>
352
    void post(H handler);
353

354
    /// Argument `saturation` is the fraction of time that is not spent
355
    /// sleeping. Argument `inefficiency` is the fraction of time not spent
356
    /// sleeping, and not spent executing completion handlers. Both values are
357
    /// guaranteed to always be in the range 0 to 1 (both inclusive). The value
358
    /// passed as `inefficiency` is guaranteed to always be less than, or equal
359
    /// to the value passed as `saturation`.
360
    using EventLoopMetricsHandler = void(double saturation, double inefficiency);
361

362
    /// \brief Report event loop metrics via the specified handler.
363
    ///
364
    /// The handler will be called approximately every 30 seconds.
365
    ///
366
    /// report_event_loop_metrics() must be called prior to any invocation of
367
    /// run(). report_event_loop_metrics() is not thread-safe.
368
    ///
369
    /// This feature is only available if
370
    /// `REALM_UTIL_NETWORK_EVENT_LOOP_METRICS` was defined during
371
    /// compilation. When the feature is not available, the specified handler
372
    /// will never be called.
373
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler>);
374

375
private:
376
    enum class Want { nothing = 0, read, write };
377

378
    template <class Oper>
379
    class OperQueue;
380
    class Descriptor;
381
    class AsyncOper;
382
    class ResolveOperBase;
383
    class WaitOperBase;
384
    class TriggerExecOperBase;
385
    class PostOperBase;
386
    template <class H>
387
    class PostOper;
388
    class IoOper;
389
    class UnusedOper; // Allocated, but currently unused memory
390

391
    template <class S>
392
    class BasicStreamOps;
393

394
    struct OwnersOperDeleter {
395
        void operator()(AsyncOper*) const noexcept;
396
    };
397
    struct LendersOperDeleter {
398
        void operator()(AsyncOper*) const noexcept;
399
    };
400
    using OwnersOperPtr = std::unique_ptr<AsyncOper, OwnersOperDeleter>;
401
    using LendersOperPtr = std::unique_ptr<AsyncOper, LendersOperDeleter>;
402
    using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, LendersOperDeleter>;
403
    using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>;
404
    using LendersIoOperPtr = std::unique_ptr<IoOper, LendersOperDeleter>;
405

406
    class IoReactor;
407
    class Impl;
408
    const std::unique_ptr<Impl> m_impl;
409

410
    template <class Oper, class... Args>
411
    static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...);
412

413
    using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie);
414
    void do_post(PostOperConstr, std::size_t size, void* cookie);
415
    template <class H>
416
    static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie);
417
    static void recycle_post_oper(Impl&, PostOperBase*) noexcept;
418
    static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
419
    static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
420

421
    using clock = std::chrono::steady_clock;
422

423
    friend class Resolver;
424
    friend class SocketBase;
425
    friend class Socket;
426
    friend class Acceptor;
427
    friend class DeadlineTimer;
428
    friend class ReadAheadBuffer;
429
    friend class ssl::Stream;
430
};
431

432

433
template <class Oper>
434
class Service::OperQueue {
435
public:
436
    using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>;
437
    bool empty() const noexcept;
438
    void push_back(LendersOperPtr) noexcept;
439
    template <class Oper2>
440
    void push_back(OperQueue<Oper2>&) noexcept;
441
    LendersOperPtr pop_front() noexcept;
442
    void clear() noexcept;
443
    OperQueue() noexcept = default;
2,788,156✔
444
    OperQueue(OperQueue&&) noexcept;
445
    ~OperQueue() noexcept
446
    {
2,885,552✔
447
        clear();
2,885,552✔
448
    }
2,885,552✔
449

450
private:
451
    Oper* m_back = nullptr;
452
    template <class>
453
    friend class OperQueue;
454
};
455

456

457
class Service::Descriptor {
458
public:
459
#ifdef _WIN32
460
    using native_handle_type = SOCKET;
461
#else
462
    using native_handle_type = int;
463
#endif
464

465
    Impl& service_impl;
466

467
    Descriptor(Impl& service) noexcept;
468
    ~Descriptor() noexcept;
469

470
    /// \param in_blocking_mode Must be true if, and only if the passed file
471
    /// descriptor refers to a file description in which the file status flag
472
    /// O_NONBLOCK is not set.
473
    ///
474
    /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC
475
    /// set.
476
    void assign(native_handle_type fd, bool in_blocking_mode) noexcept;
477
    void close() noexcept;
478
    native_handle_type release() noexcept;
479

480
    bool is_open() const noexcept;
481

482
    native_handle_type native_handle() const noexcept;
483
    bool in_blocking_mode() const noexcept;
484

485
    void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept;
486
    std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept;
487
    std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept;
488

489
    /// \tparam Oper An operation type inherited from IoOper with an initate()
490
    /// function that initiates the operation and figures out whether it needs
491
    /// to read from, or write to the underlying descriptor to
492
    /// proceed. `initiate()` must return Want::read if the operation needs to
493
    /// read, or Want::write if the operation needs to write. If the operation
494
    /// completes immediately (e.g. due to a failure during initialization),
495
    /// `initiate()` must return Want::nothing.
496
    template <class Oper, class... Args>
497
    void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...);
498

499
    void ensure_blocking_mode();
500
    void ensure_nonblocking_mode();
501

502
private:
503
    native_handle_type m_fd = -1;
504
    bool m_in_blocking_mode; // Not in nonblocking mode
505

506
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
507
    bool m_read_ready;
508
    bool m_write_ready;
509
    bool m_imminent_end_of_input; // Kernel has seen the end of input
510
    bool m_is_registered;
511
    OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops;
512

513
    void deregister_for_async() noexcept;
514
#endif
515

516
    bool assume_read_would_block() const noexcept;
517
    bool assume_write_would_block() const noexcept;
518

519
    void set_read_ready(bool) noexcept;
520
    void set_write_ready(bool) noexcept;
521

522
    void set_nonblock_flag(bool value);
523
    void add_initiated_oper(LendersIoOperPtr, Want);
524

525
    void do_close() noexcept;
526
    native_handle_type do_release() noexcept;
527

528
    friend class IoReactor;
529
};
530

531

532
class Resolver {
533
public:
534
    class Query;
535

536
    Resolver(Service&);
537
    ~Resolver() noexcept;
538

539
    /// Thread-safe.
540
    Service& get_service() noexcept;
541

542
    /// @{ \brief Resolve the specified query to one or more endpoints.
543
    Endpoint::List resolve(const Query&);
544
    Endpoint::List resolve(const Query&, std::error_code&);
545
    /// @}
546

547
    /// \brief Perform an asynchronous resolve operation.
548
    ///
549
    /// Initiate an asynchronous resolve operation. The completion handler will
550
    /// be called when the operation completes. The operation completes when it
551
    /// succeeds, or an error occurs.
552
    ///
553
    /// The completion handler is always executed by the event loop thread,
554
    /// i.e., by a thread that is executing Service::run(). Conversely, the
555
    /// completion handler is guaranteed to not be called while no thread is
556
    /// executing Service::run(). The execution of the completion handler is
557
    /// always deferred to the event loop, meaning that it never happens as a
558
    /// synchronous side effect of the execution of async_resolve(), even when
559
    /// async_resolve() is executed by the event loop thread. The completion
560
    /// handler is guaranteed to be called eventually, as long as there is time
561
    /// enough for the operation to complete or fail, and a thread is executing
562
    /// Service::run() for long enough.
563
    ///
564
    /// The operation can be canceled by calling cancel(), and will be
565
    /// automatically canceled if the resolver object is destroyed. If the
566
    /// operation is canceled, it will fail with `error::operation_aborted`. The
567
    /// operation remains cancelable up until the point in time where the
568
    /// completion handler starts to execute. This means that if cancel() is
569
    /// called before the completion handler starts to execute, then the
570
    /// completion handler is guaranteed to have `error::operation_aborted`
571
    /// passed to it. This is true regardless of whether cancel() is called
572
    /// explicitly or implicitly, such as when the resolver is destroyed.
573
    ///
574
    /// The specified handler will be executed by an expression on the form
575
    /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is
576
    /// an object of type `Endpoint::List`. If the the handler object is
577
    /// movable, it will never be copied. Otherwise, it will be copied as
578
    /// necessary.
579
    ///
580
    /// It is an error to start a new resolve operation (synchronous or
581
    /// asynchronous) while an asynchronous resolve operation is in progress via
582
    /// the same resolver object. An asynchronous resolve operation is
583
    /// considered complete as soon as the completion handler starts to
584
    /// execute. This means that a new resolve operation can be started from the
585
    /// completion handler.
586
    template <class H>
587
    void async_resolve(Query, H&& handler);
588

589
    /// \brief Cancel all asynchronous operations.
590
    ///
591
    /// Cause all incomplete asynchronous operations, that are associated with
592
    /// this resolver (at most one), to fail with `error::operation_aborted`. An
593
    /// asynchronous operation is complete precisely when its completion handler
594
    /// starts executing.
595
    ///
596
    /// Completion handlers of canceled operations will become immediately ready
597
    /// to execute, but will never be executed directly as part of the execution
598
    /// of cancel().
599
    ///
600
    /// Cancellation happens automatically when the resolver object is destroyed.
601
    void cancel() noexcept;
602

603
private:
604
    template <class H>
605
    class ResolveOper;
606

607
    Service::Impl& m_service_impl;
608

609
    Service::OwnersOperPtr m_resolve_oper;
610

611
    void initiate_oper(Service::LendersResolveOperPtr);
612
};
613

614

615
class Resolver::Query {
616
public:
617
    enum {
618
        /// Locally bound socket endpoint (server side)
619
        passive = AI_PASSIVE,
620

621
        /// Ignore families without a configured non-loopback address
622
        address_configured = AI_ADDRCONFIG
623
    };
624

625
    Query(std::string service_port, int init_flags = passive | address_configured);
626
    Query(const StreamProtocol&, std::string service_port, int init_flags = passive | address_configured);
627
    Query(std::string host_name, std::string service_port, int init_flags = address_configured);
628
    Query(const StreamProtocol&, std::string host_name, std::string service_port,
629
          int init_flags = address_configured);
630

631
    ~Query() noexcept;
632

633
    int flags() const;
634
    StreamProtocol protocol() const;
635
    std::string host() const;
636
    std::string service() const;
637

638
private:
639
    int m_flags;
640
    StreamProtocol m_protocol;
641
    std::string m_host;    // hostname
642
    std::string m_service; // port
643

644
    friend class Service;
645
};
646

647

648
class SocketBase {
649
public:
650
    using native_handle_type = Service::Descriptor::native_handle_type;
651

652
    ~SocketBase() noexcept;
653

654
    /// Thread-safe.
655
    Service& get_service() noexcept;
656

657
    bool is_open() const noexcept;
658
    native_handle_type native_handle() const noexcept;
659

660
    /// @{ \brief Open the socket for use with the specified protocol.
661
    ///
662
    /// It is an error to call open() on a socket that is already open.
663
    void open(const StreamProtocol&);
664
    std::error_code open(const StreamProtocol&, std::error_code&);
665
    /// @}
666

667
    /// \brief Close this socket.
668
    ///
669
    /// If the socket is open, it will be closed. If it is already closed (or
670
    /// never opened), this function does nothing (idempotency).
671
    ///
672
    /// A socket is automatically closed when destroyed.
673
    ///
674
    /// When the socket is closed, any incomplete asynchronous operation will be
675
    /// canceled (as if cancel() was called).
676
    void close() noexcept;
677

678
    /// \brief Cancel all asynchronous operations.
679
    ///
680
    /// Cause all incomplete asynchronous operations, that are associated with
681
    /// this socket, to fail with `error::operation_aborted`. An asynchronous
682
    /// operation is complete precisely when its completion handler starts
683
    /// executing.
684
    ///
685
    /// Completion handlers of canceled operations will become immediately ready
686
    /// to execute, but will never be executed directly as part of the execution
687
    /// of cancel().
688
    void cancel() noexcept;
689

690
    template <class O>
691
    void get_option(O& opt) const;
692

693
    template <class O>
694
    std::error_code get_option(O& opt, std::error_code&) const;
695

696
    template <class O>
697
    void set_option(const O& opt);
698

699
    template <class O>
700
    std::error_code set_option(const O& opt, std::error_code&);
701

702
    void bind(const Endpoint&);
703
    std::error_code bind(const Endpoint&, std::error_code&);
704

705
    Endpoint local_endpoint() const;
706
    Endpoint local_endpoint(std::error_code&) const;
707

708
    /// Release the ownership of this socket object over the native handle and
709
    /// return the native handle to the caller. The caller assumes ownership
710
    /// over the returned handle. The socket is left in a closed
711
    /// state. Incomplete asynchronous operations will be canceled as if close()
712
    /// had been called.
713
    ///
714
    /// If called on a closed socket, this function is a no-op, and returns the
715
    /// same value as would be returned by native_handle()
716
    native_handle_type release_native_handle() noexcept;
717

718
private:
719
    enum opt_enum {
720
        opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR`
721
        opt_Linger,    ///< `SOL_SOCKET`, `SO_LINGER`
722
        opt_NoDelay,   ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm)
723
    };
724

725
    template <class, int, class>
726
    class Option;
727

728
public:
729
    using reuse_address = Option<bool, opt_ReuseAddr, int>;
730
    using no_delay = Option<bool, opt_NoDelay, int>;
731

732
    // linger struct defined by POSIX sys/socket.h.
733
    struct linger_opt;
734
    using linger = Option<linger_opt, opt_Linger, struct linger>;
735

736
protected:
737
    Service::Descriptor m_desc;
738

739
private:
740
    StreamProtocol m_protocol;
741

742
protected:
743
    Service::OwnersOperPtr m_read_oper;  // Read or accept
744
    Service::OwnersOperPtr m_write_oper; // Write or connect
745

746
    SocketBase(Service&);
747

748
    const StreamProtocol& get_protocol() const noexcept;
749
    std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&);
750
    void do_close() noexcept;
751

752
    void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const;
753
    void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&);
754
    void map_option(opt_enum, int& level, int& option_name) const;
755

756
    friend class Acceptor;
757
};
758

759

760
template <class T, int opt, class U>
761
class SocketBase::Option {
762
public:
763
    Option(T value = T());
764
    T value() const;
765

766
private:
767
    T m_value;
768

769
    void get(const SocketBase&, std::error_code&);
770
    void set(SocketBase&, std::error_code&) const;
771

772
    friend class SocketBase;
773
};
774

775
struct SocketBase::linger_opt {
776
    linger_opt(bool enable, int timeout_seconds = 0)
777
    {
×
778
        m_linger.l_onoff = enable ? 1 : 0;
×
779
        m_linger.l_linger = timeout_seconds;
×
780
    }
×
781

782
    ::linger m_linger;
783

784
    operator ::linger() const
785
    {
×
786
        return m_linger;
×
787
    }
×
788

789
    bool enabled() const
790
    {
×
791
        return m_linger.l_onoff != 0;
×
792
    }
×
793
    int timeout() const
794
    {
×
795
        return m_linger.l_linger;
×
796
    }
×
797
};
798

799

800
/// Switching between synchronous and asynchronous operations is allowed, but
801
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
802
/// allowed to run concurrently with an asynchronous one on the same
803
/// socket. Note that an asynchronous operation is considered to be running
804
/// until its completion handler starts executing.
805
class Socket : public SocketBase {
806
public:
807
    Socket(Service&);
808

809
    /// \brief Create a socket with an already-connected native socket handle.
810
    ///
811
    /// This constructor is shorthand for creating the socket with the
812
    /// one-argument constructor, and then calling the two-argument assign()
813
    /// with the specified protocol and native handle.
814
    Socket(Service&, const StreamProtocol&, native_handle_type);
815

816
    ~Socket() noexcept;
817

818
    void connect(const Endpoint&);
819
    std::error_code connect(const Endpoint&, std::error_code&);
820

821
    /// @{ \brief Perform a synchronous read operation.
822
    ///
823
    /// read() will not return until the specified buffer is full, or an error
824
    /// occurs. Reaching the end of input before the buffer is filled, is
825
    /// considered an error, and will cause the operation to fail with
826
    /// MiscExtErrors::end_of_input.
827
    ///
828
    /// read_until() will not return until the specified buffer contains the
829
    /// specified delimiter, or an error occurs. If the buffer is filled before
830
    /// the delimiter is found, the operation fails with
831
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
832
    /// reached before the delimiter is found, the operation fails with
833
    /// MiscExtErrors::end_of_input. If the operation succeeds, the last byte
834
    /// placed in the buffer is the delimiter.
835
    ///
836
    /// The versions that take a ReadAheadBuffer argument will read through that
837
    /// buffer. This allows for fewer larger reads on the underlying
838
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
839
    /// a read operation returns, it is important that the same read-ahead
840
    /// buffer is passed to the next read operation.
841
    ///
842
    /// The versions of read() and read_until() that do not take an
843
    /// `std::error_code&` argument will throw std::system_error on failure.
844
    ///
845
    /// The versions that do take an `std::error_code&` argument will set \a ec
846
    /// to `std::error_code()` on success, and to something else on failure. On
847
    /// failure they will return the number of bytes placed in the specified
848
    /// buffer before the error occured.
849
    ///
850
    /// \return The number of bytes places in the specified buffer upon return.
851
    std::size_t read(char* buffer, std::size_t size);
852
    std::size_t read(char* buffer, std::size_t size, std::error_code& ec);
853
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&);
854
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec);
855
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&);
856
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, std::error_code& ec);
857
    /// @}
858

859
    /// @{ \brief Perform a synchronous write operation.
860
    ///
861
    /// write() will not return until all the specified bytes have been written
862
    /// to the socket, or an error occurs.
863
    ///
864
    /// The versions of write() that does not take an `std::error_code&`
865
    /// argument will throw std::system_error on failure. When it succeeds, it
866
    /// always returns \a size.
867
    ///
868
    /// The versions that does take an `std::error_code&` argument will set \a
869
    /// ec to `std::error_code()` on success, and to something else on
870
    /// failure. On success it returns \a size. On faulure it returns the number
871
    /// of bytes written before the failure occured.
872
    std::size_t write(const char* data, std::size_t size);
873
    std::size_t write(const char* data, std::size_t size, std::error_code& ec);
874
    /// @}
875

876
    /// @{ \brief Read at least one byte from this socket.
877
    ///
878
    /// If \a size is zero, both versions of read_some() will return zero
879
    /// without blocking. Read errors may or may not be detected in this case.
880
    ///
881
    /// Otherwise, if \a size is greater than zero, and at least one byte is
882
    /// immediately available, that is, without blocking, then both versions
883
    /// will read at least one byte (but generally as many immediately available
884
    /// bytes as will fit into the specified buffer), and return without
885
    /// blocking.
886
    ///
887
    /// Otherwise, both versions will block the calling thread until at least one
888
    /// byte becomes available, or an error occurs.
889
    ///
890
    /// In this context, it counts as an error, if the end of input is reached
891
    /// before at least one byte becomes available (see
892
    /// MiscExtErrors::end_of_input).
893
    ///
894
    /// If no error occurs, both versions will return the number of bytes placed
895
    /// in the specified buffer, which is generally as many as are immediately
896
    /// available at the time when the first byte becomes available, although
897
    /// never more than \a size.
898
    ///
899
    /// If no error occurs, the three-argument version will set \a ec to
900
    /// indicate success.
901
    ///
902
    /// If an error occurs, the two-argument version will throw
903
    /// `std::system_error`, while the three-argument version will set \a ec to
904
    /// indicate the error, and return zero.
905
    ///
906
    /// As long as \a size is greater than zero, the two argument version will
907
    /// always return a value that is greater than zero, while the three
908
    /// argument version will return a value greater than zero when, and only
909
    /// when \a ec is set to indicate success (no error, and no end of input).
910
    std::size_t read_some(char* buffer, std::size_t size);
911
    std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec);
912
    /// @}
913

914
    /// @{ \brief Write at least one byte to this socket.
915
    ///
916
    /// If \a size is zero, both versions of write_some() will return zero
917
    /// without blocking. Write errors may or may not be detected in this case.
918
    ///
919
    /// Otherwise, if \a size is greater than zero, and at least one byte can be
920
    /// written immediately, that is, without blocking, then both versions will
921
    /// write at least one byte (but generally as many as can be written
922
    /// immediately), and return without blocking.
923
    ///
924
    /// Otherwise, both versions will block the calling thread until at least one
925
    /// byte can be written, or an error occurs.
926
    ///
927
    /// If no error occurs, both versions will return the number of bytes
928
    /// written, which is generally as many as can be written immediately at the
929
    /// time when the first byte can be written.
930
    ///
931
    /// If no error occurs, the three-argument version will set \a ec to
932
    /// indicate success.
933
    ///
934
    /// If an error occurs, the two-argument version will throw
935
    /// `std::system_error`, while the three-argument version will set \a ec to
936
    /// indicate the error, and return zero.
937
    ///
938
    /// As long as \a size is greater than zero, the two argument version will
939
    /// always return a value that is greater than zero, while the three
940
    /// argument version will return a value greater than zero when, and only
941
    /// when \a ec is set to indicate success.
942
    std::size_t write_some(const char* data, std::size_t size);
943
    std::size_t write_some(const char* data, std::size_t size, std::error_code&);
944
    /// @}
945

946
    /// \brief Perform an asynchronous connect operation.
947
    ///
948
    /// Initiate an asynchronous connect operation. The completion handler is
949
    /// called when the operation completes. The operation completes when the
950
    /// connection is established, or an error occurs.
951
    ///
952
    /// The completion handler is always executed by the event loop thread,
953
    /// i.e., by a thread that is executing Service::run(). Conversely, the
954
    /// completion handler is guaranteed to not be called while no thread is
955
    /// executing Service::run(). The execution of the completion handler is
956
    /// always deferred to the event loop, meaning that it never happens as a
957
    /// synchronous side effect of the execution of async_connect(), even when
958
    /// async_connect() is executed by the event loop thread. The completion
959
    /// handler is guaranteed to be called eventually, as long as there is time
960
    /// enough for the operation to complete or fail, and a thread is executing
961
    /// Service::run() for long enough.
962
    ///
963
    /// The operation can be canceled by calling cancel(), and will be
964
    /// automatically canceled if the socket is closed. If the operation is
965
    /// canceled, it will fail with `error::operation_aborted`. The operation
966
    /// remains cancelable up until the point in time where the completion
967
    /// handler starts to execute. This means that if cancel() is called before
968
    /// the completion handler starts to execute, then the completion handler is
969
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
970
    /// regardless of whether cancel() is called explicitly or implicitly, such
971
    /// as when the socket is destroyed.
972
    ///
973
    /// If the socket is not already open, it will be opened as part of the
974
    /// connect operation as if by calling `open(ep.protocol())`. If the opening
975
    /// operation succeeds, but the connect operation fails, the socket will be
976
    /// left in the opened state.
977
    ///
978
    /// The specified handler will be executed by an expression on the form
979
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
980
    /// movable, it will never be copied. Otherwise, it will be copied as
981
    /// necessary.
982
    ///
983
    /// It is an error to start a new connect operation (synchronous or
984
    /// asynchronous) while an asynchronous connect operation is in progress. An
985
    /// asynchronous connect operation is considered complete as soon as the
986
    /// completion handler starts to execute.
987
    ///
988
    /// \param ep The remote endpoint of the connection to be established.
989
    template <class H>
990
    void async_connect(const Endpoint& ep, H&& handler);
991

992
    /// @{ \brief Perform an asynchronous read operation.
993
    ///
994
    /// Initiate an asynchronous buffered read operation on the associated
995
    /// socket. The completion handler will be called when the operation
996
    /// completes, or an error occurs.
997
    ///
998
    /// async_read() will continue reading until the specified buffer is full,
999
    /// or an error occurs. If the end of input is reached before the buffer is
1000
    /// filled, the operation fails with MiscExtErrors::end_of_input.
1001
    ///
1002
    /// async_read_until() will continue reading until the specified buffer
1003
    /// contains the specified delimiter, or an error occurs. If the buffer is
1004
    /// filled before a delimiter is found, the operation fails with
1005
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
1006
    /// reached before a delimiter is found, the operation fails with
1007
    /// MiscExtErrors::end_of_input. Otherwise, if the operation succeeds, the
1008
    /// last byte placed in the buffer is the delimiter.
1009
    ///
1010
    /// The versions that take a ReadAheadBuffer argument will read through that
1011
    /// buffer. This allows for fewer larger reads on the underlying
1012
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
1013
    /// a read operation completes, it is important that the same read-ahead
1014
    /// buffer is passed to the next read operation.
1015
    ///
1016
    /// The completion handler is always executed by the event loop thread,
1017
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1018
    /// completion handler is guaranteed to not be called while no thread is
1019
    /// executing Service::run(). The execution of the completion handler is
1020
    /// always deferred to the event loop, meaning that it never happens as a
1021
    /// synchronous side effect of the execution of async_read() or
1022
    /// async_read_until(), even when async_read() or async_read_until() is
1023
    /// executed by the event loop thread. The completion handler is guaranteed
1024
    /// to be called eventually, as long as there is time enough for the
1025
    /// operation to complete or fail, and a thread is executing Service::run()
1026
    /// for long enough.
1027
    ///
1028
    /// The operation can be canceled by calling cancel() on the associated
1029
    /// socket, and will be automatically canceled if the associated socket is
1030
    /// closed. If the operation is canceled, it will fail with
1031
    /// `error::operation_aborted`. The operation remains cancelable up until
1032
    /// the point in time where the completion handler starts to execute. This
1033
    /// means that if cancel() is called before the completion handler starts to
1034
    /// execute, then the completion handler is guaranteed to have
1035
    /// `error::operation_aborted` passed to it. This is true regardless of
1036
    /// whether cancel() is called explicitly or implicitly, such as when the
1037
    /// socket is destroyed.
1038
    ///
1039
    /// The specified handler will be executed by an expression on the form
1040
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1041
    /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to
1042
    /// be less than, or equal to \a size. If the the handler object is movable,
1043
    /// it will never be copied. Otherwise, it will be copied as necessary.
1044
    ///
1045
    /// It is an error to start a read operation before the associated socket is
1046
    /// connected.
1047
    ///
1048
    /// It is an error to start a new read operation (synchronous or
1049
    /// asynchronous) while an asynchronous read operation is in progress. An
1050
    /// asynchronous read operation is considered complete as soon as the
1051
    /// completion handler starts executing. This means that a new read
1052
    /// operation can be started from the completion handler of another
1053
    /// asynchronous buffered read operation.
1054
    template <class H>
1055
    void async_read(char* buffer, std::size_t size, H&& handler);
1056
    template <class H>
1057
    void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H&& handler);
1058
    template <class H>
1059
    void async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, H&& handler);
1060
    /// @}
1061

1062
    /// \brief Perform an asynchronous write operation.
1063
    ///
1064
    /// Initiate an asynchronous write operation. The completion handler is
1065
    /// called when the operation completes. The operation completes when all
1066
    /// the specified bytes have been written to the socket, or an error occurs.
1067
    ///
1068
    /// The completion handler is always executed by the event loop thread,
1069
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1070
    /// completion handler is guaranteed to not be called while no thread is
1071
    /// executing Service::run(). The execution of the completion handler is
1072
    /// always deferred to the event loop, meaning that it never happens as a
1073
    /// synchronous side effect of the execution of async_write(), even when
1074
    /// async_write() is executed by the event loop thread. The completion
1075
    /// handler is guaranteed to be called eventually, as long as there is time
1076
    /// enough for the operation to complete or fail, and a thread is executing
1077
    /// Service::run() for long enough.
1078
    ///
1079
    /// The operation can be canceled by calling cancel(), and will be
1080
    /// automatically canceled if the socket is closed. If the operation is
1081
    /// canceled, it will fail with `error::operation_aborted`. The operation
1082
    /// remains cancelable up until the point in time where the completion
1083
    /// handler starts to execute. This means that if cancel() is called before
1084
    /// the completion handler starts to execute, then the completion handler is
1085
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1086
    /// regardless of whether cancel() is called explicitly or implicitly, such
1087
    /// as when the socket is destroyed.
1088
    ///
1089
    /// The specified handler will be executed by an expression on the form
1090
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1091
    /// bytes written (of type `std::size_t`). If the the handler object is
1092
    /// movable, it will never be copied. Otherwise, it will be copied as
1093
    /// necessary.
1094
    ///
1095
    /// It is an error to start an asynchronous write operation before the
1096
    /// socket is connected.
1097
    ///
1098
    /// It is an error to start a new write operation (synchronous or
1099
    /// asynchronous) while an asynchronous write operation is in progress. An
1100
    /// asynchronous write operation is considered complete as soon as the
1101
    /// completion handler starts to execute. This means that a new write
1102
    /// operation can be started from the completion handler of another
1103
    /// asynchronous write operation.
1104
    template <class H>
1105
    void async_write(const char* data, std::size_t size, H&& handler);
1106

1107
    template <class H>
1108
    void async_read_some(char* buffer, std::size_t size, H&& handler);
1109
    template <class H>
1110
    void async_write_some(const char* data, std::size_t size, H&& handler);
1111

1112
    enum shutdown_type {
1113
#ifdef _WIN32
1114
        /// Shutdown the receiving side of the socket.
1115
        shutdown_receive = SD_RECEIVE,
1116

1117
        /// Shutdown the sending side of the socket.
1118
        shutdown_send = SD_SEND,
1119

1120
        /// Shutdown both sending and receiving side of the socket.
1121
        shutdown_both = SD_BOTH
1122
#else
1123
        shutdown_receive = SHUT_RD,
1124
        shutdown_send = SHUT_WR,
1125
        shutdown_both = SHUT_RDWR
1126
#endif
1127
    };
1128

1129
    /// @{ \brief Shut down the connected sockets sending and/or receiving
1130
    /// side.
1131
    ///
1132
    /// It is an error to call this function when the socket is not both open
1133
    /// and connected.
1134
    void shutdown(shutdown_type);
1135
    std::error_code shutdown(shutdown_type, std::error_code&);
1136
    /// @}
1137

1138
    /// @{ \brief Initialize socket with an already-connected native socket
1139
    /// handle.
1140
    ///
1141
    /// The specified native handle must refer to a socket that is already fully
1142
    /// open and connected.
1143
    ///
1144
    /// If the assignment operation succeeds, this socket object has taken
1145
    /// ownership of the specified native handle, and the handle will be closed
1146
    /// when the socket object is destroyed, (or when close() is called). If the
1147
    /// operation fails, the caller still owns the specified native handle.
1148
    ///
1149
    /// It is an error to call connect() or async_connect() on a socket object
1150
    /// that is initialized this way (unless it is first closed).
1151
    ///
1152
    /// It is an error to call this function on a socket object that is already
1153
    /// open.
1154
    void assign(const StreamProtocol&, native_handle_type);
1155
    std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&);
1156
    /// @}
1157

1158
    /// Returns a reference to this socket, as this socket is the lowest layer
1159
    /// of a stream.
1160
    Socket& lowest_layer() noexcept;
1161

1162
private:
1163
    using Want = Service::Want;
1164
    using StreamOps = Service::BasicStreamOps<Socket>;
1165

1166
    class ConnectOperBase;
1167
    template <class H>
1168
    class ConnectOper;
1169

1170
    using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>;
1171

1172
    // `ec` untouched on success, but no immediate completion
1173
    bool initiate_async_connect(const Endpoint&, std::error_code& ec);
1174
    // `ec` untouched on success
1175
    std::error_code finalize_async_connect(std::error_code& ec) noexcept;
1176

1177
    // See Service::BasicStreamOps for details on these these 6 functions.
1178
    void do_init_read_async(std::error_code&, Want&) noexcept;
1179
    void do_init_write_async(std::error_code&, Want&) noexcept;
1180
    std::size_t do_read_some_sync(char* buffer, std::size_t size, std::error_code&) noexcept;
1181
    std::size_t do_write_some_sync(const char* data, std::size_t size, std::error_code&) noexcept;
1182
    std::size_t do_read_some_async(char* buffer, std::size_t size, std::error_code&, Want&) noexcept;
1183
    std::size_t do_write_some_async(const char* data, std::size_t size, std::error_code&, Want&) noexcept;
1184

1185
    friend class Service::BasicStreamOps<Socket>;
1186
    friend class Service::BasicStreamOps<ssl::Stream>;
1187
    friend class ReadAheadBuffer;
1188
    friend class ssl::Stream;
1189
};
1190

1191

1192
/// Switching between synchronous and asynchronous operations is allowed, but
1193
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
1194
/// allowed to run concurrently with an asynchronous one on the same
1195
/// acceptor. Note that an asynchronous operation is considered to be running
1196
/// until its completion handler starts executing.
1197
class Acceptor : public SocketBase {
1198
public:
1199
    Acceptor(Service&);
1200
    ~Acceptor() noexcept;
1201

1202
    static constexpr int max_connections = SOMAXCONN;
1203

1204
    void listen(int backlog = max_connections);
1205
    std::error_code listen(int backlog, std::error_code&);
1206

1207
    void accept(Socket&);
1208
    void accept(Socket&, Endpoint&);
1209
    std::error_code accept(Socket&, std::error_code&);
1210
    std::error_code accept(Socket&, Endpoint&, std::error_code&);
1211

1212
    /// @{ \brief Perform an asynchronous accept operation.
1213
    ///
1214
    /// Initiate an asynchronous accept operation. The completion handler will
1215
    /// be called when the operation completes. The operation completes when the
1216
    /// connection is accepted, or an error occurs. If the operation succeeds,
1217
    /// the specified local socket will have become connected to a remote
1218
    /// socket.
1219
    ///
1220
    /// The completion handler is always executed by the event loop thread,
1221
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1222
    /// completion handler is guaranteed to not be called while no thread is
1223
    /// executing Service::run(). The execution of the completion handler is
1224
    /// always deferred to the event loop, meaning that it never happens as a
1225
    /// synchronous side effect of the execution of async_accept(), even when
1226
    /// async_accept() is executed by the event loop thread. The completion
1227
    /// handler is guaranteed to be called eventually, as long as there is time
1228
    /// enough for the operation to complete or fail, and a thread is executing
1229
    /// Service::run() for long enough.
1230
    ///
1231
    /// The operation can be canceled by calling cancel(), and will be
1232
    /// automatically canceled if the acceptor is closed. If the operation is
1233
    /// canceled, it will fail with `error::operation_aborted`. The operation
1234
    /// remains cancelable up until the point in time where the completion
1235
    /// handler starts to execute. This means that if cancel() is called before
1236
    /// the completion handler starts to execute, then the completion handler is
1237
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1238
    /// regardless of whether cancel() is called explicitly or implicitly, such
1239
    /// as when the acceptor is destroyed.
1240
    ///
1241
    /// The specified handler will be executed by an expression on the form
1242
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
1243
    /// movable, it will never be copied. Otherwise, it will be copied as
1244
    /// necessary.
1245
    ///
1246
    /// It is an error to start a new accept operation (synchronous or
1247
    /// asynchronous) while an asynchronous accept operation is in progress. An
1248
    /// asynchronous accept operation is considered complete as soon as the
1249
    /// completion handler starts executing. This means that a new accept
1250
    /// operation can be started from the completion handler.
1251
    ///
1252
    /// \param sock This is the local socket, that upon successful completion
1253
    /// will have become connected to the remote socket. It must be in the
1254
    /// closed state (Socket::is_open()) when async_accept() is called.
1255
    ///
1256
    template <class H>
1257
    void async_accept(Socket& sock, H&& handler);
1258
    /// \param ep Upon completion, the remote peer endpoint will have been
1259
    /// assigned to this variable.
1260
    template <class H>
1261
    void async_accept(Socket& sock, Endpoint& ep, H&& handler);
1262
    /// @}
1263

1264
private:
1265
    using Want = Service::Want;
1266

1267
    class AcceptOperBase;
1268
    template <class H>
1269
    class AcceptOper;
1270

1271
    using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>;
1272

1273
    std::error_code accept(Socket&, Endpoint*, std::error_code&);
1274
    Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept;
1275

1276
    template <class H>
1277
    void async_accept(Socket&, Endpoint*, H&&);
1278
};
1279

1280

1281
/// \brief A timer object supporting asynchronous wait operations.
1282
class DeadlineTimer {
1283
public:
1284
    DeadlineTimer(Service&);
1285
    ~DeadlineTimer() noexcept;
1286

1287
    /// Thread-safe.
1288
    Service& get_service() noexcept;
1289

1290
    /// \brief Perform an asynchronous wait operation.
1291
    ///
1292
    /// Initiate an asynchronous wait operation. The completion handler becomes
1293
    /// ready to execute when the expiration time is reached, or an error occurs
1294
    /// (cancellation counts as an error here). The expiration time is the time
1295
    /// of initiation plus the specified delay. The error code passed to the
1296
    /// completion handler will **never** indicate success, unless the
1297
    /// expiration time was reached.
1298
    ///
1299
    /// The completion handler is always executed by the event loop thread,
1300
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1301
    /// completion handler is guaranteed to not be called while no thread is
1302
    /// executing Service::run(). The execution of the completion handler is
1303
    /// always deferred to the event loop, meaning that it never happens as a
1304
    /// synchronous side effect of the execution of async_wait(), even when
1305
    /// async_wait() is executed by the event loop thread. The completion
1306
    /// handler is guaranteed to be called eventually, as long as there is time
1307
    /// enough for the operation to complete or fail, and a thread is executing
1308
    /// Service::run() for long enough.
1309
    ///
1310
    /// The operation can be canceled by calling cancel(), and will be
1311
    /// automatically canceled if the timer is destroyed. If the operation is
1312
    /// canceled, it will fail with `ErrorCodes::OperationAborted`. The operation
1313
    /// remains cancelable up until the point in time where the completion
1314
    /// handler starts to execute. This means that if cancel() is called before
1315
    /// the completion handler starts to execute, then the completion handler is
1316
    /// guaranteed to have `ErrorCodes::OperationAborted` passed to it. This is true
1317
    /// regardless of whether cancel() is called explicitly or implicitly, such
1318
    /// as when the timer is destroyed.
1319
    ///
1320
    /// The specified handler will be executed by an expression on the form
1321
    /// `handler(status)` where `status` is a Status object. If the handler object
1322
    /// is movable, it will never be copied. Otherwise, it will be copied as
1323
    /// necessary.
1324
    ///
1325
    /// It is an error to start a new asynchronous wait operation while an
1326
    /// another one is in progress. An asynchronous wait operation is in
1327
    /// progress until its completion handler starts executing.
1328
    template <class R, class P, class H>
1329
    void async_wait(std::chrono::duration<R, P> delay, H&& handler);
1330

1331
    /// \brief Cancel an asynchronous wait operation.
1332
    ///
1333
    /// If an asynchronous wait operation, that is associated with this deadline
1334
    /// timer, is in progress, cause it to fail with
1335
    /// `error::operation_aborted`. An asynchronous wait operation is in
1336
    /// progress until its completion handler starts executing.
1337
    ///
1338
    /// Completion handlers of canceled operations will become immediately ready
1339
    /// to execute, but will never be executed directly as part of the execution
1340
    /// of cancel().
1341
    ///
1342
    /// Cancellation happens automatically when the timer object is destroyed.
1343
    void cancel() noexcept;
1344

1345
private:
1346
    template <class H>
1347
    class WaitOper;
1348

1349
    using clock = Service::clock;
1350

1351
    Service::Impl& m_service_impl;
1352
    Service::OwnersOperPtr m_wait_oper;
1353

1354
    void initiate_oper(Service::LendersWaitOperPtr);
1355
};
1356

1357

1358
class ReadAheadBuffer {
1359
public:
1360
    ReadAheadBuffer();
1361

1362
    /// Discard any buffered data.
1363
    void clear() noexcept;
1364

1365
private:
1366
    using Want = Service::Want;
1367

1368
    char* m_begin = nullptr;
1369
    char* m_end = nullptr;
1370
    static constexpr std::size_t s_size = 1024;
1371
    const std::unique_ptr<char[]> m_buffer;
1372

1373
    bool empty() const noexcept;
1374
    bool read(char*& begin, char* end, int delim, std::error_code&) noexcept;
1375
    template <class S>
1376
    void refill_sync(S& stream, std::error_code&) noexcept;
1377
    template <class S>
1378
    bool refill_async(S& stream, std::error_code&, Want&) noexcept;
1379

1380
    template <class>
1381
    friend class Service::BasicStreamOps;
1382
};
1383

1384

1385
enum class ResolveErrors {
1386
    /// Host not found (authoritative).
1387
    host_not_found = 1,
1388

1389
    /// Host not found (non-authoritative).
1390
    host_not_found_try_again = 2,
1391

1392
    /// The query is valid but does not have associated address data.
1393
    no_data = 3,
1394

1395
    /// A non-recoverable error occurred.
1396
    no_recovery = 4,
1397

1398
    /// The service is not supported for the given socket type.
1399
    service_not_found = 5,
1400

1401
    /// The socket type is not supported.
1402
    socket_type_not_supported = 6,
1403
};
1404

1405
/// The error category associated with ResolveErrors. The name of this category is
1406
/// `realm.sync.network.resolve`.
1407
const std::error_category& resolve_error_category() noexcept;
1408

1409
std::error_code make_error_code(ResolveErrors err);
1410

1411
} // namespace realm::sync::network
1412

1413
namespace std {
1414

1415
template <>
1416
class is_error_code_enum<realm::sync::network::ResolveErrors> {
1417
public:
1418
    static const bool value = true;
1419
};
1420

1421
} // namespace std
1422

1423
namespace realm::sync::network {
1424

1425
// Implementation
1426

1427
// ---------------- StreamProtocol ----------------
1428

1429
inline StreamProtocol StreamProtocol::ip_v4()
1430
{
40,680✔
1431
    StreamProtocol prot;
40,680✔
1432
    prot.m_family = AF_INET;
40,680✔
1433
    return prot;
40,680✔
1434
}
40,680✔
1435

1436
inline StreamProtocol StreamProtocol::ip_v6()
1437
{
×
1438
    StreamProtocol prot;
×
1439
    prot.m_family = AF_INET6;
×
1440
    return prot;
×
1441
}
×
1442

1443
inline bool StreamProtocol::is_ip_v4() const
1444
{
80,774✔
1445
    return m_family == AF_INET;
80,774✔
1446
}
80,774✔
1447

1448
inline bool StreamProtocol::is_ip_v6() const
1449
{
×
1450
    return m_family == AF_INET6;
×
1451
}
×
1452

1453
inline int StreamProtocol::family() const
1454
{
40,672✔
1455
    return m_family;
40,672✔
1456
}
40,672✔
1457

1458
inline int StreamProtocol::protocol() const
1459
{
2✔
1460
    return m_protocol;
2✔
1461
}
2✔
1462

1463
inline StreamProtocol::StreamProtocol()
1464
    : m_family{AF_UNSPEC}
1465
    , // Allow both IPv4 and IPv6
1466
    m_socktype{SOCK_STREAM}
1467
    ,             // Or SOCK_DGRAM for UDP
1468
    m_protocol{0} // Any protocol
1469
{
73,892✔
1470
}
73,892✔
1471

1472
// ---------------- Address ----------------
1473

1474
inline bool Address::is_ip_v4() const
1475
{
×
1476
    return !m_is_ip_v6;
×
1477
}
×
1478

1479
inline bool Address::is_ip_v6() const
1480
{
×
1481
    return m_is_ip_v6;
×
1482
}
×
1483

1484
template <class C, class T>
1485
inline std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>& out, const Address& addr)
1486
{
19,716✔
1487
    // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO
10,004✔
1488
    // do?
10,004✔
1489
    union buffer_union {
19,716✔
1490
        char ip_v4[INET_ADDRSTRLEN];
19,716✔
1491
        char ip_v6[INET6_ADDRSTRLEN];
19,716✔
1492
    };
19,716✔
1493
    char buffer[sizeof(buffer_union)];
19,716✔
1494
    int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET;
19,716✔
1495
#ifdef _WIN32
1496
    void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union));
1497
#else
1498
    const void* src = &addr.m_union;
19,716✔
1499
#endif
19,716✔
1500
    const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer);
19,716✔
1501
    if (ret == 0) {
19,716✔
1502
        std::error_code ec = util::make_basic_system_error_code(errno);
×
1503
        throw std::system_error(ec);
×
1504
    }
×
1505
    out << ret;
19,716✔
1506
    return out;
19,716✔
1507
}
19,716✔
1508

1509
inline Address::Address()
1510
{
19,720✔
1511
    m_union.m_ip_v4 = ip_v4_type();
19,720✔
1512
}
19,720✔
1513

1514
inline Address make_address(const char* c_str)
1515
{
×
1516
    std::error_code ec;
×
1517
    Address addr = make_address(c_str, ec);
×
1518
    if (ec)
×
1519
        throw std::system_error(ec);
×
1520
    return addr;
×
1521
}
×
1522

1523
inline Address make_address(const std::string& str)
1524
{
×
1525
    std::error_code ec;
×
1526
    Address addr = make_address(str, ec);
×
1527
    if (ec)
×
1528
        throw std::system_error(ec);
×
1529
    return addr;
×
1530
}
×
1531

1532
inline Address make_address(const std::string& str, std::error_code& ec) noexcept
1533
{
×
1534
    return make_address(str.c_str(), ec);
×
1535
}
×
1536

1537
// ---------------- Endpoint ----------------
1538

1539
inline StreamProtocol Endpoint::protocol() const
1540
{
11,780✔
1541
    return m_protocol;
11,780✔
1542
}
11,780✔
1543

1544
inline Address Endpoint::address() const
1545
{
19,718✔
1546
    Address addr;
19,718✔
1547
    if (m_protocol.is_ip_v4()) {
19,718✔
1548
        addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr;
19,718✔
1549
    }
19,718✔
UNCOV
1550
    else {
×
UNCOV
1551
        addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr;
×
UNCOV
1552
        addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id;
×
UNCOV
1553
        addr.m_is_ip_v6 = true;
×
UNCOV
1554
    }
×
1555
    return addr;
19,718✔
1556
}
19,718✔
1557

1558
inline Endpoint::port_type Endpoint::port() const
1559
{
27,664✔
1560
    return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port : m_sockaddr_union.m_ip_v6.sin6_port);
27,664✔
1561
}
27,664✔
1562

1563
inline Endpoint::data_type* Endpoint::data()
1564
{
2✔
1565
    return &m_sockaddr_union.m_base;
2✔
1566
}
2✔
1567

1568
inline const Endpoint::data_type* Endpoint::data() const
1569
{
×
1570
    return &m_sockaddr_union.m_base;
×
1571
}
×
1572

1573
inline Endpoint::Endpoint()
1574
    : Endpoint{StreamProtocol::ip_v4(), 0}
1575
{
40,670✔
1576
}
40,670✔
1577

1578
inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port)
1579
    : m_protocol{protocol}
1580
{
40,666✔
1581
    int family = m_protocol.family();
40,666✔
1582
    if (family == AF_INET) {
40,668✔
1583
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
40,668✔
1584
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
40,668✔
1585
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
40,668✔
1586
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
40,668✔
1587
    }
40,668✔
1588
    else if (family == AF_INET6) {
2,147,483,647!
1589
        m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear
×
1590
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
×
1591
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
×
1592
    }
×
1593
    else {
2,147,483,647✔
1594
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
2,147,483,647✔
1595
        m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC;
2,147,483,647✔
1596
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
2,147,483,647✔
1597
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
2,147,483,647✔
1598
    }
2,147,483,647✔
1599
}
40,666✔
1600

1601
inline Endpoint::Endpoint(const Address& addr, port_type port)
1602
{
1603
    if (addr.m_is_ip_v6) {
1604
        m_protocol = StreamProtocol::ip_v6();
1605
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1606
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1607
        m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0;
1608
        m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6;
1609
        m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id;
1610
    }
1611
    else {
1612
        m_protocol = StreamProtocol::ip_v4();
1613
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1614
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1615
        m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4;
1616
    }
1617
}
1618

1619
inline Endpoint::List::iterator Endpoint::List::begin() const noexcept
1620
{
14,528✔
1621
    return m_endpoints.data();
14,528✔
1622
}
14,528✔
1623

1624
inline Endpoint::List::iterator Endpoint::List::end() const noexcept
1625
{
7,980✔
1626
    return m_endpoints.data() + m_endpoints.size();
7,980✔
1627
}
7,980✔
1628

1629
inline std::size_t Endpoint::List::size() const noexcept
1630
{
9,828✔
1631
    return m_endpoints.size();
9,828✔
1632
}
9,828✔
1633

1634
inline bool Endpoint::List::empty() const noexcept
1635
{
3,280✔
1636
    return m_endpoints.size() == 0;
3,280✔
1637
}
3,280✔
1638

1639
// ---------------- Service::OperQueue ----------------
1640

1641
template <class Oper>
1642
inline bool Service::OperQueue<Oper>::empty() const noexcept
1643
{
11,349,462✔
1644
    return !m_back;
11,349,462✔
1645
}
11,349,462✔
1646

1647
template <class Oper>
1648
inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept
1649
{
7,767,614✔
1650
    REALM_ASSERT(!op->m_next);
7,767,614✔
1651
    if (m_back) {
7,767,614✔
1652
        op->m_next = m_back->m_next;
2,190,530✔
1653
        m_back->m_next = op.get();
2,190,530✔
1654
    }
2,190,530✔
1655
    else {
5,577,084✔
1656
        op->m_next = op.get();
5,577,084✔
1657
    }
5,577,084✔
1658
    m_back = op.release();
7,767,614✔
1659
}
7,767,614✔
1660

1661
template <class Oper>
1662
template <class Oper2>
1663
inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept
1664
{
3,490,338✔
1665
    if (!q.m_back)
3,490,338✔
1666
        return;
2,811,732✔
1667
    if (m_back)
678,606✔
1668
        std::swap(m_back->m_next, q.m_back->m_next);
17,964✔
1669
    m_back = q.m_back;
678,606✔
1670
    q.m_back = nullptr;
678,606✔
1671
}
678,606✔
1672

1673
template <class Oper>
1674
inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr
1675
{
12,912,354✔
1676
    Oper* op = nullptr;
12,912,354✔
1677
    if (m_back) {
12,912,354✔
1678
        op = static_cast<Oper*>(m_back->m_next);
7,776,582✔
1679
        if (op != m_back) {
7,776,582✔
1680
            m_back->m_next = op->m_next;
2,200,502✔
1681
        }
2,200,502✔
1682
        else {
5,576,080✔
1683
            m_back = nullptr;
5,576,080✔
1684
        }
5,576,080✔
1685
        op->m_next = nullptr;
7,776,582✔
1686
    }
7,776,582✔
1687
    return LendersOperPtr(op);
12,912,354✔
1688
}
12,912,354✔
1689

1690
template <class Oper>
1691
inline void Service::OperQueue<Oper>::clear() noexcept
1692
{
2,902,834✔
1693
    if (m_back) {
2,902,834✔
1694
        LendersOperPtr op(m_back);
8,482✔
1695
        while (op->m_next != m_back)
17,196✔
1696
            op.reset(static_cast<Oper*>(op->m_next));
8,714✔
1697
        m_back = nullptr;
8,482✔
1698
    }
8,482✔
1699
}
2,902,834✔
1700

1701
template <class Oper>
1702
inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept
1703
    : m_back{q.m_back}
1704
{
99,456✔
1705
    q.m_back = nullptr;
99,456✔
1706
}
99,456✔
1707

1708
// ---------------- Service::Descriptor ----------------
1709

1710
inline Service::Descriptor::Descriptor(Impl& s) noexcept
1711
    : service_impl{s}
1712
{
21,938✔
1713
}
21,938✔
1714

1715
inline Service::Descriptor::~Descriptor() noexcept
1716
{
21,934✔
1717
    if (is_open())
21,934✔
1718
        close();
×
1719
}
21,934✔
1720

1721
inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept
1722
{
13,952✔
1723
    REALM_ASSERT(!is_open());
13,952✔
1724
    m_fd = fd;
13,952✔
1725
    m_in_blocking_mode = in_blocking_mode;
13,952✔
1726
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
6,922✔
1727
    m_read_ready = false;
6,922✔
1728
    m_write_ready = false;
6,922✔
1729
    m_imminent_end_of_input = false;
6,922✔
1730
    m_is_registered = false;
6,922✔
1731
#endif
6,922✔
1732
}
13,952✔
1733

1734
inline void Service::Descriptor::close() noexcept
1735
{
13,950✔
1736
    REALM_ASSERT(is_open());
13,950✔
1737
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
6,922✔
1738
    if (m_is_registered)
6,922✔
1739
        deregister_for_async();
6,836✔
1740
    m_is_registered = false;
6,922✔
1741
#endif
6,922✔
1742
    do_close();
13,950✔
1743
}
13,950✔
1744

1745
inline auto Service::Descriptor::release() noexcept -> native_handle_type
1746
{
×
1747
    REALM_ASSERT(is_open());
×
1748
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
×
1749
    if (m_is_registered)
×
1750
        deregister_for_async();
×
1751
    m_is_registered = false;
×
1752
#endif
×
1753
    return do_release();
×
1754
}
×
1755

1756
inline bool Service::Descriptor::is_open() const noexcept
1757
{
113,748✔
1758
    return (m_fd != -1);
113,748✔
1759
}
113,748✔
1760

1761
inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type
1762
{
52,830✔
1763
    return m_fd;
52,830✔
1764
}
52,830✔
1765

1766
inline bool Service::Descriptor::in_blocking_mode() const noexcept
1767
{
1,146✔
1768
    return m_in_blocking_mode;
1,146✔
1769
}
1,146✔
1770

1771
template <class Oper, class... Args>
1772
inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op, Args&&... args)
1773
{
2,884,748✔
1774
    Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws
2,884,748✔
1775
    add_initiated_oper(std::move(op), want);                        // Throws
2,884,748✔
1776
}
2,884,748✔
1777

1778
inline void Service::Descriptor::ensure_blocking_mode()
1779
{
556,654✔
1780
    // Assuming that descriptors are either used mostly in blocking mode, or
277,606✔
1781
    // mostly in nonblocking mode.
277,606✔
1782
    if (REALM_UNLIKELY(!m_in_blocking_mode)) {
556,654✔
1783
        bool value = false;
144✔
1784
        set_nonblock_flag(value); // Throws
144✔
1785
        m_in_blocking_mode = true;
144✔
1786
    }
144✔
1787
}
556,654✔
1788

1789
inline void Service::Descriptor::ensure_nonblocking_mode()
1790
{
3,009,198✔
1791
    // Assuming that descriptors are either used mostly in blocking mode, or
1,539,406✔
1792
    // mostly in nonblocking mode.
1,539,406✔
1793
    if (REALM_UNLIKELY(m_in_blocking_mode)) {
3,009,198✔
1794
        bool value = true;
11,716✔
1795
        set_nonblock_flag(value); // Throws
11,716✔
1796
        m_in_blocking_mode = false;
11,716✔
1797
    }
11,716✔
1798
}
3,009,198✔
1799

1800
inline bool Service::Descriptor::assume_read_would_block() const noexcept
1801
{
2,154,612✔
1802
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,140,594✔
1803
    return !m_in_blocking_mode && !m_read_ready;
1,140,594✔
1804
#else
1805
    return false;
1,014,018✔
1806
#endif
1,014,018✔
1807
}
2,154,612✔
1808

1809
inline bool Service::Descriptor::assume_write_would_block() const noexcept
1810
{
1,499,030✔
1811
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
783,968✔
1812
    return !m_in_blocking_mode && !m_write_ready;
783,968✔
1813
#else
1814
    return false;
715,062✔
1815
#endif
715,062✔
1816
}
1,499,030✔
1817

1818
inline void Service::Descriptor::set_read_ready(bool value) noexcept
1819
{
2,161,072✔
1820
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,144,056✔
1821
    m_read_ready = value;
1,144,056✔
1822
#else
1823
    // No-op
1,017,016✔
1824
    static_cast<void>(value);
1,017,016✔
1825
#endif
1,017,016✔
1826
}
2,161,072✔
1827

1828
inline void Service::Descriptor::set_write_ready(bool value) noexcept
1829
{
1,410,624✔
1830
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
692,886✔
1831
    m_write_ready = value;
692,886✔
1832
#else
1833
    // No-op
717,738✔
1834
    static_cast<void>(value);
717,738✔
1835
#endif
717,738✔
1836
}
1,410,624✔
1837

1838
// ---------------- Service ----------------
1839

1840
class Service::AsyncOper {
1841
public:
1842
    bool in_use() const noexcept;
1843
    bool is_complete() const noexcept;
1844
    bool is_canceled() const noexcept;
1845
    void cancel() noexcept;
1846
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1847
    /// to this function or to recycle(). This function recycles the operation
1848
    /// object (commits suicide), even if it throws.
1849
    virtual void recycle_and_execute() = 0;
1850
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1851
    /// to recycle_and_execute() or to this function. This function destroys the
1852
    /// object (commits suicide).
1853
    virtual void recycle() noexcept = 0;
1854
    /// Must be called when the owner dies, and the object is in use (not an
1855
    /// instance of UnusedOper).
1856
    virtual void orphan() noexcept = 0;
1857

1858
protected:
1859
    AsyncOper(std::size_t size, bool in_use) noexcept;
1860
    virtual ~AsyncOper() noexcept {}
10,253,692✔
1861
    void set_is_complete(bool value) noexcept;
1862
    template <class H, class... Args>
1863
    void do_recycle_and_execute(bool orphaned, H& handler, Args&&...);
1864
    void do_recycle(bool orphaned) noexcept;
1865

1866
private:
1867
    std::size_t m_size; // Allocated number of bytes
1868
    bool m_in_use = false;
1869
    // Set to true when the operation completes successfully or fails. If the
1870
    // operation is canceled before this happens, it will never be set to
1871
    // true. Always false when not in use
1872
    bool m_complete = false;
1873
    // Set to true when the operation is canceled. Always false when not in use.
1874
    bool m_canceled = false;
1875
    AsyncOper* m_next = nullptr; // Always null when not in use
1876
    template <class H, class... Args>
1877
    void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...);
1878
    friend class Service;
1879
};
1880

1881
class Service::ResolveOperBase : public AsyncOper {
1882
public:
1883
    ResolveOperBase(std::size_t size, Resolver& resolver, Resolver::Query query) noexcept
1884
        : AsyncOper{size, true}
1885
        , m_resolver{&resolver}
1886
        , m_query{std::move(query)}
1887
    {
3,296✔
1888
    }
3,296✔
1889
    void complete() noexcept
1890
    {
3,292✔
1891
        set_is_complete(true);
3,292✔
1892
    }
3,292✔
1893
    void recycle() noexcept override final
1894
    {
8✔
1895
        bool orphaned = !m_resolver;
8✔
1896
        REALM_ASSERT(orphaned);
8✔
1897
        // Note: do_recycle() commits suicide.
4✔
1898
        do_recycle(orphaned);
8✔
1899
    }
8✔
1900
    void orphan() noexcept override final
1901
    {
8✔
1902
        m_resolver = nullptr;
8✔
1903
    }
8✔
1904

1905
protected:
1906
    Resolver* m_resolver;
1907
    Resolver::Query m_query;
1908
    Endpoint::List m_endpoints;
1909
    std::error_code m_error_code;
1910
    friend class Service;
1911
};
1912

1913
class Service::WaitOperBase : public AsyncOper {
1914
public:
1915
    WaitOperBase(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time) noexcept
1916
        : AsyncOper{size, true}
1917
        , // Second argument is `in_use`
1918
        m_timer{&timer}
1919
        , m_expiration_time{expiration_time}
1920
    {
549,696✔
1921
    }
549,696✔
1922
    void complete() noexcept
1923
    {
529,458✔
1924
        set_is_complete(true);
529,458✔
1925
    }
529,458✔
1926
    void recycle() noexcept override final
1927
    {
7,972✔
1928
        bool orphaned = !m_timer;
7,972✔
1929
        REALM_ASSERT(orphaned);
7,972✔
1930
        // Note: do_recycle() commits suicide.
3,932✔
1931
        do_recycle(orphaned);
7,972✔
1932
    }
7,972✔
1933
    void orphan() noexcept override final
1934
    {
20,204✔
1935
        m_timer = nullptr;
20,204✔
1936
    }
20,204✔
1937

1938
protected:
1939
    DeadlineTimer* m_timer;
1940
    clock::time_point m_expiration_time;
1941
    friend class Service;
1942
};
1943

1944
class Service::TriggerExecOperBase : public AsyncOper, public util::AtomicRefCountBase {
1945
public:
1946
    TriggerExecOperBase(Impl& service) noexcept
1947
        : AsyncOper{0, false}
1948
        , // First arg is `size` (unused), second arg is `in_use`
1949
        m_service{&service}
1950
    {
×
1951
    }
×
1952
    void recycle() noexcept override final
1953
    {
×
1954
        REALM_ASSERT(in_use());
×
1955
        REALM_ASSERT(!m_service);
×
1956
        // Note: Potential suicide when `self` goes out of scope
×
1957
        util::bind_ptr<TriggerExecOperBase> self{this, util::bind_ptr_base::adopt_tag{}};
×
1958
    }
×
1959
    void orphan() noexcept override final
1960
    {
×
1961
        REALM_ASSERT(m_service);
×
1962
        m_service = nullptr;
×
1963
    }
×
1964
    void trigger() noexcept
1965
    {
×
1966
        REALM_ASSERT(m_service);
×
1967
        Service::trigger_exec(*m_service, *this);
×
1968
    }
×
1969

1970
protected:
1971
    Impl* m_service;
1972
};
1973

1974
class Service::PostOperBase : public AsyncOper {
1975
public:
1976
    PostOperBase(std::size_t size, Impl& service) noexcept
1977
        : AsyncOper{size, true}
1978
        , // Second argument is `in_use`
1979
        m_service{service}
1980
    {
1,191,594✔
1981
    }
1,191,594✔
1982
    void recycle() noexcept override final
1983
    {
362✔
1984
        // Service::recycle_post_oper() destroys this operation object
228✔
1985
        Service::recycle_post_oper(m_service, this);
362✔
1986
    }
362✔
1987
    void orphan() noexcept override final
1988
    {
×
1989
        REALM_ASSERT(false); // Never called
×
1990
    }
×
1991

1992
protected:
1993
    Impl& m_service;
1994
};
1995

1996
template <class H>
1997
class Service::PostOper : public PostOperBase {
1998
public:
1999
    PostOper(std::size_t size, Impl& service, H&& handler)
2000
        : PostOperBase{size, service}
2001
        , m_handler{std::move(handler)}
2002
    {
770,126✔
2003
    }
770,126✔
2004
    void recycle_and_execute() override final
2005
    {
769,792✔
2006
        // Recycle the operation object before the handler is exceuted, such
378,751✔
2007
        // that the memory is available for a new post operation that might be
378,751✔
2008
        // initiated during the execution of the handler.
378,751✔
2009
        bool was_recycled = false;
769,792✔
2010
        try {
769,792✔
2011
            H handler = std::move(m_handler); // Throws
769,792✔
2012
            // Service::recycle_post_oper() destroys this operation object
378,751✔
2013
            Service::recycle_post_oper(m_service, this);
769,792✔
2014
            was_recycled = true;
769,792✔
2015
            handler(Status::OK()); // Throws
769,792✔
2016
        }
769,792✔
2017
        catch (...) {
378,755✔
2018
            if (!was_recycled) {
8!
2019
                // Service::recycle_post_oper() destroys this operation object
2020
                Service::recycle_post_oper(m_service, this);
×
2021
            }
×
2022
            throw;
8✔
2023
        }
8✔
2024
    }
769,792✔
2025

2026
private:
2027
    H m_handler;
2028
};
2029

2030
class Service::IoOper : public AsyncOper {
2031
public:
2032
    IoOper(std::size_t size) noexcept
2033
        : AsyncOper{size, true} // Second argument is `in_use`
2034
    {
3,445,314✔
2035
    }
3,445,314✔
2036
    virtual Descriptor& descriptor() noexcept = 0;
2037
    /// Advance this operation and figure out out whether it needs to read from,
2038
    /// or write to the underlying descriptor to advance further. This function
2039
    /// must return Want::read if the operation needs to read, or Want::write if
2040
    /// the operation needs to write to advance further. If the operation
2041
    /// completes (due to success or failure), this function must return
2042
    /// Want::nothing.
2043
    virtual Want advance() noexcept = 0;
2044
};
2045

2046
class Service::UnusedOper : public AsyncOper {
2047
public:
2048
    UnusedOper(std::size_t size) noexcept
2049
        : AsyncOper{size, false} // Second argument is `in_use`
2050
    {
5,133,846✔
2051
    }
5,133,846✔
2052
    void recycle_and_execute() override final
2053
    {
×
2054
        // Must never be called
2055
        REALM_ASSERT(false);
×
2056
    }
×
2057
    void recycle() noexcept override final
2058
    {
×
2059
        // Must never be called
2060
        REALM_ASSERT(false);
×
2061
    }
×
2062
    void orphan() noexcept override final
2063
    {
×
2064
        // Must never be called
2065
        REALM_ASSERT(false);
×
2066
    }
×
2067
};
2068

2069
// `S` must be a stream class with the following member functions:
2070
//
2071
//    Socket& lowest_layer() noexcept;
2072
//
2073
//    void do_init_read_async(std::error_code& ec, Want& want) noexcept;
2074
//    void do_init_write_async(std::error_code& ec, Want& want) noexcept;
2075
//
2076
//    std::size_t do_read_some_sync(char* buffer, std::size_t size,
2077
//                                  std::error_code& ec) noexcept;
2078
//    std::size_t do_write_some_sync(const char* data, std::size_t size,
2079
//                                   std::error_code& ec) noexcept;
2080
//    std::size_t do_read_some_async(char* buffer, std::size_t size,
2081
//                                   std::error_code& ec, Want& want) noexcept;
2082
//    std::size_t do_write_some_async(const char* data, std::size_t size,
2083
//                                    std::error_code& ec, Want& want) noexcept;
2084
//
2085
// If an error occurs during any of these 6 functions, the `ec` argument must be
2086
// set accordingly. Otherwise the `ec` argument must be set to
2087
// `std::error_code()`.
2088
//
2089
// The do_init_*_async() functions must update the `want` argument to indicate
2090
// how the operation must be initiated:
2091
//
2092
//    Want::read      Wait for read readiness, then call do_*_some_async().
2093
//    Want::write     Wait for write readiness, then call do_*_some_async().
2094
//    Want::nothing   Call do_*_some_async() immediately without waiting for
2095
//                    read or write readiness.
2096
//
2097
// If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to
2098
// MiscExtErrors::end_of_input, and return zero.
2099
//
2100
// If an error occurs during reading or writing, do_*_some_sync() must set `ec`
2101
// accordingly (to something other than `std::system_error()`) and return
2102
// zero. Otherwise they must set `ec` to `std::system_error()` and return the
2103
// number of bytes read or written, which **must** be at least 1. If the
2104
// underlying socket is in nonblocking mode, and no bytes could be immediately
2105
// read or written, these functions must fail with
2106
// `error::resource_unavailable_try_again`.
2107
//
2108
// If an error occurs during reading or writing, do_*_some_async() must set `ec`
2109
// accordingly (to something other than `std::system_error()`), `want` to
2110
// `Want::nothing`, and return zero. Otherwise they must set `ec` to
2111
// `std::system_error()` and return the number of bytes read or written, which
2112
// must be zero if no bytes could be immediately read or written. Note, in this
2113
// case it is not an error if the underlying socket is in nonblocking mode, and
2114
// no bytes could be immediately read or written. When these functions succeed,
2115
// but return zero because no bytes could be immediately read or written, they
2116
// must set `want` to something other than `Want::nothing`.
2117
//
2118
// If no error occurs, do_*_some_async() must set `want` to indicate how the
2119
// operation should proceed if additional data needs to be read or written, or
2120
// if no bytes were transferred:
2121
//
2122
//    Want::read      Wait for read readiness, then call do_*_some_async() again.
2123
//    Want::write     Wait for write readiness, then call do_*_some_async() again.
2124
//    Want::nothing   Call do_*_some_async() again without waiting for read or
2125
//                    write readiness.
2126
//
2127
// NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it
2128
// means that the stream needs to write data to the underlying TCP socket before
2129
// it is able to deliver any additional data to the caller. While such a
2130
// situation will never occur on a raw TCP socket, it can occur on an SSL stream
2131
// (Secure Socket Layer).
2132
//
2133
// When do_*_some_async() returns `n`, at least one of the following conditions
2134
// must be true:
2135
//
2136
//    n > 0                     Bytes were transferred.
2137
//    ec != std::error_code()   An error occured.
2138
//    want != Want::nothing     Wait for read/write readiness.
2139
//
2140
// This is of critical importance, as it is the only way we can avoid falling
2141
// into a busy loop of repeated invocations of do_*_some_async().
2142
//
2143
// NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or
2144
// `Want::write`, even when they succesfully transfer a nonzero number of bytes.
2145
template <class S>
2146
class Service::BasicStreamOps {
2147
public:
2148
    class StreamOper;
2149
    class ReadOperBase;
2150
    class WriteOperBase;
2151
    class BufferedReadOperBase;
2152
    template <class H>
2153
    class ReadOper;
2154
    template <class H>
2155
    class WriteOper;
2156
    template <class H>
2157
    class BufferedReadOper;
2158

2159
    using LendersReadOperPtr = std::unique_ptr<ReadOperBase, LendersOperDeleter>;
2160
    using LendersWriteOperPtr = std::unique_ptr<WriteOperBase, LendersOperDeleter>;
2161
    using LendersBufferedReadOperPtr = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>;
2162

2163
    // Synchronous read
2164
    static std::size_t read(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2165
    {
4✔
2166
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
4✔
2167
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
4✔
2168
        char* begin = buffer;
4✔
2169
        char* end = buffer + size;
4✔
2170
        char* curr = begin;
4✔
2171
        for (;;) {
8✔
2172
            if (curr == end) {
8✔
2173
                ec = std::error_code(); // Success
×
2174
                break;
×
2175
            }
×
2176
            char* buffer_2 = curr;
8✔
2177
            std::size_t size_2 = std::size_t(end - curr);
8✔
2178
            std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec);
8✔
2179
            if (REALM_UNLIKELY(ec))
8✔
2180
                break;
6✔
2181
            REALM_ASSERT(n > 0);
4✔
2182
            REALM_ASSERT(n <= size_2);
4✔
2183
            curr += n;
4✔
2184
        }
4✔
2185
        std::size_t n = std::size_t(curr - begin);
4✔
2186
        return n;
4✔
2187
    }
4✔
2188

2189
    // Synchronous write
2190
    static std::size_t write(S& stream, const char* data, std::size_t size, std::error_code& ec)
2191
    {
261,812✔
2192
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
261,812✔
2193
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
261,812✔
2194
        const char* begin = data;
261,812✔
2195
        const char* end = data + size;
261,812✔
2196
        const char* curr = begin;
261,812✔
2197
        for (;;) {
523,619✔
2198
            if (curr == end) {
523,619✔
2199
                ec = std::error_code(); // Success
261,808✔
2200
                break;
261,808✔
2201
            }
261,808✔
2202
            const char* data_2 = curr;
261,811✔
2203
            std::size_t size_2 = std::size_t(end - curr);
261,811✔
2204
            std::size_t n = stream.do_write_some_sync(data_2, size_2, ec);
261,811✔
2205
            if (REALM_UNLIKELY(ec))
261,811✔
2206
                break;
130,645✔
2207
            REALM_ASSERT(n > 0);
261,807✔
2208
            REALM_ASSERT(n <= size_2);
261,807✔
2209
            curr += n;
261,807✔
2210
        }
261,807✔
2211
        std::size_t n = std::size_t(curr - begin);
261,812✔
2212
        return n;
261,812✔
2213
    }
261,812✔
2214

2215
    // Synchronous read
2216
    static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2217
                                     std::error_code& ec)
2218
    {
16,410✔
2219
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
16,410!
2220
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
16,410✔
2221
        char* begin = buffer;
16,410✔
2222
        char* end = buffer + size;
16,410✔
2223
        char* curr = begin;
16,410✔
2224
        for (;;) {
147,501✔
2225
            bool complete = rab.read(curr, end, delim, ec);
147,501✔
2226
            if (complete)
147,501✔
2227
                break;
16,402✔
2228

65,552✔
2229
            rab.refill_sync(stream, ec);
131,099✔
2230
            if (REALM_UNLIKELY(ec))
131,099✔
2231
                break;
65,556✔
2232
        }
131,099✔
2233
        std::size_t n = (curr - begin);
16,410✔
2234
        return n;
16,410✔
2235
    }
16,410✔
2236

2237
    // Synchronous read
2238
    static std::size_t read_some(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2239
    {
20✔
2240
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
20✔
2241
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
20✔
2242
        return stream.do_read_some_sync(buffer, size, ec);
20✔
2243
    }
20✔
2244

2245
    // Synchronous write
2246
    static std::size_t write_some(S& stream, const char* data, std::size_t size, std::error_code& ec)
2247
    {
112✔
2248
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
112✔
2249
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
112✔
2250
        return stream.do_write_some_sync(data, size, ec);
112✔
2251
    }
112✔
2252

2253
    template <class H>
2254
    static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H&& handler)
2255
    {
587,619✔
2256
        char* begin = buffer;
587,619✔
2257
        char* end = buffer + size;
587,619✔
2258
        LendersReadOperPtr op = Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some,
587,619✔
2259
                                                            begin, end, std::move(handler)); // Throws
587,619✔
2260
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                           // Throws
587,619✔
2261
    }
587,619✔
2262

2263
    template <class H>
2264
    static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some, H&& handler)
2265
    {
763,043✔
2266
        const char* begin = data;
763,043✔
2267
        const char* end = data + size;
763,043✔
2268
        LendersWriteOperPtr op = Service::alloc<WriteOper<H>>(
763,043✔
2269
            stream.lowest_layer().m_write_oper, stream, is_write_some, begin, end, std::move(handler)); // Throws
763,043✔
2270
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                                      // Throws
763,043✔
2271
    }
763,043✔
2272

2273
    template <class H>
2274
    static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2275
                                    H&& handler)
2276
    {
695,368✔
2277
        char* begin = buffer;
695,368✔
2278
        char* end = buffer + size;
695,368✔
2279
        LendersBufferedReadOperPtr op =
695,368✔
2280
            Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream, begin, end, delim, rab,
695,368✔
2281
                                                std::move(handler)); // Throws
695,368✔
2282
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));   // Throws
695,368✔
2283
    }
695,368✔
2284
};
2285

2286
template <class S>
2287
class Service::BasicStreamOps<S>::StreamOper : public IoOper {
2288
public:
2289
    StreamOper(std::size_t size, S& stream) noexcept
2290
        : IoOper{size}
2291
        , m_stream{&stream}
2292
    {
3,450,924✔
2293
    }
3,450,924✔
2294
    void recycle() noexcept override final
2295
    {
876✔
2296
        bool orphaned = !m_stream;
876✔
2297
        REALM_ASSERT(orphaned);
876✔
2298
        // Note: do_recycle() commits suicide.
352✔
2299
        do_recycle(orphaned);
876✔
2300
    }
876✔
2301
    void orphan() noexcept override final
2302
    {
4,674✔
2303
        m_stream = nullptr;
4,674✔
2304
    }
4,674✔
2305
    Descriptor& descriptor() noexcept override final
2306
    {
214,316✔
2307
        return m_stream->lowest_layer().m_desc;
214,316✔
2308
    }
214,316✔
2309

2310
protected:
2311
    S* m_stream;
2312
    std::error_code m_error_code;
2313
};
2314

2315
template <class S>
2316
class Service::BasicStreamOps<S>::ReadOperBase : public StreamOper {
2317
public:
2318
    ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept
2319
        : StreamOper{size, stream}
2320
        , m_is_read_some{is_read_some}
2321
        , m_begin{begin}
2322
        , m_end{end}
2323
    {
587,386✔
2324
    }
587,386✔
2325
    Want initiate()
2326
    {
587,587✔
2327
        auto& s = *this;
587,587✔
2328
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
587,587✔
2329
        REALM_ASSERT(!s.is_complete());
587,587✔
2330
        REALM_ASSERT(s.m_curr <= s.m_end);
587,587✔
2331
        Want want = Want::nothing;
587,587✔
2332
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
587,587✔
2333
            s.set_is_complete(true); // Success
×
2334
        }
×
2335
        else {
587,587✔
2336
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
587,587✔
2337
            s.m_stream->do_init_read_async(s.m_error_code, want);
587,587✔
2338
            if (want == Want::nothing) {
587,587✔
2339
                if (REALM_UNLIKELY(s.m_error_code)) {
236,451!
2340
                    s.set_is_complete(true); // Failure
×
2341
                }
×
2342
                else {
236,451✔
2343
                    want = advance();
236,451✔
2344
                }
236,451✔
2345
            }
236,451✔
2346
        }
587,587✔
2347
        return want;
587,587✔
2348
    }
587,587✔
2349
    Want advance() noexcept override final
2350
    {
372,706✔
2351
        auto& s = *this;
372,706✔
2352
        REALM_ASSERT(!s.is_complete());
372,706✔
2353
        REALM_ASSERT(!s.is_canceled());
372,706✔
2354
        REALM_ASSERT(!s.m_error_code);
372,706✔
2355
        REALM_ASSERT(s.m_curr < s.m_end);
372,706✔
2356
        REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin);
372,706✔
2357
        for (;;) {
372,706✔
2358
            // Read into callers buffer
195,686✔
2359
            char* buffer = s.m_curr;
372,549✔
2360
            std::size_t size = std::size_t(s.m_end - s.m_curr);
372,549✔
2361
            Want want = Want::nothing;
372,549✔
2362
            std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want);
372,549✔
2363
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
372,549✔
2364
            // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
195,686✔
2365
            bool got_nothing = (n == 0);
372,549✔
2366
            if (got_nothing) {
372,549✔
2367
                if (REALM_UNLIKELY(s.m_error_code)) {
1,683✔
2368
                    s.set_is_complete(true); // Failure
2✔
2369
                    return Want::nothing;
2✔
2370
                }
2✔
2371
                // Got nothing, but want something
4✔
2372
                return want;
1,681✔
2373
            }
1,681✔
2374
            REALM_ASSERT(!s.m_error_code);
370,866✔
2375
            // Check for completion
195,681✔
2376
            REALM_ASSERT(n <= size);
370,866✔
2377
            s.m_curr += n;
370,866✔
2378
            if (s.m_is_read_some || s.m_curr == s.m_end) {
371,611!
2379
                s.set_is_complete(true); // Success
371,602✔
2380
                return Want::nothing;
371,602✔
2381
            }
371,602✔
2382
            if (want != Want::nothing)
2,147,483,656✔
2383
                return want;
×
2384
            REALM_ASSERT(n < size);
2,147,483,656✔
2385
        }
2,147,483,656✔
2386
    }
372,706✔
2387

2388
protected:
2389
    const bool m_is_read_some;
2390
    char* const m_begin;    // May be dangling after cancellation
2391
    char* const m_end;      // May be dangling after cancellation
2392
    char* m_curr = m_begin; // May be dangling after cancellation
2393
};
2394

2395
template <class S>
2396
class Service::BasicStreamOps<S>::WriteOperBase : public StreamOper {
2397
public:
2398
    WriteOperBase(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end) noexcept
2399
        : StreamOper{size, stream}
2400
        , m_is_write_some{is_write_some}
2401
        , m_begin{begin}
2402
        , m_end{end}
2403
    {
1,351,174✔
2404
    }
1,351,174✔
2405
    Want initiate()
2406
    {
1,349,986✔
2407
        auto& s = *this;
1,349,986✔
2408
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get());
1,349,986✔
2409
        REALM_ASSERT(!s.is_complete());
1,349,986✔
2410
        REALM_ASSERT(s.m_curr <= s.m_end);
1,349,986✔
2411
        Want want = Want::nothing;
1,349,986✔
2412
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
1,349,986✔
2413
            s.set_is_complete(true); // Success
43,532✔
2414
        }
43,532✔
2415
        else {
1,306,454✔
2416
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
1,306,454✔
2417
            s.m_stream->do_init_write_async(s.m_error_code, want);
1,306,454✔
2418
            if (want == Want::nothing) {
1,306,454✔
2419
                if (REALM_UNLIKELY(s.m_error_code)) {
307,898!
2420
                    s.set_is_complete(true); // Failure
×
2421
                }
×
2422
                else {
307,898✔
2423
                    want = advance();
307,898✔
2424
                }
307,898✔
2425
            }
307,898✔
2426
        }
1,306,454✔
2427
        return want;
1,349,986✔
2428
    }
1,349,986✔
2429
    Want advance() noexcept override final
2430
    {
882,670✔
2431
        auto& s = *this;
882,670✔
2432
        REALM_ASSERT(!s.is_complete());
882,670✔
2433
        REALM_ASSERT(!s.is_canceled());
882,670✔
2434
        REALM_ASSERT(!s.m_error_code);
882,670✔
2435
        REALM_ASSERT(s.m_curr < s.m_end);
882,670✔
2436
        REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin);
882,670✔
2437
        for (;;) {
882,676✔
2438
            // Write from callers buffer
453,750✔
2439
            const char* data = s.m_curr;
882,446✔
2440
            std::size_t size = std::size_t(s.m_end - s.m_curr);
882,446✔
2441
            Want want = Want::nothing;
882,446✔
2442
            std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want);
882,446✔
2443
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
882,446✔
2444
            bool wrote_nothing = (n == 0);
882,446✔
2445
            if (wrote_nothing) {
882,446✔
2446
                if (REALM_UNLIKELY(s.m_error_code)) {
5,078✔
2447
                    s.set_is_complete(true); // Failure
152✔
2448
                    return Want::nothing;
152✔
2449
                }
152✔
2450
                // Wrote nothing, but want something written
376✔
2451
                return want;
4,926✔
2452
            }
4,926✔
2453
            REALM_ASSERT(!s.m_error_code);
877,368✔
2454
            // Check for completion
453,306✔
2455
            REALM_ASSERT(n <= size);
877,368✔
2456
            s.m_curr += n;
877,368✔
2457
            if (s.m_is_write_some || s.m_curr == s.m_end) {
878,620✔
2458
                s.set_is_complete(true); // Success
877,998✔
2459
                return Want::nothing;
877,998✔
2460
            }
877,998✔
2461
            if (want != Want::nothing)
2,147,484,269✔
2462
                return want;
558✔
2463
            REALM_ASSERT(n < size);
2,147,483,731✔
2464
        }
2,147,483,731✔
2465
    }
882,670✔
2466

2467
protected:
2468
    const bool m_is_write_some;
2469
    const char* const m_begin;    // May be dangling after cancellation
2470
    const char* const m_end;      // May be dangling after cancellation
2471
    const char* m_curr = m_begin; // May be dangling after cancellation
2472
};
2473

2474
template <class S>
2475
class Service::BasicStreamOps<S>::BufferedReadOperBase : public StreamOper {
2476
public:
2477
    BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim,
2478
                         ReadAheadBuffer& rab) noexcept
2479
        : StreamOper{size, stream}
2480
        , m_read_ahead_buffer{rab}
2481
        , m_begin{begin}
2482
        , m_end{end}
2483
        , m_delim{delim}
2484
    {
934,146✔
2485
    }
934,146✔
2486
    Want initiate()
2487
    {
934,230✔
2488
        auto& s = *this;
934,230✔
2489
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
934,230✔
2490
        REALM_ASSERT(!s.is_complete());
934,230✔
2491
        Want want = Want::nothing;
934,230✔
2492
        bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
934,230✔
2493
        if (complete) {
934,230✔
2494
            s.set_is_complete(true); // Success or failure
400,454✔
2495
        }
400,454✔
2496
        else {
533,776✔
2497
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
533,776✔
2498
            s.m_stream->do_init_read_async(s.m_error_code, want);
533,776✔
2499
            if (want == Want::nothing) {
533,776✔
2500
                if (REALM_UNLIKELY(s.m_error_code)) {
186!
2501
                    s.set_is_complete(true); // Failure
×
2502
                }
×
2503
                else {
186✔
2504
                    want = advance();
186✔
2505
                }
186✔
2506
            }
186✔
2507
        }
533,776✔
2508
        return want;
934,230✔
2509
    }
934,230✔
2510
    Want advance() noexcept override final
2511
    {
875,928✔
2512
        auto& s = *this;
875,928✔
2513
        REALM_ASSERT(!s.is_complete());
875,928✔
2514
        REALM_ASSERT(!s.is_canceled());
875,928✔
2515
        REALM_ASSERT(!s.m_error_code);
875,928✔
2516
        REALM_ASSERT(s.m_read_ahead_buffer.empty());
875,928✔
2517
        REALM_ASSERT(s.m_curr < s.m_end);
875,928✔
2518
        for (;;) {
875,856✔
2519
            // Fill read-ahead buffer from stream (is empty now)
399,210✔
2520
            Want want = Want::nothing;
875,824✔
2521
            bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want);
875,824✔
2522
            REALM_ASSERT(nonempty || s.m_error_code || want != Want::nothing); // No busy loop, please
875,824✔
2523
            bool got_nothing = !nonempty;
875,824✔
2524
            if (got_nothing) {
875,824✔
2525
                if (REALM_UNLIKELY(s.m_error_code)) {
60,832✔
2526
                    s.set_is_complete(true); // Failure
1,198✔
2527
                    return Want::nothing;
1,198✔
2528
                }
1,198✔
2529
                // Got nothing, but want something
64✔
2530
                return want;
59,634✔
2531
            }
59,634✔
2532
            // Transfer buffered data to callers buffer
398,438✔
2533
            bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
814,992✔
2534
            if (complete || s.m_error_code == util::MiscExtErrors::end_of_input) {
814,992✔
2535
                s.set_is_complete(true); // Success or failure (delim_not_found or end_of_input)
521,076✔
2536
                return Want::nothing;
521,076✔
2537
            }
521,076✔
2538
            if (want != Want::nothing)
293,916✔
2539
                return want;
294,388✔
2540
        }
293,916✔
2541
    }
875,928✔
2542

2543
protected:
2544
    ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation
2545
    char* const m_begin;                  // May be dangling after cancellation
2546
    char* const m_end;                    // May be dangling after cancellation
2547
    char* m_curr = m_begin;               // May be dangling after cancellation
2548
    const int m_delim;
2549
};
2550

2551
template <class S>
2552
template <class H>
2553
class Service::BasicStreamOps<S>::ReadOper : public ReadOperBase {
2554
public:
2555
    ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H&& handler)
2556
        : ReadOperBase{size, stream, is_read_some, begin, end}
2557
        , m_handler{std::move(handler)}
2558
    {
587,402✔
2559
    }
587,402✔
2560
    void recycle_and_execute() override final
2561
    {
587,223✔
2562
        auto& s = *this;
587,223✔
2563
        REALM_ASSERT(s.is_complete() || s.is_canceled());
587,223!
2564
        REALM_ASSERT(s.is_complete() ==
587,223!
2565
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_read_some && s.m_curr != s.m_begin)));
587,223✔
2566
        REALM_ASSERT(s.m_curr >= s.m_begin);
587,223✔
2567
        bool orphaned = !s.m_stream;
587,223✔
2568
        std::error_code ec = s.m_error_code;
587,223✔
2569
        if (s.is_canceled())
587,223✔
2570
            ec = util::error::operation_aborted;
216,602✔
2571
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
587,223✔
2572
        // Note: do_recycle_and_execute() commits suicide.
307,822✔
2573
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
587,223✔
2574
                                             num_bytes_transferred); // Throws
587,223✔
2575
    }
587,223✔
2576

2577
private:
2578
    H m_handler;
2579
};
2580

2581
template <class S>
2582
template <class H>
2583
class Service::BasicStreamOps<S>::WriteOper : public WriteOperBase {
2584
public:
2585
    WriteOper(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end, H&& handler)
2586
        : WriteOperBase{size, stream, is_write_some, begin, end}
2587
        , m_handler{std::move(handler)}
2588
    {
763,021✔
2589
    }
763,021✔
2590
    void recycle_and_execute() override final
2591
    {
762,705✔
2592
        auto& s = *this;
762,705✔
2593
        REALM_ASSERT(s.is_complete() || s.is_canceled());
762,705!
2594
        REALM_ASSERT(s.is_complete() ==
762,705!
2595
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_write_some && s.m_curr != s.m_begin)));
762,705✔
2596
        REALM_ASSERT(s.m_curr >= s.m_begin);
762,705!
2597
        bool orphaned = !s.m_stream;
762,705✔
2598
        std::error_code ec = s.m_error_code;
762,705✔
2599
        if (s.is_canceled())
762,705!
2600
            ec = util::error::operation_aborted;
216,643✔
2601
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
762,705✔
2602
        // Note: do_recycle_and_execute() commits suicide.
389,415✔
2603
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
762,705✔
2604
                                             num_bytes_transferred); // Throws
762,705✔
2605
    }
762,705✔
2606

2607
private:
2608
    H m_handler;
2609
};
2610

2611
template <class S>
2612
template <class H>
2613
class Service::BasicStreamOps<S>::BufferedReadOper : public BufferedReadOperBase {
2614
public:
2615
    BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim, ReadAheadBuffer& rab,
2616
                     H&& handler)
2617
        : BufferedReadOperBase{size, stream, begin, end, delim, rab}
2618
        , m_handler{std::move(handler)}
2619
    {
695,334✔
2620
    }
695,334✔
2621
    void recycle_and_execute() override final
2622
    {
694,356✔
2623
        auto& s = *this;
694,356✔
2624
        REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code));
694,356!
2625
        REALM_ASSERT(s.is_canceled() || s.m_error_code ||
694,356!
2626
                     (s.m_delim != std::char_traits<char>::eof()
694,356✔
2627
                          ? s.m_curr > s.m_begin && s.m_curr[-1] == std::char_traits<char>::to_char_type(s.m_delim)
694,356✔
2628
                          : s.m_curr == s.m_end));
694,356✔
2629
        REALM_ASSERT(s.m_curr >= s.m_begin);
694,356!
2630
        bool orphaned = !s.m_stream;
694,356✔
2631
        std::error_code ec = s.m_error_code;
694,356✔
2632
        if (s.is_canceled())
694,356!
2633
            ec = util::error::operation_aborted;
6,545✔
2634
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
694,356✔
2635
        // Note: do_recycle_and_execute() commits suicide.
334,455✔
2636
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
694,356✔
2637
                                             num_bytes_transferred); // Throws
694,356✔
2638
    }
694,356✔
2639

2640
private:
2641
    H m_handler;
2642
};
2643

2644
template <class H>
2645
inline void Service::post(H handler)
2646
{
770,125✔
2647
    do_post(&Service::post_oper_constr<H>, sizeof(PostOper<H>), &handler);
770,125✔
2648
}
770,125✔
2649

2650
inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept
2651
{
828,300✔
2652
    if (op->in_use()) {
828,300✔
2653
        op->orphan();
32,876✔
2654
    }
32,876✔
2655
    else {
795,424✔
2656
        void* addr = op;
795,424✔
2657
        op->~AsyncOper();
795,424✔
2658
        delete[] static_cast<char*>(addr);
795,424✔
2659
    }
795,424✔
2660
}
828,300✔
2661

2662
inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept
2663
{
17,196✔
2664
    op->recycle(); // Suicide
17,196✔
2665
}
17,196✔
2666

2667
template <class Oper, class... Args>
2668
std::unique_ptr<Oper, Service::LendersOperDeleter> Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args)
2669
{
2,349,701✔
2670
    void* addr = owners_ptr.get();
2,349,701✔
2671
    std::size_t size;
2,349,701✔
2672
    if (REALM_LIKELY(addr)) {
2,349,701!
2673
        REALM_ASSERT(!owners_ptr->in_use());
2,303,246!
2674
        size = owners_ptr->m_size;
2,303,246✔
2675
        // We can use static dispatch in the destructor call here, since an
1,166,321✔
2676
        // object, that is not in use, is always an instance of UnusedOper.
1,166,321✔
2677
        REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get()));
2,303,246!
2678
        static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper();
2,303,246✔
2679
        if (REALM_UNLIKELY(size < sizeof(Oper))) {
2,303,246!
2680
            owners_ptr.release();
3,323✔
2681
            delete[] static_cast<char*>(addr);
3,323✔
2682
            goto no_object;
3,323✔
2683
        }
3,323✔
2684
    }
46,455✔
2685
    else {
46,455✔
2686
    no_object:
49,664✔
2687
        addr = new char[sizeof(Oper)]; // Throws
49,664✔
2688
        size = sizeof(Oper);
49,664✔
2689
        owners_ptr.reset(static_cast<AsyncOper*>(addr));
49,664✔
2690
    }
49,664✔
2691
    std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr;
2,349,732✔
2692
    try {
2,349,587✔
2693
        lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws
2,349,587✔
2694
    }
2,349,587✔
2695
    catch (...) {
1,189,838✔
2696
        new (addr) UnusedOper(size); // Does not throw
×
2697
        throw;
×
2698
    }
×
2699
    return lenders_ptr;
2,349,883✔
2700
}
2,349,883✔
2701

2702
template <class H>
2703
inline Service::PostOperBase* Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie)
2704
{
770,122✔
2705
    H& handler = *static_cast<H*>(cookie);
770,122✔
2706
    return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws
770,122✔
2707
}
770,122✔
2708

2709
inline bool Service::AsyncOper::in_use() const noexcept
2710
{
10,707,928✔
2711
    return m_in_use;
10,707,928✔
2712
}
10,707,928✔
2713

2714
inline bool Service::AsyncOper::is_complete() const noexcept
2715
{
17,134,892✔
2716
    return m_complete;
17,134,892✔
2717
}
17,134,892✔
2718

2719
inline void Service::AsyncOper::cancel() noexcept
2720
{
903,740✔
2721
    REALM_ASSERT(m_in_use);
903,740✔
2722
    REALM_ASSERT(!m_canceled);
903,740✔
2723
    m_canceled = true;
903,740✔
2724
}
903,740✔
2725

2726
inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept
2727
    : m_size{size}
2728
    , m_in_use{is_in_use}
2729
{
10,199,442✔
2730
}
10,199,442✔
2731

2732
inline bool Service::AsyncOper::is_canceled() const noexcept
2733
{
10,923,604✔
2734
    return m_canceled;
10,923,604✔
2735
}
10,923,604✔
2736

2737
inline void Service::AsyncOper::set_is_complete(bool value) noexcept
2738
{
3,111,458✔
2739
    REALM_ASSERT(!m_complete);
3,111,458✔
2740
    REALM_ASSERT(!value || m_in_use);
3,111,458✔
2741
    m_complete = value;
3,111,458✔
2742
}
3,111,458✔
2743

2744
template <class H, class... Args>
2745
inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args)
2746
{
2,332,898✔
2747
    // Recycle the operation object before the handler is exceuted, such that
1,181,824✔
2748
    // the memory is available for a new post operation that might be initiated
1,181,824✔
2749
    // during the execution of the handler.
1,181,824✔
2750
    bool was_recycled = false;
2,332,898✔
2751

1,181,824✔
2752
    // ScopeExit to ensure the AsyncOper object was reclaimed/deleted
1,181,824✔
2753
    auto at_exit = util::ScopeExit([this, &was_recycled, &orphaned]() noexcept {
2,333,205✔
2754
        if (!was_recycled) {
2,332,917!
2755
            do_recycle(orphaned);
×
2756
        }
×
2757
    });
2,332,917✔
2758

1,181,824✔
2759
    // We need to copy or move all arguments to be passed to the handler,
1,181,824✔
2760
    // such that there is no risk of references to the recycled operation
1,181,824✔
2761
    // object being passed to the handler (the passed arguments may be
1,181,824✔
2762
    // references to members of the recycled operation object). The easiest
1,181,824✔
2763
    // way to achive this, is by forwarding the reference arguments (passed
1,181,824✔
2764
    // to this function) to a helper function whose arguments have
1,181,824✔
2765
    // nonreference type (`Args...` rather than `Args&&...`).
1,181,824✔
2766
    //
1,181,824✔
2767
    // Note that the copying and moving of arguments may throw, and it is
1,181,824✔
2768
    // important that the operation is still recycled even if that
1,181,824✔
2769
    // happens. For that reason, copying and moving of arguments must not
1,181,824✔
2770
    // happen until we are in a scope (this scope) that catches and deals
1,181,824✔
2771
    // correctly with such exceptions.
1,181,824✔
2772
    do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler),
2,332,898✔
2773
                                  std::forward<Args>(args)...); // Throws
2,332,898✔
2774

1,181,824✔
2775
    // Removed catch to prevent truncating the stack trace on exception
1,181,824✔
2776
}
2,332,898✔
2777

2778
template <class H, class... Args>
2779
inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler,
2780
                                                              Args... args)
2781
{
2,333,744✔
2782
    do_recycle(orphaned);
2,333,744✔
2783
    was_recycled = true;
2,333,744✔
2784
    handler(std::move(args)...); // Throws
2,333,744✔
2785
}
2,333,744✔
2786

2787
inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept
2788
{
4,004,244✔
2789
    REALM_ASSERT(in_use());
4,004,244✔
2790
    void* addr = this;
4,004,244✔
2791
    std::size_t size = m_size;
4,004,244✔
2792
    this->~AsyncOper(); // Suicide
4,004,244✔
2793
    if (orphaned) {
4,004,244✔
2794
        delete[] static_cast<char*>(addr);
32,876✔
2795
    }
32,876✔
2796
    else {
3,971,368✔
2797
        new (addr) UnusedOper(size);
3,971,368✔
2798
    }
3,971,368✔
2799
}
4,004,244✔
2800

2801
// ---------------- Resolver ----------------
2802

2803
template <class H>
2804
class Resolver::ResolveOper : public Service::ResolveOperBase {
2805
public:
2806
    ResolveOper(std::size_t size, Resolver& r, Query q, H&& handler)
2807
        : ResolveOperBase{size, r, std::move(q)}
2808
        , m_handler{std::move(handler)}
2809
    {
3,292✔
2810
    }
3,292✔
2811
    void recycle_and_execute() override final
2812
    {
3,284✔
2813
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,284!
2814
        REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty());
3,284!
2815
        bool orphaned = !m_resolver;
3,284✔
2816
        std::error_code ec = m_error_code;
3,284✔
2817
        if (is_canceled())
3,284✔
2818
            ec = util::error::operation_aborted;
2✔
2819
        // Note: do_recycle_and_execute() commits suicide.
1,690✔
2820
        do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws
3,284✔
2821
    }
3,284✔
2822

2823
private:
2824
    H m_handler;
2825
};
2826

2827
inline Resolver::Resolver(Service& service)
2828
    : m_service_impl{*service.m_impl}
2829
{
11,276✔
2830
}
11,276✔
2831

2832
inline Resolver::~Resolver() noexcept
2833
{
11,274✔
2834
    cancel();
11,274✔
2835
}
11,274✔
2836

2837
inline Endpoint::List Resolver::resolve(const Query& q)
2838
{
7,980✔
2839
    std::error_code ec;
7,980✔
2840
    Endpoint::List list = resolve(q, ec);
7,980✔
2841
    if (REALM_UNLIKELY(ec))
7,980✔
2842
        throw std::system_error(ec);
3,938✔
2843
    return list;
7,980✔
2844
}
7,980✔
2845

2846
template <class H>
2847
void Resolver::async_resolve(Query query, H&& handler)
2848
{
3,292✔
2849
    Service::LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this, std::move(query),
3,292✔
2850
                                                                       std::move(handler)); // Throws
3,292✔
2851
    initiate_oper(std::move(op));                                                           // Throws
3,292✔
2852
}
3,292✔
2853

2854
inline Resolver::Query::Query(std::string service_port, int init_flags)
2855
    : m_flags{init_flags}
2856
    , m_service{service_port}
2857
{
2858
}
2859

2860
inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port, int init_flags)
2861
    : m_flags{init_flags}
2862
    , m_protocol{prot}
2863
    , m_service{service_port}
2864
{
2865
}
2866

2867
inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags)
2868
    : m_flags{init_flags}
2869
    , m_host{host_name}
2870
    , m_service{service_port}
2871
{
11,276✔
2872
}
11,276✔
2873

2874
inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name, std::string service_port,
2875
                              int init_flags)
2876
    : m_flags{init_flags}
2877
    , m_protocol{prot}
2878
    , m_host{host_name}
2879
    , m_service{service_port}
2880
{
2881
}
2882

2883
inline Resolver::Query::~Query() noexcept {}
24,454✔
2884

2885
inline int Resolver::Query::flags() const
2886
{
×
2887
    return m_flags;
×
2888
}
×
2889

2890
inline StreamProtocol Resolver::Query::protocol() const
2891
{
×
2892
    return m_protocol;
×
2893
}
×
2894

2895
inline std::string Resolver::Query::host() const
2896
{
×
2897
    return m_host;
×
2898
}
×
2899

2900
inline std::string Resolver::Query::service() const
2901
{
×
2902
    return m_service;
×
2903
}
×
2904

2905
// ---------------- SocketBase ----------------
2906

2907
inline SocketBase::SocketBase(Service& service)
2908
    : m_desc{*service.m_impl}
2909
{
21,938✔
2910
}
21,938✔
2911

2912
inline SocketBase::~SocketBase() noexcept
2913
{
21,936✔
2914
    close();
21,936✔
2915
}
21,936✔
2916

2917
inline bool SocketBase::is_open() const noexcept
2918
{
60,860✔
2919
    return m_desc.is_open();
60,860✔
2920
}
60,860✔
2921

2922
inline auto SocketBase::native_handle() const noexcept -> native_handle_type
2923
{
×
2924
    return m_desc.native_handle();
×
2925
}
×
2926

2927
inline void SocketBase::open(const StreamProtocol& prot)
2928
{
131✔
2929
    std::error_code ec;
131✔
2930
    if (open(prot, ec))
131✔
2931
        throw std::system_error(ec);
×
2932
}
131✔
2933

2934
inline void SocketBase::close() noexcept
2935
{
22,008✔
2936
    if (!is_open())
22,008✔
2937
        return;
8,058✔
2938
    cancel();
13,950✔
2939
    m_desc.close();
13,950✔
2940
}
13,950✔
2941

2942
template <class O>
2943
inline void SocketBase::get_option(O& opt) const
2944
{
4✔
2945
    std::error_code ec;
4✔
2946
    if (get_option(opt, ec))
4✔
2947
        throw std::system_error(ec);
×
2948
}
4✔
2949

2950
template <class O>
2951
inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const
2952
{
4✔
2953
    opt.get(*this, ec);
4✔
2954
    return ec;
4✔
2955
}
4✔
2956

2957
template <class O>
2958
inline void SocketBase::set_option(const O& opt)
2959
{
1,876✔
2960
    std::error_code ec;
1,876✔
2961
    if (set_option(opt, ec))
1,876✔
2962
        throw std::system_error(ec);
×
2963
}
1,876✔
2964

2965
template <class O>
2966
inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec)
2967
{
9,846✔
2968
    opt.set(*this, ec);
9,846✔
2969
    return ec;
9,846✔
2970
}
9,846✔
2971

2972
inline void SocketBase::bind(const Endpoint& ep)
2973
{
123✔
2974
    std::error_code ec;
123✔
2975
    if (bind(ep, ec))
123✔
2976
        throw std::system_error(ec);
×
2977
}
123✔
2978

2979
inline Endpoint SocketBase::local_endpoint() const
2980
{
19,476✔
2981
    std::error_code ec;
19,476✔
2982
    Endpoint ep = local_endpoint(ec);
19,476✔
2983
    if (ec)
19,476✔
2984
        throw std::system_error(ec);
×
2985
    return ep;
19,476✔
2986
}
19,476✔
2987

2988
inline auto SocketBase::release_native_handle() noexcept -> native_handle_type
2989
{
×
2990
    if (is_open()) {
×
2991
        cancel();
×
2992
        return m_desc.release();
×
2993
    }
×
2994
    return m_desc.native_handle();
×
2995
}
×
2996

2997
inline const StreamProtocol& SocketBase::get_protocol() const noexcept
2998
{
×
2999
    return m_protocol;
×
3000
}
×
3001

3002
template <class T, int opt, class U>
3003
inline SocketBase::Option<T, opt, U>::Option(T init_value)
3004
    : m_value{init_value}
3005
{
9,850✔
3006
}
9,850✔
3007

3008
template <class T, int opt, class U>
3009
inline T SocketBase::Option<T, opt, U>::value() const
3010
{
4✔
3011
    return m_value;
4✔
3012
}
4✔
3013

3014
template <class T, int opt, class U>
3015
inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec)
3016
{
4✔
3017
    union {
4✔
3018
        U value;
4✔
3019
        char strut[sizeof(U) + 1];
4✔
3020
    };
4✔
3021
    std::size_t value_size = sizeof strut;
4✔
3022
    sock.get_option(opt_enum(opt), &value, value_size, ec);
4✔
3023
    if (!ec) {
4✔
3024
        REALM_ASSERT(value_size == sizeof value);
4✔
3025
        m_value = T(value);
4✔
3026
    }
4✔
3027
}
4✔
3028

3029
template <class T, int opt, class U>
3030
inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const
3031
{
9,846✔
3032
    U value_to_set = U(m_value);
9,846✔
3033
    sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec);
9,846✔
3034
}
9,846✔
3035

3036
// ---------------- Socket ----------------
3037

3038
class Socket::ConnectOperBase : public Service::IoOper {
3039
public:
3040
    ConnectOperBase(std::size_t size, Socket& sock) noexcept
3041
        : IoOper{size}
3042
        , m_socket{&sock}
3043
    {
3,486✔
3044
    }
3,486✔
3045
    Want initiate(const Endpoint& ep)
3046
    {
3,486✔
3047
        REALM_ASSERT(this == m_socket->m_write_oper.get());
3,486✔
3048
        if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws
3,486✔
3049
            set_is_complete(true);                                // Failure, or immediate completion
2✔
3050
            return Want::nothing;
2✔
3051
        }
2✔
3052
        return Want::write;
3,484✔
3053
    }
3,484✔
3054
    Want advance() noexcept override final
3055
    {
3,474✔
3056
        REALM_ASSERT(!is_complete());
3,474✔
3057
        REALM_ASSERT(!is_canceled());
3,474✔
3058
        REALM_ASSERT(!m_error_code);
3,474✔
3059
        m_socket->finalize_async_connect(m_error_code);
3,474✔
3060
        set_is_complete(true);
3,474✔
3061
        return Want::nothing;
3,474✔
3062
    }
3,474✔
3063
    void recycle() noexcept override final
3064
    {
×
3065
        bool orphaned = !m_socket;
×
3066
        REALM_ASSERT(orphaned);
×
3067
        // Note: do_recycle() commits suicide.
3068
        do_recycle(orphaned);
×
3069
    }
×
3070
    void orphan() noexcept override final
3071
    {
6✔
3072
        m_socket = nullptr;
6✔
3073
    }
6✔
3074
    Service::Descriptor& descriptor() noexcept override final
3075
    {
×
3076
        return m_socket->m_desc;
×
3077
    }
×
3078

3079
protected:
3080
    Socket* m_socket;
3081
    std::error_code m_error_code;
3082
};
3083

3084
template <class H>
3085
class Socket::ConnectOper : public ConnectOperBase {
3086
public:
3087
    ConnectOper(std::size_t size, Socket& sock, H&& handler)
3088
        : ConnectOperBase{size, sock}
3089
        , m_handler{std::move(handler)}
3090
    {
3,381✔
3091
    }
3,381✔
3092
    void recycle_and_execute() override final
3093
    {
3,381✔
3094
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,381!
3095
        bool orphaned = !m_socket;
3,381✔
3096
        std::error_code ec = m_error_code;
3,381✔
3097
        if (is_canceled())
3,381✔
3098
            ec = util::error::operation_aborted;
9✔
3099
        // Note: do_recycle_and_execute() commits suicide.
1,738✔
3100
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3,381✔
3101
    }
3,381✔
3102

3103
private:
3104
    H m_handler;
3105
};
3106

3107
inline Socket::Socket(Service& service)
3108
    : SocketBase{service}
3109
{
13,708✔
3110
}
13,708✔
3111

3112
inline Socket::Socket(Service& service, const StreamProtocol& prot, native_handle_type native_socket)
3113
    : SocketBase{service}
3114
{
2✔
3115
    assign(prot, native_socket); // Throws
2✔
3116
}
2✔
3117

3118
inline Socket::~Socket() noexcept {}
13,710✔
3119

3120
inline void Socket::connect(const Endpoint& ep)
3121
{
20✔
3122
    std::error_code ec;
20✔
3123
    if (connect(ep, ec)) // Throws
20✔
3124
        throw std::system_error(ec);
×
3125
}
20✔
3126

3127
inline std::size_t Socket::read(char* buffer, std::size_t size)
3128
{
×
3129
    std::error_code ec;
×
3130
    read(buffer, size, ec); // Throws
×
3131
    if (ec)
×
3132
        throw std::system_error(ec);
×
3133
    return size;
×
3134
}
×
3135

3136
inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec)
3137
{
×
3138
    return StreamOps::read(*this, buffer, size, ec); // Throws
×
3139
}
×
3140

3141
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab)
3142
{
8✔
3143
    std::error_code ec;
8✔
3144
    read(buffer, size, rab, ec); // Throws
8✔
3145
    if (ec)
8✔
3146
        throw std::system_error(ec);
×
3147
    return size;
8✔
3148
}
8✔
3149

3150
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab, std::error_code& ec)
3151
{
16,402✔
3152
    int delim = std::char_traits<char>::eof();
16,402✔
3153
    return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws
16,402✔
3154
}
16,402✔
3155

3156
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab)
3157
{
8✔
3158
    std::error_code ec;
8✔
3159
    std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws
8✔
3160
    if (ec)
8✔
3161
        throw std::system_error(ec);
×
3162
    return n;
8✔
3163
}
8✔
3164

3165
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab,
3166
                                      std::error_code& ec)
3167
{
8✔
3168
    int delim_2 = std::char_traits<char>::to_int_type(delim);
8✔
3169
    return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws
8✔
3170
}
8✔
3171

3172
inline std::size_t Socket::write(const char* data, std::size_t size)
3173
{
261,804✔
3174
    std::error_code ec;
261,804✔
3175
    write(data, size, ec); // Throws
261,804✔
3176
    if (ec)
261,804✔
3177
        throw std::system_error(ec);
×
3178
    return size;
261,804✔
3179
}
261,804✔
3180

3181
inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec)
3182
{
261,804✔
3183
    return StreamOps::write(*this, data, size, ec); // Throws
261,804✔
3184
}
261,804✔
3185

3186
inline std::size_t Socket::read_some(char* buffer, std::size_t size)
3187
{
4✔
3188
    std::error_code ec;
4✔
3189
    std::size_t n = read_some(buffer, size, ec); // Throws
4✔
3190
    if (ec)
4✔
3191
        throw std::system_error(ec);
4✔
3192
    return n;
×
3193
}
×
3194

3195
inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec)
3196
{
8✔
3197
    return StreamOps::read_some(*this, buffer, size, ec); // Throws
8✔
3198
}
8✔
3199

3200
inline std::size_t Socket::write_some(const char* data, std::size_t size)
3201
{
×
3202
    std::error_code ec;
×
3203
    std::size_t n = write_some(data, size, ec); // Throws
×
3204
    if (ec)
×
3205
        throw std::system_error(ec);
×
3206
    return n;
×
3207
}
×
3208

3209
inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec)
3210
{
112✔
3211
    return StreamOps::write_some(*this, data, size, ec); // Throws
112✔
3212
}
112✔
3213

3214
template <class H>
3215
inline void Socket::async_connect(const Endpoint& ep, H&& handler)
3216
{
3,381✔
3217
    LendersConnectOperPtr op = Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws
3,381✔
3218
    m_desc.initiate_oper(std::move(op), ep);                                                            // Throws
3,381✔
3219
}
3,381✔
3220

3221
template <class H>
3222
inline void Socket::async_read(char* buffer, std::size_t size, H&& handler)
3223
{
×
3224
    bool is_read_some = false;
×
3225
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
×
3226
}
×
3227

3228
template <class H>
3229
inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H&& handler)
3230
{
650,138✔
3231
    int delim = std::char_traits<char>::eof();
650,138✔
3232
    StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws
650,138✔
3233
}
650,138✔
3234

3235
template <class H>
3236
inline void Socket::async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab, H&& handler)
3237
{
44,808✔
3238
    int delim_2 = std::char_traits<char>::to_int_type(delim);
44,808✔
3239
    StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws
44,808✔
3240
}
44,808✔
3241

3242
template <class H>
3243
inline void Socket::async_write(const char* data, std::size_t size, H&& handler)
3244
{
261,291✔
3245
    bool is_write_some = false;
261,291✔
3246
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
261,291✔
3247
}
261,291✔
3248

3249
template <class H>
3250
inline void Socket::async_read_some(char* buffer, std::size_t size, H&& handler)
3251
{
350,954✔
3252
    bool is_read_some = true;
350,954✔
3253
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
350,954✔
3254
}
350,954✔
3255

3256
template <class H>
3257
inline void Socket::async_write_some(const char* data, std::size_t size, H&& handler)
3258
{
347,680✔
3259
    bool is_write_some = true;
347,680✔
3260
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
347,680✔
3261
}
347,680✔
3262

3263
inline void Socket::shutdown(shutdown_type what)
3264
{
18✔
3265
    std::error_code ec;
18✔
3266
    if (shutdown(what, ec)) // Throws
18✔
3267
        throw std::system_error(ec);
×
3268
}
18✔
3269

3270
inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket)
3271
{
2✔
3272
    std::error_code ec;
2✔
3273
    if (assign(prot, native_socket, ec)) // Throws
2✔
3274
        throw std::system_error(ec);
×
3275
}
2✔
3276

3277
inline std::error_code Socket::assign(const StreamProtocol& prot, native_handle_type native_socket,
3278
                                      std::error_code& ec)
3279
{
2✔
3280
    return do_assign(prot, native_socket, ec); // Throws
2✔
3281
}
2✔
3282

3283
inline Socket& Socket::lowest_layer() noexcept
3284
{
11,496,692✔
3285
    return *this;
11,496,692✔
3286
}
11,496,692✔
3287

3288
inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept
3289
{
1,234,438✔
3290
    want = Want::read; // Wait for read readiness before proceeding
1,234,438✔
3291
}
1,234,438✔
3292

3293
inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept
3294
{
998,016✔
3295
    want = Want::write; // Wait for write readiness before proceeding
998,016✔
3296
}
998,016✔
3297

3298
inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size, std::error_code& ec) noexcept
3299
{
131,107✔
3300
    return m_desc.read_some(buffer, size, ec);
131,107✔
3301
}
131,107✔
3302

3303
inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size, std::error_code& ec) noexcept
3304
{
261,916✔
3305
    return m_desc.write_some(data, size, ec);
261,916✔
3306
}
261,916✔
3307

3308
inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size, std::error_code& ec,
3309
                                              Want& want) noexcept
3310
{
1,146,992✔
3311
    std::error_code ec_2;
1,146,992✔
3312
    std::size_t n = m_desc.read_some(buffer, size, ec_2);
1,146,992✔
3313
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
1,146,992✔
3314
    if (REALM_UNLIKELY(!success)) {
1,146,992✔
3315
        ec = ec_2;
1,196✔
3316
        want = Want::nothing; // Failure
1,196✔
3317
        return 0;
1,196✔
3318
    }
1,196✔
3319
    ec = std::error_code();
1,145,796✔
3320
    want = Want::read; // Success
1,145,796✔
3321
    return n;
1,145,796✔
3322
}
1,145,796✔
3323

3324
inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size, std::error_code& ec,
3325
                                               Want& want) noexcept
3326
{
572,120✔
3327
    std::error_code ec_2;
572,120✔
3328
    std::size_t n = m_desc.write_some(data, size, ec_2);
572,120✔
3329
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
572,120✔
3330
    if (REALM_UNLIKELY(!success)) {
572,120✔
3331
        ec = ec_2;
152✔
3332
        want = Want::nothing; // Failure
152✔
3333
        return 0;
152✔
3334
    }
152✔
3335
    ec = std::error_code();
571,968✔
3336
    want = Want::write; // Success
571,968✔
3337
    return n;
571,968✔
3338
}
571,968✔
3339

3340
// ---------------- Acceptor ----------------
3341

3342
class Acceptor::AcceptOperBase : public Service::IoOper {
3343
public:
3344
    AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e)
3345
        : IoOper{size}
3346
        , m_acceptor{&a}
3347
        , m_socket{s}
3348
        , m_endpoint{e}
3349
    {
10,118✔
3350
    }
10,118✔
3351
    Want initiate()
3352
    {
10,118✔
3353
        REALM_ASSERT(this == m_acceptor->m_read_oper.get());
10,118✔
3354
        REALM_ASSERT(!is_complete());
10,118✔
3355
        m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws
10,118✔
3356
        return Want::read;
10,118✔
3357
    }
10,118✔
3358
    Want advance() noexcept override final
3359
    {
3,014✔
3360
        REALM_ASSERT(!is_complete());
3,014✔
3361
        REALM_ASSERT(!is_canceled());
3,014✔
3362
        REALM_ASSERT(!m_error_code);
3,014✔
3363
        REALM_ASSERT(!m_socket.is_open());
3,014✔
3364
        Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code);
3,014✔
3365
        if (want == Want::nothing)
3,014✔
3366
            set_is_complete(true); // Success or failure
2,128✔
3367
        return want;
3,014✔
3368
    }
3,014✔
3369
    void recycle() noexcept override final
3370
    {
7,976✔
3371
        bool orphaned = !m_acceptor;
7,976✔
3372
        REALM_ASSERT(orphaned);
7,976✔
3373
        // Note: do_recycle() commits suicide.
3,936✔
3374
        do_recycle(orphaned);
7,976✔
3375
    }
7,976✔
3376
    void orphan() noexcept override final
3377
    {
7,978✔
3378
        m_acceptor = nullptr;
7,978✔
3379
    }
7,978✔
3380
    Service::Descriptor& descriptor() noexcept override final
3381
    {
886✔
3382
        return m_acceptor->m_desc;
886✔
3383
    }
886✔
3384

3385
protected:
3386
    Acceptor* m_acceptor;
3387
    Socket& m_socket;           // May be dangling after cancellation
3388
    Endpoint* const m_endpoint; // May be dangling after cancellation
3389
    std::error_code m_error_code;
3390
};
3391

3392
template <class H>
3393
class Acceptor::AcceptOper : public AcceptOperBase {
3394
public:
3395
    AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H&& handler)
3396
        : AcceptOperBase{size, a, s, e}
3397
        , m_handler{std::move(handler)}
3398
    {
10,005✔
3399
    }
10,005✔
3400
    void recycle_and_execute() override final
3401
    {
2,033✔
3402
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
2,033!
3403
        REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open());
2,033!
3404
        bool orphaned = !m_acceptor;
2,033✔
3405
        std::error_code ec = m_error_code;
2,033✔
3406
        if (is_canceled())
2,033!
3407
            ec = util::error::operation_aborted;
7✔
3408
        // Note: do_recycle_and_execute() commits suicide.
1,076✔
3409
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
2,033✔
3410
    }
2,033✔
3411

3412
private:
3413
    H m_handler;
3414
};
3415

3416
inline Acceptor::Acceptor(Service& service)
3417
    : SocketBase{service}
3418
{
8,226✔
3419
}
8,226✔
3420

3421
inline Acceptor::~Acceptor() noexcept {}
8,226✔
3422

3423
inline void Acceptor::listen(int backlog)
3424
{
8,218✔
3425
    std::error_code ec;
8,218✔
3426
    if (listen(backlog, ec)) // Throws
8,218✔
3427
        throw std::system_error(ec);
×
3428
}
8,218✔
3429

3430
inline void Acceptor::accept(Socket& sock)
3431
{
20✔
3432
    std::error_code ec;
20✔
3433
    if (accept(sock, ec)) // Throws
20✔
3434
        throw std::system_error(ec);
×
3435
}
20✔
3436

3437
inline void Acceptor::accept(Socket& sock, Endpoint& ep)
3438
{
2✔
3439
    std::error_code ec;
2✔
3440
    if (accept(sock, ep, ec)) // Throws
2✔
3441
        throw std::system_error(ec);
×
3442
}
2✔
3443

3444
inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec)
3445
{
20✔
3446
    Endpoint* ep = nullptr;
20✔
3447
    return accept(sock, ep, ec); // Throws
20✔
3448
}
20✔
3449

3450
inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec)
3451
{
2✔
3452
    return accept(sock, &ep, ec); // Throws
2✔
3453
}
2✔
3454

3455
template <class H>
3456
inline void Acceptor::async_accept(Socket& sock, H&& handler)
3457
{
111✔
3458
    Endpoint* ep = nullptr;
111✔
3459
    async_accept(sock, ep, std::move(handler)); // Throws
111✔
3460
}
111✔
3461

3462
template <class H>
3463
inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H&& handler)
3464
{
9,894✔
3465
    async_accept(sock, &ep, std::move(handler)); // Throws
9,894✔
3466
}
9,894✔
3467

3468
inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec)
3469
{
22✔
3470
    REALM_ASSERT(!m_read_oper || !m_read_oper->in_use());
22✔
3471
    if (REALM_UNLIKELY(socket.is_open()))
22✔
3472
        throw util::runtime_error("Socket is already open");
11✔
3473
    m_desc.ensure_blocking_mode(); // Throws
22✔
3474
    m_desc.accept(socket.m_desc, m_protocol, ep, ec);
22✔
3475
    return ec;
22✔
3476
}
22✔
3477

3478
inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep, std::error_code& ec) noexcept
3479
{
3,014✔
3480
    std::error_code ec_2;
3,014✔
3481
    m_desc.accept(socket.m_desc, m_protocol, ep, ec_2);
3,014✔
3482
    if (ec_2 == util::error::resource_unavailable_try_again)
3,014✔
3483
        return Want::read;
886✔
3484
    ec = ec_2;
2,128✔
3485
    return Want::nothing;
2,128✔
3486
}
2,128✔
3487

3488
template <class H>
3489
inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H&& handler)
3490
{
10,005✔
3491
    if (REALM_UNLIKELY(sock.is_open()))
10,005✔
3492
        throw util::runtime_error("Socket is already open");
5,010✔
3493
    LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep,
10,005✔
3494
                                                            std::move(handler)); // Throws
10,005✔
3495
    m_desc.initiate_oper(std::move(op));                                         // Throws
10,005✔
3496
}
10,005✔
3497

3498
// ---------------- DeadlineTimer ----------------
3499

3500
template <class H>
3501
class DeadlineTimer::WaitOper : public Service::WaitOperBase {
3502
public:
3503
    WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H&& handler)
3504
        : Service::WaitOperBase{size, timer, expiration_time}
3505
        , m_handler{std::move(handler)}
3506
    {
287,028✔
3507
    }
287,028✔
3508
    void recycle_and_execute() override final
3509
    {
278,998✔
3510
        bool orphaned = !m_timer;
278,998✔
3511
        Status status = Status::OK();
278,998✔
3512
        if (is_canceled())
278,998!
3513
            status = Status{ErrorCodes::OperationAborted, "Timer canceled"};
12,246✔
3514
        // Note: do_recycle_and_execute() commits suicide.
144,526✔
3515
        do_recycle_and_execute<H>(orphaned, m_handler, status); // Throws
278,998✔
3516
    }
278,998✔
3517

3518
private:
3519
    H m_handler;
3520
};
3521

3522
inline DeadlineTimer::DeadlineTimer(Service& service)
3523
    : m_service_impl{*service.m_impl}
3524
{
24,332✔
3525
}
24,332✔
3526

3527
inline DeadlineTimer::~DeadlineTimer() noexcept
3528
{
24,330✔
3529
    cancel();
24,330✔
3530
}
24,330✔
3531

3532
template <class R, class P, class H>
3533
inline void DeadlineTimer::async_wait(std::chrono::duration<R, P> delay, H&& handler)
3534
{
287,123✔
3535
    clock::time_point now = clock::now();
287,123✔
3536
    // FIXME: This method of detecting overflow does not work. Comparison
148,639✔
3537
    // between distinct duration types is not overflow safe. Overflow easily
148,639✔
3538
    // happens in the implied conversion of arguments to the common duration
148,639✔
3539
    // type (std::common_type<>).
148,639✔
3540
    auto max_add = clock::time_point::max() - now;
287,123✔
3541
    if (delay > max_add)
287,123✔
3542
        throw util::overflow_error("Expiration time overflow");
×
3543
    clock::time_point expiration_time = now + delay;
287,123✔
3544
    initiate_oper(Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time,
287,123✔
3545
                                              std::move(handler))); // Throws
287,123✔
3546
}
287,123✔
3547

3548
// ---------------- ReadAheadBuffer ----------------
3549

3550
inline ReadAheadBuffer::ReadAheadBuffer()
3551
    : m_buffer{new char[s_size]} // Throws
3552
{
13,360✔
3553
}
13,360✔
3554

3555
inline void ReadAheadBuffer::clear() noexcept
3556
{
×
3557
    m_begin = nullptr;
×
3558
    m_end = nullptr;
×
3559
}
×
3560

3561
inline bool ReadAheadBuffer::empty() const noexcept
3562
{
875,906✔
3563
    return (m_begin == m_end);
875,906✔
3564
}
875,906✔
3565

3566
template <class S>
3567
inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept
3568
{
131,099✔
3569
    char* buffer = m_buffer.get();
131,099✔
3570
    std::size_t size = s_size;
131,099✔
3571
    static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), "");
131,099✔
3572
    std::size_t n = stream.do_read_some_sync(buffer, size, ec);
131,099✔
3573
    if (REALM_UNLIKELY(n == 0))
131,099✔
3574
        return;
65,556✔
3575
    REALM_ASSERT(!ec);
131,091✔
3576
    REALM_ASSERT(n <= size);
131,091✔
3577
    m_begin = m_buffer.get();
131,091✔
3578
    m_end = m_begin + n;
131,091✔
3579
}
131,091✔
3580

3581
template <class S>
3582
inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept
3583
{
875,832✔
3584
    char* buffer = m_buffer.get();
875,832✔
3585
    std::size_t size = s_size;
875,832✔
3586
    static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), "");
875,832✔
3587
    std::size_t n = stream.do_read_some_async(buffer, size, ec, want);
875,832✔
3588
    // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
399,208✔
3589
    if (n == 0)
875,832✔
3590
        return false;
60,834✔
3591
    REALM_ASSERT(!ec || ec == util::MiscExtErrors::end_of_input);
814,998✔
3592
    REALM_ASSERT(n <= size);
814,998✔
3593
    m_begin = m_buffer.get();
814,998✔
3594
    m_end = m_begin + n;
814,998✔
3595
    return true;
814,998✔
3596
}
814,998✔
3597

3598
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc