• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2536

01 Aug 2024 07:02PM UTC coverage: 91.088% (-0.03%) from 91.121%
2536

push

Evergreen

web-flow
Prepare for release 14.11.2 (#7939)

Co-authored-by: nicola-cab <1497069+nicola-cab@users.noreply.github.com>

102758 of 181570 branches covered (56.59%)

216794 of 238004 relevant lines covered (91.09%)

5893031.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.28
/src/realm/sync/network/network.hpp
1
#pragma once
2

3
#include <cstddef>
4
#include <memory>
5
#include <chrono>
6
#include <string>
7
#include <system_error>
8
#include <ostream>
9

10
#include <sys/types.h>
11

12
#ifdef _WIN32
13
#include <winsock2.h>
14
#include <ws2tcpip.h>
15
#include <stdio.h>
16
#include <Ws2def.h>
17
#pragma comment(lib, "Ws2_32.lib")
18
#else
19
#include <sys/socket.h>
20
#include <arpa/inet.h>
21
#include <netdb.h>
22
#endif
23

24
#include <realm/status.hpp>
25
#include <realm/util/features.h>
26
#include <realm/util/assert.hpp>
27
#include <realm/util/backtrace.hpp>
28
#include <realm/util/basic_system_errors.hpp>
29
#include <realm/util/bind_ptr.hpp>
30
#include <realm/util/buffer.hpp>
31
#include <realm/util/misc_ext_errors.hpp>
32
#include <realm/util/scope_exit.hpp>
33

34
// Linux epoll
35
#if defined(REALM_USE_EPOLL) && !REALM_ANDROID
36
#define REALM_NETWORK_USE_EPOLL 1
37
#else
38
#define REALM_NETWORK_USE_EPOLL 0
39
#endif
40

41
// FreeBSD Kqueue.
42
//
43
// Available on Mac OS X, FreeBSD, NetBSD, OpenBSD
44
#if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
45
#if !defined(REALM_HAVE_KQUEUE)
46
#if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE)
47
#define REALM_HAVE_KQUEUE 1
48
#endif
49
#endif
50
#endif
51
#if !defined(REALM_HAVE_KQUEUE)
52
#define REALM_HAVE_KQUEUE 0
53
#endif
54

55

56
// FIXME: Unfinished business around `Address::m_ip_v6_scope_id`.
57

58
namespace realm::sync::network {
59

60
/// \brief TCP/IP networking API.
61
///
62
/// The design of this networking API is heavily inspired by the ASIO C++
63
/// library (http://think-async.com).
64
///
65
///
66
/// ### Thread safety
67
///
68
/// A *service context* is a set of objects consisting of an instance of
69
/// Service, and all the objects that are associated with that instance (\ref
70
/// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, and
71
/// \ref ssl::Stream).
72
///
73
/// In general, it is unsafe for two threads to call functions on the same
74
/// object, or on different objects in the same service context. This also
75
/// applies to destructors. Notable exceptions are the fully thread-safe
76
/// functions, such as Service::post(), Service::stop(), and Service::reset().
77
///
78
/// On the other hand, it is always safe for two threads to call functions on
79
/// objects belonging to different service contexts.
80
///
81
/// One implication of these rules is that at most one thread must execute run()
82
/// at any given time, and if one thread is executing run(), then no other
83
/// thread is allowed to access objects in the same service context (with the
84
/// mentioned exceptions).
85
///
86
/// Unless otherwise specified, free-standing objects, such as \ref
87
/// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are
88
/// fully thread-safe as long as they are not mutated. If one thread is mutating
89
/// such an object, no other thread may access it. Note that these free-standing
90
/// objects are not associcated with an instance of Service, and are therefore
91
/// not part of a service context.
92
///
93
///
94
/// ### Comparison with ASIO
95
///
96
/// There is a crucial difference between the two libraries in regards to the
97
/// guarantees that are provided about the cancelability of asynchronous
98
/// operations. The Realm networking library (this library) considers an
99
/// asynchronous operation to be complete precisely when the completion handler
100
/// starts to execute, and it guarantees that such an operation is cancelable up
101
/// until that point in time. In particular, if `cancel()` is called on a socket
102
/// or a deadline timer object before the completion handler starts to execute,
103
/// then that operation will be canceled, and will receive
104
/// `error::operation_aborted`. This guarantee is possible to provide (and free
105
/// of ambiguities) precisely because this library prohibits multiple threads
106
/// from executing the event loop concurrently, and because `cancel()` is
107
/// allowed to be called only from a completion handler (executed by the event
108
/// loop thread) or while no thread is executing the event loop. This guarantee
109
/// allows for safe destruction of sockets and deadline timers as long as the
110
/// completion handlers react appropriately to `error::operation_aborted`, in
111
/// particular, that they do not attempt to access the socket or deadline timer
112
/// object in such cases.
113
///
114
/// ASIO, on the other hand, allows for an asynchronous operation to complete
115
/// and become **uncancellable** before the completion handler starts to
116
/// execute. For this reason, it is possible with ASIO to get the completion
117
/// handler of an asynchronous wait operation to start executing and receive an
118
/// error code other than "operation aborted" at a point in time where
119
/// `cancel()` has already been called on the deadline timer object, or even at
120
/// a point in timer where the deadline timer has been destroyed. This seems
121
/// like an inevitable consequence of the fact that ASIO allows for multiple
122
/// threads to execute the event loop concurrently. This generally forces ASIO
123
/// applications to invent ways of extending the lifetime of deadline timer and
124
/// socket objects until the completion handler starts executing.
125
///
126
/// IMPORTANT: Even if ASIO is used in a way where at most one thread executes
127
/// the event loop, there is still no guarantee that an asynchronous operation
128
/// remains cancelable up until the point in time where the completion handler
129
/// starts to execute.
130

131
std::string host_name();
132

133

134
class StreamProtocol;
135
class Address;
136
class Endpoint;
137
class Service;
138
class Resolver;
139
class SocketBase;
140
class Socket;
141
class Acceptor;
142
class DeadlineTimer;
143
class ReadAheadBuffer;
144
namespace ssl {
145
class Stream;
146
} // namespace ssl
147

148

149
/// \brief An IP protocol descriptor.
150
class StreamProtocol {
151
public:
152
    static StreamProtocol ip_v4();
153
    static StreamProtocol ip_v6();
154

155
    bool is_ip_v4() const;
156
    bool is_ip_v6() const;
157

158
    int protocol() const;
159
    int family() const;
160

161
    StreamProtocol();
162
    ~StreamProtocol() noexcept {}
69,540✔
163

164
private:
165
    int m_family;
166
    int m_socktype;
167
    int m_protocol;
168

169
    friend class Service;
170
    friend class SocketBase;
171
};
172

173

174
/// \brief An IP address (IPv4 or IPv6).
175
class Address {
176
public:
177
    bool is_ip_v4() const;
178
    bool is_ip_v6() const;
179

180
    template <class C, class T>
181
    friend std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>&, const Address&);
182

183
    Address();
184
    ~Address() noexcept {}
14,096✔
185

186
private:
187
    using ip_v4_type = in_addr;
188
    using ip_v6_type = in6_addr;
189
    union union_type {
190
        ip_v4_type m_ip_v4;
191
        ip_v6_type m_ip_v6;
192
    };
193
    union_type m_union;
194
    std::uint_least32_t m_ip_v6_scope_id = 0;
195
    bool m_is_ip_v6 = false;
196

197
    friend Address make_address(const char*, std::error_code&) noexcept;
198
    friend class Endpoint;
199
};
200

201
Address make_address(const char* c_str);
202
Address make_address(const char* c_str, std::error_code& ec) noexcept;
203
Address make_address(const std::string&);
204
Address make_address(const std::string&, std::error_code& ec) noexcept;
205

206

207
/// \brief An IP endpoint.
208
///
209
/// An IP endpoint is a triplet (`protocol`, `address`, `port`).
210
class Endpoint {
211
public:
212
    using port_type = std::uint_fast16_t;
213
    class List;
214

215
    StreamProtocol protocol() const;
216
    Address address() const;
217
    port_type port() const;
218

219
    Endpoint();
220
    Endpoint(const StreamProtocol&, port_type);
221
    Endpoint(const Address&, port_type);
222
    ~Endpoint() noexcept {}
17,554✔
223

224
    using data_type = sockaddr;
225
    data_type* data();
226
    const data_type* data() const;
227

228
private:
229
    StreamProtocol m_protocol;
230

231
    using sockaddr_base_type = sockaddr;
232
    using sockaddr_ip_v4_type = sockaddr_in;
233
    using sockaddr_ip_v6_type = sockaddr_in6;
234
    union sockaddr_union_type {
235
        sockaddr_base_type m_base;
236
        sockaddr_ip_v4_type m_ip_v4;
237
        sockaddr_ip_v6_type m_ip_v6;
238
    };
239
    sockaddr_union_type m_sockaddr_union;
240

241
    friend class Service;
242
    friend class Resolver;
243
    friend class SocketBase;
244
    friend class Socket;
245
};
246

247

248
/// \brief A list of IP endpoints.
249
class Endpoint::List {
250
public:
251
    using iterator = const Endpoint*;
252

253
    iterator begin() const noexcept;
254
    iterator end() const noexcept;
255
    std::size_t size() const noexcept;
256
    bool empty() const noexcept;
257

258
    List() noexcept = default;
8,494✔
259
    List(List&&) noexcept = default;
29,046✔
260
    ~List() noexcept = default;
37,542✔
261

262
    List& operator=(List&&) noexcept = default;
3,640✔
263

264
private:
265
    util::Buffer<Endpoint> m_endpoints;
266

267
    friend class Service;
268
};
269

270

271
/// \brief TCP/IP networking service.
272
class Service {
273
public:
274
    Service();
275
    ~Service() noexcept;
276

277
    /// \brief Execute the event loop.
278
    ///
279
    /// Execute completion handlers of completed asynchronous operations, or
280
    /// wait for more completion handlers to become ready for
281
    /// execution. Handlers submitted via post() are considered immeditely
282
    /// ready. If there are no completion handlers ready for execution, and
283
    /// there are no asynchronous operations in progress, run() returns.
284
    ///
285
    /// run_until_stopped() will continue running even if there are no completion
286
    /// handlers ready for execution, and no asynchronous operations in progress,
287
    /// until stop() is called.
288
    ///
289
    /// All completion handlers, including handlers submitted via post() will be
290
    /// executed from run(), that is, by the thread that executes run(). If no
291
    /// thread executes run(), then the completion handlers will not be
292
    /// executed.
293
    ///
294
    /// Exceptions thrown by completion handlers will always propagate back
295
    /// through run().
296
    ///
297
    /// Syncronous operations (e.g., Socket::connect()) execute independently of
298
    /// the event loop, and do not require that any thread calls run().
299
    void run();
300
    void run_until_stopped();
301

302
    /// @{ \brief Stop event loop execution.
303
    ///
304
    /// stop() puts the event loop into the stopped mode. If a thread is
305
    /// currently executing run(), it will be made to return in a timely
306
    /// fashion, that is, without further blocking. If a thread is currently
307
    /// blocked in run(), it will be unblocked. Handlers that can be executed
308
    /// immediately, may, or may not be executed before run() returns, but new
309
    /// handlers submitted by these, will not be executed before run()
310
    /// returns. Also, if a handler is submitted by a call to post, and that
311
    /// call happens after stop() returns, then that handler is guaranteed to
312
    /// not be executed before run() returns (assuming that reset() is not called
313
    /// before run() returns).
314
    ///
315
    /// The event loop will remain in the stopped mode until reset() is
316
    /// called. If reset() is called before run() returns, it may, or may not
317
    /// cause run() to resume normal operation without returning.
318
    ///
319
    /// Both stop() and reset() are thread-safe, that is, they may be called by
320
    /// any thread. Also, both of these function may be called from completion
321
    /// handlers (including posted handlers).
322
    void stop() noexcept;
323
    void reset() noexcept;
324
    /// @}
325

326
    /// \brief Submit a handler to be executed by the event loop thread.
327
    ///
328
    /// Register the sepcified completion handler for immediate asynchronous
329
    /// execution. The specified handler will be executed by an expression on
330
    /// the form `handler(status)` where status is a Status object whose value
331
    /// will always be OK, but may change in the future. If the handler object
332
    /// is movable, it will never be copied. Otherwise, it will be copied as
333
    /// necessary.
334
    ///
335
    /// This function is thread-safe, that is, it may be called by any
336
    /// thread. It may also be called from other completion handlers.
337
    ///
338
    /// The handler will never be called as part of the execution of post(). It
339
    /// will always be called by a thread that is executing run(). If no thread
340
    /// is currently executing run(), the handler will not be executed until a
341
    /// thread starts executing run(). If post() is called while another thread
342
    /// is executing run(), the handler may be called before post() returns. If
343
    /// post() is called from another completion handler, the submitted handler
344
    /// is guaranteed to not be called during the execution of post().
345
    ///
346
    /// Completion handlers added through post() will be executed in the order
347
    /// that they are added. More precisely, if post() is called twice to add
348
    /// two handlers, A and B, and the execution of post(A) ends before the
349
    /// beginning of the execution of post(B), then A is guaranteed to execute
350
    /// before B.
351
    template <class H>
352
    void post(H handler);
353

354
    /// Argument `saturation` is the fraction of time that is not spent
355
    /// sleeping. Argument `inefficiency` is the fraction of time not spent
356
    /// sleeping, and not spent executing completion handlers. Both values are
357
    /// guaranteed to always be in the range 0 to 1 (both inclusive). The value
358
    /// passed as `inefficiency` is guaranteed to always be less than, or equal
359
    /// to the value passed as `saturation`.
360
    using EventLoopMetricsHandler = void(double saturation, double inefficiency);
361

362
    /// \brief Report event loop metrics via the specified handler.
363
    ///
364
    /// The handler will be called approximately every 30 seconds.
365
    ///
366
    /// report_event_loop_metrics() must be called prior to any invocation of
367
    /// run(). report_event_loop_metrics() is not thread-safe.
368
    ///
369
    /// This feature is only available if
370
    /// `REALM_UTIL_NETWORK_EVENT_LOOP_METRICS` was defined during
371
    /// compilation. When the feature is not available, the specified handler
372
    /// will never be called.
373
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler>);
374

375
private:
376
    enum class Want { nothing = 0, read, write };
377

378
    template <class Oper>
379
    class OperQueue;
380
    class Descriptor;
381
    class AsyncOper;
382
    class ResolveOperBase;
383
    class WaitOperBase;
384
    class TriggerExecOperBase;
385
    class PostOperBase;
386
    template <class H>
387
    class PostOper;
388
    class IoOper;
389
    class UnusedOper; // Allocated, but currently unused memory
390

391
    template <class S>
392
    class BasicStreamOps;
393

394
    struct OwnersOperDeleter {
395
        void operator()(AsyncOper*) const noexcept;
396
    };
397
    struct LendersOperDeleter {
398
        void operator()(AsyncOper*) const noexcept;
399
    };
400
    using OwnersOperPtr = std::unique_ptr<AsyncOper, OwnersOperDeleter>;
401
    using LendersOperPtr = std::unique_ptr<AsyncOper, LendersOperDeleter>;
402
    using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, LendersOperDeleter>;
403
    using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>;
404
    using LendersIoOperPtr = std::unique_ptr<IoOper, LendersOperDeleter>;
405

406
    class IoReactor;
407
    class Impl;
408
    const std::unique_ptr<Impl> m_impl;
409

410
    template <class Oper, class... Args>
411
    static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...);
412

413
    using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie);
414
    void do_post(PostOperConstr, std::size_t size, void* cookie);
415
    template <class H>
416
    static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie);
417
    static void recycle_post_oper(Impl&, PostOperBase*) noexcept;
418
    static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
419
    static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
420

421
    using clock = std::chrono::steady_clock;
422

423
    friend class Resolver;
424
    friend class SocketBase;
425
    friend class Socket;
426
    friend class Acceptor;
427
    friend class DeadlineTimer;
428
    friend class ReadAheadBuffer;
429
    friend class ssl::Stream;
430
};
431

432

433
template <class Oper>
434
class Service::OperQueue {
435
public:
436
    using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>;
437
    bool empty() const noexcept;
438
    void push_back(LendersOperPtr) noexcept;
439
    template <class Oper2>
440
    void push_back(OperQueue<Oper2>&) noexcept;
441
    LendersOperPtr pop_front() noexcept;
442
    void clear() noexcept;
443
    OperQueue() noexcept = default;
2,729,938✔
444
    OperQueue(OperQueue&&) noexcept;
445
    ~OperQueue() noexcept
446
    {
2,804,624✔
447
        clear();
2,804,624✔
448
    }
2,804,624✔
449

450
private:
451
    Oper* m_back = nullptr;
452
    template <class>
453
    friend class OperQueue;
454
};
455

456

457
class Service::Descriptor {
458
public:
459
#ifdef _WIN32
460
    using native_handle_type = SOCKET;
461
#else
462
    using native_handle_type = int;
463
#endif
464

465
    Impl& service_impl;
466

467
    Descriptor(Impl& service) noexcept;
468
    ~Descriptor() noexcept;
469

470
    /// \param in_blocking_mode Must be true if, and only if the passed file
471
    /// descriptor refers to a file description in which the file status flag
472
    /// O_NONBLOCK is not set.
473
    ///
474
    /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC
475
    /// set.
476
    void assign(native_handle_type fd, bool in_blocking_mode) noexcept;
477
    void close() noexcept;
478
    native_handle_type release() noexcept;
479

480
    bool is_open() const noexcept;
481

482
    native_handle_type native_handle() const noexcept;
483
    bool in_blocking_mode() const noexcept;
484

485
    void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept;
486
    std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept;
487
    std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept;
488

489
    /// \tparam Oper An operation type inherited from IoOper with an initate()
490
    /// function that initiates the operation and figures out whether it needs
491
    /// to read from, or write to the underlying descriptor to
492
    /// proceed. `initiate()` must return Want::read if the operation needs to
493
    /// read, or Want::write if the operation needs to write. If the operation
494
    /// completes immediately (e.g. due to a failure during initialization),
495
    /// `initiate()` must return Want::nothing.
496
    template <class Oper, class... Args>
497
    void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...);
498

499
    void ensure_blocking_mode();
500
    void ensure_nonblocking_mode();
501

502
private:
503
    native_handle_type m_fd = -1;
504
    bool m_in_blocking_mode; // Not in nonblocking mode
505

506
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
507
    bool m_read_ready;
508
    bool m_write_ready;
509
    bool m_imminent_end_of_input; // Kernel has seen the end of input
510
    bool m_is_registered;
511
    OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops;
512

513
    void deregister_for_async() noexcept;
514
#endif
515

516
    bool assume_read_would_block() const noexcept;
517
    bool assume_write_would_block() const noexcept;
518

519
    void set_read_ready(bool) noexcept;
520
    void set_write_ready(bool) noexcept;
521

522
    void set_nonblock_flag(bool value);
523
    void add_initiated_oper(LendersIoOperPtr, Want);
524

525
    void do_close() noexcept;
526
    native_handle_type do_release() noexcept;
527

528
    friend class IoReactor;
529
};
530

531

532
class Resolver {
533
public:
534
    class Query;
535

536
    Resolver(Service&);
537
    ~Resolver() noexcept;
538

539
    /// Thread-safe.
540
    Service& get_service() noexcept;
541

542
    /// @{ \brief Resolve the specified query to one or more endpoints.
543
    Endpoint::List resolve(const Query&);
544
    Endpoint::List resolve(const Query&, std::error_code&);
545
    /// @}
546

547
    /// \brief Perform an asynchronous resolve operation.
548
    ///
549
    /// Initiate an asynchronous resolve operation. The completion handler will
550
    /// be called when the operation completes. The operation completes when it
551
    /// succeeds, or an error occurs.
552
    ///
553
    /// The completion handler is always executed by the event loop thread,
554
    /// i.e., by a thread that is executing Service::run(). Conversely, the
555
    /// completion handler is guaranteed to not be called while no thread is
556
    /// executing Service::run(). The execution of the completion handler is
557
    /// always deferred to the event loop, meaning that it never happens as a
558
    /// synchronous side effect of the execution of async_resolve(), even when
559
    /// async_resolve() is executed by the event loop thread. The completion
560
    /// handler is guaranteed to be called eventually, as long as there is time
561
    /// enough for the operation to complete or fail, and a thread is executing
562
    /// Service::run() for long enough.
563
    ///
564
    /// The operation can be canceled by calling cancel(), and will be
565
    /// automatically canceled if the resolver object is destroyed. If the
566
    /// operation is canceled, it will fail with `error::operation_aborted`. The
567
    /// operation remains cancelable up until the point in time where the
568
    /// completion handler starts to execute. This means that if cancel() is
569
    /// called before the completion handler starts to execute, then the
570
    /// completion handler is guaranteed to have `error::operation_aborted`
571
    /// passed to it. This is true regardless of whether cancel() is called
572
    /// explicitly or implicitly, such as when the resolver is destroyed.
573
    ///
574
    /// The specified handler will be executed by an expression on the form
575
    /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is
576
    /// an object of type `Endpoint::List`. If the the handler object is
577
    /// movable, it will never be copied. Otherwise, it will be copied as
578
    /// necessary.
579
    ///
580
    /// It is an error to start a new resolve operation (synchronous or
581
    /// asynchronous) while an asynchronous resolve operation is in progress via
582
    /// the same resolver object. An asynchronous resolve operation is
583
    /// considered complete as soon as the completion handler starts to
584
    /// execute. This means that a new resolve operation can be started from the
585
    /// completion handler.
586
    template <class H>
587
    void async_resolve(Query, H&& handler);
588

589
    /// \brief Cancel all asynchronous operations.
590
    ///
591
    /// Cause all incomplete asynchronous operations, that are associated with
592
    /// this resolver (at most one), to fail with `error::operation_aborted`. An
593
    /// asynchronous operation is complete precisely when its completion handler
594
    /// starts executing.
595
    ///
596
    /// Completion handlers of canceled operations will become immediately ready
597
    /// to execute, but will never be executed directly as part of the execution
598
    /// of cancel().
599
    ///
600
    /// Cancellation happens automatically when the resolver object is destroyed.
601
    void cancel() noexcept;
602

603
private:
604
    template <class H>
605
    class ResolveOper;
606

607
    Service::Impl& m_service_impl;
608

609
    Service::OwnersOperPtr m_resolve_oper;
610

611
    void initiate_oper(Service::LendersResolveOperPtr);
612
};
613

614

615
class Resolver::Query {
616
public:
617
    enum {
618
        /// Locally bound socket endpoint (server side)
619
        passive = AI_PASSIVE,
620

621
        /// Ignore families without a configured non-loopback address
622
        address_configured = AI_ADDRCONFIG
623
    };
624

625
    Query(std::string service_port, int init_flags = passive | address_configured);
626
    Query(const StreamProtocol&, std::string service_port, int init_flags = passive | address_configured);
627
    Query(std::string host_name, std::string service_port, int init_flags = address_configured);
628
    Query(const StreamProtocol&, std::string host_name, std::string service_port,
629
          int init_flags = address_configured);
630

631
    ~Query() noexcept;
632

633
    int flags() const;
634
    StreamProtocol protocol() const;
635
    std::string host() const;
636
    std::string service() const;
637

638
private:
639
    int m_flags;
640
    StreamProtocol m_protocol;
641
    std::string m_host;    // hostname
642
    std::string m_service; // port
643

644
    friend class Service;
645
};
646

647

648
class SocketBase {
649
public:
650
    using native_handle_type = Service::Descriptor::native_handle_type;
651

652
    ~SocketBase() noexcept;
653

654
    /// Thread-safe.
655
    Service& get_service() noexcept;
656

657
    bool is_open() const noexcept;
658
    native_handle_type native_handle() const noexcept;
659

660
    /// @{ \brief Open the socket for use with the specified protocol.
661
    ///
662
    /// It is an error to call open() on a socket that is already open.
663
    void open(const StreamProtocol&);
664
    std::error_code open(const StreamProtocol&, std::error_code&);
665
    /// @}
666

667
    /// \brief Close this socket.
668
    ///
669
    /// If the socket is open, it will be closed. If it is already closed (or
670
    /// never opened), this function does nothing (idempotency).
671
    ///
672
    /// A socket is automatically closed when destroyed.
673
    ///
674
    /// When the socket is closed, any incomplete asynchronous operation will be
675
    /// canceled (as if cancel() was called).
676
    void close() noexcept;
677

678
    /// \brief Cancel all asynchronous operations.
679
    ///
680
    /// Cause all incomplete asynchronous operations, that are associated with
681
    /// this socket, to fail with `error::operation_aborted`. An asynchronous
682
    /// operation is complete precisely when its completion handler starts
683
    /// executing.
684
    ///
685
    /// Completion handlers of canceled operations will become immediately ready
686
    /// to execute, but will never be executed directly as part of the execution
687
    /// of cancel().
688
    void cancel() noexcept;
689

690
    template <class O>
691
    void get_option(O& opt) const;
692

693
    template <class O>
694
    std::error_code get_option(O& opt, std::error_code&) const;
695

696
    template <class O>
697
    void set_option(const O& opt);
698

699
    template <class O>
700
    std::error_code set_option(const O& opt, std::error_code&);
701

702
    void bind(const Endpoint&);
703
    std::error_code bind(const Endpoint&, std::error_code&);
704

705
    Endpoint local_endpoint() const;
706
    Endpoint local_endpoint(std::error_code&) const;
707

708
    /// Release the ownership of this socket object over the native handle and
709
    /// return the native handle to the caller. The caller assumes ownership
710
    /// over the returned handle. The socket is left in a closed
711
    /// state. Incomplete asynchronous operations will be canceled as if close()
712
    /// had been called.
713
    ///
714
    /// If called on a closed socket, this function is a no-op, and returns the
715
    /// same value as would be returned by native_handle()
716
    native_handle_type release_native_handle() noexcept;
717

718
private:
719
    enum opt_enum {
720
        opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR`
721
        opt_Linger,    ///< `SOL_SOCKET`, `SO_LINGER`
722
        opt_NoDelay,   ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm)
723
    };
724

725
    template <class, int, class>
726
    class Option;
727

728
public:
729
    using reuse_address = Option<bool, opt_ReuseAddr, int>;
730
    using no_delay = Option<bool, opt_NoDelay, int>;
731

732
    // linger struct defined by POSIX sys/socket.h.
733
    struct linger_opt;
734
    using linger = Option<linger_opt, opt_Linger, struct linger>;
735

736
protected:
737
    Service::Descriptor m_desc;
738

739
private:
740
    StreamProtocol m_protocol;
741

742
protected:
743
    Service::OwnersOperPtr m_read_oper;  // Read or accept
744
    Service::OwnersOperPtr m_write_oper; // Write or connect
745

746
    SocketBase(Service&);
747

748
    const StreamProtocol& get_protocol() const noexcept;
749
    std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&);
750
    void do_close() noexcept;
751

752
    void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const;
753
    void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&);
754
    void map_option(opt_enum, int& level, int& option_name) const;
755

756
    friend class Acceptor;
757
};
758

759

760
template <class T, int opt, class U>
761
class SocketBase::Option {
762
public:
763
    Option(T value = T());
764
    T value() const;
765

766
private:
767
    T m_value;
768

769
    void get(const SocketBase&, std::error_code&);
770
    void set(SocketBase&, std::error_code&) const;
771

772
    friend class SocketBase;
773
};
774

775
struct SocketBase::linger_opt {
776
    linger_opt(bool enable, int timeout_seconds = 0)
777
    {
×
778
        m_linger.l_onoff = enable ? 1 : 0;
×
779
        m_linger.l_linger = timeout_seconds;
×
780
    }
×
781

782
    ::linger m_linger;
783

784
    operator ::linger() const
785
    {
×
786
        return m_linger;
×
787
    }
×
788

789
    bool enabled() const
790
    {
×
791
        return m_linger.l_onoff != 0;
×
792
    }
×
793
    int timeout() const
794
    {
×
795
        return m_linger.l_linger;
×
796
    }
×
797
};
798

799

800
/// Switching between synchronous and asynchronous operations is allowed, but
801
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
802
/// allowed to run concurrently with an asynchronous one on the same
803
/// socket. Note that an asynchronous operation is considered to be running
804
/// until its completion handler starts executing.
805
class Socket : public SocketBase {
806
public:
807
    Socket(Service&);
808

809
    /// \brief Create a socket with an already-connected native socket handle.
810
    ///
811
    /// This constructor is shorthand for creating the socket with the
812
    /// one-argument constructor, and then calling the two-argument assign()
813
    /// with the specified protocol and native handle.
814
    Socket(Service&, const StreamProtocol&, native_handle_type);
815

816
    ~Socket() noexcept;
817

818
    void connect(const Endpoint&);
819
    std::error_code connect(const Endpoint&, std::error_code&);
820

821
    /// @{ \brief Perform a synchronous read operation.
822
    ///
823
    /// read() will not return until the specified buffer is full, or an error
824
    /// occurs. Reaching the end of input before the buffer is filled, is
825
    /// considered an error, and will cause the operation to fail with
826
    /// MiscExtErrors::end_of_input.
827
    ///
828
    /// read_until() will not return until the specified buffer contains the
829
    /// specified delimiter, or an error occurs. If the buffer is filled before
830
    /// the delimiter is found, the operation fails with
831
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
832
    /// reached before the delimiter is found, the operation fails with
833
    /// MiscExtErrors::end_of_input. If the operation succeeds, the last byte
834
    /// placed in the buffer is the delimiter.
835
    ///
836
    /// The versions that take a ReadAheadBuffer argument will read through that
837
    /// buffer. This allows for fewer larger reads on the underlying
838
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
839
    /// a read operation returns, it is important that the same read-ahead
840
    /// buffer is passed to the next read operation.
841
    ///
842
    /// The versions of read() and read_until() that do not take an
843
    /// `std::error_code&` argument will throw std::system_error on failure.
844
    ///
845
    /// The versions that do take an `std::error_code&` argument will set \a ec
846
    /// to `std::error_code()` on success, and to something else on failure. On
847
    /// failure they will return the number of bytes placed in the specified
848
    /// buffer before the error occured.
849
    ///
850
    /// \return The number of bytes places in the specified buffer upon return.
851
    std::size_t read(char* buffer, std::size_t size);
852
    std::size_t read(char* buffer, std::size_t size, std::error_code& ec);
853
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&);
854
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec);
855
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&);
856
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, std::error_code& ec);
857
    /// @}
858

859
    /// @{ \brief Perform a synchronous write operation.
860
    ///
861
    /// write() will not return until all the specified bytes have been written
862
    /// to the socket, or an error occurs.
863
    ///
864
    /// The versions of write() that does not take an `std::error_code&`
865
    /// argument will throw std::system_error on failure. When it succeeds, it
866
    /// always returns \a size.
867
    ///
868
    /// The versions that does take an `std::error_code&` argument will set \a
869
    /// ec to `std::error_code()` on success, and to something else on
870
    /// failure. On success it returns \a size. On faulure it returns the number
871
    /// of bytes written before the failure occured.
872
    std::size_t write(const char* data, std::size_t size);
873
    std::size_t write(const char* data, std::size_t size, std::error_code& ec);
874
    /// @}
875

876
    /// @{ \brief Read at least one byte from this socket.
877
    ///
878
    /// If \a size is zero, both versions of read_some() will return zero
879
    /// without blocking. Read errors may or may not be detected in this case.
880
    ///
881
    /// Otherwise, if \a size is greater than zero, and at least one byte is
882
    /// immediately available, that is, without blocking, then both versions
883
    /// will read at least one byte (but generally as many immediately available
884
    /// bytes as will fit into the specified buffer), and return without
885
    /// blocking.
886
    ///
887
    /// Otherwise, both versions will block the calling thread until at least one
888
    /// byte becomes available, or an error occurs.
889
    ///
890
    /// In this context, it counts as an error, if the end of input is reached
891
    /// before at least one byte becomes available (see
892
    /// MiscExtErrors::end_of_input).
893
    ///
894
    /// If no error occurs, both versions will return the number of bytes placed
895
    /// in the specified buffer, which is generally as many as are immediately
896
    /// available at the time when the first byte becomes available, although
897
    /// never more than \a size.
898
    ///
899
    /// If no error occurs, the three-argument version will set \a ec to
900
    /// indicate success.
901
    ///
902
    /// If an error occurs, the two-argument version will throw
903
    /// `std::system_error`, while the three-argument version will set \a ec to
904
    /// indicate the error, and return zero.
905
    ///
906
    /// As long as \a size is greater than zero, the two argument version will
907
    /// always return a value that is greater than zero, while the three
908
    /// argument version will return a value greater than zero when, and only
909
    /// when \a ec is set to indicate success (no error, and no end of input).
910
    std::size_t read_some(char* buffer, std::size_t size);
911
    std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec);
912
    /// @}
913

914
    /// @{ \brief Write at least one byte to this socket.
915
    ///
916
    /// If \a size is zero, both versions of write_some() will return zero
917
    /// without blocking. Write errors may or may not be detected in this case.
918
    ///
919
    /// Otherwise, if \a size is greater than zero, and at least one byte can be
920
    /// written immediately, that is, without blocking, then both versions will
921
    /// write at least one byte (but generally as many as can be written
922
    /// immediately), and return without blocking.
923
    ///
924
    /// Otherwise, both versions will block the calling thread until at least one
925
    /// byte can be written, or an error occurs.
926
    ///
927
    /// If no error occurs, both versions will return the number of bytes
928
    /// written, which is generally as many as can be written immediately at the
929
    /// time when the first byte can be written.
930
    ///
931
    /// If no error occurs, the three-argument version will set \a ec to
932
    /// indicate success.
933
    ///
934
    /// If an error occurs, the two-argument version will throw
935
    /// `std::system_error`, while the three-argument version will set \a ec to
936
    /// indicate the error, and return zero.
937
    ///
938
    /// As long as \a size is greater than zero, the two argument version will
939
    /// always return a value that is greater than zero, while the three
940
    /// argument version will return a value greater than zero when, and only
941
    /// when \a ec is set to indicate success.
942
    std::size_t write_some(const char* data, std::size_t size);
943
    std::size_t write_some(const char* data, std::size_t size, std::error_code&);
944
    /// @}
945

946
    /// \brief Perform an asynchronous connect operation.
947
    ///
948
    /// Initiate an asynchronous connect operation. The completion handler is
949
    /// called when the operation completes. The operation completes when the
950
    /// connection is established, or an error occurs.
951
    ///
952
    /// The completion handler is always executed by the event loop thread,
953
    /// i.e., by a thread that is executing Service::run(). Conversely, the
954
    /// completion handler is guaranteed to not be called while no thread is
955
    /// executing Service::run(). The execution of the completion handler is
956
    /// always deferred to the event loop, meaning that it never happens as a
957
    /// synchronous side effect of the execution of async_connect(), even when
958
    /// async_connect() is executed by the event loop thread. The completion
959
    /// handler is guaranteed to be called eventually, as long as there is time
960
    /// enough for the operation to complete or fail, and a thread is executing
961
    /// Service::run() for long enough.
962
    ///
963
    /// The operation can be canceled by calling cancel(), and will be
964
    /// automatically canceled if the socket is closed. If the operation is
965
    /// canceled, it will fail with `error::operation_aborted`. The operation
966
    /// remains cancelable up until the point in time where the completion
967
    /// handler starts to execute. This means that if cancel() is called before
968
    /// the completion handler starts to execute, then the completion handler is
969
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
970
    /// regardless of whether cancel() is called explicitly or implicitly, such
971
    /// as when the socket is destroyed.
972
    ///
973
    /// If the socket is not already open, it will be opened as part of the
974
    /// connect operation as if by calling `open(ep.protocol())`. If the opening
975
    /// operation succeeds, but the connect operation fails, the socket will be
976
    /// left in the opened state.
977
    ///
978
    /// The specified handler will be executed by an expression on the form
979
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
980
    /// movable, it will never be copied. Otherwise, it will be copied as
981
    /// necessary.
982
    ///
983
    /// It is an error to start a new connect operation (synchronous or
984
    /// asynchronous) while an asynchronous connect operation is in progress. An
985
    /// asynchronous connect operation is considered complete as soon as the
986
    /// completion handler starts to execute.
987
    ///
988
    /// \param ep The remote endpoint of the connection to be established.
989
    template <class H>
990
    void async_connect(const Endpoint& ep, H&& handler);
991

992
    /// @{ \brief Perform an asynchronous read operation.
993
    ///
994
    /// Initiate an asynchronous buffered read operation on the associated
995
    /// socket. The completion handler will be called when the operation
996
    /// completes, or an error occurs.
997
    ///
998
    /// async_read() will continue reading until the specified buffer is full,
999
    /// or an error occurs. If the end of input is reached before the buffer is
1000
    /// filled, the operation fails with MiscExtErrors::end_of_input.
1001
    ///
1002
    /// async_read_until() will continue reading until the specified buffer
1003
    /// contains the specified delimiter, or an error occurs. If the buffer is
1004
    /// filled before a delimiter is found, the operation fails with
1005
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
1006
    /// reached before a delimiter is found, the operation fails with
1007
    /// MiscExtErrors::end_of_input. Otherwise, if the operation succeeds, the
1008
    /// last byte placed in the buffer is the delimiter.
1009
    ///
1010
    /// The versions that take a ReadAheadBuffer argument will read through that
1011
    /// buffer. This allows for fewer larger reads on the underlying
1012
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
1013
    /// a read operation completes, it is important that the same read-ahead
1014
    /// buffer is passed to the next read operation.
1015
    ///
1016
    /// The completion handler is always executed by the event loop thread,
1017
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1018
    /// completion handler is guaranteed to not be called while no thread is
1019
    /// executing Service::run(). The execution of the completion handler is
1020
    /// always deferred to the event loop, meaning that it never happens as a
1021
    /// synchronous side effect of the execution of async_read() or
1022
    /// async_read_until(), even when async_read() or async_read_until() is
1023
    /// executed by the event loop thread. The completion handler is guaranteed
1024
    /// to be called eventually, as long as there is time enough for the
1025
    /// operation to complete or fail, and a thread is executing Service::run()
1026
    /// for long enough.
1027
    ///
1028
    /// The operation can be canceled by calling cancel() on the associated
1029
    /// socket, and will be automatically canceled if the associated socket is
1030
    /// closed. If the operation is canceled, it will fail with
1031
    /// `error::operation_aborted`. The operation remains cancelable up until
1032
    /// the point in time where the completion handler starts to execute. This
1033
    /// means that if cancel() is called before the completion handler starts to
1034
    /// execute, then the completion handler is guaranteed to have
1035
    /// `error::operation_aborted` passed to it. This is true regardless of
1036
    /// whether cancel() is called explicitly or implicitly, such as when the
1037
    /// socket is destroyed.
1038
    ///
1039
    /// The specified handler will be executed by an expression on the form
1040
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1041
    /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to
1042
    /// be less than, or equal to \a size. If the the handler object is movable,
1043
    /// it will never be copied. Otherwise, it will be copied as necessary.
1044
    ///
1045
    /// It is an error to start a read operation before the associated socket is
1046
    /// connected.
1047
    ///
1048
    /// It is an error to start a new read operation (synchronous or
1049
    /// asynchronous) while an asynchronous read operation is in progress. An
1050
    /// asynchronous read operation is considered complete as soon as the
1051
    /// completion handler starts executing. This means that a new read
1052
    /// operation can be started from the completion handler of another
1053
    /// asynchronous buffered read operation.
1054
    template <class H>
1055
    void async_read(char* buffer, std::size_t size, H&& handler);
1056
    template <class H>
1057
    void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H&& handler);
1058
    template <class H>
1059
    void async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, H&& handler);
1060
    /// @}
1061

1062
    /// \brief Perform an asynchronous write operation.
1063
    ///
1064
    /// Initiate an asynchronous write operation. The completion handler is
1065
    /// called when the operation completes. The operation completes when all
1066
    /// the specified bytes have been written to the socket, or an error occurs.
1067
    ///
1068
    /// The completion handler is always executed by the event loop thread,
1069
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1070
    /// completion handler is guaranteed to not be called while no thread is
1071
    /// executing Service::run(). The execution of the completion handler is
1072
    /// always deferred to the event loop, meaning that it never happens as a
1073
    /// synchronous side effect of the execution of async_write(), even when
1074
    /// async_write() is executed by the event loop thread. The completion
1075
    /// handler is guaranteed to be called eventually, as long as there is time
1076
    /// enough for the operation to complete or fail, and a thread is executing
1077
    /// Service::run() for long enough.
1078
    ///
1079
    /// The operation can be canceled by calling cancel(), and will be
1080
    /// automatically canceled if the socket is closed. If the operation is
1081
    /// canceled, it will fail with `error::operation_aborted`. The operation
1082
    /// remains cancelable up until the point in time where the completion
1083
    /// handler starts to execute. This means that if cancel() is called before
1084
    /// the completion handler starts to execute, then the completion handler is
1085
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1086
    /// regardless of whether cancel() is called explicitly or implicitly, such
1087
    /// as when the socket is destroyed.
1088
    ///
1089
    /// The specified handler will be executed by an expression on the form
1090
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1091
    /// bytes written (of type `std::size_t`). If the the handler object is
1092
    /// movable, it will never be copied. Otherwise, it will be copied as
1093
    /// necessary.
1094
    ///
1095
    /// It is an error to start an asynchronous write operation before the
1096
    /// socket is connected.
1097
    ///
1098
    /// It is an error to start a new write operation (synchronous or
1099
    /// asynchronous) while an asynchronous write operation is in progress. An
1100
    /// asynchronous write operation is considered complete as soon as the
1101
    /// completion handler starts to execute. This means that a new write
1102
    /// operation can be started from the completion handler of another
1103
    /// asynchronous write operation.
1104
    template <class H>
1105
    void async_write(const char* data, std::size_t size, H&& handler);
1106

1107
    template <class H>
1108
    void async_read_some(char* buffer, std::size_t size, H&& handler);
1109
    template <class H>
1110
    void async_write_some(const char* data, std::size_t size, H&& handler);
1111

1112
    enum shutdown_type {
1113
#ifdef _WIN32
1114
        /// Shutdown the receiving side of the socket.
1115
        shutdown_receive = SD_RECEIVE,
1116

1117
        /// Shutdown the sending side of the socket.
1118
        shutdown_send = SD_SEND,
1119

1120
        /// Shutdown both sending and receiving side of the socket.
1121
        shutdown_both = SD_BOTH
1122
#else
1123
        shutdown_receive = SHUT_RD,
1124
        shutdown_send = SHUT_WR,
1125
        shutdown_both = SHUT_RDWR
1126
#endif
1127
    };
1128

1129
    /// @{ \brief Shut down the connected sockets sending and/or receiving
1130
    /// side.
1131
    ///
1132
    /// It is an error to call this function when the socket is not both open
1133
    /// and connected.
1134
    void shutdown(shutdown_type);
1135
    std::error_code shutdown(shutdown_type, std::error_code&);
1136
    /// @}
1137

1138
    /// @{ \brief Initialize socket with an already-connected native socket
1139
    /// handle.
1140
    ///
1141
    /// The specified native handle must refer to a socket that is already fully
1142
    /// open and connected.
1143
    ///
1144
    /// If the assignment operation succeeds, this socket object has taken
1145
    /// ownership of the specified native handle, and the handle will be closed
1146
    /// when the socket object is destroyed, (or when close() is called). If the
1147
    /// operation fails, the caller still owns the specified native handle.
1148
    ///
1149
    /// It is an error to call connect() or async_connect() on a socket object
1150
    /// that is initialized this way (unless it is first closed).
1151
    ///
1152
    /// It is an error to call this function on a socket object that is already
1153
    /// open.
1154
    void assign(const StreamProtocol&, native_handle_type);
1155
    std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&);
1156
    /// @}
1157

1158
    /// Returns a reference to this socket, as this socket is the lowest layer
1159
    /// of a stream.
1160
    Socket& lowest_layer() noexcept;
1161

1162
private:
1163
    using Want = Service::Want;
1164
    using StreamOps = Service::BasicStreamOps<Socket>;
1165

1166
    class ConnectOperBase;
1167
    template <class H>
1168
    class ConnectOper;
1169

1170
    using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>;
1171

1172
    // `ec` untouched on success, but no immediate completion
1173
    bool initiate_async_connect(const Endpoint&, std::error_code& ec);
1174
    // `ec` untouched on success
1175
    std::error_code finalize_async_connect(std::error_code& ec) noexcept;
1176

1177
    // See Service::BasicStreamOps for details on these these 6 functions.
1178
    void do_init_read_async(std::error_code&, Want&) noexcept;
1179
    void do_init_write_async(std::error_code&, Want&) noexcept;
1180
    std::size_t do_read_some_sync(char* buffer, std::size_t size, std::error_code&) noexcept;
1181
    std::size_t do_write_some_sync(const char* data, std::size_t size, std::error_code&) noexcept;
1182
    std::size_t do_read_some_async(char* buffer, std::size_t size, std::error_code&, Want&) noexcept;
1183
    std::size_t do_write_some_async(const char* data, std::size_t size, std::error_code&, Want&) noexcept;
1184

1185
    friend class Service::BasicStreamOps<Socket>;
1186
    friend class Service::BasicStreamOps<ssl::Stream>;
1187
    friend class ReadAheadBuffer;
1188
    friend class ssl::Stream;
1189
};
1190

1191

1192
/// Switching between synchronous and asynchronous operations is allowed, but
1193
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
1194
/// allowed to run concurrently with an asynchronous one on the same
1195
/// acceptor. Note that an asynchronous operation is considered to be running
1196
/// until its completion handler starts executing.
1197
class Acceptor : public SocketBase {
1198
public:
1199
    Acceptor(Service&);
1200
    ~Acceptor() noexcept;
1201

1202
    static constexpr int max_connections = SOMAXCONN;
1203

1204
    void listen(int backlog = max_connections);
1205
    std::error_code listen(int backlog, std::error_code&);
1206

1207
    void accept(Socket&);
1208
    void accept(Socket&, Endpoint&);
1209
    std::error_code accept(Socket&, std::error_code&);
1210
    std::error_code accept(Socket&, Endpoint&, std::error_code&);
1211

1212
    /// @{ \brief Perform an asynchronous accept operation.
1213
    ///
1214
    /// Initiate an asynchronous accept operation. The completion handler will
1215
    /// be called when the operation completes. The operation completes when the
1216
    /// connection is accepted, or an error occurs. If the operation succeeds,
1217
    /// the specified local socket will have become connected to a remote
1218
    /// socket.
1219
    ///
1220
    /// The completion handler is always executed by the event loop thread,
1221
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1222
    /// completion handler is guaranteed to not be called while no thread is
1223
    /// executing Service::run(). The execution of the completion handler is
1224
    /// always deferred to the event loop, meaning that it never happens as a
1225
    /// synchronous side effect of the execution of async_accept(), even when
1226
    /// async_accept() is executed by the event loop thread. The completion
1227
    /// handler is guaranteed to be called eventually, as long as there is time
1228
    /// enough for the operation to complete or fail, and a thread is executing
1229
    /// Service::run() for long enough.
1230
    ///
1231
    /// The operation can be canceled by calling cancel(), and will be
1232
    /// automatically canceled if the acceptor is closed. If the operation is
1233
    /// canceled, it will fail with `error::operation_aborted`. The operation
1234
    /// remains cancelable up until the point in time where the completion
1235
    /// handler starts to execute. This means that if cancel() is called before
1236
    /// the completion handler starts to execute, then the completion handler is
1237
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1238
    /// regardless of whether cancel() is called explicitly or implicitly, such
1239
    /// as when the acceptor is destroyed.
1240
    ///
1241
    /// The specified handler will be executed by an expression on the form
1242
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
1243
    /// movable, it will never be copied. Otherwise, it will be copied as
1244
    /// necessary.
1245
    ///
1246
    /// It is an error to start a new accept operation (synchronous or
1247
    /// asynchronous) while an asynchronous accept operation is in progress. An
1248
    /// asynchronous accept operation is considered complete as soon as the
1249
    /// completion handler starts executing. This means that a new accept
1250
    /// operation can be started from the completion handler.
1251
    ///
1252
    /// \param sock This is the local socket, that upon successful completion
1253
    /// will have become connected to the remote socket. It must be in the
1254
    /// closed state (Socket::is_open()) when async_accept() is called.
1255
    ///
1256
    template <class H>
1257
    void async_accept(Socket& sock, H&& handler);
1258
    /// \param ep Upon completion, the remote peer endpoint will have been
1259
    /// assigned to this variable.
1260
    template <class H>
1261
    void async_accept(Socket& sock, Endpoint& ep, H&& handler);
1262
    /// @}
1263

1264
private:
1265
    using Want = Service::Want;
1266

1267
    class AcceptOperBase;
1268
    template <class H>
1269
    class AcceptOper;
1270

1271
    using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>;
1272

1273
    std::error_code accept(Socket&, Endpoint*, std::error_code&);
1274
    Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept;
1275

1276
    template <class H>
1277
    void async_accept(Socket&, Endpoint*, H&&);
1278
};
1279

1280

1281
/// \brief A timer object supporting asynchronous wait operations.
1282
///
1283
/// NOTE: The DeadlineTimer is not thread safe and async_wait() or cancel()
1284
/// must be called prior to calling run() or directly from the event loop.
1285
class DeadlineTimer {
1286
public:
1287
    DeadlineTimer(Service&);
1288
    ~DeadlineTimer() noexcept;
1289

1290
    /// Thread-safe.
1291
    Service& get_service() noexcept;
1292

1293
    /// \brief Perform an asynchronous wait operation.
1294
    ///
1295
    /// Initiate an asynchronous wait operation. The completion handler becomes
1296
    /// ready to execute when the expiration time is reached, or an error occurs
1297
    /// (cancellation counts as an error here). The expiration time is the time
1298
    /// of initiation plus the specified delay. The error code passed to the
1299
    /// completion handler will **never** indicate success, unless the
1300
    /// expiration time was reached.
1301
    ///
1302
    /// The completion handler is always executed by the event loop thread,
1303
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1304
    /// completion handler is guaranteed to not be called while no thread is
1305
    /// executing Service::run(). The execution of the completion handler is
1306
    /// always deferred to the event loop, meaning that it never happens as a
1307
    /// synchronous side effect of the execution of async_wait(), even when
1308
    /// async_wait() is executed by the event loop thread. The completion
1309
    /// handler is guaranteed to be called eventually, as long as there is time
1310
    /// enough for the operation to complete or fail, and a thread is executing
1311
    /// Service::run() for long enough.
1312
    ///
1313
    /// The operation can be canceled by calling cancel(), and will be
1314
    /// automatically canceled if the timer is destroyed. If the operation is
1315
    /// canceled, it will fail with `ErrorCodes::OperationAborted`. The operation
1316
    /// remains cancelable up until the point in time where the completion
1317
    /// handler starts to execute. This means that if cancel() is called before
1318
    /// the completion handler starts to execute, then the completion handler is
1319
    /// guaranteed to have `ErrorCodes::OperationAborted` passed to it. This is true
1320
    /// regardless of whether cancel() is called explicitly or implicitly, such
1321
    /// as when the timer is destroyed.
1322
    ///
1323
    /// The specified handler will be executed by an expression on the form
1324
    /// `handler(status)` where `status` is a Status object. If the handler object
1325
    /// is movable, it will never be copied. Otherwise, it will be copied as
1326
    /// necessary.
1327
    ///
1328
    /// It is an error to start a new asynchronous wait operation while an
1329
    /// another one is in progress. An asynchronous wait operation is in
1330
    /// progress until its completion handler starts executing.
1331
    template <class R, class P, class H>
1332
    void async_wait(std::chrono::duration<R, P> delay, H&& handler);
1333

1334
    /// \brief Cancel an asynchronous wait operation.
1335
    ///
1336
    /// If an asynchronous wait operation, that is associated with this deadline
1337
    /// timer, is in progress, cause it to fail with
1338
    /// `error::operation_aborted`. An asynchronous wait operation is in
1339
    /// progress until its completion handler starts executing.
1340
    ///
1341
    /// Completion handlers of canceled operations will become immediately ready
1342
    /// to execute, but will never be executed directly as part of the execution
1343
    /// of cancel().
1344
    ///
1345
    /// Cancellation happens automatically when the timer object is destroyed.
1346
    void cancel() noexcept;
1347

1348
private:
1349
    template <class H>
1350
    class WaitOper;
1351

1352
    using clock = Service::clock;
1353

1354
    Service::Impl& m_service_impl;
1355
    Service::OwnersOperPtr m_wait_oper;
1356

1357
    void initiate_oper(Service::LendersWaitOperPtr);
1358
};
1359

1360

1361
class ReadAheadBuffer {
1362
public:
1363
    ReadAheadBuffer();
1364

1365
    /// Discard any buffered data.
1366
    void clear() noexcept;
1367

1368
private:
1369
    using Want = Service::Want;
1370

1371
    char* m_begin = nullptr;
1372
    char* m_end = nullptr;
1373
    static constexpr std::size_t s_size = 1024;
1374
    const std::unique_ptr<char[]> m_buffer;
1375

1376
    bool empty() const noexcept;
1377
    bool read(char*& begin, char* end, int delim, std::error_code&) noexcept;
1378
    template <class S>
1379
    void refill_sync(S& stream, std::error_code&) noexcept;
1380
    template <class S>
1381
    bool refill_async(S& stream, std::error_code&, Want&) noexcept;
1382

1383
    template <class>
1384
    friend class Service::BasicStreamOps;
1385
};
1386

1387

1388
enum class ResolveErrors {
1389
    /// Host not found (authoritative).
1390
    host_not_found = 1,
1391

1392
    /// Host not found (non-authoritative).
1393
    host_not_found_try_again = 2,
1394

1395
    /// The query is valid but does not have associated address data.
1396
    no_data = 3,
1397

1398
    /// A non-recoverable error occurred.
1399
    no_recovery = 4,
1400

1401
    /// The service is not supported for the given socket type.
1402
    service_not_found = 5,
1403

1404
    /// The socket type is not supported.
1405
    socket_type_not_supported = 6,
1406
};
1407

1408
/// The error category associated with ResolveErrors. The name of this category is
1409
/// `realm.sync.network.resolve`.
1410
const std::error_category& resolve_error_category() noexcept;
1411

1412
std::error_code make_error_code(ResolveErrors err);
1413

1414
} // namespace realm::sync::network
1415

1416
namespace std {
1417

1418
template <>
1419
class is_error_code_enum<realm::sync::network::ResolveErrors> {
1420
public:
1421
    static const bool value = true;
1422
};
1423

1424
} // namespace std
1425

1426
namespace realm::sync::network {
1427

1428
// Implementation
1429

1430
// ---------------- StreamProtocol ----------------
1431

1432
inline StreamProtocol StreamProtocol::ip_v4()
1433
{
13,906✔
1434
    StreamProtocol prot;
13,906✔
1435
    prot.m_family = AF_INET;
13,906✔
1436
    return prot;
13,906✔
1437
}
13,906✔
1438

1439
inline StreamProtocol StreamProtocol::ip_v6()
1440
{
×
1441
    StreamProtocol prot;
×
1442
    prot.m_family = AF_INET6;
×
1443
    return prot;
×
1444
}
×
1445

1446
inline bool StreamProtocol::is_ip_v4() const
1447
{
44,300✔
1448
    return m_family == AF_INET;
44,300✔
1449
}
44,300✔
1450

1451
inline bool StreamProtocol::is_ip_v6() const
1452
{
×
1453
    return m_family == AF_INET6;
×
1454
}
×
1455

1456
inline int StreamProtocol::family() const
1457
{
13,898✔
1458
    return m_family;
13,898✔
1459
}
13,898✔
1460

1461
inline int StreamProtocol::protocol() const
1462
{
2✔
1463
    return m_protocol;
2✔
1464
}
2✔
1465

1466
inline StreamProtocol::StreamProtocol()
1467
    : m_family{AF_UNSPEC}
12,654✔
1468
    , // Allow both IPv4 and IPv6
1469
    m_socktype{SOCK_STREAM}
12,654✔
1470
    ,             // Or SOCK_DGRAM for UDP
1471
    m_protocol{0} // Any protocol
12,654✔
1472
{
28,154✔
1473
}
28,154✔
1474

1475
// ---------------- Address ----------------
1476

1477
inline bool Address::is_ip_v4() const
1478
{
×
1479
    return !m_is_ip_v6;
×
1480
}
×
1481

1482
inline bool Address::is_ip_v6() const
1483
{
×
1484
    return m_is_ip_v6;
×
1485
}
×
1486

1487
template <class C, class T>
1488
inline std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>& out, const Address& addr)
1489
{
14,086✔
1490
    // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO
1491
    // do?
1492
    union buffer_union {
14,086✔
1493
        char ip_v4[INET_ADDRSTRLEN];
14,086✔
1494
        char ip_v6[INET6_ADDRSTRLEN];
14,086✔
1495
    };
14,086✔
1496
    char buffer[sizeof(buffer_union)];
14,086✔
1497
    int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET;
14,086✔
1498
#ifdef _WIN32
1499
    void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union));
1500
#else
1501
    const void* src = &addr.m_union;
14,086✔
1502
#endif
14,086✔
1503
    const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer);
14,086✔
1504
    if (ret == 0) {
14,086✔
1505
        std::error_code ec = util::make_basic_system_error_code(errno);
×
1506
        throw std::system_error(ec);
×
1507
    }
×
1508
    out << ret;
14,086✔
1509
    return out;
14,086✔
1510
}
14,086✔
1511

1512
inline Address::Address()
1513
{
14,090✔
1514
    m_union.m_ip_v4 = ip_v4_type();
14,090✔
1515
}
14,090✔
1516

1517
inline Address make_address(const char* c_str)
1518
{
×
1519
    std::error_code ec;
×
1520
    Address addr = make_address(c_str, ec);
×
1521
    if (ec)
×
1522
        throw std::system_error(ec);
×
1523
    return addr;
×
1524
}
×
1525

1526
inline Address make_address(const std::string& str)
1527
{
×
1528
    std::error_code ec;
×
1529
    Address addr = make_address(str, ec);
×
1530
    if (ec)
×
1531
        throw std::system_error(ec);
×
1532
    return addr;
×
1533
}
×
1534

1535
inline Address make_address(const std::string& str, std::error_code& ec) noexcept
1536
{
×
1537
    return make_address(str.c_str(), ec);
×
1538
}
×
1539

1540
// ---------------- Endpoint ----------------
1541

1542
inline StreamProtocol Endpoint::protocol() const
1543
{
5,370✔
1544
    return m_protocol;
5,370✔
1545
}
5,370✔
1546

1547
inline Address Endpoint::address() const
1548
{
14,090✔
1549
    Address addr;
14,090✔
1550
    if (m_protocol.is_ip_v4()) {
14,094✔
1551
        addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr;
10,158✔
1552
    }
10,158✔
1553
    else {
2,147,487,583✔
1554
        addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr;
2,147,487,583✔
1555
        addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id;
2,147,487,583✔
1556
        addr.m_is_ip_v6 = true;
2,147,487,583✔
1557
    }
2,147,487,583✔
1558
    return addr;
14,090✔
1559
}
14,090✔
1560

1561
inline Endpoint::port_type Endpoint::port() const
1562
{
15,726✔
1563
    return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port : m_sockaddr_union.m_ip_v6.sin6_port);
15,726✔
1564
}
15,726✔
1565

1566
inline Endpoint::data_type* Endpoint::data()
1567
{
2✔
1568
    return &m_sockaddr_union.m_base;
2✔
1569
}
2✔
1570

1571
inline const Endpoint::data_type* Endpoint::data() const
1572
{
×
1573
    return &m_sockaddr_union.m_base;
×
1574
}
×
1575

1576
inline Endpoint::Endpoint()
1577
    : Endpoint{StreamProtocol::ip_v4(), 0}
5,830✔
1578
{
13,894✔
1579
}
13,894✔
1580

1581
inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port)
1582
    : m_protocol{protocol}
5,830✔
1583
{
13,894✔
1584
    int family = m_protocol.family();
13,894✔
1585
    if (family == AF_INET) {
13,894✔
1586
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
13,894✔
1587
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
13,894✔
1588
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
13,894✔
1589
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
13,894✔
1590
    }
13,894✔
1591
    else if (family == AF_INET6) {
×
1592
        m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear
×
1593
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
×
1594
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
×
1595
    }
×
1596
    else {
×
1597
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
×
1598
        m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC;
×
1599
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
×
1600
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
×
1601
    }
×
1602
}
13,894✔
1603

1604
inline Endpoint::Endpoint(const Address& addr, port_type port)
1605
{
1606
    if (addr.m_is_ip_v6) {
1607
        m_protocol = StreamProtocol::ip_v6();
1608
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1609
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1610
        m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0;
1611
        m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6;
1612
        m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id;
1613
    }
1614
    else {
1615
        m_protocol = StreamProtocol::ip_v4();
1616
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1617
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1618
        m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4;
1619
    }
1620
}
1621

1622
inline Endpoint::List::iterator Endpoint::List::begin() const noexcept
1623
{
8,468✔
1624
    return m_endpoints.data();
8,468✔
1625
}
8,468✔
1626

1627
inline Endpoint::List::iterator Endpoint::List::end() const noexcept
1628
{
1,212✔
1629
    return m_endpoints.data() + m_endpoints.size();
1,212✔
1630
}
1,212✔
1631

1632
inline std::size_t Endpoint::List::size() const noexcept
1633
{
10,890✔
1634
    return m_endpoints.size();
10,890✔
1635
}
10,890✔
1636

1637
inline bool Endpoint::List::empty() const noexcept
1638
{
3,630✔
1639
    return m_endpoints.size() == 0;
3,630✔
1640
}
3,630✔
1641

1642
// ---------------- Service::OperQueue ----------------
1643

1644
template <class Oper>
1645
inline bool Service::OperQueue<Oper>::empty() const noexcept
1646
{
10,969,632✔
1647
    return !m_back;
10,969,632✔
1648
}
10,969,632✔
1649

1650
template <class Oper>
1651
inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept
1652
{
7,336,874✔
1653
    REALM_ASSERT(!op->m_next);
7,336,874✔
1654
    if (m_back) {
7,336,874✔
1655
        op->m_next = m_back->m_next;
2,061,004✔
1656
        m_back->m_next = op.get();
2,061,004✔
1657
    }
2,061,004✔
1658
    else {
5,275,870✔
1659
        op->m_next = op.get();
5,275,870✔
1660
    }
5,275,870✔
1661
    m_back = op.release();
7,336,874✔
1662
}
7,336,874✔
1663

1664
template <class Oper>
1665
template <class Oper2>
1666
inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept
1667
{
3,524,884✔
1668
    if (!q.m_back)
3,524,884✔
1669
        return;
2,780,702✔
1670
    if (m_back)
744,182✔
1671
        std::swap(m_back->m_next, q.m_back->m_next);
11,676✔
1672
    m_back = q.m_back;
744,182✔
1673
    q.m_back = nullptr;
744,182✔
1674
}
744,182✔
1675

1676
template <class Oper>
1677
inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr
1678
{
12,202,644✔
1679
    Oper* op = nullptr;
12,202,644✔
1680
    if (m_back) {
12,202,644✔
1681
        op = static_cast<Oper*>(m_back->m_next);
7,342,068✔
1682
        if (op != m_back) {
7,342,068✔
1683
            m_back->m_next = op->m_next;
2,071,590✔
1684
        }
2,071,590✔
1685
        else {
5,270,478✔
1686
            m_back = nullptr;
5,270,478✔
1687
        }
5,270,478✔
1688
        op->m_next = nullptr;
7,342,068✔
1689
    }
7,342,068✔
1690
    return LendersOperPtr(op);
12,202,644✔
1691
}
12,202,644✔
1692

1693
template <class Oper>
1694
inline void Service::OperQueue<Oper>::clear() noexcept
1695
{
2,816,146✔
1696
    if (m_back) {
2,816,146✔
1697
        LendersOperPtr op(m_back);
1,606✔
1698
        while (op->m_next != m_back)
3,626✔
1699
            op.reset(static_cast<Oper*>(op->m_next));
2,020✔
1700
        m_back = nullptr;
1,606✔
1701
    }
1,606✔
1702
}
2,816,146✔
1703

1704
template <class Oper>
1705
inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept
1706
    : m_back{q.m_back}
82,788✔
1707
{
82,788✔
1708
    q.m_back = nullptr;
82,788✔
1709
}
82,788✔
1710

1711
// ---------------- Service::Descriptor ----------------
1712

1713
inline Service::Descriptor::Descriptor(Impl& s) noexcept
1714
    : service_impl{s}
4,588✔
1715
{
9,392✔
1716
}
9,392✔
1717

1718
inline Service::Descriptor::~Descriptor() noexcept
1719
{
9,392✔
1720
    if (is_open())
9,392✔
1721
        close();
×
1722
}
9,392✔
1723

1724
inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept
1725
{
8,160✔
1726
    REALM_ASSERT(!is_open());
8,160✔
1727
    m_fd = fd;
8,160✔
1728
    m_in_blocking_mode = in_blocking_mode;
8,160✔
1729
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
4,138✔
1730
    m_read_ready = false;
4,138✔
1731
    m_write_ready = false;
4,138✔
1732
    m_imminent_end_of_input = false;
4,138✔
1733
    m_is_registered = false;
4,138✔
1734
#endif
4,138✔
1735
}
8,160✔
1736

1737
inline void Service::Descriptor::close() noexcept
1738
{
8,156✔
1739
    REALM_ASSERT(is_open());
8,156✔
1740
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
4,138✔
1741
    if (m_is_registered)
4,138✔
1742
        deregister_for_async();
4,048✔
1743
    m_is_registered = false;
4,138✔
1744
#endif
4,138✔
1745
    do_close();
8,156✔
1746
}
8,156✔
1747

1748
inline auto Service::Descriptor::release() noexcept -> native_handle_type
1749
{
×
1750
    REALM_ASSERT(is_open());
×
1751
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
×
1752
    if (m_is_registered)
×
1753
        deregister_for_async();
×
1754
    m_is_registered = false;
×
1755
#endif
×
1756
    return do_release();
×
1757
}
×
1758

1759
inline bool Service::Descriptor::is_open() const noexcept
1760
{
60,402✔
1761
    return (m_fd != -1);
60,402✔
1762
}
60,402✔
1763

1764
inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type
1765
{
20,170✔
1766
    return m_fd;
20,170✔
1767
}
20,170✔
1768

1769
inline bool Service::Descriptor::in_blocking_mode() const noexcept
1770
{
1,536✔
1771
    return m_in_blocking_mode;
1,536✔
1772
}
1,536✔
1773

1774
template <class Oper, class... Args>
1775
inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op, Args&&... args)
1776
{
2,719,881✔
1777
    Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws
2,719,881✔
1778
    add_initiated_oper(std::move(op), want);                        // Throws
2,719,881✔
1779
}
2,719,881✔
1780

1781
inline void Service::Descriptor::ensure_blocking_mode()
1782
{
557,524✔
1783
    // Assuming that descriptors are either used mostly in blocking mode, or
1784
    // mostly in nonblocking mode.
1785
    if (REALM_UNLIKELY(!m_in_blocking_mode)) {
557,524✔
1786
        bool value = false;
144✔
1787
        set_nonblock_flag(value); // Throws
144✔
1788
        m_in_blocking_mode = true;
144✔
1789
    }
144✔
1790
}
557,524✔
1791

1792
inline void Service::Descriptor::ensure_nonblocking_mode()
1793
{
2,809,832✔
1794
    // Assuming that descriptors are either used mostly in blocking mode, or
1795
    // mostly in nonblocking mode.
1796
    if (REALM_UNLIKELY(m_in_blocking_mode)) {
2,809,832✔
1797
        bool value = true;
5,302✔
1798
        set_nonblock_flag(value); // Throws
5,302✔
1799
        m_in_blocking_mode = false;
5,302✔
1800
    }
5,302✔
1801
}
2,809,832✔
1802

1803
inline bool Service::Descriptor::assume_read_would_block() const noexcept
1804
{
2,173,458✔
1805
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,147,442✔
1806
    return !m_in_blocking_mode && !m_read_ready;
1,147,442✔
1807
#else
1808
    return false;
1,026,016✔
1809
#endif
1,026,016✔
1810
}
2,173,458✔
1811

1812
inline bool Service::Descriptor::assume_write_would_block() const noexcept
1813
{
1,508,500✔
1814
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
791,368✔
1815
    return !m_in_blocking_mode && !m_write_ready;
791,368✔
1816
#else
1817
    return false;
717,132✔
1818
#endif
717,132✔
1819
}
1,508,500✔
1820

1821
inline void Service::Descriptor::set_read_ready(bool value) noexcept
1822
{
2,181,012✔
1823
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,150,320✔
1824
    m_read_ready = value;
1,150,320✔
1825
#else
1826
    // No-op
1827
    static_cast<void>(value);
1,030,692✔
1828
#endif
1,030,692✔
1829
}
2,181,012✔
1830

1831
inline void Service::Descriptor::set_write_ready(bool value) noexcept
1832
{
1,412,838✔
1833
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
692,890✔
1834
    m_write_ready = value;
692,890✔
1835
#else
1836
    // No-op
1837
    static_cast<void>(value);
719,948✔
1838
#endif
719,948✔
1839
}
1,412,838✔
1840

1841
// ---------------- Service ----------------
1842

1843
class Service::AsyncOper {
1844
public:
1845
    bool in_use() const noexcept;
1846
    bool is_complete() const noexcept;
1847
    bool is_canceled() const noexcept;
1848
    void cancel() noexcept;
1849
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1850
    /// to this function or to recycle(). This function recycles the operation
1851
    /// object (commits suicide), even if it throws.
1852
    virtual void recycle_and_execute() = 0;
1853
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1854
    /// to recycle_and_execute() or to this function. This function destroys the
1855
    /// object (commits suicide).
1856
    virtual void recycle() noexcept = 0;
1857
    /// Must be called when the owner dies, and the object is in use (not an
1858
    /// instance of UnusedOper).
1859
    virtual void orphan() noexcept = 0;
1860

1861
protected:
1862
    AsyncOper(std::size_t size, bool in_use) noexcept;
1863
    virtual ~AsyncOper() noexcept {}
9,771,372✔
1864
    void set_is_complete(bool value) noexcept;
1865
    template <class H, class... Args>
1866
    void do_recycle_and_execute(bool orphaned, H& handler, Args&&...);
1867
    void do_recycle(bool orphaned) noexcept;
1868

1869
private:
1870
    std::size_t m_size; // Allocated number of bytes
1871
    bool m_in_use = false;
1872
    // Set to true when the operation completes successfully or fails. If the
1873
    // operation is canceled before this happens, it will never be set to
1874
    // true. Always false when not in use
1875
    bool m_complete = false;
1876
    // Set to true when the operation is canceled. Always false when not in use.
1877
    bool m_canceled = false;
1878
    AsyncOper* m_next = nullptr; // Always null when not in use
1879
    template <class H, class... Args>
1880
    void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...);
1881
    friend class Service;
1882
};
1883

1884
class Service::ResolveOperBase : public AsyncOper {
1885
public:
1886
    ResolveOperBase(std::size_t size, Resolver& resolver, Resolver::Query query) noexcept
1887
        : AsyncOper{size, true}
1,674✔
1888
        , m_resolver{&resolver}
1,674✔
1889
        , m_query{std::move(query)}
1,674✔
1890
    {
3,642✔
1891
    }
3,642✔
1892
    void complete() noexcept
1893
    {
3,640✔
1894
        set_is_complete(true);
3,640✔
1895
    }
3,640✔
1896
    void recycle() noexcept override final
1897
    {
6✔
1898
        bool orphaned = !m_resolver;
6✔
1899
        REALM_ASSERT(orphaned);
6✔
1900
        // Note: do_recycle() commits suicide.
1901
        do_recycle(orphaned);
6✔
1902
    }
6✔
1903
    void orphan() noexcept override final
1904
    {
6✔
1905
        m_resolver = nullptr;
6✔
1906
    }
6✔
1907

1908
protected:
1909
    Resolver* m_resolver;
1910
    Resolver::Query m_query;
1911
    Endpoint::List m_endpoints;
1912
    std::error_code m_error_code;
1913
    friend class Service;
1914
};
1915

1916
class Service::WaitOperBase : public AsyncOper {
1917
public:
1918
    WaitOperBase(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time) noexcept
1919
        : AsyncOper{size, true}
178,020✔
1920
        , // Second argument is `in_use`
1921
        m_timer{&timer}
178,020✔
1922
        , m_expiration_time{expiration_time}
178,020✔
1923
    {
452,800✔
1924
    }
452,800✔
1925
    void complete() noexcept
1926
    {
437,518✔
1927
        set_is_complete(true);
437,518✔
1928
    }
437,518✔
1929
    void recycle() noexcept override final
1930
    {
1,204✔
1931
        bool orphaned = !m_timer;
1,204✔
1932
        REALM_ASSERT(orphaned);
1,204✔
1933
        // Note: do_recycle() commits suicide.
1934
        do_recycle(orphaned);
1,204✔
1935
    }
1,204✔
1936
    void orphan() noexcept override final
1937
    {
14,824✔
1938
        m_timer = nullptr;
14,824✔
1939
    }
14,824✔
1940

1941
protected:
1942
    DeadlineTimer* m_timer;
1943
    clock::time_point m_expiration_time;
1944
    friend class Service;
1945
};
1946

1947
class Service::TriggerExecOperBase : public AsyncOper, public util::AtomicRefCountBase {
1948
public:
1949
    TriggerExecOperBase(Impl& service) noexcept
1950
        : AsyncOper{0, false}
1951
        , // First arg is `size` (unused), second arg is `in_use`
1952
        m_service{&service}
1953
    {
×
1954
    }
×
1955
    void recycle() noexcept override final
1956
    {
×
1957
        REALM_ASSERT(in_use());
×
1958
        REALM_ASSERT(!m_service);
×
1959
        // Note: Potential suicide when `self` goes out of scope
×
1960
        util::bind_ptr<TriggerExecOperBase> self{this, util::bind_ptr_base::adopt_tag{}};
×
1961
    }
×
1962
    void orphan() noexcept override final
1963
    {
×
1964
        REALM_ASSERT(m_service);
×
1965
        m_service = nullptr;
×
1966
    }
×
1967
    void trigger() noexcept
1968
    {
×
1969
        REALM_ASSERT(m_service);
×
1970
        Service::trigger_exec(*m_service, *this);
×
1971
    }
×
1972

1973
protected:
1974
    Impl* m_service;
1975
};
1976

1977
class Service::PostOperBase : public AsyncOper {
1978
public:
1979
    PostOperBase(std::size_t size, Impl& service) noexcept
1980
        : AsyncOper{size, true}
601,936✔
1981
        , // Second argument is `in_use`
1982
        m_service{service}
601,936✔
1983
    {
1,208,646✔
1984
    }
1,208,646✔
1985
    void recycle() noexcept override final
1986
    {
326✔
1987
        // Service::recycle_post_oper() destroys this operation object
1988
        Service::recycle_post_oper(m_service, this);
326✔
1989
    }
326✔
1990
    void orphan() noexcept override final
1991
    {
×
1992
        REALM_ASSERT(false); // Never called
×
1993
    }
×
1994

1995
protected:
1996
    Impl& m_service;
1997
};
1998

1999
template <class H>
2000
class Service::PostOper : public PostOperBase {
2001
    static_assert(std::is_nothrow_move_constructible_v<H>);
2002

2003
public:
2004
    PostOper(std::size_t size, Impl& service, H&& handler)
2005
        : PostOperBase{size, service}
390,843✔
2006
        , m_handler{std::move(handler)}
390,843✔
2007
    {
786,744✔
2008
    }
786,744✔
2009
    void recycle_and_execute() override final
2010
    {
786,414✔
2011
        // Recycle the operation object before the handler is exceuted, such
2012
        // that the memory is available for a new post operation that might be
2013
        // initiated during the execution of the handler.
2014
        H handler = std::move(m_handler);
786,414✔
2015
        // Service::recycle_post_oper() destroys this operation object
2016
        Service::recycle_post_oper(m_service, this);
786,414✔
2017
        handler(Status::OK()); // Throws
786,414✔
2018
    }
786,414✔
2019

2020
private:
2021
    H m_handler;
2022
};
2023

2024
class Service::IoOper : public AsyncOper {
2025
public:
2026
    IoOper(std::size_t size) noexcept
2027
        : AsyncOper{size, true} // Second argument is `in_use`
1,561,878✔
2028
    {
3,249,428✔
2029
    }
3,249,428✔
2030
    virtual Descriptor& descriptor() noexcept = 0;
2031
    /// Advance this operation and figure out out whether it needs to read from,
2032
    /// or write to the underlying descriptor to advance further. This function
2033
    /// must return Want::read if the operation needs to read, or Want::write if
2034
    /// the operation needs to write to advance further. If the operation
2035
    /// completes (due to success or failure), this function must return
2036
    /// Want::nothing.
2037
    virtual Want advance() noexcept = 0;
2038
};
2039

2040
class Service::UnusedOper : public AsyncOper {
2041
public:
2042
    UnusedOper(std::size_t size) noexcept
2043
        : AsyncOper{size, false} // Second argument is `in_use`
2,332,324✔
2044
    {
4,891,244✔
2045
    }
4,891,244✔
2046
    void recycle_and_execute() override final
2047
    {
×
2048
        // Must never be called
2049
        REALM_ASSERT(false);
×
2050
    }
×
2051
    void recycle() noexcept override final
2052
    {
×
2053
        // Must never be called
2054
        REALM_ASSERT(false);
×
2055
    }
×
2056
    void orphan() noexcept override final
2057
    {
×
2058
        // Must never be called
2059
        REALM_ASSERT(false);
×
2060
    }
×
2061
};
2062

2063
// `S` must be a stream class with the following member functions:
2064
//
2065
//    Socket& lowest_layer() noexcept;
2066
//
2067
//    void do_init_read_async(std::error_code& ec, Want& want) noexcept;
2068
//    void do_init_write_async(std::error_code& ec, Want& want) noexcept;
2069
//
2070
//    std::size_t do_read_some_sync(char* buffer, std::size_t size,
2071
//                                  std::error_code& ec) noexcept;
2072
//    std::size_t do_write_some_sync(const char* data, std::size_t size,
2073
//                                   std::error_code& ec) noexcept;
2074
//    std::size_t do_read_some_async(char* buffer, std::size_t size,
2075
//                                   std::error_code& ec, Want& want) noexcept;
2076
//    std::size_t do_write_some_async(const char* data, std::size_t size,
2077
//                                    std::error_code& ec, Want& want) noexcept;
2078
//
2079
// If an error occurs during any of these 6 functions, the `ec` argument must be
2080
// set accordingly. Otherwise the `ec` argument must be set to
2081
// `std::error_code()`.
2082
//
2083
// The do_init_*_async() functions must update the `want` argument to indicate
2084
// how the operation must be initiated:
2085
//
2086
//    Want::read      Wait for read readiness, then call do_*_some_async().
2087
//    Want::write     Wait for write readiness, then call do_*_some_async().
2088
//    Want::nothing   Call do_*_some_async() immediately without waiting for
2089
//                    read or write readiness.
2090
//
2091
// If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to
2092
// MiscExtErrors::end_of_input, and return zero.
2093
//
2094
// If an error occurs during reading or writing, do_*_some_sync() must set `ec`
2095
// accordingly (to something other than `std::system_error()`) and return
2096
// zero. Otherwise they must set `ec` to `std::system_error()` and return the
2097
// number of bytes read or written, which **must** be at least 1. If the
2098
// underlying socket is in nonblocking mode, and no bytes could be immediately
2099
// read or written, these functions must fail with
2100
// `error::resource_unavailable_try_again`.
2101
//
2102
// If an error occurs during reading or writing, do_*_some_async() must set `ec`
2103
// accordingly (to something other than `std::system_error()`), `want` to
2104
// `Want::nothing`, and return zero. Otherwise they must set `ec` to
2105
// `std::system_error()` and return the number of bytes read or written, which
2106
// must be zero if no bytes could be immediately read or written. Note, in this
2107
// case it is not an error if the underlying socket is in nonblocking mode, and
2108
// no bytes could be immediately read or written. When these functions succeed,
2109
// but return zero because no bytes could be immediately read or written, they
2110
// must set `want` to something other than `Want::nothing`.
2111
//
2112
// If no error occurs, do_*_some_async() must set `want` to indicate how the
2113
// operation should proceed if additional data needs to be read or written, or
2114
// if no bytes were transferred:
2115
//
2116
//    Want::read      Wait for read readiness, then call do_*_some_async() again.
2117
//    Want::write     Wait for write readiness, then call do_*_some_async() again.
2118
//    Want::nothing   Call do_*_some_async() again without waiting for read or
2119
//                    write readiness.
2120
//
2121
// NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it
2122
// means that the stream needs to write data to the underlying TCP socket before
2123
// it is able to deliver any additional data to the caller. While such a
2124
// situation will never occur on a raw TCP socket, it can occur on an SSL stream
2125
// (Secure Socket Layer).
2126
//
2127
// When do_*_some_async() returns `n`, at least one of the following conditions
2128
// must be true:
2129
//
2130
//    n > 0                     Bytes were transferred.
2131
//    ec != std::error_code()   An error occured.
2132
//    want != Want::nothing     Wait for read/write readiness.
2133
//
2134
// This is of critical importance, as it is the only way we can avoid falling
2135
// into a busy loop of repeated invocations of do_*_some_async().
2136
//
2137
// NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or
2138
// `Want::write`, even when they succesfully transfer a nonzero number of bytes.
2139
template <class S>
2140
class Service::BasicStreamOps {
2141
public:
2142
    class StreamOper;
2143
    class ReadOperBase;
2144
    class WriteOperBase;
2145
    class BufferedReadOperBase;
2146
    template <class H>
2147
    class ReadOper;
2148
    template <class H>
2149
    class WriteOper;
2150
    template <class H>
2151
    class BufferedReadOper;
2152

2153
    using LendersReadOperPtr = std::unique_ptr<ReadOperBase, LendersOperDeleter>;
2154
    using LendersWriteOperPtr = std::unique_ptr<WriteOperBase, LendersOperDeleter>;
2155
    using LendersBufferedReadOperPtr = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>;
2156

2157
    // Synchronous read
2158
    static std::size_t read(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2159
    {
4✔
2160
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
4✔
2161
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
4✔
2162
        char* begin = buffer;
4✔
2163
        char* end = buffer + size;
4✔
2164
        char* curr = begin;
4✔
2165
        for (;;) {
8✔
2166
            if (curr == end) {
8✔
2167
                ec = std::error_code(); // Success
×
2168
                break;
×
2169
            }
×
2170
            char* buffer_2 = curr;
8✔
2171
            std::size_t size_2 = std::size_t(end - curr);
8✔
2172
            std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec);
8✔
2173
            if (REALM_UNLIKELY(ec))
8✔
2174
                break;
4✔
2175
            REALM_ASSERT(n > 0);
4✔
2176
            REALM_ASSERT(n <= size_2);
4✔
2177
            curr += n;
4✔
2178
        }
4✔
2179
        std::size_t n = std::size_t(curr - begin);
4✔
2180
        return n;
4✔
2181
    }
4✔
2182

2183
    // Synchronous write
2184
    static std::size_t write(S& stream, const char* data, std::size_t size, std::error_code& ec)
2185
    {
262,129✔
2186
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
262,129✔
2187
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
262,129✔
2188
        const char* begin = data;
262,129✔
2189
        const char* end = data + size;
262,129✔
2190
        const char* curr = begin;
262,129✔
2191
        for (;;) {
524,254✔
2192
            if (curr == end) {
524,254✔
2193
                ec = std::error_code(); // Success
262,125✔
2194
                break;
262,125✔
2195
            }
262,125✔
2196
            const char* data_2 = curr;
262,129✔
2197
            std::size_t size_2 = std::size_t(end - curr);
262,129✔
2198
            std::size_t n = stream.do_write_some_sync(data_2, size_2, ec);
262,129✔
2199
            if (REALM_UNLIKELY(ec))
262,129✔
2200
                break;
4✔
2201
            REALM_ASSERT(n > 0);
262,125✔
2202
            REALM_ASSERT(n <= size_2);
262,125✔
2203
            curr += n;
262,125✔
2204
        }
262,125✔
2205
        std::size_t n = std::size_t(curr - begin);
262,129✔
2206
        return n;
262,129✔
2207
    }
262,129✔
2208

2209
    // Synchronous read
2210
    static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2211
                                     std::error_code& ec)
2212
    {
16,410✔
2213
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
16,410✔
2214
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
16,410✔
2215
        char* begin = buffer;
16,410✔
2216
        char* end = buffer + size;
16,410✔
2217
        char* curr = begin;
16,410✔
2218
        for (;;) {
147,499✔
2219
            bool complete = rab.read(curr, end, delim, ec);
147,499✔
2220
            if (complete)
147,499✔
2221
                break;
16,402✔
2222

×
2223
            rab.refill_sync(stream, ec);
131,097✔
2224
            if (REALM_UNLIKELY(ec))
131,097✔
2225
                break;
8✔
2226
        }
131,097✔
2227
        std::size_t n = (curr - begin);
16,410✔
2228
        return n;
16,410✔
2229
    }
16,410✔
2230

2231
    // Synchronous read
2232
    static std::size_t read_some(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2233
    {
20✔
2234
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
20✔
2235
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
20✔
2236
        return stream.do_read_some_sync(buffer, size, ec);
20✔
2237
    }
20✔
2238

2239
    // Synchronous write
2240
    static std::size_t write_some(S& stream, const char* data, std::size_t size, std::error_code& ec)
2241
    {
97✔
2242
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
97✔
2243
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
97✔
2244
        return stream.do_write_some_sync(data, size, ec);
97✔
2245
    }
97✔
2246

2247
    template <class H>
2248
    static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H&& handler)
2249
    {
539,348✔
2250
        char* begin = buffer;
539,348✔
2251
        char* end = buffer + size;
539,348✔
2252
        LendersReadOperPtr op = Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some,
539,348✔
2253
                                                            begin, end, std::move(handler)); // Throws
539,348✔
2254
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                           // Throws
539,348✔
2255
    }
539,348✔
2256

2257
    template <class H>
2258
    static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some, H&& handler)
2259
    {
712,014✔
2260
        const char* begin = data;
712,014✔
2261
        const char* end = data + size;
712,014✔
2262
        LendersWriteOperPtr op = Service::alloc<WriteOper<H>>(
712,014✔
2263
            stream.lowest_layer().m_write_oper, stream, is_write_some, begin, end, std::move(handler)); // Throws
712,014✔
2264
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                                      // Throws
712,014✔
2265
    }
712,014✔
2266

2267
    template <class H>
2268
    static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2269
                                    H&& handler)
2270
    {
679,479✔
2271
        char* begin = buffer;
679,479✔
2272
        char* end = buffer + size;
679,479✔
2273
        LendersBufferedReadOperPtr op =
679,479✔
2274
            Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream, begin, end, delim, rab,
679,479✔
2275
                                                std::move(handler)); // Throws
679,479✔
2276
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));   // Throws
679,479✔
2277
    }
679,479✔
2278
};
2279

2280
template <class S>
2281
class Service::BasicStreamOps<S>::StreamOper : public IoOper {
2282
public:
2283
    StreamOper(std::size_t size, S& stream) noexcept
2284
        : IoOper{size}
1,563,918✔
2285
        , m_stream{&stream}
1,563,918✔
2286
    {
3,247,710✔
2287
    }
3,247,710✔
2288
    void recycle() noexcept override final
2289
    {
880✔
2290
        bool orphaned = !m_stream;
880✔
2291
        REALM_ASSERT(orphaned);
880✔
2292
        // Note: do_recycle() commits suicide.
2293
        do_recycle(orphaned);
880✔
2294
    }
880✔
2295
    void orphan() noexcept override final
2296
    {
4,946✔
2297
        m_stream = nullptr;
4,946✔
2298
    }
4,946✔
2299
    Descriptor& descriptor() noexcept override final
2300
    {
231,634✔
2301
        return m_stream->lowest_layer().m_desc;
231,634✔
2302
    }
231,634✔
2303

2304
protected:
2305
    S* m_stream;
2306
    std::error_code m_error_code;
2307
};
2308

2309
template <class S>
2310
class Service::BasicStreamOps<S>::ReadOperBase : public StreamOper {
2311
public:
2312
    ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept
2313
        : StreamOper{size, stream}
257,863✔
2314
        , m_is_read_some{is_read_some}
257,863✔
2315
        , m_begin{begin}
257,863✔
2316
        , m_end{end}
257,863✔
2317
    {
539,298✔
2318
    }
539,298✔
2319
    Want initiate()
2320
    {
538,808✔
2321
        auto& s = *this;
538,808✔
2322
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
538,808✔
2323
        REALM_ASSERT(!s.is_complete());
538,808✔
2324
        REALM_ASSERT(s.m_curr <= s.m_end);
538,808✔
2325
        Want want = Want::nothing;
538,808✔
2326
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
538,808✔
2327
            s.set_is_complete(true); // Success
×
2328
        }
×
2329
        else {
538,808✔
2330
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
538,808✔
2331
            s.m_stream->do_init_read_async(s.m_error_code, want);
538,808✔
2332
            if (want == Want::nothing) {
538,808✔
2333
                if (REALM_UNLIKELY(s.m_error_code)) {
235,884!
2334
                    s.set_is_complete(true); // Failure
×
2335
                }
×
2336
                else {
235,884✔
2337
                    want = advance();
235,884✔
2338
                }
235,884✔
2339
            }
235,884✔
2340
        }
538,808✔
2341
        return want;
538,808✔
2342
    }
538,808✔
2343
    Want advance() noexcept override final
2344
    {
369,998✔
2345
        auto& s = *this;
369,998✔
2346
        REALM_ASSERT(!s.is_complete());
369,998✔
2347
        REALM_ASSERT(!s.is_canceled());
369,998✔
2348
        REALM_ASSERT(!s.m_error_code);
369,998✔
2349
        REALM_ASSERT(s.m_curr < s.m_end);
369,998✔
2350
        REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin);
369,998✔
2351
        for (;;) {
370,001✔
2352
            // Read into callers buffer
2353
            char* buffer = s.m_curr;
369,933✔
2354
            std::size_t size = std::size_t(s.m_end - s.m_curr);
369,933✔
2355
            Want want = Want::nothing;
369,933✔
2356
            std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want);
369,933✔
2357
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
369,933✔
2358
            // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
2359
            bool got_nothing = (n == 0);
369,933✔
2360
            if (got_nothing) {
369,933✔
2361
                if (REALM_UNLIKELY(s.m_error_code)) {
649✔
2362
                    s.set_is_complete(true); // Failure
2✔
2363
                    return Want::nothing;
2✔
2364
                }
2✔
2365
                // Got nothing, but want something
2366
                return want;
647✔
2367
            }
649✔
2368
            REALM_ASSERT(!s.m_error_code);
369,284✔
2369
            // Check for completion
2370
            REALM_ASSERT(n <= size);
369,284✔
2371
            s.m_curr += n;
369,284✔
2372
            if (s.m_is_read_some || s.m_curr == s.m_end) {
370,379✔
2373
                s.set_is_complete(true); // Success
370,378✔
2374
                return Want::nothing;
370,378✔
2375
            }
370,378✔
2376
            if (want != Want::nothing)
2,147,483,648✔
2377
                return want;
×
2378
            REALM_ASSERT(n < size);
2,147,483,648✔
2379
        }
2,147,483,648✔
2380
    }
369,998✔
2381

2382
protected:
2383
    const bool m_is_read_some;
2384
    char* const m_begin;    // May be dangling after cancellation
2385
    char* const m_end;      // May be dangling after cancellation
2386
    char* m_curr = m_begin; // May be dangling after cancellation
2387
};
2388

2389
template <class S>
2390
class Service::BasicStreamOps<S>::WriteOperBase : public StreamOper {
2391
public:
2392
    WriteOperBase(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end) noexcept
2393
        : StreamOper{size, stream}
602,960✔
2394
        , m_is_write_some{is_write_some}
602,960✔
2395
        , m_begin{begin}
602,960✔
2396
        , m_end{end}
602,960✔
2397
    {
1,256,804✔
2398
    }
1,256,804✔
2399
    Want initiate()
2400
    {
1,255,924✔
2401
        auto& s = *this;
1,255,924✔
2402
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get());
1,255,924✔
2403
        REALM_ASSERT(!s.is_complete());
1,255,924✔
2404
        REALM_ASSERT(s.m_curr <= s.m_end);
1,255,924✔
2405
        Want want = Want::nothing;
1,255,924✔
2406
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
1,255,924✔
2407
            s.set_is_complete(true); // Success
44,216✔
2408
        }
44,216✔
2409
        else {
1,211,708✔
2410
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
1,211,708✔
2411
            s.m_stream->do_init_write_async(s.m_error_code, want);
1,211,708✔
2412
            if (want == Want::nothing) {
1,211,708✔
2413
                if (REALM_UNLIKELY(s.m_error_code)) {
307,356✔
2414
                    s.set_is_complete(true); // Failure
×
2415
                }
×
2416
                else {
307,356✔
2417
                    want = advance();
307,356✔
2418
                }
307,356✔
2419
            }
307,356✔
2420
        }
1,211,708✔
2421
        return want;
1,255,924✔
2422
    }
1,255,924✔
2423
    Want advance() noexcept override final
2424
    {
882,690✔
2425
        auto& s = *this;
882,690✔
2426
        REALM_ASSERT(!s.is_complete());
882,690✔
2427
        REALM_ASSERT(!s.is_canceled());
882,690✔
2428
        REALM_ASSERT(!s.m_error_code);
882,690✔
2429
        REALM_ASSERT(s.m_curr < s.m_end);
882,690✔
2430
        REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin);
882,690✔
2431
        for (;;) {
882,690✔
2432
            // Write from callers buffer
2433
            const char* data = s.m_curr;
882,646✔
2434
            std::size_t size = std::size_t(s.m_end - s.m_curr);
882,646✔
2435
            Want want = Want::nothing;
882,646✔
2436
            std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want);
882,646✔
2437
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
882,646✔
2438
            bool wrote_nothing = (n == 0);
882,646✔
2439
            if (wrote_nothing) {
882,646✔
2440
                if (REALM_UNLIKELY(s.m_error_code)) {
7,808✔
2441
                    s.set_is_complete(true); // Failure
42✔
2442
                    return Want::nothing;
42✔
2443
                }
42✔
2444
                // Wrote nothing, but want something written
2445
                return want;
7,766✔
2446
            }
7,808✔
2447
            REALM_ASSERT(!s.m_error_code);
874,838✔
2448
            // Check for completion
2449
            REALM_ASSERT(n <= size);
874,838✔
2450
            s.m_curr += n;
874,838✔
2451
            if (s.m_is_write_some || s.m_curr == s.m_end) {
875,814✔
2452
                s.set_is_complete(true); // Success
871,588✔
2453
                return Want::nothing;
871,588✔
2454
            }
871,588✔
2455
            if (want != Want::nothing)
2,147,487,873✔
2456
                return want;
4,528✔
2457
            REALM_ASSERT(n < size);
4,294,967,294✔
2458
        }
4,294,967,294✔
2459
    }
882,690✔
2460

2461
protected:
2462
    const bool m_is_write_some;
2463
    const char* const m_begin;    // May be dangling after cancellation
2464
    const char* const m_end;      // May be dangling after cancellation
2465
    const char* m_curr = m_begin; // May be dangling after cancellation
2466
};
2467

2468
template <class S>
2469
class Service::BasicStreamOps<S>::BufferedReadOperBase : public StreamOper {
2470
public:
2471
    BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim,
2472
                         ReadAheadBuffer& rab) noexcept
2473
        : StreamOper{size, stream}
449,888✔
2474
        , m_read_ahead_buffer{rab}
449,888✔
2475
        , m_begin{begin}
449,888✔
2476
        , m_end{end}
449,888✔
2477
        , m_delim{delim}
449,888✔
2478
    {
917,262✔
2479
    }
917,262✔
2480
    Want initiate()
2481
    {
917,006✔
2482
        auto& s = *this;
917,006✔
2483
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
917,006✔
2484
        REALM_ASSERT(!s.is_complete());
917,006✔
2485
        Want want = Want::nothing;
917,006✔
2486
        bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
917,006✔
2487
        if (complete) {
917,006✔
2488
            s.set_is_complete(true); // Success or failure
394,242✔
2489
        }
394,242✔
2490
        else {
522,764✔
2491
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
522,764✔
2492
            s.m_stream->do_init_read_async(s.m_error_code, want);
522,764✔
2493
            if (want == Want::nothing) {
522,764✔
2494
                if (REALM_UNLIKELY(s.m_error_code)) {
186✔
2495
                    s.set_is_complete(true); // Failure
×
2496
                }
×
2497
                else {
186✔
2498
                    want = advance();
186✔
2499
                }
186✔
2500
            }
186✔
2501
        }
522,764✔
2502
        return want;
917,006✔
2503
    }
917,006✔
2504
    Want advance() noexcept override final
2505
    {
896,444✔
2506
        auto& s = *this;
896,444✔
2507
        REALM_ASSERT(!s.is_complete());
896,444✔
2508
        REALM_ASSERT(!s.is_canceled());
896,444✔
2509
        REALM_ASSERT(!s.m_error_code);
896,444✔
2510
        REALM_ASSERT(s.m_read_ahead_buffer.empty());
896,444✔
2511
        REALM_ASSERT(s.m_curr < s.m_end);
896,444✔
2512
        for (;;) {
896,444✔
2513
            // Fill read-ahead buffer from stream (is empty now)
2514
            Want want = Want::nothing;
896,304✔
2515
            bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want);
896,304✔
2516
            REALM_ASSERT(nonempty || s.m_error_code || want != Want::nothing); // No busy loop, please
896,304✔
2517
            bool got_nothing = !nonempty;
896,304✔
2518
            if (got_nothing) {
896,304✔
2519
                if (REALM_UNLIKELY(s.m_error_code)) {
58,000✔
2520
                    s.set_is_complete(true); // Failure
1,226✔
2521
                    return Want::nothing;
1,226✔
2522
                }
1,226✔
2523
                // Got nothing, but want something
2524
                return want;
56,774✔
2525
            }
58,000✔
2526
            // Transfer buffered data to callers buffer
2527
            bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
838,304✔
2528
            if (complete || s.m_error_code == util::MiscExtErrors::end_of_input) {
838,304✔
2529
                s.set_is_complete(true); // Success or failure (delim_not_found or end_of_input)
514,582✔
2530
                return Want::nothing;
514,582✔
2531
            }
514,582✔
2532
            if (want != Want::nothing)
323,722✔
2533
                return want;
324,840✔
2534
        }
323,722✔
2535
    }
896,444✔
2536

2537
protected:
2538
    ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation
2539
    char* const m_begin;                  // May be dangling after cancellation
2540
    char* const m_end;                    // May be dangling after cancellation
2541
    char* m_curr = m_begin;               // May be dangling after cancellation
2542
    const int m_delim;
2543
};
2544

2545
template <class S>
2546
template <class H>
2547
class Service::BasicStreamOps<S>::ReadOper : public ReadOperBase {
2548
public:
2549
    ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H&& handler)
2550
        : ReadOperBase{size, stream, is_read_some, begin, end}
257,884✔
2551
        , m_handler{std::move(handler)}
257,884✔
2552
    {
539,314✔
2553
    }
539,314✔
2554
    void recycle_and_execute() override final
2555
    {
538,514✔
2556
        auto& s = *this;
538,514✔
2557
        REALM_ASSERT(s.is_complete() || s.is_canceled());
538,514✔
2558
        REALM_ASSERT(s.is_complete() ==
538,514!
2559
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_read_some && s.m_curr != s.m_begin)));
538,514✔
2560
        REALM_ASSERT(s.m_curr >= s.m_begin);
538,514✔
2561
        bool orphaned = !s.m_stream;
538,514✔
2562
        std::error_code ec = s.m_error_code;
538,514✔
2563
        if (s.is_canceled())
538,514✔
2564
            ec = util::error::operation_aborted;
169,265✔
2565
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
538,514✔
2566
        // Note: do_recycle_and_execute() commits suicide.
2567
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
538,514✔
2568
                                             num_bytes_transferred); // Throws
538,514✔
2569
    }
538,514✔
2570

2571
private:
2572
    H m_handler;
2573
};
2574

2575
template <class S>
2576
template <class H>
2577
class Service::BasicStreamOps<S>::WriteOper : public WriteOperBase {
2578
public:
2579
    WriteOper(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end, H&& handler)
2580
        : WriteOperBase{size, stream, is_write_some, begin, end}
341,710✔
2581
        , m_handler{std::move(handler)}
341,710✔
2582
    {
711,839✔
2583
    }
711,839✔
2584
    void recycle_and_execute() override final
2585
    {
710,990✔
2586
        auto& s = *this;
710,990✔
2587
        REALM_ASSERT(s.is_complete() || s.is_canceled());
710,990!
2588
        REALM_ASSERT(s.is_complete() ==
710,990!
2589
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_write_some && s.m_curr != s.m_begin)));
710,990✔
2590
        REALM_ASSERT(s.m_curr >= s.m_begin);
710,990✔
2591
        bool orphaned = !s.m_stream;
710,990✔
2592
        std::error_code ec = s.m_error_code;
710,990✔
2593
        if (s.is_canceled())
710,990✔
2594
            ec = util::error::operation_aborted;
171,562✔
2595
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
710,990✔
2596
        // Note: do_recycle_and_execute() commits suicide.
2597
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
710,990✔
2598
                                             num_bytes_transferred); // Throws
710,990✔
2599
    }
710,990✔
2600

2601
private:
2602
    H m_handler;
2603
};
2604

2605
template <class S>
2606
template <class H>
2607
class Service::BasicStreamOps<S>::BufferedReadOper : public BufferedReadOperBase {
2608
public:
2609
    BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim, ReadAheadBuffer& rab,
2610
                     H&& handler)
2611
        : BufferedReadOperBase{size, stream, begin, end, delim, rab}
330,658✔
2612
        , m_handler{std::move(handler)}
330,658✔
2613
    {
679,517✔
2614
    }
679,517✔
2615
    void recycle_and_execute() override final
2616
    {
678,241✔
2617
        auto& s = *this;
678,241✔
2618
        REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code));
678,241!
2619
        REALM_ASSERT(s.is_canceled() || s.m_error_code ||
678,241!
2620
                     (s.m_delim != std::char_traits<char>::eof()
678,241✔
2621
                          ? s.m_curr > s.m_begin && s.m_curr[-1] == std::char_traits<char>::to_char_type(s.m_delim)
678,241✔
2622
                          : s.m_curr == s.m_end));
678,241✔
2623
        REALM_ASSERT(s.m_curr >= s.m_begin);
678,241!
2624
        bool orphaned = !s.m_stream;
678,241✔
2625
        std::error_code ec = s.m_error_code;
678,241✔
2626
        if (s.is_canceled())
678,241✔
2627
            ec = util::error::operation_aborted;
4,387✔
2628
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
678,241✔
2629
        // Note: do_recycle_and_execute() commits suicide.
2630
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
678,241✔
2631
                                             num_bytes_transferred); // Throws
678,241✔
2632
    }
678,241✔
2633

2634
private:
2635
    H m_handler;
2636
};
2637

2638
template <class H>
2639
inline void Service::post(H handler)
2640
{
786,628✔
2641
    do_post(&Service::post_oper_constr<H>, sizeof(PostOper<H>), &handler);
786,628✔
2642
}
786,628✔
2643

2644
inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept
2645
{
813,466✔
2646
    if (op->in_use()) {
813,466✔
2647
        op->orphan();
20,988✔
2648
    }
20,988✔
2649
    else {
792,478✔
2650
        void* addr = op;
792,478✔
2651
        op->~AsyncOper();
792,478✔
2652
        delete[] static_cast<char*>(addr);
792,478✔
2653
    }
792,478✔
2654
}
813,466✔
2655

2656
inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept
2657
{
3,626✔
2658
    op->recycle(); // Suicide
3,626✔
2659
}
3,626✔
2660

2661
template <class Oper, class... Args>
2662
std::unique_ptr<Oper, Service::LendersOperDeleter> Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args)
2663
{
2,177,390✔
2664
    void* addr = owners_ptr.get();
2,177,390✔
2665
    std::size_t size;
2,177,390✔
2666
    if (REALM_LIKELY(addr)) {
2,177,390!
2667
        REALM_ASSERT(!owners_ptr->in_use());
2,140,719!
2668
        size = owners_ptr->m_size;
2,140,719✔
2669
        // We can use static dispatch in the destructor call here, since an
2670
        // object, that is not in use, is always an instance of UnusedOper.
2671
        REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get()));
2,140,719!
2672
        static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper();
2,140,719✔
2673
        if (REALM_UNLIKELY(size < sizeof(Oper))) {
2,140,719!
2674
            owners_ptr.release();
3,664✔
2675
            delete[] static_cast<char*>(addr);
3,664✔
2676
            goto no_object;
3,664✔
2677
        }
3,664✔
2678
    }
2,140,719✔
2679
    else {
36,671✔
2680
    no_object:
39,958✔
2681
        addr = new char[sizeof(Oper)]; // Throws
39,958✔
2682
        size = sizeof(Oper);
39,958✔
2683
        owners_ptr.reset(static_cast<AsyncOper*>(addr));
39,958✔
2684
    }
39,958✔
2685
    std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr;
2,177,013✔
2686
    try {
2,177,013✔
2687
        lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws
2,177,013✔
2688
    }
2,177,013✔
2689
    catch (...) {
2,177,013✔
2690
        new (addr) UnusedOper(size); // Does not throw
×
2691
        throw;
×
2692
    }
×
2693
    return lenders_ptr;
2,176,835✔
2694
}
2,177,013✔
2695

2696
template <class H>
2697
inline Service::PostOperBase* Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie)
2698
{
786,736✔
2699
    H& handler = *static_cast<H*>(cookie);
786,736✔
2700
    return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws
786,736✔
2701
}
786,736✔
2702

2703
inline bool Service::AsyncOper::in_use() const noexcept
2704
{
10,015,278✔
2705
    return m_in_use;
10,015,278✔
2706
}
10,015,278✔
2707

2708
inline bool Service::AsyncOper::is_complete() const noexcept
2709
{
16,470,656✔
2710
    return m_complete;
16,470,656✔
2711
}
16,470,656✔
2712

2713
inline void Service::AsyncOper::cancel() noexcept
2714
{
702,830✔
2715
    REALM_ASSERT(m_in_use);
702,830✔
2716
    REALM_ASSERT(!m_canceled);
702,830✔
2717
    m_canceled = true;
702,830✔
2718
}
702,830✔
2719

2720
inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept
2721
    : m_size{size}
4,630,696✔
2722
    , m_in_use{is_in_use}
4,630,696✔
2723
{
9,749,400✔
2724
}
9,749,400✔
2725

2726
inline bool Service::AsyncOper::is_canceled() const noexcept
2727
{
10,335,238✔
2728
    return m_canceled;
10,335,238✔
2729
}
10,335,238✔
2730

2731
inline void Service::AsyncOper::set_is_complete(bool value) noexcept
2732
{
3,009,932✔
2733
    REALM_ASSERT(!m_complete);
3,009,932✔
2734
    REALM_ASSERT(!value || m_in_use);
3,009,932✔
2735
    m_complete = value;
3,009,932✔
2736
}
3,009,932✔
2737

2738
template <class H, class... Args>
2739
inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args)
2740
{
2,174,711✔
2741
    // Recycle the operation object before the handler is exceuted, such that
2742
    // the memory is available for a new post operation that might be initiated
2743
    // during the execution of the handler.
2744
    bool was_recycled = false;
2,174,711✔
2745

2746
    // ScopeExit to ensure the AsyncOper object was reclaimed/deleted
2747
    auto at_exit = util::ScopeExit([this, &was_recycled, &orphaned]() noexcept {
2,175,065✔
2748
        if (!was_recycled) {
2,173,542✔
2749
            do_recycle(orphaned);
×
2750
        }
×
2751
    });
2,173,542✔
2752

2753
    // We need to copy or move all arguments to be passed to the handler,
2754
    // such that there is no risk of references to the recycled operation
2755
    // object being passed to the handler (the passed arguments may be
2756
    // references to members of the recycled operation object). The easiest
2757
    // way to achive this, is by forwarding the reference arguments (passed
2758
    // to this function) to a helper function whose arguments have
2759
    // nonreference type (`Args...` rather than `Args&&...`).
2760
    //
2761
    // Note that the copying and moving of arguments may throw, and it is
2762
    // important that the operation is still recycled even if that
2763
    // happens. For that reason, copying and moving of arguments must not
2764
    // happen until we are in a scope (this scope) that catches and deals
2765
    // correctly with such exceptions.
2766
    do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler),
2,174,711✔
2767
                                  std::forward<Args>(args)...); // Throws
2,174,711✔
2768

2769
    // Removed catch to prevent truncating the stack trace on exception
2770
}
2,174,711✔
2771

2772
template <class H, class... Args>
2773
inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler,
2774
                                                              Args... args)
2775
{
2,175,907✔
2776
    do_recycle(orphaned);
2,175,907✔
2777
    was_recycled = true;
2,175,907✔
2778
    handler(std::move(args)...); // Throws
2,175,907✔
2779
}
2,175,907✔
2780

2781
inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept
2782
{
3,715,914✔
2783
    REALM_ASSERT(in_use());
3,715,914✔
2784
    void* addr = this;
3,715,914✔
2785
    std::size_t size = m_size;
3,715,914✔
2786
    this->~AsyncOper(); // Suicide
3,715,914✔
2787
    if (orphaned) {
3,715,914✔
2788
        delete[] static_cast<char*>(addr);
20,988✔
2789
    }
20,988✔
2790
    else {
3,694,926✔
2791
        new (addr) UnusedOper(size);
3,694,926✔
2792
    }
3,694,926✔
2793
}
3,715,914✔
2794

2795
// ---------------- Resolver ----------------
2796

2797
template <class H>
2798
class Resolver::ResolveOper : public Service::ResolveOperBase {
2799
public:
2800
    ResolveOper(std::size_t size, Resolver& r, Query q, H&& handler)
2801
        : ResolveOperBase{size, r, std::move(q)}
1,672✔
2802
        , m_handler{std::move(handler)}
1,672✔
2803
    {
3,638✔
2804
    }
3,638✔
2805
    void recycle_and_execute() override final
2806
    {
3,634✔
2807
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,634!
2808
        REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty());
3,634!
2809
        bool orphaned = !m_resolver;
3,634✔
2810
        std::error_code ec = m_error_code;
3,634✔
2811
        if (is_canceled())
3,634✔
2812
            ec = util::error::operation_aborted;
2✔
2813
        // Note: do_recycle_and_execute() commits suicide.
2814
        do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws
3,634✔
2815
    }
3,634✔
2816

2817
private:
2818
    H m_handler;
2819
};
2820

2821
inline Resolver::Resolver(Service& service)
2822
    : m_service_impl{*service.m_impl}
2,230✔
2823
{
4,856✔
2824
}
4,856✔
2825

2826
inline Resolver::~Resolver() noexcept
2827
{
4,856✔
2828
    cancel();
4,856✔
2829
}
4,856✔
2830

2831
inline Endpoint::List Resolver::resolve(const Query& q)
2832
{
1,212✔
2833
    std::error_code ec;
1,212✔
2834
    Endpoint::List list = resolve(q, ec);
1,212✔
2835
    if (REALM_UNLIKELY(ec))
1,212✔
2836
        throw std::system_error(ec);
×
2837
    return list;
1,212✔
2838
}
1,212✔
2839

2840
template <class H>
2841
void Resolver::async_resolve(Query query, H&& handler)
2842
{
3,640✔
2843
    Service::LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this, std::move(query),
3,640✔
2844
                                                                       std::move(handler)); // Throws
3,640✔
2845
    initiate_oper(std::move(op));                                                           // Throws
3,640✔
2846
}
3,640✔
2847

2848
inline Resolver::Query::Query(std::string service_port, int init_flags)
2849
    : m_flags{init_flags}
2850
    , m_service{service_port}
2851
{
2852
}
2853

2854
inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port, int init_flags)
2855
    : m_flags{init_flags}
2856
    , m_protocol{prot}
2857
    , m_service{service_port}
2858
{
2859
}
2860

2861
inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags)
2862
    : m_flags{init_flags}
2,230✔
2863
    , m_host{host_name}
2,230✔
2864
    , m_service{service_port}
2,230✔
2865
{
4,856✔
2866
}
4,856✔
2867

2868
inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name, std::string service_port,
2869
                              int init_flags)
2870
    : m_flags{init_flags}
2871
    , m_protocol{prot}
2872
    , m_host{host_name}
2873
    , m_service{service_port}
2874
{
2875
}
2876

2877
inline Resolver::Query::~Query() noexcept {}
19,426✔
2878

2879
inline int Resolver::Query::flags() const
2880
{
×
2881
    return m_flags;
×
2882
}
×
2883

2884
inline StreamProtocol Resolver::Query::protocol() const
2885
{
×
2886
    return m_protocol;
×
2887
}
×
2888

2889
inline std::string Resolver::Query::host() const
2890
{
×
2891
    return m_host;
×
2892
}
×
2893

2894
inline std::string Resolver::Query::service() const
2895
{
×
2896
    return m_service;
×
2897
}
×
2898

2899
// ---------------- SocketBase ----------------
2900

2901
inline SocketBase::SocketBase(Service& service)
2902
    : m_desc{*service.m_impl}
4,588✔
2903
{
9,392✔
2904
}
9,392✔
2905

2906
inline SocketBase::~SocketBase() noexcept
2907
{
9,392✔
2908
    close();
9,392✔
2909
}
9,392✔
2910

2911
inline bool SocketBase::is_open() const noexcept
2912
{
30,804✔
2913
    return m_desc.is_open();
30,804✔
2914
}
30,804✔
2915

2916
inline auto SocketBase::native_handle() const noexcept -> native_handle_type
2917
{
×
2918
    return m_desc.native_handle();
×
2919
}
×
2920

2921
inline void SocketBase::open(const StreamProtocol& prot)
2922
{
264✔
2923
    std::error_code ec;
264✔
2924
    if (open(prot, ec))
264✔
2925
        throw std::system_error(ec);
×
2926
}
264✔
2927

2928
inline void SocketBase::close() noexcept
2929
{
9,468✔
2930
    if (!is_open())
9,468✔
2931
        return;
1,308✔
2932
    cancel();
8,160✔
2933
    m_desc.close();
8,160✔
2934
}
8,160✔
2935

2936
template <class O>
2937
inline void SocketBase::get_option(O& opt) const
2938
{
4✔
2939
    std::error_code ec;
4✔
2940
    if (get_option(opt, ec))
4✔
2941
        throw std::system_error(ec);
×
2942
}
4✔
2943

2944
template <class O>
2945
inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const
2946
{
4✔
2947
    opt.get(*this, ec);
4✔
2948
    return ec;
4✔
2949
}
4✔
2950

2951
template <class O>
2952
inline void SocketBase::set_option(const O& opt)
2953
{
1,930✔
2954
    std::error_code ec;
1,930✔
2955
    if (set_option(opt, ec))
1,930✔
2956
        throw std::system_error(ec);
×
2957
}
1,930✔
2958

2959
template <class O>
2960
inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec)
2961
{
3,132✔
2962
    opt.set(*this, ec);
3,132✔
2963
    return ec;
3,132✔
2964
}
3,132✔
2965

2966
inline void SocketBase::bind(const Endpoint& ep)
2967
{
248✔
2968
    std::error_code ec;
248✔
2969
    if (bind(ep, ec))
248✔
2970
        throw std::system_error(ec);
×
2971
}
248✔
2972

2973
inline Endpoint SocketBase::local_endpoint() const
2974
{
6,344✔
2975
    std::error_code ec;
6,344✔
2976
    Endpoint ep = local_endpoint(ec);
6,344✔
2977
    if (ec)
6,344✔
2978
        throw std::system_error(ec);
×
2979
    return ep;
6,344✔
2980
}
6,344✔
2981

2982
inline auto SocketBase::release_native_handle() noexcept -> native_handle_type
2983
{
×
2984
    if (is_open()) {
×
2985
        cancel();
×
2986
        return m_desc.release();
×
2987
    }
×
2988
    return m_desc.native_handle();
×
2989
}
×
2990

2991
inline const StreamProtocol& SocketBase::get_protocol() const noexcept
2992
{
×
2993
    return m_protocol;
×
2994
}
×
2995

2996
template <class T, int opt, class U>
2997
inline SocketBase::Option<T, opt, U>::Option(T init_value)
2998
    : m_value{init_value}
1,444✔
2999
{
3,136✔
3000
}
3,136✔
3001

3002
template <class T, int opt, class U>
3003
inline T SocketBase::Option<T, opt, U>::value() const
3004
{
4✔
3005
    return m_value;
4✔
3006
}
4✔
3007

3008
template <class T, int opt, class U>
3009
inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec)
3010
{
4✔
3011
    union {
4✔
3012
        U value;
4✔
3013
        char strut[sizeof(U) + 1];
4✔
3014
    };
4✔
3015
    std::size_t value_size = sizeof strut;
4✔
3016
    sock.get_option(opt_enum(opt), &value, value_size, ec);
4✔
3017
    if (!ec) {
4✔
3018
        REALM_ASSERT(value_size == sizeof value);
4✔
3019
        m_value = T(value);
4✔
3020
    }
4✔
3021
}
4✔
3022

3023
template <class T, int opt, class U>
3024
inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const
3025
{
3,132✔
3026
    U value_to_set = U(m_value);
3,132✔
3027
    sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec);
3,132✔
3028
}
3,132✔
3029

3030
// ---------------- Socket ----------------
3031

3032
class Socket::ConnectOperBase : public Service::IoOper {
3033
public:
3034
    ConnectOperBase(std::size_t size, Socket& sock) noexcept
3035
        : IoOper{size}
1,774✔
3036
        , m_socket{&sock}
1,774✔
3037
    {
3,838✔
3038
    }
3,838✔
3039
    Want initiate(const Endpoint& ep)
3040
    {
3,838✔
3041
        REALM_ASSERT(this == m_socket->m_write_oper.get());
3,838✔
3042
        if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws
3,838✔
3043
            set_is_complete(true);                                // Failure, or immediate completion
2✔
3044
            return Want::nothing;
2✔
3045
        }
2✔
3046
        return Want::write;
3,836✔
3047
    }
3,838✔
3048
    Want advance() noexcept override final
3049
    {
3,830✔
3050
        REALM_ASSERT(!is_complete());
3,830✔
3051
        REALM_ASSERT(!is_canceled());
3,830✔
3052
        REALM_ASSERT(!m_error_code);
3,830✔
3053
        m_socket->finalize_async_connect(m_error_code);
3,830✔
3054
        set_is_complete(true);
3,830✔
3055
        return Want::nothing;
3,830✔
3056
    }
3,830✔
3057
    void recycle() noexcept override final
3058
    {
×
3059
        bool orphaned = !m_socket;
×
3060
        REALM_ASSERT(orphaned);
×
3061
        // Note: do_recycle() commits suicide.
3062
        do_recycle(orphaned);
×
3063
    }
×
3064
    void orphan() noexcept override final
3065
    {
2✔
3066
        m_socket = nullptr;
2✔
3067
    }
2✔
3068
    Service::Descriptor& descriptor() noexcept override final
3069
    {
×
3070
        return m_socket->m_desc;
×
3071
    }
×
3072

3073
protected:
3074
    Socket* m_socket;
3075
    std::error_code m_error_code;
3076
};
3077

3078
template <class H>
3079
class Socket::ConnectOper : public ConnectOperBase {
3080
public:
3081
    ConnectOper(std::size_t size, Socket& sock, H&& handler)
3082
        : ConnectOperBase{size, sock}
1,722✔
3083
        , m_handler{std::move(handler)}
1,722✔
3084
    {
3,733✔
3085
    }
3,733✔
3086
    void recycle_and_execute() override final
3087
    {
3,733✔
3088
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,733!
3089
        bool orphaned = !m_socket;
3,733✔
3090
        std::error_code ec = m_error_code;
3,733✔
3091
        if (is_canceled())
3,733✔
3092
            ec = util::error::operation_aborted;
5✔
3093
        // Note: do_recycle_and_execute() commits suicide.
3094
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3,733✔
3095
    }
3,733✔
3096

3097
private:
3098
    H m_handler;
3099
};
3100

3101
inline Socket::Socket(Service& service)
3102
    : SocketBase{service}
3,908✔
3103
{
7,928✔
3104
}
7,928✔
3105

3106
inline Socket::Socket(Service& service, const StreamProtocol& prot, native_handle_type native_socket)
3107
    : SocketBase{service}
1✔
3108
{
2✔
3109
    assign(prot, native_socket); // Throws
2✔
3110
}
2✔
3111

3112
inline Socket::~Socket() noexcept {}
7,932✔
3113

3114
inline void Socket::connect(const Endpoint& ep)
3115
{
20✔
3116
    std::error_code ec;
20✔
3117
    if (connect(ep, ec)) // Throws
20✔
3118
        throw std::system_error(ec);
×
3119
}
20✔
3120

3121
inline std::size_t Socket::read(char* buffer, std::size_t size)
3122
{
×
3123
    std::error_code ec;
×
3124
    read(buffer, size, ec); // Throws
×
3125
    if (ec)
×
3126
        throw std::system_error(ec);
×
3127
    return size;
×
3128
}
×
3129

3130
inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec)
3131
{
×
3132
    return StreamOps::read(*this, buffer, size, ec); // Throws
×
3133
}
×
3134

3135
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab)
3136
{
8✔
3137
    std::error_code ec;
8✔
3138
    read(buffer, size, rab, ec); // Throws
8✔
3139
    if (ec)
8✔
3140
        throw std::system_error(ec);
×
3141
    return size;
8✔
3142
}
8✔
3143

3144
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab, std::error_code& ec)
3145
{
16,402✔
3146
    int delim = std::char_traits<char>::eof();
16,402✔
3147
    return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws
16,402✔
3148
}
16,402✔
3149

3150
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab)
3151
{
8✔
3152
    std::error_code ec;
8✔
3153
    std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws
8✔
3154
    if (ec)
8✔
3155
        throw std::system_error(ec);
×
3156
    return n;
8✔
3157
}
8✔
3158

3159
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab,
3160
                                      std::error_code& ec)
3161
{
8✔
3162
    int delim_2 = std::char_traits<char>::to_int_type(delim);
8✔
3163
    return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws
8✔
3164
}
8✔
3165

3166
inline std::size_t Socket::write(const char* data, std::size_t size)
3167
{
262,121✔
3168
    std::error_code ec;
262,121✔
3169
    write(data, size, ec); // Throws
262,121✔
3170
    if (ec)
262,121✔
3171
        throw std::system_error(ec);
×
3172
    return size;
262,121✔
3173
}
262,121✔
3174

3175
inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec)
3176
{
262,121✔
3177
    return StreamOps::write(*this, data, size, ec); // Throws
262,121✔
3178
}
262,121✔
3179

3180
inline std::size_t Socket::read_some(char* buffer, std::size_t size)
3181
{
4✔
3182
    std::error_code ec;
4✔
3183
    std::size_t n = read_some(buffer, size, ec); // Throws
4✔
3184
    if (ec)
4✔
3185
        throw std::system_error(ec);
4✔
3186
    return n;
×
3187
}
4✔
3188

3189
inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec)
3190
{
8✔
3191
    return StreamOps::read_some(*this, buffer, size, ec); // Throws
8✔
3192
}
8✔
3193

3194
inline std::size_t Socket::write_some(const char* data, std::size_t size)
3195
{
×
3196
    std::error_code ec;
×
3197
    std::size_t n = write_some(data, size, ec); // Throws
×
3198
    if (ec)
×
3199
        throw std::system_error(ec);
×
3200
    return n;
×
3201
}
×
3202

3203
inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec)
3204
{
97✔
3205
    return StreamOps::write_some(*this, data, size, ec); // Throws
97✔
3206
}
97✔
3207

3208
template <class H>
3209
inline void Socket::async_connect(const Endpoint& ep, H&& handler)
3210
{
3,733✔
3211
    LendersConnectOperPtr op = Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws
3,733✔
3212
    m_desc.initiate_oper(std::move(op), ep);                                                            // Throws
3,733✔
3213
}
3,733✔
3214

3215
template <class H>
3216
inline void Socket::async_read(char* buffer, std::size_t size, H&& handler)
3217
{
×
3218
    bool is_read_some = false;
×
3219
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
×
3220
}
×
3221

3222
template <class H>
3223
inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H&& handler)
3224
{
627,482✔
3225
    int delim = std::char_traits<char>::eof();
627,482✔
3226
    StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws
627,482✔
3227
}
627,482✔
3228

3229
template <class H>
3230
inline void Socket::async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab, H&& handler)
3231
{
51,535✔
3232
    int delim_2 = std::char_traits<char>::to_int_type(delim);
51,535✔
3233
    StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws
51,535✔
3234
}
51,535✔
3235

3236
template <class H>
3237
inline void Socket::async_write(const char* data, std::size_t size, H&& handler)
3238
{
254,751✔
3239
    bool is_write_some = false;
254,751✔
3240
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
254,751✔
3241
}
254,751✔
3242

3243
template <class H>
3244
inline void Socket::async_read_some(char* buffer, std::size_t size, H&& handler)
3245
{
303,180✔
3246
    bool is_read_some = true;
303,180✔
3247
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
303,180✔
3248
}
303,180✔
3249

3250
template <class H>
3251
inline void Socket::async_write_some(const char* data, std::size_t size, H&& handler)
3252
{
303,322✔
3253
    bool is_write_some = true;
303,322✔
3254
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
303,322✔
3255
}
303,322✔
3256

3257
inline void Socket::shutdown(shutdown_type what)
3258
{
18✔
3259
    std::error_code ec;
18✔
3260
    if (shutdown(what, ec)) // Throws
18✔
3261
        throw std::system_error(ec);
×
3262
}
18✔
3263

3264
inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket)
3265
{
2✔
3266
    std::error_code ec;
2✔
3267
    if (assign(prot, native_socket, ec)) // Throws
2✔
3268
        throw std::system_error(ec);
×
3269
}
2✔
3270

3271
inline std::error_code Socket::assign(const StreamProtocol& prot, native_handle_type native_socket,
3272
                                      std::error_code& ec)
3273
{
2✔
3274
    return do_assign(prot, native_socket, ec); // Throws
2✔
3275
}
2✔
3276

3277
inline Socket& Socket::lowest_layer() noexcept
3278
{
10,734,426✔
3279
    return *this;
10,734,426✔
3280
}
10,734,426✔
3281

3282
inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept
3283
{
1,126,764✔
3284
    want = Want::read; // Wait for read readiness before proceeding
1,126,764✔
3285
}
1,126,764✔
3286

3287
inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept
3288
{
903,660✔
3289
    want = Want::write; // Wait for write readiness before proceeding
903,660✔
3290
}
903,660✔
3291

3292
inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size, std::error_code& ec) noexcept
3293
{
131,106✔
3294
    return m_desc.read_some(buffer, size, ec);
131,106✔
3295
}
131,106✔
3296

3297
inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size, std::error_code& ec) noexcept
3298
{
262,218✔
3299
    return m_desc.write_some(data, size, ec);
262,218✔
3300
}
262,218✔
3301

3302
inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size, std::error_code& ec,
3303
                                              Want& want) noexcept
3304
{
1,163,426✔
3305
    std::error_code ec_2;
1,163,426✔
3306
    std::size_t n = m_desc.read_some(buffer, size, ec_2);
1,163,426✔
3307
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
1,163,426✔
3308
    if (REALM_UNLIKELY(!success)) {
1,163,426✔
3309
        ec = ec_2;
1,224✔
3310
        want = Want::nothing; // Failure
1,224✔
3311
        return 0;
1,224✔
3312
    }
1,224✔
3313
    ec = std::error_code();
1,162,202✔
3314
    want = Want::read; // Success
1,162,202✔
3315
    return n;
1,162,202✔
3316
}
1,163,426✔
3317

3318
inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size, std::error_code& ec,
3319
                                               Want& want) noexcept
3320
{
572,582✔
3321
    std::error_code ec_2;
572,582✔
3322
    std::size_t n = m_desc.write_some(data, size, ec_2);
572,582✔
3323
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
572,582✔
3324
    if (REALM_UNLIKELY(!success)) {
572,582✔
3325
        ec = ec_2;
42✔
3326
        want = Want::nothing; // Failure
42✔
3327
        return 0;
42✔
3328
    }
42✔
3329
    ec = std::error_code();
572,540✔
3330
    want = Want::write; // Success
572,540✔
3331
    return n;
572,540✔
3332
}
572,582✔
3333

3334
// ---------------- Acceptor ----------------
3335

3336
class Acceptor::AcceptOperBase : public Service::IoOper {
3337
public:
3338
    AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e)
3339
        : IoOper{size}
2,074✔
3340
        , m_acceptor{&a}
2,074✔
3341
        , m_socket{s}
2,074✔
3342
        , m_endpoint{e}
2,074✔
3343
    {
3,970✔
3344
    }
3,970✔
3345
    Want initiate()
3346
    {
3,970✔
3347
        REALM_ASSERT(this == m_acceptor->m_read_oper.get());
3,970✔
3348
        REALM_ASSERT(!is_complete());
3,970✔
3349
        m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws
3,970✔
3350
        return Want::read;
3,970✔
3351
    }
3,970✔
3352
    Want advance() noexcept override final
3353
    {
3,850✔
3354
        REALM_ASSERT(!is_complete());
3,850✔
3355
        REALM_ASSERT(!is_canceled());
3,850✔
3356
        REALM_ASSERT(!m_error_code);
3,850✔
3357
        REALM_ASSERT(!m_socket.is_open());
3,850✔
3358
        Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code);
3,850✔
3359
        if (want == Want::nothing)
3,850✔
3360
            set_is_complete(true); // Success or failure
2,746✔
3361
        return want;
3,850✔
3362
    }
3,850✔
3363
    void recycle() noexcept override final
3364
    {
1,210✔
3365
        bool orphaned = !m_acceptor;
1,210✔
3366
        REALM_ASSERT(orphaned);
1,210✔
3367
        // Note: do_recycle() commits suicide.
3368
        do_recycle(orphaned);
1,210✔
3369
    }
1,210✔
3370
    void orphan() noexcept override final
3371
    {
1,212✔
3372
        m_acceptor = nullptr;
1,212✔
3373
    }
1,212✔
3374
    Service::Descriptor& descriptor() noexcept override final
3375
    {
1,104✔
3376
        return m_acceptor->m_desc;
1,104✔
3377
    }
1,104✔
3378

3379
protected:
3380
    Acceptor* m_acceptor;
3381
    Socket& m_socket;           // May be dangling after cancellation
3382
    Endpoint* const m_endpoint; // May be dangling after cancellation
3383
    std::error_code m_error_code;
3384
};
3385

3386
template <class H>
3387
class Acceptor::AcceptOper : public AcceptOperBase {
3388
public:
3389
    AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H&& handler)
3390
        : AcceptOperBase{size, a, s, e}
1,750✔
3391
        , m_handler{std::move(handler)}
1,750✔
3392
    {
3,589✔
3393
    }
3,589✔
3394
    void recycle_and_execute() override final
3395
    {
2,384✔
3396
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
2,384!
3397
        REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open());
2,384!
3398
        bool orphaned = !m_acceptor;
2,384✔
3399
        std::error_code ec = m_error_code;
2,384✔
3400
        if (is_canceled())
2,384✔
3401
            ec = util::error::operation_aborted;
7✔
3402
        // Note: do_recycle_and_execute() commits suicide.
3403
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
2,384✔
3404
    }
2,384✔
3405

3406
private:
3407
    H m_handler;
3408
};
3409

3410
inline Acceptor::Acceptor(Service& service)
3411
    : SocketBase{service}
678✔
3412
{
1,460✔
3413
}
1,460✔
3414

3415
inline Acceptor::~Acceptor() noexcept {}
1,460✔
3416

3417
inline void Acceptor::listen(int backlog)
3418
{
1,452✔
3419
    std::error_code ec;
1,452✔
3420
    if (listen(backlog, ec)) // Throws
1,452✔
3421
        throw std::system_error(ec);
×
3422
}
1,452✔
3423

3424
inline void Acceptor::accept(Socket& sock)
3425
{
20✔
3426
    std::error_code ec;
20✔
3427
    if (accept(sock, ec)) // Throws
20✔
3428
        throw std::system_error(ec);
×
3429
}
20✔
3430

3431
inline void Acceptor::accept(Socket& sock, Endpoint& ep)
3432
{
2✔
3433
    std::error_code ec;
2✔
3434
    if (accept(sock, ep, ec)) // Throws
2✔
3435
        throw std::system_error(ec);
×
3436
}
2✔
3437

3438
inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec)
3439
{
20✔
3440
    Endpoint* ep = nullptr;
20✔
3441
    return accept(sock, ep, ec); // Throws
20✔
3442
}
20✔
3443

3444
inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec)
3445
{
2✔
3446
    return accept(sock, &ep, ec); // Throws
2✔
3447
}
2✔
3448

3449
template <class H>
3450
inline void Acceptor::async_accept(Socket& sock, H&& handler)
3451
{
379✔
3452
    Endpoint* ep = nullptr;
379✔
3453
    async_accept(sock, ep, std::move(handler)); // Throws
379✔
3454
}
379✔
3455

3456
template <class H>
3457
inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H&& handler)
3458
{
3,210✔
3459
    async_accept(sock, &ep, std::move(handler)); // Throws
3,210✔
3460
}
3,210✔
3461

3462
inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec)
3463
{
22✔
3464
    REALM_ASSERT(!m_read_oper || !m_read_oper->in_use());
22✔
3465
    if (REALM_UNLIKELY(socket.is_open()))
22✔
3466
        throw util::runtime_error("Socket is already open");
×
3467
    m_desc.ensure_blocking_mode(); // Throws
22✔
3468
    m_desc.accept(socket.m_desc, m_protocol, ep, ec);
22✔
3469
    return ec;
22✔
3470
}
22✔
3471

3472
inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep, std::error_code& ec) noexcept
3473
{
3,850✔
3474
    std::error_code ec_2;
3,850✔
3475
    m_desc.accept(socket.m_desc, m_protocol, ep, ec_2);
3,850✔
3476
    if (ec_2 == util::error::resource_unavailable_try_again)
3,850✔
3477
        return Want::read;
1,104✔
3478
    ec = ec_2;
2,746✔
3479
    return Want::nothing;
2,746✔
3480
}
3,850✔
3481

3482
template <class H>
3483
inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H&& handler)
3484
{
3,589✔
3485
    if (REALM_UNLIKELY(sock.is_open()))
3,589✔
3486
        throw util::runtime_error("Socket is already open");
×
3487
    LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep,
3,589✔
3488
                                                            std::move(handler)); // Throws
3,589✔
3489
    m_desc.initiate_oper(std::move(op));                                         // Throws
3,589✔
3490
}
3,589✔
3491

3492
// ---------------- DeadlineTimer ----------------
3493

3494
template <class H>
3495
class DeadlineTimer::WaitOper : public Service::WaitOperBase {
3496
public:
3497
    WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H&& handler)
3498
        : Service::WaitOperBase{size, timer, expiration_time}
93,400✔
3499
        , m_handler{std::move(handler)}
93,397✔
3500
    {
235,935✔
3501
    }
235,935✔
3502
    void recycle_and_execute() override final
3503
    {
233,928✔
3504
        bool orphaned = !m_timer;
233,928✔
3505
        Status status = Status::OK();
233,928✔
3506
        if (is_canceled())
233,928✔
3507
            status = Status{ErrorCodes::OperationAborted, "Timer canceled"};
13,632✔
3508
        // Note: do_recycle_and_execute() commits suicide.
3509
        do_recycle_and_execute<H>(orphaned, m_handler, status); // Throws
233,928✔
3510
    }
233,928✔
3511

3512
private:
3513
    H m_handler;
3514
};
3515

3516
inline DeadlineTimer::DeadlineTimer(Service& service)
3517
    : m_service_impl{*service.m_impl}
8,742✔
3518
{
19,056✔
3519
}
19,056✔
3520

3521
inline DeadlineTimer::~DeadlineTimer() noexcept
3522
{
19,052✔
3523
    cancel();
19,052✔
3524
}
19,052✔
3525

3526
template <class R, class P, class H>
3527
inline void DeadlineTimer::async_wait(std::chrono::duration<R, P> delay, H&& handler)
3528
{
235,841✔
3529
    clock::time_point now = clock::now();
235,841✔
3530
    // FIXME: This method of detecting overflow does not work. Comparison
3531
    // between distinct duration types is not overflow safe. Overflow easily
3532
    // happens in the implied conversion of arguments to the common duration
3533
    // type (std::common_type<>).
3534
    auto max_add = clock::time_point::max() - now;
235,841✔
3535
    if (delay > max_add)
235,841!
3536
        throw util::overflow_error("Expiration time overflow");
×
3537
    clock::time_point expiration_time = now + delay;
235,841✔
3538
    initiate_oper(Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time,
235,841✔
3539
                                              std::move(handler))); // Throws
235,841✔
3540
}
235,841✔
3541

3542
// ---------------- ReadAheadBuffer ----------------
3543

3544
inline ReadAheadBuffer::ReadAheadBuffer()
3545
    : m_buffer{new char[s_size]} // Throws
3,720✔
3546
{
7,556✔
3547
}
7,556✔
3548

3549
inline void ReadAheadBuffer::clear() noexcept
3550
{
×
3551
    m_begin = nullptr;
×
3552
    m_end = nullptr;
×
3553
}
×
3554

3555
inline bool ReadAheadBuffer::empty() const noexcept
3556
{
896,306✔
3557
    return (m_begin == m_end);
896,306✔
3558
}
896,306✔
3559

3560
template <class S>
3561
inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept
3562
{
131,098✔
3563
    char* buffer = m_buffer.get();
131,098✔
3564
    std::size_t size = s_size;
131,098✔
3565
    static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), "");
131,098✔
3566
    std::size_t n = stream.do_read_some_sync(buffer, size, ec);
131,098✔
3567
    if (REALM_UNLIKELY(n == 0))
131,098✔
3568
        return;
8✔
3569
    REALM_ASSERT(!ec);
131,090✔
3570
    REALM_ASSERT(n <= size);
131,090✔
3571
    m_begin = m_buffer.get();
131,090✔
3572
    m_end = m_begin + n;
131,090✔
3573
}
131,090✔
3574

3575
template <class S>
3576
inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept
3577
{
896,280✔
3578
    char* buffer = m_buffer.get();
896,280✔
3579
    std::size_t size = s_size;
896,280✔
3580
    static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), "");
896,280✔
3581
    std::size_t n = stream.do_read_some_async(buffer, size, ec, want);
896,280✔
3582
    // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
3583
    if (n == 0)
896,280✔
3584
        return false;
58,002✔
3585
    REALM_ASSERT(!ec || ec == util::MiscExtErrors::end_of_input);
838,278✔
3586
    REALM_ASSERT(n <= size);
838,278✔
3587
    m_begin = m_buffer.get();
838,278✔
3588
    m_end = m_begin + n;
838,278✔
3589
    return true;
838,278✔
3590
}
896,280✔
3591

3592
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc