• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_278228

04 Oct 2023 10:15PM UTC coverage: 91.582% (+0.007%) from 91.575%
github_pull_request_278228

Pull #7029

Evergreen

tgoyne
Use UNITTEST_LOG_LEVEL in objectstore tests

For historical reasons core and sync tests use the UNITTEST_LOG_LEVEL
environment variable to determine the test log level, while object store tests
used a build time setting. This brings them into alignment on using the env
variable, and applies it via setting the default log level on startup in a
single place.
Pull Request #7029: Use UNITTEST_LOG_LEVEL in objectstore tests

94218 of 173442 branches covered (0.0%)

46 of 54 new or added lines in 5 files covered. (85.19%)

51 existing lines in 12 files now uncovered.

230351 of 251523 relevant lines covered (91.58%)

6704577.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.41
/src/realm/sync/network/network.hpp
1
#pragma once
2

3
#include <cstddef>
4
#include <memory>
5
#include <chrono>
6
#include <string>
7
#include <system_error>
8
#include <ostream>
9

10
#include <sys/types.h>
11

12
#ifdef _WIN32
13
#include <winsock2.h>
14
#include <ws2tcpip.h>
15
#include <stdio.h>
16
#include <Ws2def.h>
17
#pragma comment(lib, "Ws2_32.lib")
18
#else
19
#include <sys/socket.h>
20
#include <arpa/inet.h>
21
#include <netdb.h>
22
#endif
23

24
#include <realm/status.hpp>
25
#include <realm/util/features.h>
26
#include <realm/util/assert.hpp>
27
#include <realm/util/backtrace.hpp>
28
#include <realm/util/basic_system_errors.hpp>
29
#include <realm/util/bind_ptr.hpp>
30
#include <realm/util/buffer.hpp>
31
#include <realm/util/misc_ext_errors.hpp>
32
#include <realm/util/scope_exit.hpp>
33

34
// Linux epoll
35
#if defined(REALM_USE_EPOLL) && !REALM_ANDROID
36
#define REALM_NETWORK_USE_EPOLL 1
37
#else
38
#define REALM_NETWORK_USE_EPOLL 0
39
#endif
40

41
// FreeBSD Kqueue.
42
//
43
// Available on Mac OS X, FreeBSD, NetBSD, OpenBSD
44
#if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
45
#if !defined(REALM_HAVE_KQUEUE)
46
#if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE)
47
#define REALM_HAVE_KQUEUE 1
48
#endif
49
#endif
50
#endif
51
#if !defined(REALM_HAVE_KQUEUE)
52
#define REALM_HAVE_KQUEUE 0
53
#endif
54

55

56
// FIXME: Unfinished business around `Address::m_ip_v6_scope_id`.
57

58
namespace realm::sync::network {
59

60
/// \brief TCP/IP networking API.
61
///
62
/// The design of this networking API is heavily inspired by the ASIO C++
63
/// library (http://think-async.com).
64
///
65
///
66
/// ### Thread safety
67
///
68
/// A *service context* is a set of objects consisting of an instance of
69
/// Service, and all the objects that are associated with that instance (\ref
70
/// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, and
71
/// \ref ssl::Stream).
72
///
73
/// In general, it is unsafe for two threads to call functions on the same
74
/// object, or on different objects in the same service context. This also
75
/// applies to destructors. Notable exceptions are the fully thread-safe
76
/// functions, such as Service::post(), Service::stop(), and Service::reset().
77
///
78
/// On the other hand, it is always safe for two threads to call functions on
79
/// objects belonging to different service contexts.
80
///
81
/// One implication of these rules is that at most one thread must execute run()
82
/// at any given time, and if one thread is executing run(), then no other
83
/// thread is allowed to access objects in the same service context (with the
84
/// mentioned exceptions).
85
///
86
/// Unless otherwise specified, free-standing objects, such as \ref
87
/// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are
88
/// fully thread-safe as long as they are not mutated. If one thread is mutating
89
/// such an object, no other thread may access it. Note that these free-standing
90
/// objects are not associcated with an instance of Service, and are therefore
91
/// not part of a service context.
92
///
93
///
94
/// ### Comparison with ASIO
95
///
96
/// There is a crucial difference between the two libraries in regards to the
97
/// guarantees that are provided about the cancelability of asynchronous
98
/// operations. The Realm networking library (this library) considers an
99
/// asynchronous operation to be complete precisely when the completion handler
100
/// starts to execute, and it guarantees that such an operation is cancelable up
101
/// until that point in time. In particular, if `cancel()` is called on a socket
102
/// or a deadline timer object before the completion handler starts to execute,
103
/// then that operation will be canceled, and will receive
104
/// `error::operation_aborted`. This guarantee is possible to provide (and free
105
/// of ambiguities) precisely because this library prohibits multiple threads
106
/// from executing the event loop concurrently, and because `cancel()` is
107
/// allowed to be called only from a completion handler (executed by the event
108
/// loop thread) or while no thread is executing the event loop. This guarantee
109
/// allows for safe destruction of sockets and deadline timers as long as the
110
/// completion handlers react appropriately to `error::operation_aborted`, in
111
/// particular, that they do not attempt to access the socket or deadline timer
112
/// object in such cases.
113
///
114
/// ASIO, on the other hand, allows for an asynchronous operation to complete
115
/// and become **uncancellable** before the completion handler starts to
116
/// execute. For this reason, it is possible with ASIO to get the completion
117
/// handler of an asynchronous wait operation to start executing and receive an
118
/// error code other than "operation aborted" at a point in time where
119
/// `cancel()` has already been called on the deadline timer object, or even at
120
/// a point in timer where the deadline timer has been destroyed. This seems
121
/// like an inevitable consequence of the fact that ASIO allows for multiple
122
/// threads to execute the event loop concurrently. This generally forces ASIO
123
/// applications to invent ways of extending the lifetime of deadline timer and
124
/// socket objects until the completion handler starts executing.
125
///
126
/// IMPORTANT: Even if ASIO is used in a way where at most one thread executes
127
/// the event loop, there is still no guarantee that an asynchronous operation
128
/// remains cancelable up until the point in time where the completion handler
129
/// starts to execute.
130

131
std::string host_name();
132

133

134
class StreamProtocol;
135
class Address;
136
class Endpoint;
137
class Service;
138
class Resolver;
139
class SocketBase;
140
class Socket;
141
class Acceptor;
142
class DeadlineTimer;
143
class ReadAheadBuffer;
144
namespace ssl {
145
class Stream;
146
} // namespace ssl
147

148

149
/// \brief An IP protocol descriptor.
150
class StreamProtocol {
151
public:
152
    static StreamProtocol ip_v4();
153
    static StreamProtocol ip_v6();
154

155
    bool is_ip_v4() const;
156
    bool is_ip_v6() const;
157

158
    int protocol() const;
159
    int family() const;
160

161
    StreamProtocol();
162
    ~StreamProtocol() noexcept {}
146,780✔
163

164
private:
165
    int m_family;
166
    int m_socktype;
167
    int m_protocol;
168

169
    friend class Service;
170
    friend class SocketBase;
171
};
172

173

174
/// \brief An IP address (IPv4 or IPv6).
175
class Address {
176
public:
177
    bool is_ip_v4() const;
178
    bool is_ip_v6() const;
179

180
    template <class C, class T>
181
    friend std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>&, const Address&);
182

183
    Address();
184
    ~Address() noexcept {}
19,938✔
185

186
private:
187
    using ip_v4_type = in_addr;
188
    using ip_v6_type = in6_addr;
189
    union union_type {
190
        ip_v4_type m_ip_v4;
191
        ip_v6_type m_ip_v6;
192
    };
193
    union_type m_union;
194
    std::uint_least32_t m_ip_v6_scope_id = 0;
195
    bool m_is_ip_v6 = false;
196

197
    friend Address make_address(const char*, std::error_code&) noexcept;
198
    friend class Endpoint;
199
};
200

201
Address make_address(const char* c_str);
202
Address make_address(const char* c_str, std::error_code& ec) noexcept;
203
Address make_address(const std::string&);
204
Address make_address(const std::string&, std::error_code& ec) noexcept;
205

206

207
/// \brief An IP endpoint.
208
///
209
/// An IP endpoint is a triplet (`protocol`, `address`, `port`).
210
class Endpoint {
211
public:
212
    using port_type = std::uint_fast16_t;
213
    class List;
214

215
    StreamProtocol protocol() const;
216
    Address address() const;
217
    port_type port() const;
218

219
    Endpoint();
220
    Endpoint(const StreamProtocol&, port_type);
221
    Endpoint(const Address&, port_type);
222
    ~Endpoint() noexcept {}
44,184✔
223

224
    using data_type = sockaddr;
225
    data_type* data();
226
    const data_type* data() const;
227

228
private:
229
    StreamProtocol m_protocol;
230

231
    using sockaddr_base_type = sockaddr;
232
    using sockaddr_ip_v4_type = sockaddr_in;
233
    using sockaddr_ip_v6_type = sockaddr_in6;
234
    union sockaddr_union_type {
235
        sockaddr_base_type m_base;
236
        sockaddr_ip_v4_type m_ip_v4;
237
        sockaddr_ip_v6_type m_ip_v6;
238
    };
239
    sockaddr_union_type m_sockaddr_union;
240

241
    friend class Service;
242
    friend class Resolver;
243
    friend class SocketBase;
244
    friend class Socket;
245
};
246

247

248
/// \brief A list of IP endpoints.
249
class Endpoint::List {
250
public:
251
    using iterator = const Endpoint*;
252

253
    iterator begin() const noexcept;
254
    iterator end() const noexcept;
255
    std::size_t size() const noexcept;
256
    bool empty() const noexcept;
257

258
    List() noexcept = default;
14,660✔
259
    List(List&&) noexcept = default;
26,726✔
260
    ~List() noexcept = default;
41,390✔
261

262
    List& operator=(List&&) noexcept = default;
3,348✔
263

264
private:
265
    util::Buffer<Endpoint> m_endpoints;
266

267
    friend class Service;
268
};
269

270

271
/// \brief TCP/IP networking service.
272
class Service {
273
public:
274
    Service();
275
    ~Service() noexcept;
276

277
    /// \brief Execute the event loop.
278
    ///
279
    /// Execute completion handlers of completed asynchronous operations, or
280
    /// wait for more completion handlers to become ready for
281
    /// execution. Handlers submitted via post() are considered immeditely
282
    /// ready. If there are no completion handlers ready for execution, and
283
    /// there are no asynchronous operations in progress, run() returns.
284
    ///
285
    /// run_until_stopped() will continue running even if there are no completion
286
    /// handlers ready for execution, and no asynchronous operations in progress,
287
    /// until stop() is called.
288
    ///
289
    /// All completion handlers, including handlers submitted via post() will be
290
    /// executed from run(), that is, by the thread that executes run(). If no
291
    /// thread executes run(), then the completion handlers will not be
292
    /// executed.
293
    ///
294
    /// Exceptions thrown by completion handlers will always propagate back
295
    /// through run().
296
    ///
297
    /// Syncronous operations (e.g., Socket::connect()) execute independently of
298
    /// the event loop, and do not require that any thread calls run().
299
    void run();
300
    void run_until_stopped();
301

302
    /// @{ \brief Stop event loop execution.
303
    ///
304
    /// stop() puts the event loop into the stopped mode. If a thread is
305
    /// currently executing run(), it will be made to return in a timely
306
    /// fashion, that is, without further blocking. If a thread is currently
307
    /// blocked in run(), it will be unblocked. Handlers that can be executed
308
    /// immediately, may, or may not be executed before run() returns, but new
309
    /// handlers submitted by these, will not be executed before run()
310
    /// returns. Also, if a handler is submitted by a call to post, and that
311
    /// call happens after stop() returns, then that handler is guaranteed to
312
    /// not be executed before run() returns (assuming that reset() is not called
313
    /// before run() returns).
314
    ///
315
    /// The event loop will remain in the stopped mode until reset() is
316
    /// called. If reset() is called before run() returns, it may, or may not
317
    /// cause run() to resume normal operation without returning.
318
    ///
319
    /// Both stop() and reset() are thread-safe, that is, they may be called by
320
    /// any thread. Also, both of these function may be called from completion
321
    /// handlers (including posted handlers).
322
    void stop() noexcept;
323
    void reset() noexcept;
324
    /// @}
325

326
    /// \brief Submit a handler to be executed by the event loop thread.
327
    ///
328
    /// Register the sepcified completion handler for immediate asynchronous
329
    /// execution. The specified handler will be executed by an expression on
330
    /// the form `handler(status)` where status is a Status object whose value
331
    /// will always be OK, but may change in the future. If the handler object
332
    /// is movable, it will never be copied. Otherwise, it will be copied as
333
    /// necessary.
334
    ///
335
    /// This function is thread-safe, that is, it may be called by any
336
    /// thread. It may also be called from other completion handlers.
337
    ///
338
    /// The handler will never be called as part of the execution of post(). It
339
    /// will always be called by a thread that is executing run(). If no thread
340
    /// is currently executing run(), the handler will not be executed until a
341
    /// thread starts executing run(). If post() is called while another thread
342
    /// is executing run(), the handler may be called before post() returns. If
343
    /// post() is called from another completion handler, the submitted handler
344
    /// is guaranteed to not be called during the execution of post().
345
    ///
346
    /// Completion handlers added through post() will be executed in the order
347
    /// that they are added. More precisely, if post() is called twice to add
348
    /// two handlers, A and B, and the execution of post(A) ends before the
349
    /// beginning of the execution of post(B), then A is guaranteed to execute
350
    /// before B.
351
    template <class H>
352
    void post(H handler);
353

354
    /// Argument `saturation` is the fraction of time that is not spent
355
    /// sleeping. Argument `inefficiency` is the fraction of time not spent
356
    /// sleeping, and not spent executing completion handlers. Both values are
357
    /// guaranteed to always be in the range 0 to 1 (both inclusive). The value
358
    /// passed as `inefficiency` is guaranteed to always be less than, or equal
359
    /// to the value passed as `saturation`.
360
    using EventLoopMetricsHandler = void(double saturation, double inefficiency);
361

362
    /// \brief Report event loop metrics via the specified handler.
363
    ///
364
    /// The handler will be called approximately every 30 seconds.
365
    ///
366
    /// report_event_loop_metrics() must be called prior to any invocation of
367
    /// run(). report_event_loop_metrics() is not thread-safe.
368
    ///
369
    /// This feature is only available if
370
    /// `REALM_UTIL_NETWORK_EVENT_LOOP_METRICS` was defined during
371
    /// compilation. When the feature is not available, the specified handler
372
    /// will never be called.
373
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler>);
374

375
private:
376
    enum class Want { nothing = 0, read, write };
377

378
    template <class Oper>
379
    class OperQueue;
380
    class Descriptor;
381
    class AsyncOper;
382
    class ResolveOperBase;
383
    class WaitOperBase;
384
    class TriggerExecOperBase;
385
    class PostOperBase;
386
    template <class H>
387
    class PostOper;
388
    class IoOper;
389
    class UnusedOper; // Allocated, but currently unused memory
390

391
    template <class S>
392
    class BasicStreamOps;
393

394
    struct OwnersOperDeleter {
395
        void operator()(AsyncOper*) const noexcept;
396
    };
397
    struct LendersOperDeleter {
398
        void operator()(AsyncOper*) const noexcept;
399
    };
400
    using OwnersOperPtr = std::unique_ptr<AsyncOper, OwnersOperDeleter>;
401
    using LendersOperPtr = std::unique_ptr<AsyncOper, LendersOperDeleter>;
402
    using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, LendersOperDeleter>;
403
    using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>;
404
    using LendersIoOperPtr = std::unique_ptr<IoOper, LendersOperDeleter>;
405

406
    class IoReactor;
407
    class Impl;
408
    const std::unique_ptr<Impl> m_impl;
409

410
    template <class Oper, class... Args>
411
    static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...);
412

413
    using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie);
414
    void do_post(PostOperConstr, std::size_t size, void* cookie);
415
    template <class H>
416
    static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie);
417
    static void recycle_post_oper(Impl&, PostOperBase*) noexcept;
418
    static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
419
    static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept;
420

421
    using clock = std::chrono::steady_clock;
422

423
    friend class Resolver;
424
    friend class SocketBase;
425
    friend class Socket;
426
    friend class Acceptor;
427
    friend class DeadlineTimer;
428
    friend class ReadAheadBuffer;
429
    friend class ssl::Stream;
430
};
431

432

433
template <class Oper>
434
class Service::OperQueue {
435
public:
436
    using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>;
437
    bool empty() const noexcept;
438
    void push_back(LendersOperPtr) noexcept;
439
    template <class Oper2>
440
    void push_back(OperQueue<Oper2>&) noexcept;
441
    LendersOperPtr pop_front() noexcept;
442
    void clear() noexcept;
443
    OperQueue() noexcept = default;
2,737,360✔
444
    OperQueue(OperQueue&&) noexcept;
445
    ~OperQueue() noexcept
446
    {
2,814,414✔
447
        clear();
2,814,414✔
448
    }
2,814,414✔
449

450
private:
451
    Oper* m_back = nullptr;
452
    template <class>
453
    friend class OperQueue;
454
};
455

456

457
class Service::Descriptor {
458
public:
459
#ifdef _WIN32
460
    using native_handle_type = SOCKET;
461
#else
462
    using native_handle_type = int;
463
#endif
464

465
    Impl& service_impl;
466

467
    Descriptor(Impl& service) noexcept;
468
    ~Descriptor() noexcept;
469

470
    /// \param in_blocking_mode Must be true if, and only if the passed file
471
    /// descriptor refers to a file description in which the file status flag
472
    /// O_NONBLOCK is not set.
473
    ///
474
    /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC
475
    /// set.
476
    void assign(native_handle_type fd, bool in_blocking_mode) noexcept;
477
    void close() noexcept;
478
    native_handle_type release() noexcept;
479

480
    bool is_open() const noexcept;
481

482
    native_handle_type native_handle() const noexcept;
483
    bool in_blocking_mode() const noexcept;
484

485
    void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept;
486
    std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept;
487
    std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept;
488

489
    /// \tparam Oper An operation type inherited from IoOper with an initate()
490
    /// function that initiates the operation and figures out whether it needs
491
    /// to read from, or write to the underlying descriptor to
492
    /// proceed. `initiate()` must return Want::read if the operation needs to
493
    /// read, or Want::write if the operation needs to write. If the operation
494
    /// completes immediately (e.g. due to a failure during initialization),
495
    /// `initiate()` must return Want::nothing.
496
    template <class Oper, class... Args>
497
    void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...);
498

499
    void ensure_blocking_mode();
500
    void ensure_nonblocking_mode();
501

502
private:
503
    native_handle_type m_fd = -1;
504
    bool m_in_blocking_mode; // Not in nonblocking mode
505

506
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
507
    bool m_read_ready;
508
    bool m_write_ready;
509
    bool m_imminent_end_of_input; // Kernel has seen the end of input
510
    bool m_is_registered;
511
    OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops;
512

513
    void deregister_for_async() noexcept;
514
#endif
515

516
    bool assume_read_would_block() const noexcept;
517
    bool assume_write_would_block() const noexcept;
518

519
    void set_read_ready(bool) noexcept;
520
    void set_write_ready(bool) noexcept;
521

522
    void set_nonblock_flag(bool value);
523
    void add_initiated_oper(LendersIoOperPtr, Want);
524

525
    void do_close() noexcept;
526
    native_handle_type do_release() noexcept;
527

528
    friend class IoReactor;
529
};
530

531

532
class Resolver {
533
public:
534
    class Query;
535

536
    Resolver(Service&);
537
    ~Resolver() noexcept;
538

539
    /// Thread-safe.
540
    Service& get_service() noexcept;
541

542
    /// @{ \brief Resolve the specified query to one or more endpoints.
543
    Endpoint::List resolve(const Query&);
544
    Endpoint::List resolve(const Query&, std::error_code&);
545
    /// @}
546

547
    /// \brief Perform an asynchronous resolve operation.
548
    ///
549
    /// Initiate an asynchronous resolve operation. The completion handler will
550
    /// be called when the operation completes. The operation completes when it
551
    /// succeeds, or an error occurs.
552
    ///
553
    /// The completion handler is always executed by the event loop thread,
554
    /// i.e., by a thread that is executing Service::run(). Conversely, the
555
    /// completion handler is guaranteed to not be called while no thread is
556
    /// executing Service::run(). The execution of the completion handler is
557
    /// always deferred to the event loop, meaning that it never happens as a
558
    /// synchronous side effect of the execution of async_resolve(), even when
559
    /// async_resolve() is executed by the event loop thread. The completion
560
    /// handler is guaranteed to be called eventually, as long as there is time
561
    /// enough for the operation to complete or fail, and a thread is executing
562
    /// Service::run() for long enough.
563
    ///
564
    /// The operation can be canceled by calling cancel(), and will be
565
    /// automatically canceled if the resolver object is destroyed. If the
566
    /// operation is canceled, it will fail with `error::operation_aborted`. The
567
    /// operation remains cancelable up until the point in time where the
568
    /// completion handler starts to execute. This means that if cancel() is
569
    /// called before the completion handler starts to execute, then the
570
    /// completion handler is guaranteed to have `error::operation_aborted`
571
    /// passed to it. This is true regardless of whether cancel() is called
572
    /// explicitly or implicitly, such as when the resolver is destroyed.
573
    ///
574
    /// The specified handler will be executed by an expression on the form
575
    /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is
576
    /// an object of type `Endpoint::List`. If the the handler object is
577
    /// movable, it will never be copied. Otherwise, it will be copied as
578
    /// necessary.
579
    ///
580
    /// It is an error to start a new resolve operation (synchronous or
581
    /// asynchronous) while an asynchronous resolve operation is in progress via
582
    /// the same resolver object. An asynchronous resolve operation is
583
    /// considered complete as soon as the completion handler starts to
584
    /// execute. This means that a new resolve operation can be started from the
585
    /// completion handler.
586
    template <class H>
587
    void async_resolve(Query, H&& handler);
588

589
    /// \brief Cancel all asynchronous operations.
590
    ///
591
    /// Cause all incomplete asynchronous operations, that are associated with
592
    /// this resolver (at most one), to fail with `error::operation_aborted`. An
593
    /// asynchronous operation is complete precisely when its completion handler
594
    /// starts executing.
595
    ///
596
    /// Completion handlers of canceled operations will become immediately ready
597
    /// to execute, but will never be executed directly as part of the execution
598
    /// of cancel().
599
    ///
600
    /// Cancellation happens automatically when the resolver object is destroyed.
601
    void cancel() noexcept;
602

603
private:
604
    template <class H>
605
    class ResolveOper;
606

607
    Service::Impl& m_service_impl;
608

609
    Service::OwnersOperPtr m_resolve_oper;
610

611
    void initiate_oper(Service::LendersResolveOperPtr);
612
};
613

614

615
class Resolver::Query {
616
public:
617
    enum {
618
        /// Locally bound socket endpoint (server side)
619
        passive = AI_PASSIVE,
620

621
        /// Ignore families without a configured non-loopback address
622
        address_configured = AI_ADDRCONFIG
623
    };
624

625
    Query(std::string service_port, int init_flags = passive | address_configured);
626
    Query(const StreamProtocol&, std::string service_port, int init_flags = passive | address_configured);
627
    Query(std::string host_name, std::string service_port, int init_flags = address_configured);
628
    Query(const StreamProtocol&, std::string host_name, std::string service_port,
629
          int init_flags = address_configured);
630

631
    ~Query() noexcept;
632

633
    int flags() const;
634
    StreamProtocol protocol() const;
635
    std::string host() const;
636
    std::string service() const;
637

638
private:
639
    int m_flags;
640
    StreamProtocol m_protocol;
641
    std::string m_host;    // hostname
642
    std::string m_service; // port
643

644
    friend class Service;
645
};
646

647

648
class SocketBase {
649
public:
650
    using native_handle_type = Service::Descriptor::native_handle_type;
651

652
    ~SocketBase() noexcept;
653

654
    /// Thread-safe.
655
    Service& get_service() noexcept;
656

657
    bool is_open() const noexcept;
658
    native_handle_type native_handle() const noexcept;
659

660
    /// @{ \brief Open the socket for use with the specified protocol.
661
    ///
662
    /// It is an error to call open() on a socket that is already open.
663
    void open(const StreamProtocol&);
664
    std::error_code open(const StreamProtocol&, std::error_code&);
665
    /// @}
666

667
    /// \brief Close this socket.
668
    ///
669
    /// If the socket is open, it will be closed. If it is already closed (or
670
    /// never opened), this function does nothing (idempotency).
671
    ///
672
    /// A socket is automatically closed when destroyed.
673
    ///
674
    /// When the socket is closed, any incomplete asynchronous operation will be
675
    /// canceled (as if cancel() was called).
676
    void close() noexcept;
677

678
    /// \brief Cancel all asynchronous operations.
679
    ///
680
    /// Cause all incomplete asynchronous operations, that are associated with
681
    /// this socket, to fail with `error::operation_aborted`. An asynchronous
682
    /// operation is complete precisely when its completion handler starts
683
    /// executing.
684
    ///
685
    /// Completion handlers of canceled operations will become immediately ready
686
    /// to execute, but will never be executed directly as part of the execution
687
    /// of cancel().
688
    void cancel() noexcept;
689

690
    template <class O>
691
    void get_option(O& opt) const;
692

693
    template <class O>
694
    std::error_code get_option(O& opt, std::error_code&) const;
695

696
    template <class O>
697
    void set_option(const O& opt);
698

699
    template <class O>
700
    std::error_code set_option(const O& opt, std::error_code&);
701

702
    void bind(const Endpoint&);
703
    std::error_code bind(const Endpoint&, std::error_code&);
704

705
    Endpoint local_endpoint() const;
706
    Endpoint local_endpoint(std::error_code&) const;
707

708
    /// Release the ownership of this socket object over the native handle and
709
    /// return the native handle to the caller. The caller assumes ownership
710
    /// over the returned handle. The socket is left in a closed
711
    /// state. Incomplete asynchronous operations will be canceled as if close()
712
    /// had been called.
713
    ///
714
    /// If called on a closed socket, this function is a no-op, and returns the
715
    /// same value as would be returned by native_handle()
716
    native_handle_type release_native_handle() noexcept;
717

718
private:
719
    enum opt_enum {
720
        opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR`
721
        opt_Linger,    ///< `SOL_SOCKET`, `SO_LINGER`
722
        opt_NoDelay,   ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm)
723
    };
724

725
    template <class, int, class>
726
    class Option;
727

728
public:
729
    using reuse_address = Option<bool, opt_ReuseAddr, int>;
730
    using no_delay = Option<bool, opt_NoDelay, int>;
731

732
    // linger struct defined by POSIX sys/socket.h.
733
    struct linger_opt;
734
    using linger = Option<linger_opt, opt_Linger, struct linger>;
735

736
protected:
737
    Service::Descriptor m_desc;
738

739
private:
740
    StreamProtocol m_protocol;
741

742
protected:
743
    Service::OwnersOperPtr m_read_oper;  // Read or accept
744
    Service::OwnersOperPtr m_write_oper; // Write or connect
745

746
    SocketBase(Service&);
747

748
    const StreamProtocol& get_protocol() const noexcept;
749
    std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&);
750
    void do_close() noexcept;
751

752
    void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const;
753
    void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&);
754
    void map_option(opt_enum, int& level, int& option_name) const;
755

756
    friend class Acceptor;
757
};
758

759

760
template <class T, int opt, class U>
761
class SocketBase::Option {
762
public:
763
    Option(T value = T());
764
    T value() const;
765

766
private:
767
    T m_value;
768

769
    void get(const SocketBase&, std::error_code&);
770
    void set(SocketBase&, std::error_code&) const;
771

772
    friend class SocketBase;
773
};
774

775
struct SocketBase::linger_opt {
776
    linger_opt(bool enable, int timeout_seconds = 0)
777
    {
×
778
        m_linger.l_onoff = enable ? 1 : 0;
×
779
        m_linger.l_linger = timeout_seconds;
×
780
    }
×
781

782
    ::linger m_linger;
783

784
    operator ::linger() const
785
    {
×
786
        return m_linger;
×
787
    }
×
788

789
    bool enabled() const
790
    {
×
791
        return m_linger.l_onoff != 0;
×
792
    }
×
793
    int timeout() const
794
    {
×
795
        return m_linger.l_linger;
×
796
    }
×
797
};
798

799

800
/// Switching between synchronous and asynchronous operations is allowed, but
801
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
802
/// allowed to run concurrently with an asynchronous one on the same
803
/// socket. Note that an asynchronous operation is considered to be running
804
/// until its completion handler starts executing.
805
class Socket : public SocketBase {
806
public:
807
    Socket(Service&);
808

809
    /// \brief Create a socket with an already-connected native socket handle.
810
    ///
811
    /// This constructor is shorthand for creating the socket with the
812
    /// one-argument constructor, and then calling the two-argument assign()
813
    /// with the specified protocol and native handle.
814
    Socket(Service&, const StreamProtocol&, native_handle_type);
815

816
    ~Socket() noexcept;
817

818
    void connect(const Endpoint&);
819
    std::error_code connect(const Endpoint&, std::error_code&);
820

821
    /// @{ \brief Perform a synchronous read operation.
822
    ///
823
    /// read() will not return until the specified buffer is full, or an error
824
    /// occurs. Reaching the end of input before the buffer is filled, is
825
    /// considered an error, and will cause the operation to fail with
826
    /// MiscExtErrors::end_of_input.
827
    ///
828
    /// read_until() will not return until the specified buffer contains the
829
    /// specified delimiter, or an error occurs. If the buffer is filled before
830
    /// the delimiter is found, the operation fails with
831
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
832
    /// reached before the delimiter is found, the operation fails with
833
    /// MiscExtErrors::end_of_input. If the operation succeeds, the last byte
834
    /// placed in the buffer is the delimiter.
835
    ///
836
    /// The versions that take a ReadAheadBuffer argument will read through that
837
    /// buffer. This allows for fewer larger reads on the underlying
838
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
839
    /// a read operation returns, it is important that the same read-ahead
840
    /// buffer is passed to the next read operation.
841
    ///
842
    /// The versions of read() and read_until() that do not take an
843
    /// `std::error_code&` argument will throw std::system_error on failure.
844
    ///
845
    /// The versions that do take an `std::error_code&` argument will set \a ec
846
    /// to `std::error_code()` on success, and to something else on failure. On
847
    /// failure they will return the number of bytes placed in the specified
848
    /// buffer before the error occured.
849
    ///
850
    /// \return The number of bytes places in the specified buffer upon return.
851
    std::size_t read(char* buffer, std::size_t size);
852
    std::size_t read(char* buffer, std::size_t size, std::error_code& ec);
853
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&);
854
    std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec);
855
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&);
856
    std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, std::error_code& ec);
857
    /// @}
858

859
    /// @{ \brief Perform a synchronous write operation.
860
    ///
861
    /// write() will not return until all the specified bytes have been written
862
    /// to the socket, or an error occurs.
863
    ///
864
    /// The versions of write() that does not take an `std::error_code&`
865
    /// argument will throw std::system_error on failure. When it succeeds, it
866
    /// always returns \a size.
867
    ///
868
    /// The versions that does take an `std::error_code&` argument will set \a
869
    /// ec to `std::error_code()` on success, and to something else on
870
    /// failure. On success it returns \a size. On faulure it returns the number
871
    /// of bytes written before the failure occured.
872
    std::size_t write(const char* data, std::size_t size);
873
    std::size_t write(const char* data, std::size_t size, std::error_code& ec);
874
    /// @}
875

876
    /// @{ \brief Read at least one byte from this socket.
877
    ///
878
    /// If \a size is zero, both versions of read_some() will return zero
879
    /// without blocking. Read errors may or may not be detected in this case.
880
    ///
881
    /// Otherwise, if \a size is greater than zero, and at least one byte is
882
    /// immediately available, that is, without blocking, then both versions
883
    /// will read at least one byte (but generally as many immediately available
884
    /// bytes as will fit into the specified buffer), and return without
885
    /// blocking.
886
    ///
887
    /// Otherwise, both versions will block the calling thread until at least one
888
    /// byte becomes available, or an error occurs.
889
    ///
890
    /// In this context, it counts as an error, if the end of input is reached
891
    /// before at least one byte becomes available (see
892
    /// MiscExtErrors::end_of_input).
893
    ///
894
    /// If no error occurs, both versions will return the number of bytes placed
895
    /// in the specified buffer, which is generally as many as are immediately
896
    /// available at the time when the first byte becomes available, although
897
    /// never more than \a size.
898
    ///
899
    /// If no error occurs, the three-argument version will set \a ec to
900
    /// indicate success.
901
    ///
902
    /// If an error occurs, the two-argument version will throw
903
    /// `std::system_error`, while the three-argument version will set \a ec to
904
    /// indicate the error, and return zero.
905
    ///
906
    /// As long as \a size is greater than zero, the two argument version will
907
    /// always return a value that is greater than zero, while the three
908
    /// argument version will return a value greater than zero when, and only
909
    /// when \a ec is set to indicate success (no error, and no end of input).
910
    std::size_t read_some(char* buffer, std::size_t size);
911
    std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec);
912
    /// @}
913

914
    /// @{ \brief Write at least one byte to this socket.
915
    ///
916
    /// If \a size is zero, both versions of write_some() will return zero
917
    /// without blocking. Write errors may or may not be detected in this case.
918
    ///
919
    /// Otherwise, if \a size is greater than zero, and at least one byte can be
920
    /// written immediately, that is, without blocking, then both versions will
921
    /// write at least one byte (but generally as many as can be written
922
    /// immediately), and return without blocking.
923
    ///
924
    /// Otherwise, both versions will block the calling thread until at least one
925
    /// byte can be written, or an error occurs.
926
    ///
927
    /// If no error occurs, both versions will return the number of bytes
928
    /// written, which is generally as many as can be written immediately at the
929
    /// time when the first byte can be written.
930
    ///
931
    /// If no error occurs, the three-argument version will set \a ec to
932
    /// indicate success.
933
    ///
934
    /// If an error occurs, the two-argument version will throw
935
    /// `std::system_error`, while the three-argument version will set \a ec to
936
    /// indicate the error, and return zero.
937
    ///
938
    /// As long as \a size is greater than zero, the two argument version will
939
    /// always return a value that is greater than zero, while the three
940
    /// argument version will return a value greater than zero when, and only
941
    /// when \a ec is set to indicate success.
942
    std::size_t write_some(const char* data, std::size_t size);
943
    std::size_t write_some(const char* data, std::size_t size, std::error_code&);
944
    /// @}
945

946
    /// \brief Perform an asynchronous connect operation.
947
    ///
948
    /// Initiate an asynchronous connect operation. The completion handler is
949
    /// called when the operation completes. The operation completes when the
950
    /// connection is established, or an error occurs.
951
    ///
952
    /// The completion handler is always executed by the event loop thread,
953
    /// i.e., by a thread that is executing Service::run(). Conversely, the
954
    /// completion handler is guaranteed to not be called while no thread is
955
    /// executing Service::run(). The execution of the completion handler is
956
    /// always deferred to the event loop, meaning that it never happens as a
957
    /// synchronous side effect of the execution of async_connect(), even when
958
    /// async_connect() is executed by the event loop thread. The completion
959
    /// handler is guaranteed to be called eventually, as long as there is time
960
    /// enough for the operation to complete or fail, and a thread is executing
961
    /// Service::run() for long enough.
962
    ///
963
    /// The operation can be canceled by calling cancel(), and will be
964
    /// automatically canceled if the socket is closed. If the operation is
965
    /// canceled, it will fail with `error::operation_aborted`. The operation
966
    /// remains cancelable up until the point in time where the completion
967
    /// handler starts to execute. This means that if cancel() is called before
968
    /// the completion handler starts to execute, then the completion handler is
969
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
970
    /// regardless of whether cancel() is called explicitly or implicitly, such
971
    /// as when the socket is destroyed.
972
    ///
973
    /// If the socket is not already open, it will be opened as part of the
974
    /// connect operation as if by calling `open(ep.protocol())`. If the opening
975
    /// operation succeeds, but the connect operation fails, the socket will be
976
    /// left in the opened state.
977
    ///
978
    /// The specified handler will be executed by an expression on the form
979
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
980
    /// movable, it will never be copied. Otherwise, it will be copied as
981
    /// necessary.
982
    ///
983
    /// It is an error to start a new connect operation (synchronous or
984
    /// asynchronous) while an asynchronous connect operation is in progress. An
985
    /// asynchronous connect operation is considered complete as soon as the
986
    /// completion handler starts to execute.
987
    ///
988
    /// \param ep The remote endpoint of the connection to be established.
989
    template <class H>
990
    void async_connect(const Endpoint& ep, H&& handler);
991

992
    /// @{ \brief Perform an asynchronous read operation.
993
    ///
994
    /// Initiate an asynchronous buffered read operation on the associated
995
    /// socket. The completion handler will be called when the operation
996
    /// completes, or an error occurs.
997
    ///
998
    /// async_read() will continue reading until the specified buffer is full,
999
    /// or an error occurs. If the end of input is reached before the buffer is
1000
    /// filled, the operation fails with MiscExtErrors::end_of_input.
1001
    ///
1002
    /// async_read_until() will continue reading until the specified buffer
1003
    /// contains the specified delimiter, or an error occurs. If the buffer is
1004
    /// filled before a delimiter is found, the operation fails with
1005
    /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is
1006
    /// reached before a delimiter is found, the operation fails with
1007
    /// MiscExtErrors::end_of_input. Otherwise, if the operation succeeds, the
1008
    /// last byte placed in the buffer is the delimiter.
1009
    ///
1010
    /// The versions that take a ReadAheadBuffer argument will read through that
1011
    /// buffer. This allows for fewer larger reads on the underlying
1012
    /// socket. Since unconsumed data may be left in the read-ahead buffer after
1013
    /// a read operation completes, it is important that the same read-ahead
1014
    /// buffer is passed to the next read operation.
1015
    ///
1016
    /// The completion handler is always executed by the event loop thread,
1017
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1018
    /// completion handler is guaranteed to not be called while no thread is
1019
    /// executing Service::run(). The execution of the completion handler is
1020
    /// always deferred to the event loop, meaning that it never happens as a
1021
    /// synchronous side effect of the execution of async_read() or
1022
    /// async_read_until(), even when async_read() or async_read_until() is
1023
    /// executed by the event loop thread. The completion handler is guaranteed
1024
    /// to be called eventually, as long as there is time enough for the
1025
    /// operation to complete or fail, and a thread is executing Service::run()
1026
    /// for long enough.
1027
    ///
1028
    /// The operation can be canceled by calling cancel() on the associated
1029
    /// socket, and will be automatically canceled if the associated socket is
1030
    /// closed. If the operation is canceled, it will fail with
1031
    /// `error::operation_aborted`. The operation remains cancelable up until
1032
    /// the point in time where the completion handler starts to execute. This
1033
    /// means that if cancel() is called before the completion handler starts to
1034
    /// execute, then the completion handler is guaranteed to have
1035
    /// `error::operation_aborted` passed to it. This is true regardless of
1036
    /// whether cancel() is called explicitly or implicitly, such as when the
1037
    /// socket is destroyed.
1038
    ///
1039
    /// The specified handler will be executed by an expression on the form
1040
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1041
    /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to
1042
    /// be less than, or equal to \a size. If the the handler object is movable,
1043
    /// it will never be copied. Otherwise, it will be copied as necessary.
1044
    ///
1045
    /// It is an error to start a read operation before the associated socket is
1046
    /// connected.
1047
    ///
1048
    /// It is an error to start a new read operation (synchronous or
1049
    /// asynchronous) while an asynchronous read operation is in progress. An
1050
    /// asynchronous read operation is considered complete as soon as the
1051
    /// completion handler starts executing. This means that a new read
1052
    /// operation can be started from the completion handler of another
1053
    /// asynchronous buffered read operation.
1054
    template <class H>
1055
    void async_read(char* buffer, std::size_t size, H&& handler);
1056
    template <class H>
1057
    void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H&& handler);
1058
    template <class H>
1059
    void async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, H&& handler);
1060
    /// @}
1061

1062
    /// \brief Perform an asynchronous write operation.
1063
    ///
1064
    /// Initiate an asynchronous write operation. The completion handler is
1065
    /// called when the operation completes. The operation completes when all
1066
    /// the specified bytes have been written to the socket, or an error occurs.
1067
    ///
1068
    /// The completion handler is always executed by the event loop thread,
1069
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1070
    /// completion handler is guaranteed to not be called while no thread is
1071
    /// executing Service::run(). The execution of the completion handler is
1072
    /// always deferred to the event loop, meaning that it never happens as a
1073
    /// synchronous side effect of the execution of async_write(), even when
1074
    /// async_write() is executed by the event loop thread. The completion
1075
    /// handler is guaranteed to be called eventually, as long as there is time
1076
    /// enough for the operation to complete or fail, and a thread is executing
1077
    /// Service::run() for long enough.
1078
    ///
1079
    /// The operation can be canceled by calling cancel(), and will be
1080
    /// automatically canceled if the socket is closed. If the operation is
1081
    /// canceled, it will fail with `error::operation_aborted`. The operation
1082
    /// remains cancelable up until the point in time where the completion
1083
    /// handler starts to execute. This means that if cancel() is called before
1084
    /// the completion handler starts to execute, then the completion handler is
1085
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1086
    /// regardless of whether cancel() is called explicitly or implicitly, such
1087
    /// as when the socket is destroyed.
1088
    ///
1089
    /// The specified handler will be executed by an expression on the form
1090
    /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of
1091
    /// bytes written (of type `std::size_t`). If the the handler object is
1092
    /// movable, it will never be copied. Otherwise, it will be copied as
1093
    /// necessary.
1094
    ///
1095
    /// It is an error to start an asynchronous write operation before the
1096
    /// socket is connected.
1097
    ///
1098
    /// It is an error to start a new write operation (synchronous or
1099
    /// asynchronous) while an asynchronous write operation is in progress. An
1100
    /// asynchronous write operation is considered complete as soon as the
1101
    /// completion handler starts to execute. This means that a new write
1102
    /// operation can be started from the completion handler of another
1103
    /// asynchronous write operation.
1104
    template <class H>
1105
    void async_write(const char* data, std::size_t size, H&& handler);
1106

1107
    template <class H>
1108
    void async_read_some(char* buffer, std::size_t size, H&& handler);
1109
    template <class H>
1110
    void async_write_some(const char* data, std::size_t size, H&& handler);
1111

1112
    enum shutdown_type {
1113
#ifdef _WIN32
1114
        /// Shutdown the receiving side of the socket.
1115
        shutdown_receive = SD_RECEIVE,
1116

1117
        /// Shutdown the sending side of the socket.
1118
        shutdown_send = SD_SEND,
1119

1120
        /// Shutdown both sending and receiving side of the socket.
1121
        shutdown_both = SD_BOTH
1122
#else
1123
        shutdown_receive = SHUT_RD,
1124
        shutdown_send = SHUT_WR,
1125
        shutdown_both = SHUT_RDWR
1126
#endif
1127
    };
1128

1129
    /// @{ \brief Shut down the connected sockets sending and/or receiving
1130
    /// side.
1131
    ///
1132
    /// It is an error to call this function when the socket is not both open
1133
    /// and connected.
1134
    void shutdown(shutdown_type);
1135
    std::error_code shutdown(shutdown_type, std::error_code&);
1136
    /// @}
1137

1138
    /// @{ \brief Initialize socket with an already-connected native socket
1139
    /// handle.
1140
    ///
1141
    /// The specified native handle must refer to a socket that is already fully
1142
    /// open and connected.
1143
    ///
1144
    /// If the assignment operation succeeds, this socket object has taken
1145
    /// ownership of the specified native handle, and the handle will be closed
1146
    /// when the socket object is destroyed, (or when close() is called). If the
1147
    /// operation fails, the caller still owns the specified native handle.
1148
    ///
1149
    /// It is an error to call connect() or async_connect() on a socket object
1150
    /// that is initialized this way (unless it is first closed).
1151
    ///
1152
    /// It is an error to call this function on a socket object that is already
1153
    /// open.
1154
    void assign(const StreamProtocol&, native_handle_type);
1155
    std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&);
1156
    /// @}
1157

1158
    /// Returns a reference to this socket, as this socket is the lowest layer
1159
    /// of a stream.
1160
    Socket& lowest_layer() noexcept;
1161

1162
private:
1163
    using Want = Service::Want;
1164
    using StreamOps = Service::BasicStreamOps<Socket>;
1165

1166
    class ConnectOperBase;
1167
    template <class H>
1168
    class ConnectOper;
1169

1170
    using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>;
1171

1172
    // `ec` untouched on success, but no immediate completion
1173
    bool initiate_async_connect(const Endpoint&, std::error_code& ec);
1174
    // `ec` untouched on success
1175
    std::error_code finalize_async_connect(std::error_code& ec) noexcept;
1176

1177
    // See Service::BasicStreamOps for details on these these 6 functions.
1178
    void do_init_read_async(std::error_code&, Want&) noexcept;
1179
    void do_init_write_async(std::error_code&, Want&) noexcept;
1180
    std::size_t do_read_some_sync(char* buffer, std::size_t size, std::error_code&) noexcept;
1181
    std::size_t do_write_some_sync(const char* data, std::size_t size, std::error_code&) noexcept;
1182
    std::size_t do_read_some_async(char* buffer, std::size_t size, std::error_code&, Want&) noexcept;
1183
    std::size_t do_write_some_async(const char* data, std::size_t size, std::error_code&, Want&) noexcept;
1184

1185
    friend class Service::BasicStreamOps<Socket>;
1186
    friend class Service::BasicStreamOps<ssl::Stream>;
1187
    friend class ReadAheadBuffer;
1188
    friend class ssl::Stream;
1189
};
1190

1191

1192
/// Switching between synchronous and asynchronous operations is allowed, but
1193
/// only in a nonoverlapping fashion. That is, a synchronous operation is not
1194
/// allowed to run concurrently with an asynchronous one on the same
1195
/// acceptor. Note that an asynchronous operation is considered to be running
1196
/// until its completion handler starts executing.
1197
class Acceptor : public SocketBase {
1198
public:
1199
    Acceptor(Service&);
1200
    ~Acceptor() noexcept;
1201

1202
    static constexpr int max_connections = SOMAXCONN;
1203

1204
    void listen(int backlog = max_connections);
1205
    std::error_code listen(int backlog, std::error_code&);
1206

1207
    void accept(Socket&);
1208
    void accept(Socket&, Endpoint&);
1209
    std::error_code accept(Socket&, std::error_code&);
1210
    std::error_code accept(Socket&, Endpoint&, std::error_code&);
1211

1212
    /// @{ \brief Perform an asynchronous accept operation.
1213
    ///
1214
    /// Initiate an asynchronous accept operation. The completion handler will
1215
    /// be called when the operation completes. The operation completes when the
1216
    /// connection is accepted, or an error occurs. If the operation succeeds,
1217
    /// the specified local socket will have become connected to a remote
1218
    /// socket.
1219
    ///
1220
    /// The completion handler is always executed by the event loop thread,
1221
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1222
    /// completion handler is guaranteed to not be called while no thread is
1223
    /// executing Service::run(). The execution of the completion handler is
1224
    /// always deferred to the event loop, meaning that it never happens as a
1225
    /// synchronous side effect of the execution of async_accept(), even when
1226
    /// async_accept() is executed by the event loop thread. The completion
1227
    /// handler is guaranteed to be called eventually, as long as there is time
1228
    /// enough for the operation to complete or fail, and a thread is executing
1229
    /// Service::run() for long enough.
1230
    ///
1231
    /// The operation can be canceled by calling cancel(), and will be
1232
    /// automatically canceled if the acceptor is closed. If the operation is
1233
    /// canceled, it will fail with `error::operation_aborted`. The operation
1234
    /// remains cancelable up until the point in time where the completion
1235
    /// handler starts to execute. This means that if cancel() is called before
1236
    /// the completion handler starts to execute, then the completion handler is
1237
    /// guaranteed to have `error::operation_aborted` passed to it. This is true
1238
    /// regardless of whether cancel() is called explicitly or implicitly, such
1239
    /// as when the acceptor is destroyed.
1240
    ///
1241
    /// The specified handler will be executed by an expression on the form
1242
    /// `handler(ec)` where `ec` is the error code. If the the handler object is
1243
    /// movable, it will never be copied. Otherwise, it will be copied as
1244
    /// necessary.
1245
    ///
1246
    /// It is an error to start a new accept operation (synchronous or
1247
    /// asynchronous) while an asynchronous accept operation is in progress. An
1248
    /// asynchronous accept operation is considered complete as soon as the
1249
    /// completion handler starts executing. This means that a new accept
1250
    /// operation can be started from the completion handler.
1251
    ///
1252
    /// \param sock This is the local socket, that upon successful completion
1253
    /// will have become connected to the remote socket. It must be in the
1254
    /// closed state (Socket::is_open()) when async_accept() is called.
1255
    ///
1256
    template <class H>
1257
    void async_accept(Socket& sock, H&& handler);
1258
    /// \param ep Upon completion, the remote peer endpoint will have been
1259
    /// assigned to this variable.
1260
    template <class H>
1261
    void async_accept(Socket& sock, Endpoint& ep, H&& handler);
1262
    /// @}
1263

1264
private:
1265
    using Want = Service::Want;
1266

1267
    class AcceptOperBase;
1268
    template <class H>
1269
    class AcceptOper;
1270

1271
    using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>;
1272

1273
    std::error_code accept(Socket&, Endpoint*, std::error_code&);
1274
    Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept;
1275

1276
    template <class H>
1277
    void async_accept(Socket&, Endpoint*, H&&);
1278
};
1279

1280

1281
/// \brief A timer object supporting asynchronous wait operations.
1282
class DeadlineTimer {
1283
public:
1284
    DeadlineTimer(Service&);
1285
    ~DeadlineTimer() noexcept;
1286

1287
    /// Thread-safe.
1288
    Service& get_service() noexcept;
1289

1290
    /// \brief Perform an asynchronous wait operation.
1291
    ///
1292
    /// Initiate an asynchronous wait operation. The completion handler becomes
1293
    /// ready to execute when the expiration time is reached, or an error occurs
1294
    /// (cancellation counts as an error here). The expiration time is the time
1295
    /// of initiation plus the specified delay. The error code passed to the
1296
    /// completion handler will **never** indicate success, unless the
1297
    /// expiration time was reached.
1298
    ///
1299
    /// The completion handler is always executed by the event loop thread,
1300
    /// i.e., by a thread that is executing Service::run(). Conversely, the
1301
    /// completion handler is guaranteed to not be called while no thread is
1302
    /// executing Service::run(). The execution of the completion handler is
1303
    /// always deferred to the event loop, meaning that it never happens as a
1304
    /// synchronous side effect of the execution of async_wait(), even when
1305
    /// async_wait() is executed by the event loop thread. The completion
1306
    /// handler is guaranteed to be called eventually, as long as there is time
1307
    /// enough for the operation to complete or fail, and a thread is executing
1308
    /// Service::run() for long enough.
1309
    ///
1310
    /// The operation can be canceled by calling cancel(), and will be
1311
    /// automatically canceled if the timer is destroyed. If the operation is
1312
    /// canceled, it will fail with `ErrorCodes::OperationAborted`. The operation
1313
    /// remains cancelable up until the point in time where the completion
1314
    /// handler starts to execute. This means that if cancel() is called before
1315
    /// the completion handler starts to execute, then the completion handler is
1316
    /// guaranteed to have `ErrorCodes::OperationAborted` passed to it. This is true
1317
    /// regardless of whether cancel() is called explicitly or implicitly, such
1318
    /// as when the timer is destroyed.
1319
    ///
1320
    /// The specified handler will be executed by an expression on the form
1321
    /// `handler(status)` where `status` is a Status object. If the handler object
1322
    /// is movable, it will never be copied. Otherwise, it will be copied as
1323
    /// necessary.
1324
    ///
1325
    /// It is an error to start a new asynchronous wait operation while an
1326
    /// another one is in progress. An asynchronous wait operation is in
1327
    /// progress until its completion handler starts executing.
1328
    template <class R, class P, class H>
1329
    void async_wait(std::chrono::duration<R, P> delay, H&& handler);
1330

1331
    /// \brief Cancel an asynchronous wait operation.
1332
    ///
1333
    /// If an asynchronous wait operation, that is associated with this deadline
1334
    /// timer, is in progress, cause it to fail with
1335
    /// `error::operation_aborted`. An asynchronous wait operation is in
1336
    /// progress until its completion handler starts executing.
1337
    ///
1338
    /// Completion handlers of canceled operations will become immediately ready
1339
    /// to execute, but will never be executed directly as part of the execution
1340
    /// of cancel().
1341
    ///
1342
    /// Cancellation happens automatically when the timer object is destroyed.
1343
    void cancel() noexcept;
1344

1345
private:
1346
    template <class H>
1347
    class WaitOper;
1348

1349
    using clock = Service::clock;
1350

1351
    Service::Impl& m_service_impl;
1352
    Service::OwnersOperPtr m_wait_oper;
1353

1354
    void initiate_oper(Service::LendersWaitOperPtr);
1355
};
1356

1357

1358
class ReadAheadBuffer {
1359
public:
1360
    ReadAheadBuffer();
1361

1362
    /// Discard any buffered data.
1363
    void clear() noexcept;
1364

1365
private:
1366
    using Want = Service::Want;
1367

1368
    char* m_begin = nullptr;
1369
    char* m_end = nullptr;
1370
    static constexpr std::size_t s_size = 1024;
1371
    const std::unique_ptr<char[]> m_buffer;
1372

1373
    bool empty() const noexcept;
1374
    bool read(char*& begin, char* end, int delim, std::error_code&) noexcept;
1375
    template <class S>
1376
    void refill_sync(S& stream, std::error_code&) noexcept;
1377
    template <class S>
1378
    bool refill_async(S& stream, std::error_code&, Want&) noexcept;
1379

1380
    template <class>
1381
    friend class Service::BasicStreamOps;
1382
};
1383

1384

1385
enum class ResolveErrors {
1386
    /// Host not found (authoritative).
1387
    host_not_found = 1,
1388

1389
    /// Host not found (non-authoritative).
1390
    host_not_found_try_again = 2,
1391

1392
    /// The query is valid but does not have associated address data.
1393
    no_data = 3,
1394

1395
    /// A non-recoverable error occurred.
1396
    no_recovery = 4,
1397

1398
    /// The service is not supported for the given socket type.
1399
    service_not_found = 5,
1400

1401
    /// The socket type is not supported.
1402
    socket_type_not_supported = 6,
1403
};
1404

1405
/// The error category associated with ResolveErrors. The name of this category is
1406
/// `realm.sync.network.resolve`.
1407
const std::error_category& resolve_error_category() noexcept;
1408

1409
std::error_code make_error_code(ResolveErrors err);
1410

1411
} // namespace realm::sync::network
1412

1413
namespace std {
1414

1415
template <>
1416
class is_error_code_enum<realm::sync::network::ResolveErrors> {
1417
public:
1418
    static const bool value = true;
1419
};
1420

1421
} // namespace std
1422

1423
namespace realm::sync::network {
1424

1425
// Implementation
1426

1427
// ---------------- StreamProtocol ----------------
1428

1429
inline StreamProtocol StreamProtocol::ip_v4()
1430
{
40,816✔
1431
    StreamProtocol prot;
40,816✔
1432
    prot.m_family = AF_INET;
40,816✔
1433
    return prot;
40,816✔
1434
}
40,816✔
1435

1436
inline StreamProtocol StreamProtocol::ip_v6()
1437
{
×
1438
    StreamProtocol prot;
×
1439
    prot.m_family = AF_INET6;
×
1440
    return prot;
×
1441
}
×
1442

1443
inline bool StreamProtocol::is_ip_v4() const
1444
{
81,320✔
1445
    return m_family == AF_INET;
81,320✔
1446
}
81,320✔
1447

1448
inline bool StreamProtocol::is_ip_v6() const
1449
{
×
1450
    return m_family == AF_INET6;
×
1451
}
×
1452

1453
inline int StreamProtocol::family() const
1454
{
40,808✔
1455
    return m_family;
40,808✔
1456
}
40,808✔
1457

1458
inline int StreamProtocol::protocol() const
1459
{
2✔
1460
    return m_protocol;
2✔
1461
}
2✔
1462

1463
inline StreamProtocol::StreamProtocol()
1464
    : m_family{AF_UNSPEC}
1465
    , // Allow both IPv4 and IPv6
1466
    m_socktype{SOCK_STREAM}
1467
    ,             // Or SOCK_DGRAM for UDP
1468
    m_protocol{0} // Any protocol
1469
{
74,150✔
1470
}
74,150✔
1471

1472
// ---------------- Address ----------------
1473

1474
inline bool Address::is_ip_v4() const
1475
{
×
1476
    return !m_is_ip_v6;
×
1477
}
×
1478

1479
inline bool Address::is_ip_v6() const
1480
{
×
1481
    return m_is_ip_v6;
×
1482
}
×
1483

1484
template <class C, class T>
1485
inline std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>& out, const Address& addr)
1486
{
19,936✔
1487
    // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO
9,844✔
1488
    // do?
9,844✔
1489
    union buffer_union {
19,936✔
1490
        char ip_v4[INET_ADDRSTRLEN];
19,936✔
1491
        char ip_v6[INET6_ADDRSTRLEN];
19,936✔
1492
    };
19,936✔
1493
    char buffer[sizeof(buffer_union)];
19,936✔
1494
    int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET;
19,936✔
1495
#ifdef _WIN32
1496
    void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union));
1497
#else
1498
    const void* src = &addr.m_union;
19,936✔
1499
#endif
19,936✔
1500
    const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer);
19,936✔
1501
    if (ret == 0) {
19,936✔
1502
        std::error_code ec = util::make_basic_system_error_code(errno);
×
1503
        throw std::system_error(ec);
×
1504
    }
×
1505
    out << ret;
19,936✔
1506
    return out;
19,936✔
1507
}
19,936✔
1508

1509
inline Address::Address()
1510
{
19,940✔
1511
    m_union.m_ip_v4 = ip_v4_type();
19,940✔
1512
}
19,940✔
1513

1514
inline Address make_address(const char* c_str)
1515
{
×
1516
    std::error_code ec;
×
1517
    Address addr = make_address(c_str, ec);
×
1518
    if (ec)
×
1519
        throw std::system_error(ec);
×
1520
    return addr;
×
1521
}
×
1522

1523
inline Address make_address(const std::string& str)
1524
{
×
1525
    std::error_code ec;
×
1526
    Address addr = make_address(str, ec);
×
1527
    if (ec)
×
1528
        throw std::system_error(ec);
×
1529
    return addr;
×
1530
}
×
1531

1532
inline Address make_address(const std::string& str, std::error_code& ec) noexcept
1533
{
×
1534
    return make_address(str.c_str(), ec);
×
1535
}
×
1536

1537
// ---------------- Endpoint ----------------
1538

1539
inline StreamProtocol Endpoint::protocol() const
1540
{
11,826✔
1541
    return m_protocol;
11,826✔
1542
}
11,826✔
1543

1544
inline Address Endpoint::address() const
1545
{
19,940✔
1546
    Address addr;
19,940✔
1547
    if (m_protocol.is_ip_v4()) {
19,940✔
1548
        addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr;
14,446✔
1549
    }
14,446✔
1550
    else {
5,494✔
1551
        addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr;
5,494✔
1552
        addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id;
5,494✔
1553
        addr.m_is_ip_v6 = true;
5,494✔
1554
    }
5,494✔
1555
    return addr;
19,940✔
1556
}
19,940✔
1557

1558
inline Endpoint::port_type Endpoint::port() const
1559
{
27,880✔
1560
    return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port : m_sockaddr_union.m_ip_v6.sin6_port);
27,880✔
1561
}
27,880✔
1562

1563
inline Endpoint::data_type* Endpoint::data()
1564
{
2✔
1565
    return &m_sockaddr_union.m_base;
2✔
1566
}
2✔
1567

1568
inline const Endpoint::data_type* Endpoint::data() const
1569
{
×
1570
    return &m_sockaddr_union.m_base;
×
1571
}
×
1572

1573
inline Endpoint::Endpoint()
1574
    : Endpoint{StreamProtocol::ip_v4(), 0}
1575
{
40,804✔
1576
}
40,804✔
1577

1578
inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port)
1579
    : m_protocol{protocol}
1580
{
40,804✔
1581
    int family = m_protocol.family();
40,804✔
1582
    if (family == AF_INET) {
40,804✔
1583
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
40,804✔
1584
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
40,804✔
1585
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
40,804✔
1586
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
40,804✔
1587
    }
40,804✔
UNCOV
1588
    else if (family == AF_INET6) {
×
1589
        m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear
×
1590
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
×
1591
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
×
1592
    }
×
UNCOV
1593
    else {
×
UNCOV
1594
        m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear
×
UNCOV
1595
        m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC;
×
UNCOV
1596
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
×
UNCOV
1597
        m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY;
×
UNCOV
1598
    }
×
1599
}
40,804✔
1600

1601
inline Endpoint::Endpoint(const Address& addr, port_type port)
1602
{
1603
    if (addr.m_is_ip_v6) {
1604
        m_protocol = StreamProtocol::ip_v6();
1605
        m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6;
1606
        m_sockaddr_union.m_ip_v6.sin6_port = htons(port);
1607
        m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0;
1608
        m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6;
1609
        m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id;
1610
    }
1611
    else {
1612
        m_protocol = StreamProtocol::ip_v4();
1613
        m_sockaddr_union.m_ip_v4.sin_family = AF_INET;
1614
        m_sockaddr_union.m_ip_v4.sin_port = htons(port);
1615
        m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4;
1616
    }
1617
}
1618

1619
inline Endpoint::List::iterator Endpoint::List::begin() const noexcept
1620
{
14,628✔
1621
    return m_endpoints.data();
14,628✔
1622
}
14,628✔
1623

1624
inline Endpoint::List::iterator Endpoint::List::end() const noexcept
1625
{
7,960✔
1626
    return m_endpoints.data() + m_endpoints.size();
7,960✔
1627
}
7,960✔
1628

1629
inline std::size_t Endpoint::List::size() const noexcept
1630
{
10,010✔
1631
    return m_endpoints.size();
10,010✔
1632
}
10,010✔
1633

1634
inline bool Endpoint::List::empty() const noexcept
1635
{
3,342✔
1636
    return m_endpoints.size() == 0;
3,342✔
1637
}
3,342✔
1638

1639
// ---------------- Service::OperQueue ----------------
1640

1641
template <class Oper>
1642
inline bool Service::OperQueue<Oper>::empty() const noexcept
1643
{
11,217,014✔
1644
    return !m_back;
11,217,014✔
1645
}
11,217,014✔
1646

1647
template <class Oper>
1648
inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept
1649
{
7,638,524✔
1650
    REALM_ASSERT(!op->m_next);
7,638,524✔
1651
    if (m_back) {
7,638,524✔
1652
        op->m_next = m_back->m_next;
2,128,138✔
1653
        m_back->m_next = op.get();
2,128,138✔
1654
    }
2,128,138✔
1655
    else {
5,510,386✔
1656
        op->m_next = op.get();
5,510,386✔
1657
    }
5,510,386✔
1658
    m_back = op.release();
7,638,524✔
1659
}
7,638,524✔
1660

1661
template <class Oper>
1662
template <class Oper2>
1663
inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept
1664
{
3,428,698✔
1665
    if (!q.m_back)
3,428,698✔
1666
        return;
2,754,710✔
1667
    if (m_back)
673,988✔
1668
        std::swap(m_back->m_next, q.m_back->m_next);
16,674✔
1669
    m_back = q.m_back;
673,988✔
1670
    q.m_back = nullptr;
673,988✔
1671
}
673,988✔
1672

1673
template <class Oper>
1674
inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr
1675
{
12,705,192✔
1676
    Oper* op = nullptr;
12,705,192✔
1677
    if (m_back) {
12,705,192✔
1678
        op = static_cast<Oper*>(m_back->m_next);
7,635,688✔
1679
        if (op != m_back) {
7,635,688✔
1680
            m_back->m_next = op->m_next;
2,136,282✔
1681
        }
2,136,282✔
1682
        else {
5,499,406✔
1683
            m_back = nullptr;
5,499,406✔
1684
        }
5,499,406✔
1685
        op->m_next = nullptr;
7,635,688✔
1686
    }
7,635,688✔
1687
    return LendersOperPtr(op);
12,705,192✔
1688
}
12,705,192✔
1689

1690
template <class Oper>
1691
inline void Service::OperQueue<Oper>::clear() noexcept
1692
{
2,831,594✔
1693
    if (m_back) {
2,831,594✔
1694
        LendersOperPtr op(m_back);
8,432✔
1695
        while (op->m_next != m_back)
17,174✔
1696
            op.reset(static_cast<Oper*>(op->m_next));
8,742✔
1697
        m_back = nullptr;
8,432✔
1698
    }
8,432✔
1699
}
2,831,594✔
1700

1701
template <class Oper>
1702
inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept
1703
    : m_back{q.m_back}
1704
{
77,590✔
1705
    q.m_back = nullptr;
77,590✔
1706
}
77,590✔
1707

1708
// ---------------- Service::Descriptor ----------------
1709

1710
inline Service::Descriptor::Descriptor(Impl& s) noexcept
1711
    : service_impl{s}
1712
{
22,020✔
1713
}
22,020✔
1714

1715
inline Service::Descriptor::~Descriptor() noexcept
1716
{
22,020✔
1717
    if (is_open())
22,020✔
1718
        close();
×
1719
}
22,020✔
1720

1721
inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept
1722
{
14,060✔
1723
    REALM_ASSERT(!is_open());
14,060✔
1724
    m_fd = fd;
14,060✔
1725
    m_in_blocking_mode = in_blocking_mode;
14,060✔
1726
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
7,112✔
1727
    m_read_ready = false;
7,112✔
1728
    m_write_ready = false;
7,112✔
1729
    m_imminent_end_of_input = false;
7,112✔
1730
    m_is_registered = false;
7,112✔
1731
#endif
7,112✔
1732
}
14,060✔
1733

1734
inline void Service::Descriptor::close() noexcept
1735
{
14,060✔
1736
    REALM_ASSERT(is_open());
14,060✔
1737
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
7,112✔
1738
    if (m_is_registered)
7,112✔
1739
        deregister_for_async();
7,022✔
1740
    m_is_registered = false;
7,112✔
1741
#endif
7,112✔
1742
    do_close();
14,060✔
1743
}
14,060✔
1744

1745
inline auto Service::Descriptor::release() noexcept -> native_handle_type
1746
{
×
1747
    REALM_ASSERT(is_open());
×
1748
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
×
1749
    if (m_is_registered)
×
1750
        deregister_for_async();
×
1751
    m_is_registered = false;
×
1752
#endif
×
1753
    return do_release();
×
1754
}
×
1755

1756
inline bool Service::Descriptor::is_open() const noexcept
1757
{
114,664✔
1758
    return (m_fd != -1);
114,664✔
1759
}
114,664✔
1760

1761
inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type
1762
{
52,982✔
1763
    return m_fd;
52,982✔
1764
}
52,982✔
1765

1766
inline bool Service::Descriptor::in_blocking_mode() const noexcept
1767
{
1,112✔
1768
    return m_in_blocking_mode;
1,112✔
1769
}
1,112✔
1770

1771
template <class Oper, class... Args>
1772
inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op, Args&&... args)
1773
{
2,843,265✔
1774
    Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws
2,843,265✔
1775
    add_initiated_oper(std::move(op), want);                        // Throws
2,843,265✔
1776
}
2,843,265✔
1777

1778
inline void Service::Descriptor::ensure_blocking_mode()
1779
{
556,578✔
1780
    // Assuming that descriptors are either used mostly in blocking mode, or
277,782✔
1781
    // mostly in nonblocking mode.
277,782✔
1782
    if (REALM_UNLIKELY(!m_in_blocking_mode)) {
556,578✔
1783
        bool value = false;
144✔
1784
        set_nonblock_flag(value); // Throws
144✔
1785
        m_in_blocking_mode = true;
144✔
1786
    }
144✔
1787
}
556,578✔
1788

1789
inline void Service::Descriptor::ensure_nonblocking_mode()
1790
{
2,992,264✔
1791
    // Assuming that descriptors are either used mostly in blocking mode, or
1,535,618✔
1792
    // mostly in nonblocking mode.
1,535,618✔
1793
    if (REALM_UNLIKELY(m_in_blocking_mode)) {
2,992,264✔
1794
        bool value = true;
11,758✔
1795
        set_nonblock_flag(value); // Throws
11,758✔
1796
        m_in_blocking_mode = false;
11,758✔
1797
    }
11,758✔
1798
}
2,992,264✔
1799

1800
inline bool Service::Descriptor::assume_read_would_block() const noexcept
1801
{
2,144,416✔
1802
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,128,820✔
1803
    return !m_in_blocking_mode && !m_read_ready;
1,128,820✔
1804
#else
1805
    return false;
1,015,596✔
1806
#endif
1,015,596✔
1807
}
2,144,416✔
1808

1809
inline bool Service::Descriptor::assume_write_would_block() const noexcept
1810
{
1,506,964✔
1811
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
790,164✔
1812
    return !m_in_blocking_mode && !m_write_ready;
790,164✔
1813
#else
1814
    return false;
716,800✔
1815
#endif
716,800✔
1816
}
1,506,964✔
1817

1818
inline void Service::Descriptor::set_read_ready(bool value) noexcept
1819
{
2,149,506✔
1820
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
1,131,632✔
1821
    m_read_ready = value;
1,131,632✔
1822
#else
1823
    // No-op
1,017,874✔
1824
    static_cast<void>(value);
1,017,874✔
1825
#endif
1,017,874✔
1826
}
2,149,506✔
1827

1828
inline void Service::Descriptor::set_write_ready(bool value) noexcept
1829
{
1,401,414✔
1830
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
683,038✔
1831
    m_write_ready = value;
683,038✔
1832
#else
1833
    // No-op
718,376✔
1834
    static_cast<void>(value);
718,376✔
1835
#endif
718,376✔
1836
}
1,401,414✔
1837

1838
// ---------------- Service ----------------
1839

1840
class Service::AsyncOper {
1841
public:
1842
    bool in_use() const noexcept;
1843
    bool is_complete() const noexcept;
1844
    bool is_canceled() const noexcept;
1845
    void cancel() noexcept;
1846
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1847
    /// to this function or to recycle(). This function recycles the operation
1848
    /// object (commits suicide), even if it throws.
1849
    virtual void recycle_and_execute() = 0;
1850
    /// Every object of type \ref AsyncOper must be destroyed either by a call
1851
    /// to recycle_and_execute() or to this function. This function destroys the
1852
    /// object (commits suicide).
1853
    virtual void recycle() noexcept = 0;
1854
    /// Must be called when the owner dies, and the object is in use (not an
1855
    /// instance of UnusedOper).
1856
    virtual void orphan() noexcept = 0;
1857

1858
protected:
1859
    AsyncOper(std::size_t size, bool in_use) noexcept;
1860
    virtual ~AsyncOper() noexcept {}
10,058,630✔
1861
    void set_is_complete(bool value) noexcept;
1862
    template <class H, class... Args>
1863
    void do_recycle_and_execute(bool orphaned, H& handler, Args&&...);
1864
    void do_recycle(bool orphaned) noexcept;
1865

1866
private:
1867
    std::size_t m_size; // Allocated number of bytes
1868
    bool m_in_use = false;
1869
    // Set to true when the operation completes successfully or fails. If the
1870
    // operation is canceled before this happens, it will never be set to
1871
    // true. Always false when not in use
1872
    bool m_complete = false;
1873
    // Set to true when the operation is canceled. Always false when not in use.
1874
    bool m_canceled = false;
1875
    AsyncOper* m_next = nullptr; // Always null when not in use
1876
    template <class H, class... Args>
1877
    void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...);
1878
    friend class Service;
1879
};
1880

1881
class Service::ResolveOperBase : public AsyncOper {
1882
public:
1883
    ResolveOperBase(std::size_t size, Resolver& resolver, Resolver::Query query) noexcept
1884
        : AsyncOper{size, true}
1885
        , m_resolver{&resolver}
1886
        , m_query{std::move(query)}
1887
    {
3,354✔
1888
    }
3,354✔
1889
    void complete() noexcept
1890
    {
3,348✔
1891
        set_is_complete(true);
3,348✔
1892
    }
3,348✔
1893
    void recycle() noexcept override final
1894
    {
2✔
1895
        bool orphaned = !m_resolver;
2✔
1896
        REALM_ASSERT(orphaned);
2!
1897
        // Note: do_recycle() commits suicide.
2✔
1898
        do_recycle(orphaned);
2✔
1899
    }
2✔
1900
    void orphan() noexcept override final
1901
    {
4✔
1902
        m_resolver = nullptr;
4✔
1903
    }
4✔
1904

1905
protected:
1906
    Resolver* m_resolver;
1907
    Resolver::Query m_query;
1908
    Endpoint::List m_endpoints;
1909
    std::error_code m_error_code;
1910
    friend class Service;
1911
};
1912

1913
class Service::WaitOperBase : public AsyncOper {
1914
public:
1915
    WaitOperBase(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time) noexcept
1916
        : AsyncOper{size, true}
1917
        , // Second argument is `in_use`
1918
        m_timer{&timer}
1919
        , m_expiration_time{expiration_time}
1920
    {
533,276✔
1921
    }
533,276✔
1922
    void complete() noexcept
1923
    {
513,142✔
1924
        set_is_complete(true);
513,142✔
1925
    }
513,142✔
1926
    void recycle() noexcept override final
1927
    {
7,952✔
1928
        bool orphaned = !m_timer;
7,952✔
1929
        REALM_ASSERT(orphaned);
7,952✔
1930
        // Note: do_recycle() commits suicide.
3,922✔
1931
        do_recycle(orphaned);
7,952✔
1932
    }
7,952✔
1933
    void orphan() noexcept override final
1934
    {
20,314✔
1935
        m_timer = nullptr;
20,314✔
1936
    }
20,314✔
1937

1938
protected:
1939
    DeadlineTimer* m_timer;
1940
    clock::time_point m_expiration_time;
1941
    friend class Service;
1942
};
1943

1944
class Service::TriggerExecOperBase : public AsyncOper, public util::AtomicRefCountBase {
1945
public:
1946
    TriggerExecOperBase(Impl& service) noexcept
1947
        : AsyncOper{0, false}
1948
        , // First arg is `size` (unused), second arg is `in_use`
1949
        m_service{&service}
1950
    {
×
1951
    }
×
1952
    void recycle() noexcept override final
1953
    {
×
1954
        REALM_ASSERT(in_use());
×
1955
        REALM_ASSERT(!m_service);
×
1956
        // Note: Potential suicide when `self` goes out of scope
×
1957
        util::bind_ptr<TriggerExecOperBase> self{this, util::bind_ptr_base::adopt_tag{}};
×
1958
    }
×
1959
    void orphan() noexcept override final
1960
    {
×
1961
        REALM_ASSERT(m_service);
×
1962
        m_service = nullptr;
×
1963
    }
×
1964
    void trigger() noexcept
1965
    {
×
1966
        REALM_ASSERT(m_service);
×
1967
        Service::trigger_exec(*m_service, *this);
×
1968
    }
×
1969

1970
protected:
1971
    Impl* m_service;
1972
};
1973

1974
class Service::PostOperBase : public AsyncOper {
1975
public:
1976
    PostOperBase(std::size_t size, Impl& service) noexcept
1977
        : AsyncOper{size, true}
1978
        , // Second argument is `in_use`
1979
        m_service{service}
1980
    {
1,129,428✔
1981
    }
1,129,428✔
1982
    void recycle() noexcept override final
1983
    {
386✔
1984
        // Service::recycle_post_oper() destroys this operation object
230✔
1985
        Service::recycle_post_oper(m_service, this);
386✔
1986
    }
386✔
1987
    void orphan() noexcept override final
1988
    {
×
1989
        REALM_ASSERT(false); // Never called
×
1990
    }
×
1991

1992
protected:
1993
    Impl& m_service;
1994
};
1995

1996
template <class H>
1997
class Service::PostOper : public PostOperBase {
1998
public:
1999
    PostOper(std::size_t size, Impl& service, H&& handler)
2000
        : PostOperBase{size, service}
2001
        , m_handler{std::move(handler)}
2002
    {
707,928✔
2003
    }
707,928✔
2004
    void recycle_and_execute() override final
2005
    {
707,554✔
2006
        // Recycle the operation object before the handler is exceuted, such
354,102✔
2007
        // that the memory is available for a new post operation that might be
354,102✔
2008
        // initiated during the execution of the handler.
354,102✔
2009
        bool was_recycled = false;
707,554✔
2010
        try {
707,554✔
2011
            H handler = std::move(m_handler); // Throws
707,554✔
2012
            // Service::recycle_post_oper() destroys this operation object
354,102✔
2013
            Service::recycle_post_oper(m_service, this);
707,554✔
2014
            was_recycled = true;
707,554✔
2015
            handler(Status::OK()); // Throws
707,554✔
2016
        }
707,554✔
2017
        catch (...) {
354,106✔
2018
            if (!was_recycled) {
8!
2019
                // Service::recycle_post_oper() destroys this operation object
2020
                Service::recycle_post_oper(m_service, this);
×
2021
            }
×
2022
            throw;
8✔
2023
        }
8✔
2024
    }
707,554✔
2025

2026
private:
2027
    H m_handler;
2028
};
2029

2030
class Service::IoOper : public AsyncOper {
2031
public:
2032
    IoOper(std::size_t size) noexcept
2033
        : AsyncOper{size, true} // Second argument is `in_use`
2034
    {
3,416,674✔
2035
    }
3,416,674✔
2036
    virtual Descriptor& descriptor() noexcept = 0;
2037
    /// Advance this operation and figure out out whether it needs to read from,
2038
    /// or write to the underlying descriptor to advance further. This function
2039
    /// must return Want::read if the operation needs to read, or Want::write if
2040
    /// the operation needs to write to advance further. If the operation
2041
    /// completes (due to success or failure), this function must return
2042
    /// Want::nothing.
2043
    virtual Want advance() noexcept = 0;
2044
};
2045

2046
class Service::UnusedOper : public AsyncOper {
2047
public:
2048
    UnusedOper(std::size_t size) noexcept
2049
        : AsyncOper{size, false} // Second argument is `in_use`
2050
    {
5,034,468✔
2051
    }
5,034,468✔
2052
    void recycle_and_execute() override final
2053
    {
×
2054
        // Must never be called
2055
        REALM_ASSERT(false);
×
2056
    }
×
2057
    void recycle() noexcept override final
2058
    {
×
2059
        // Must never be called
2060
        REALM_ASSERT(false);
×
2061
    }
×
2062
    void orphan() noexcept override final
2063
    {
×
2064
        // Must never be called
2065
        REALM_ASSERT(false);
×
2066
    }
×
2067
};
2068

2069
// `S` must be a stream class with the following member functions:
2070
//
2071
//    Socket& lowest_layer() noexcept;
2072
//
2073
//    void do_init_read_async(std::error_code& ec, Want& want) noexcept;
2074
//    void do_init_write_async(std::error_code& ec, Want& want) noexcept;
2075
//
2076
//    std::size_t do_read_some_sync(char* buffer, std::size_t size,
2077
//                                  std::error_code& ec) noexcept;
2078
//    std::size_t do_write_some_sync(const char* data, std::size_t size,
2079
//                                   std::error_code& ec) noexcept;
2080
//    std::size_t do_read_some_async(char* buffer, std::size_t size,
2081
//                                   std::error_code& ec, Want& want) noexcept;
2082
//    std::size_t do_write_some_async(const char* data, std::size_t size,
2083
//                                    std::error_code& ec, Want& want) noexcept;
2084
//
2085
// If an error occurs during any of these 6 functions, the `ec` argument must be
2086
// set accordingly. Otherwise the `ec` argument must be set to
2087
// `std::error_code()`.
2088
//
2089
// The do_init_*_async() functions must update the `want` argument to indicate
2090
// how the operation must be initiated:
2091
//
2092
//    Want::read      Wait for read readiness, then call do_*_some_async().
2093
//    Want::write     Wait for write readiness, then call do_*_some_async().
2094
//    Want::nothing   Call do_*_some_async() immediately without waiting for
2095
//                    read or write readiness.
2096
//
2097
// If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to
2098
// MiscExtErrors::end_of_input, and return zero.
2099
//
2100
// If an error occurs during reading or writing, do_*_some_sync() must set `ec`
2101
// accordingly (to something other than `std::system_error()`) and return
2102
// zero. Otherwise they must set `ec` to `std::system_error()` and return the
2103
// number of bytes read or written, which **must** be at least 1. If the
2104
// underlying socket is in nonblocking mode, and no bytes could be immediately
2105
// read or written, these functions must fail with
2106
// `error::resource_unavailable_try_again`.
2107
//
2108
// If an error occurs during reading or writing, do_*_some_async() must set `ec`
2109
// accordingly (to something other than `std::system_error()`), `want` to
2110
// `Want::nothing`, and return zero. Otherwise they must set `ec` to
2111
// `std::system_error()` and return the number of bytes read or written, which
2112
// must be zero if no bytes could be immediately read or written. Note, in this
2113
// case it is not an error if the underlying socket is in nonblocking mode, and
2114
// no bytes could be immediately read or written. When these functions succeed,
2115
// but return zero because no bytes could be immediately read or written, they
2116
// must set `want` to something other than `Want::nothing`.
2117
//
2118
// If no error occurs, do_*_some_async() must set `want` to indicate how the
2119
// operation should proceed if additional data needs to be read or written, or
2120
// if no bytes were transferred:
2121
//
2122
//    Want::read      Wait for read readiness, then call do_*_some_async() again.
2123
//    Want::write     Wait for write readiness, then call do_*_some_async() again.
2124
//    Want::nothing   Call do_*_some_async() again without waiting for read or
2125
//                    write readiness.
2126
//
2127
// NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it
2128
// means that the stream needs to write data to the underlying TCP socket before
2129
// it is able to deliver any additional data to the caller. While such a
2130
// situation will never occur on a raw TCP socket, it can occur on an SSL stream
2131
// (Secure Socket Layer).
2132
//
2133
// When do_*_some_async() returns `n`, at least one of the following conditions
2134
// must be true:
2135
//
2136
//    n > 0                     Bytes were transferred.
2137
//    ec != std::error_code()   An error occured.
2138
//    want != Want::nothing     Wait for read/write readiness.
2139
//
2140
// This is of critical importance, as it is the only way we can avoid falling
2141
// into a busy loop of repeated invocations of do_*_some_async().
2142
//
2143
// NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or
2144
// `Want::write`, even when they succesfully transfer a nonzero number of bytes.
2145
template <class S>
2146
class Service::BasicStreamOps {
2147
public:
2148
    class StreamOper;
2149
    class ReadOperBase;
2150
    class WriteOperBase;
2151
    class BufferedReadOperBase;
2152
    template <class H>
2153
    class ReadOper;
2154
    template <class H>
2155
    class WriteOper;
2156
    template <class H>
2157
    class BufferedReadOper;
2158

2159
    using LendersReadOperPtr = std::unique_ptr<ReadOperBase, LendersOperDeleter>;
2160
    using LendersWriteOperPtr = std::unique_ptr<WriteOperBase, LendersOperDeleter>;
2161
    using LendersBufferedReadOperPtr = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>;
2162

2163
    // Synchronous read
2164
    static std::size_t read(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2165
    {
4✔
2166
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
4✔
2167
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
4✔
2168
        char* begin = buffer;
4✔
2169
        char* end = buffer + size;
4✔
2170
        char* curr = begin;
4✔
2171
        for (;;) {
8✔
2172
            if (curr == end) {
8✔
2173
                ec = std::error_code(); // Success
×
2174
                break;
×
2175
            }
×
2176
            char* buffer_2 = curr;
8✔
2177
            std::size_t size_2 = std::size_t(end - curr);
8✔
2178
            std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec);
8✔
2179
            if (REALM_UNLIKELY(ec))
8✔
2180
                break;
6✔
2181
            REALM_ASSERT(n > 0);
4✔
2182
            REALM_ASSERT(n <= size_2);
4✔
2183
            curr += n;
4✔
2184
        }
4✔
2185
        std::size_t n = std::size_t(curr - begin);
4✔
2186
        return n;
4✔
2187
    }
4✔
2188

2189
    // Synchronous write
2190
    static std::size_t write(S& stream, const char* data, std::size_t size, std::error_code& ec)
2191
    {
261,764✔
2192
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
261,764✔
2193
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
261,764✔
2194
        const char* begin = data;
261,764✔
2195
        const char* end = data + size;
261,764✔
2196
        const char* curr = begin;
261,764✔
2197
        for (;;) {
523,523✔
2198
            if (curr == end) {
523,523✔
2199
                ec = std::error_code(); // Success
261,759✔
2200
                break;
261,759✔
2201
            }
261,759✔
2202
            const char* data_2 = curr;
261,764✔
2203
            std::size_t size_2 = std::size_t(end - curr);
261,764✔
2204
            std::size_t n = stream.do_write_some_sync(data_2, size_2, ec);
261,764✔
2205
            if (REALM_UNLIKELY(ec))
261,764✔
2206
                break;
130,633✔
2207
            REALM_ASSERT(n > 0);
261,760✔
2208
            REALM_ASSERT(n <= size_2);
261,760✔
2209
            curr += n;
261,760✔
2210
        }
261,760✔
2211
        std::size_t n = std::size_t(curr - begin);
261,764✔
2212
        return n;
261,764✔
2213
    }
261,764✔
2214

2215
    // Synchronous read
2216
    static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2217
                                     std::error_code& ec)
2218
    {
16,410✔
2219
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
16,410!
2220
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
16,410✔
2221
        char* begin = buffer;
16,410✔
2222
        char* end = buffer + size;
16,410✔
2223
        char* curr = begin;
16,410✔
2224
        for (;;) {
147,502✔
2225
            bool complete = rab.read(curr, end, delim, ec);
147,502✔
2226
            if (complete)
147,502✔
2227
                break;
16,402✔
2228

65,552✔
2229
            rab.refill_sync(stream, ec);
131,100✔
2230
            if (REALM_UNLIKELY(ec))
131,100✔
2231
                break;
65,556✔
2232
        }
131,100✔
2233
        std::size_t n = (curr - begin);
16,410✔
2234
        return n;
16,410✔
2235
    }
16,410✔
2236

2237
    // Synchronous read
2238
    static std::size_t read_some(S& stream, char* buffer, std::size_t size, std::error_code& ec)
2239
    {
20✔
2240
        REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use());
20✔
2241
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
20✔
2242
        return stream.do_read_some_sync(buffer, size, ec);
20✔
2243
    }
20✔
2244

2245
    // Synchronous write
2246
    static std::size_t write_some(S& stream, const char* data, std::size_t size, std::error_code& ec)
2247
    {
23✔
2248
        REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use());
23✔
2249
        stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws
23✔
2250
        return stream.do_write_some_sync(data, size, ec);
23✔
2251
    }
23✔
2252

2253
    template <class H>
2254
    static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H&& handler)
2255
    {
582,094✔
2256
        char* begin = buffer;
582,094✔
2257
        char* end = buffer + size;
582,094✔
2258
        LendersReadOperPtr op = Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some,
582,094✔
2259
                                                            begin, end, std::move(handler)); // Throws
582,094✔
2260
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                           // Throws
582,094✔
2261
    }
582,094✔
2262

2263
    template <class H>
2264
    static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some, H&& handler)
2265
    {
743,952✔
2266
        const char* begin = data;
743,952✔
2267
        const char* end = data + size;
743,952✔
2268
        LendersWriteOperPtr op = Service::alloc<WriteOper<H>>(
743,952✔
2269
            stream.lowest_layer().m_write_oper, stream, is_write_some, begin, end, std::move(handler)); // Throws
743,952✔
2270
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));                                      // Throws
743,952✔
2271
    }
743,952✔
2272

2273
    template <class H>
2274
    static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab,
2275
                                    H&& handler)
2276
    {
674,908✔
2277
        char* begin = buffer;
674,908✔
2278
        char* end = buffer + size;
674,908✔
2279
        LendersBufferedReadOperPtr op =
674,908✔
2280
            Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream, begin, end, delim, rab,
674,908✔
2281
                                                std::move(handler)); // Throws
674,908✔
2282
        stream.lowest_layer().m_desc.initiate_oper(std::move(op));   // Throws
674,908✔
2283
    }
674,908✔
2284
};
2285

2286
template <class S>
2287
class Service::BasicStreamOps<S>::StreamOper : public IoOper {
2288
public:
2289
    StreamOper(std::size_t size, S& stream) noexcept
2290
        : IoOper{size}
2291
        , m_stream{&stream}
2292
    {
3,405,076✔
2293
    }
3,405,076✔
2294
    void recycle() noexcept override final
2295
    {
878✔
2296
        bool orphaned = !m_stream;
878✔
2297
        REALM_ASSERT(orphaned);
878✔
2298
        // Note: do_recycle() commits suicide.
356✔
2299
        do_recycle(orphaned);
878✔
2300
    }
878✔
2301
    void orphan() noexcept override final
2302
    {
4,610✔
2303
        m_stream = nullptr;
4,610✔
2304
    }
4,610✔
2305
    Descriptor& descriptor() noexcept override final
2306
    {
210,312✔
2307
        return m_stream->lowest_layer().m_desc;
210,312✔
2308
    }
210,312✔
2309

2310
protected:
2311
    S* m_stream;
2312
    std::error_code m_error_code;
2313
};
2314

2315
template <class S>
2316
class Service::BasicStreamOps<S>::ReadOperBase : public StreamOper {
2317
public:
2318
    ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept
2319
        : StreamOper{size, stream}
2320
        , m_is_read_some{is_read_some}
2321
        , m_begin{begin}
2322
        , m_end{end}
2323
    {
581,936✔
2324
    }
581,936✔
2325
    Want initiate()
2326
    {
581,956✔
2327
        auto& s = *this;
581,956✔
2328
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
581,956✔
2329
        REALM_ASSERT(!s.is_complete());
581,956✔
2330
        REALM_ASSERT(s.m_curr <= s.m_end);
581,956✔
2331
        Want want = Want::nothing;
581,956✔
2332
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
581,956✔
2333
            s.set_is_complete(true); // Success
×
2334
        }
×
2335
        else {
581,956✔
2336
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
581,956✔
2337
            s.m_stream->do_init_read_async(s.m_error_code, want);
581,956✔
2338
            if (want == Want::nothing) {
581,956✔
2339
                if (REALM_UNLIKELY(s.m_error_code)) {
236,614!
2340
                    s.set_is_complete(true); // Failure
×
2341
                }
×
2342
                else {
236,614✔
2343
                    want = advance();
236,614✔
2344
                }
236,614✔
2345
            }
236,614✔
2346
        }
581,956✔
2347
        return want;
581,956✔
2348
    }
581,956✔
2349
    Want advance() noexcept override final
2350
    {
375,709✔
2351
        auto& s = *this;
375,709✔
2352
        REALM_ASSERT(!s.is_complete());
375,709✔
2353
        REALM_ASSERT(!s.is_canceled());
375,709✔
2354
        REALM_ASSERT(!s.m_error_code);
375,709✔
2355
        REALM_ASSERT(s.m_curr < s.m_end);
375,709✔
2356
        REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin);
375,709✔
2357
        for (;;) {
375,705✔
2358
            // Read into callers buffer
196,197✔
2359
            char* buffer = s.m_curr;
375,675✔
2360
            std::size_t size = std::size_t(s.m_end - s.m_curr);
375,675✔
2361
            Want want = Want::nothing;
375,675✔
2362
            std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want);
375,675✔
2363
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
375,675✔
2364
            // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
196,197✔
2365
            bool got_nothing = (n == 0);
375,675✔
2366
            if (got_nothing) {
375,675✔
2367
                if (REALM_UNLIKELY(s.m_error_code)) {
3,397✔
2368
                    s.set_is_complete(true); // Failure
2✔
2369
                    return Want::nothing;
2✔
2370
                }
2✔
2371
                // Got nothing, but want something
3✔
2372
                return want;
3,395✔
2373
            }
3,395✔
2374
            REALM_ASSERT(!s.m_error_code);
372,278✔
2375
            // Check for completion
196,193✔
2376
            REALM_ASSERT(n <= size);
372,278✔
2377
            s.m_curr += n;
372,278✔
2378
            if (s.m_is_read_some || s.m_curr == s.m_end) {
372,534!
2379
                s.set_is_complete(true); // Success
372,534✔
2380
                return Want::nothing;
372,534✔
2381
            }
372,534✔
2382
            if (want != Want::nothing)
4,294,967,294✔
2383
                return want;
×
2384
            REALM_ASSERT(n < size);
4,294,967,294✔
2385
        }
4,294,967,294✔
2386
    }
375,709✔
2387

2388
protected:
2389
    const bool m_is_read_some;
2390
    char* const m_begin;    // May be dangling after cancellation
2391
    char* const m_end;      // May be dangling after cancellation
2392
    char* m_curr = m_begin; // May be dangling after cancellation
2393
};
2394

2395
template <class S>
2396
class Service::BasicStreamOps<S>::WriteOperBase : public StreamOper {
2397
public:
2398
    WriteOperBase(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end) noexcept
2399
        : StreamOper{size, stream}
2400
        , m_is_write_some{is_write_some}
2401
        , m_begin{begin}
2402
        , m_end{end}
2403
    {
1,326,450✔
2404
    }
1,326,450✔
2405
    Want initiate()
2406
    {
1,325,758✔
2407
        auto& s = *this;
1,325,758✔
2408
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get());
1,325,758✔
2409
        REALM_ASSERT(!s.is_complete());
1,325,758✔
2410
        REALM_ASSERT(s.m_curr <= s.m_end);
1,325,758✔
2411
        Want want = Want::nothing;
1,325,758✔
2412
        if (REALM_UNLIKELY(s.m_curr == s.m_end)) {
1,325,758✔
2413
            s.set_is_complete(true); // Success
44,066✔
2414
        }
44,066✔
2415
        else {
1,281,692✔
2416
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
1,281,692✔
2417
            s.m_stream->do_init_write_async(s.m_error_code, want);
1,281,692✔
2418
            if (want == Want::nothing) {
1,281,692✔
2419
                if (REALM_UNLIKELY(s.m_error_code)) {
307,962!
2420
                    s.set_is_complete(true); // Failure
×
2421
                }
×
2422
                else {
307,962✔
2423
                    want = advance();
307,962✔
2424
                }
307,962✔
2425
            }
307,962✔
2426
        }
1,281,692✔
2427
        return want;
1,325,758✔
2428
    }
1,325,758✔
2429
    Want advance() noexcept override final
2430
    {
873,704✔
2431
        auto& s = *this;
873,704✔
2432
        REALM_ASSERT(!s.is_complete());
873,704✔
2433
        REALM_ASSERT(!s.is_canceled());
873,704✔
2434
        REALM_ASSERT(!s.m_error_code);
873,704✔
2435
        REALM_ASSERT(s.m_curr < s.m_end);
873,704✔
2436
        REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin);
873,704✔
2437
        for (;;) {
873,708✔
2438
            // Write from callers buffer
454,510✔
2439
            const char* data = s.m_curr;
873,556✔
2440
            std::size_t size = std::size_t(s.m_end - s.m_curr);
873,556✔
2441
            Want want = Want::nothing;
873,556✔
2442
            std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want);
873,556✔
2443
            REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please
873,556✔
2444
            bool wrote_nothing = (n == 0);
873,556✔
2445
            if (wrote_nothing) {
873,556✔
2446
                if (REALM_UNLIKELY(s.m_error_code)) {
7,722✔
2447
                    s.set_is_complete(true); // Failure
138✔
2448
                    return Want::nothing;
138✔
2449
                }
138✔
2450
                // Wrote nothing, but want something written
420✔
2451
                return want;
7,584✔
2452
            }
7,584✔
2453
            REALM_ASSERT(!s.m_error_code);
865,834✔
2454
            // Check for completion
454,048✔
2455
            REALM_ASSERT(n <= size);
865,834✔
2456
            s.m_curr += n;
865,834✔
2457
            if (s.m_is_write_some || s.m_curr == s.m_end) {
866,438✔
2458
                s.set_is_complete(true); // Success
865,748✔
2459
                return Want::nothing;
865,748✔
2460
            }
865,748✔
2461
            if (want != Want::nothing)
2,147,484,337✔
2462
                return want;
678✔
2463
            REALM_ASSERT(n < size);
2,147,483,677✔
2464
        }
2,147,483,677✔
2465
    }
873,704✔
2466

2467
protected:
2468
    const bool m_is_write_some;
2469
    const char* const m_begin;    // May be dangling after cancellation
2470
    const char* const m_end;      // May be dangling after cancellation
2471
    const char* m_curr = m_begin; // May be dangling after cancellation
2472
};
2473

2474
template <class S>
2475
class Service::BasicStreamOps<S>::BufferedReadOperBase : public StreamOper {
2476
public:
2477
    BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim,
2478
                         ReadAheadBuffer& rab) noexcept
2479
        : StreamOper{size, stream}
2480
        , m_read_ahead_buffer{rab}
2481
        , m_begin{begin}
2482
        , m_end{end}
2483
        , m_delim{delim}
2484
    {
921,880✔
2485
    }
921,880✔
2486
    Want initiate()
2487
    {
921,862✔
2488
        auto& s = *this;
921,862✔
2489
        REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get());
921,862✔
2490
        REALM_ASSERT(!s.is_complete());
921,862✔
2491
        Want want = Want::nothing;
921,862✔
2492
        bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
921,862✔
2493
        if (complete) {
921,862✔
2494
            s.set_is_complete(true); // Success or failure
380,540✔
2495
        }
380,540✔
2496
        else {
541,322✔
2497
            s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws
541,322✔
2498
            s.m_stream->do_init_read_async(s.m_error_code, want);
541,322✔
2499
            if (want == Want::nothing) {
541,322✔
2500
                if (REALM_UNLIKELY(s.m_error_code)) {
192!
2501
                    s.set_is_complete(true); // Failure
×
2502
                }
×
2503
                else {
192✔
2504
                    want = advance();
192✔
2505
                }
192✔
2506
            }
192✔
2507
        }
541,322✔
2508
        return want;
921,862✔
2509
    }
921,862✔
2510
    Want advance() noexcept override final
2511
    {
859,610✔
2512
        auto& s = *this;
859,610✔
2513
        REALM_ASSERT(!s.is_complete());
859,610✔
2514
        REALM_ASSERT(!s.is_canceled());
859,610✔
2515
        REALM_ASSERT(!s.m_error_code);
859,610✔
2516
        REALM_ASSERT(s.m_read_ahead_buffer.empty());
859,610✔
2517
        REALM_ASSERT(s.m_curr < s.m_end);
859,610✔
2518
        for (;;) {
859,548✔
2519
            // Fill read-ahead buffer from stream (is empty now)
399,664✔
2520
            Want want = Want::nothing;
859,548✔
2521
            bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want);
859,548✔
2522
            REALM_ASSERT(nonempty || s.m_error_code || want != Want::nothing); // No busy loop, please
859,548✔
2523
            bool got_nothing = !nonempty;
859,548✔
2524
            if (got_nothing) {
859,548✔
2525
                if (REALM_UNLIKELY(s.m_error_code)) {
52,648✔
2526
                    s.set_is_complete(true); // Failure
1,260✔
2527
                    return Want::nothing;
1,260✔
2528
                }
1,260✔
2529
                // Got nothing, but want something
64✔
2530
                return want;
51,388✔
2531
            }
51,388✔
2532
            // Transfer buffered data to callers buffer
398,930✔
2533
            bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code);
806,900✔
2534
            if (complete || s.m_error_code == util::MiscExtErrors::end_of_input) {
806,900✔
2535
                s.set_is_complete(true); // Success or failure (delim_not_found or end_of_input)
512,256✔
2536
                return Want::nothing;
512,256✔
2537
            }
512,256✔
2538
            if (want != Want::nothing)
294,644✔
2539
                return want;
294,958✔
2540
        }
294,644✔
2541
    }
859,610✔
2542

2543
protected:
2544
    ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation
2545
    char* const m_begin;                  // May be dangling after cancellation
2546
    char* const m_end;                    // May be dangling after cancellation
2547
    char* m_curr = m_begin;               // May be dangling after cancellation
2548
    const int m_delim;
2549
};
2550

2551
template <class S>
2552
template <class H>
2553
class Service::BasicStreamOps<S>::ReadOper : public ReadOperBase {
2554
public:
2555
    ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H&& handler)
2556
        : ReadOperBase{size, stream, is_read_some, begin, end}
2557
        , m_handler{std::move(handler)}
2558
    {
581,939✔
2559
    }
581,939✔
2560
    void recycle_and_execute() override final
2561
    {
581,473✔
2562
        auto& s = *this;
581,473✔
2563
        REALM_ASSERT(s.is_complete() || s.is_canceled());
581,473!
2564
        REALM_ASSERT(s.is_complete() ==
581,473!
2565
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_read_some && s.m_curr != s.m_begin)));
581,473✔
2566
        REALM_ASSERT(s.m_curr >= s.m_begin);
581,473✔
2567
        bool orphaned = !s.m_stream;
581,473✔
2568
        std::error_code ec = s.m_error_code;
581,473✔
2569
        if (s.is_canceled())
581,473✔
2570
            ec = util::error::operation_aborted;
209,672✔
2571
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
581,473✔
2572
        // Note: do_recycle_and_execute() commits suicide.
301,239✔
2573
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
581,473✔
2574
                                             num_bytes_transferred); // Throws
581,473✔
2575
    }
581,473✔
2576

2577
private:
2578
    H m_handler;
2579
};
2580

2581
template <class S>
2582
template <class H>
2583
class Service::BasicStreamOps<S>::WriteOper : public WriteOperBase {
2584
public:
2585
    WriteOper(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end, H&& handler)
2586
        : WriteOperBase{size, stream, is_write_some, begin, end}
2587
        , m_handler{std::move(handler)}
2588
    {
743,782✔
2589
    }
743,782✔
2590
    void recycle_and_execute() override final
2591
    {
743,563✔
2592
        auto& s = *this;
743,563✔
2593
        REALM_ASSERT(s.is_complete() || s.is_canceled());
743,563!
2594
        REALM_ASSERT(s.is_complete() ==
743,563!
2595
                     (s.m_error_code || s.m_curr == s.m_end || (s.m_is_write_some && s.m_curr != s.m_begin)));
743,563✔
2596
        REALM_ASSERT(s.m_curr >= s.m_begin);
743,563!
2597
        bool orphaned = !s.m_stream;
743,563✔
2598
        std::error_code ec = s.m_error_code;
743,563✔
2599
        if (s.is_canceled())
743,563!
2600
            ec = util::error::operation_aborted;
209,703✔
2601
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
743,563✔
2602
        // Note: do_recycle_and_execute() commits suicide.
383,772✔
2603
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
743,563✔
2604
                                             num_bytes_transferred); // Throws
743,563✔
2605
    }
743,563✔
2606

2607
private:
2608
    H m_handler;
2609
};
2610

2611
template <class S>
2612
template <class H>
2613
class Service::BasicStreamOps<S>::BufferedReadOper : public BufferedReadOperBase {
2614
public:
2615
    BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim, ReadAheadBuffer& rab,
2616
                     H&& handler)
2617
        : BufferedReadOperBase{size, stream, begin, end, delim, rab}
2618
        , m_handler{std::move(handler)}
2619
    {
674,792✔
2620
    }
674,792✔
2621
    void recycle_and_execute() override final
2622
    {
673,718✔
2623
        auto& s = *this;
673,718✔
2624
        REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code));
673,718!
2625
        REALM_ASSERT(s.is_canceled() || s.m_error_code ||
673,718!
2626
                     (s.m_delim != std::char_traits<char>::eof()
673,718✔
2627
                          ? s.m_curr > s.m_begin && s.m_curr[-1] == std::char_traits<char>::to_char_type(s.m_delim)
673,718✔
2628
                          : s.m_curr == s.m_end));
673,718✔
2629
        REALM_ASSERT(s.m_curr >= s.m_begin);
673,718!
2630
        bool orphaned = !s.m_stream;
673,718✔
2631
        std::error_code ec = s.m_error_code;
673,718✔
2632
        if (s.is_canceled())
673,718!
2633
            ec = util::error::operation_aborted;
14,691✔
2634
        std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin);
673,718✔
2635
        // Note: do_recycle_and_execute() commits suicide.
341,404✔
2636
        s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec,
673,718✔
2637
                                             num_bytes_transferred); // Throws
673,718✔
2638
    }
673,718✔
2639

2640
private:
2641
    H m_handler;
2642
};
2643

2644
template <class H>
2645
inline void Service::post(H handler)
2646
{
707,734✔
2647
    do_post(&Service::post_oper_constr<H>, sizeof(PostOper<H>), &handler);
707,734✔
2648
}
707,734✔
2649

2650
inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept
2651
{
772,066✔
2652
    if (op->in_use()) {
772,066✔
2653
        op->orphan();
32,894✔
2654
    }
32,894✔
2655
    else {
739,172✔
2656
        void* addr = op;
739,172✔
2657
        op->~AsyncOper();
739,172✔
2658
        delete[] static_cast<char*>(addr);
739,172✔
2659
    }
739,172✔
2660
}
772,066✔
2661

2662
inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept
2663
{
17,174✔
2664
    op->recycle(); // Suicide
17,174✔
2665
}
17,174✔
2666

2667
template <class Oper, class... Args>
2668
std::unique_ptr<Oper, Service::LendersOperDeleter> Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args)
2669
{
2,296,227✔
2670
    void* addr = owners_ptr.get();
2,296,227✔
2671
    std::size_t size;
2,296,227✔
2672
    if (REALM_LIKELY(addr)) {
2,296,227!
2673
        REALM_ASSERT(!owners_ptr->in_use());
2,249,228!
2674
        size = owners_ptr->m_size;
2,249,228✔
2675
        // We can use static dispatch in the destructor call here, since an
1,154,291✔
2676
        // object, that is not in use, is always an instance of UnusedOper.
1,154,291✔
2677
        REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get()));
2,249,228!
2678
        static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper();
2,249,228✔
2679
        if (REALM_UNLIKELY(size < sizeof(Oper))) {
2,249,228!
2680
            owners_ptr.release();
3,386✔
2681
            delete[] static_cast<char*>(addr);
3,386✔
2682
            goto no_object;
3,386✔
2683
        }
3,386✔
2684
    }
46,999✔
2685
    else {
46,999✔
2686
    no_object:
50,184✔
2687
        addr = new char[sizeof(Oper)]; // Throws
50,184✔
2688
        size = sizeof(Oper);
50,184✔
2689
        owners_ptr.reset(static_cast<AsyncOper*>(addr));
50,184✔
2690
    }
50,184✔
2691
    std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr;
2,296,171✔
2692
    try {
2,296,026✔
2693
        lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws
2,296,026✔
2694
    }
2,296,026✔
2695
    catch (...) {
1,177,428✔
2696
        new (addr) UnusedOper(size); // Does not throw
×
2697
        throw;
×
2698
    }
×
2699
    return lenders_ptr;
2,296,542✔
2700
}
2,296,542✔
2701

2702
template <class H>
2703
inline Service::PostOperBase* Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie)
2704
{
707,922✔
2705
    H& handler = *static_cast<H*>(cookie);
707,922✔
2706
    return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws
707,922✔
2707
}
707,922✔
2708

2709
inline bool Service::AsyncOper::in_use() const noexcept
2710
{
10,562,794✔
2711
    return m_in_use;
10,562,794✔
2712
}
10,562,794✔
2713

2714
inline bool Service::AsyncOper::is_complete() const noexcept
2715
{
17,181,494✔
2716
    return m_complete;
17,181,494✔
2717
}
17,181,494✔
2718

2719
inline void Service::AsyncOper::cancel() noexcept
2720
{
892,928✔
2721
    REALM_ASSERT(m_in_use);
892,928✔
2722
    REALM_ASSERT(!m_canceled);
892,928✔
2723
    m_canceled = true;
892,928✔
2724
}
892,928✔
2725

2726
inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept
2727
    : m_size{size}
2728
    , m_in_use{is_in_use}
2729
{
10,023,264✔
2730
}
10,023,264✔
2731

2732
inline bool Service::AsyncOper::is_canceled() const noexcept
2733
{
10,840,044✔
2734
    return m_canceled;
10,840,044✔
2735
}
10,840,044✔
2736

2737
inline void Service::AsyncOper::set_is_complete(bool value) noexcept
2738
{
3,063,288✔
2739
    REALM_ASSERT(!m_complete);
3,063,288✔
2740
    REALM_ASSERT(!value || m_in_use);
3,063,288✔
2741
    m_complete = value;
3,063,288✔
2742
}
3,063,288✔
2743

2744
template <class H, class... Args>
2745
inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args)
2746
{
2,278,807✔
2747
    // Recycle the operation object before the handler is exceuted, such that
1,168,812✔
2748
    // the memory is available for a new post operation that might be initiated
1,168,812✔
2749
    // during the execution of the handler.
1,168,812✔
2750
    bool was_recycled = false;
2,278,807✔
2751

1,168,812✔
2752
    // ScopeExit to ensure the AsyncOper object was reclaimed/deleted
1,168,812✔
2753
    auto at_exit = util::ScopeExit([this, &was_recycled, &orphaned]() noexcept {
2,279,509✔
2754
        if (!was_recycled) {
2,279,509!
2755
            do_recycle(orphaned);
×
2756
        }
×
2757
    });
2,279,509✔
2758

1,168,812✔
2759
    // We need to copy or move all arguments to be passed to the handler,
1,168,812✔
2760
    // such that there is no risk of references to the recycled operation
1,168,812✔
2761
    // object being passed to the handler (the passed arguments may be
1,168,812✔
2762
    // references to members of the recycled operation object). The easiest
1,168,812✔
2763
    // way to achive this, is by forwarding the reference arguments (passed
1,168,812✔
2764
    // to this function) to a helper function whose arguments have
1,168,812✔
2765
    // nonreference type (`Args...` rather than `Args&&...`).
1,168,812✔
2766
    //
1,168,812✔
2767
    // Note that the copying and moving of arguments may throw, and it is
1,168,812✔
2768
    // important that the operation is still recycled even if that
1,168,812✔
2769
    // happens. For that reason, copying and moving of arguments must not
1,168,812✔
2770
    // happen until we are in a scope (this scope) that catches and deals
1,168,812✔
2771
    // correctly with such exceptions.
1,168,812✔
2772
    do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler),
2,278,807✔
2773
                                  std::forward<Args>(args)...); // Throws
2,278,807✔
2774

1,168,812✔
2775
    // Removed catch to prevent truncating the stack trace on exception
1,168,812✔
2776
}
2,278,807✔
2777

2778
template <class H, class... Args>
2779
inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler,
2780
                                                              Args... args)
2781
{
2,279,110✔
2782
    do_recycle(orphaned);
2,279,110✔
2783
    was_recycled = true;
2,279,110✔
2784
    handler(std::move(args)...); // Throws
2,279,110✔
2785
}
2,279,110✔
2786

2787
inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept
2788
{
3,948,920✔
2789
    REALM_ASSERT(in_use());
3,948,920✔
2790
    void* addr = this;
3,948,920✔
2791
    std::size_t size = m_size;
3,948,920✔
2792
    this->~AsyncOper(); // Suicide
3,948,920✔
2793
    if (orphaned) {
3,948,920✔
2794
        delete[] static_cast<char*>(addr);
32,894✔
2795
    }
32,894✔
2796
    else {
3,916,026✔
2797
        new (addr) UnusedOper(size);
3,916,026✔
2798
    }
3,916,026✔
2799
}
3,948,920✔
2800

2801
// ---------------- Resolver ----------------
2802

2803
template <class H>
2804
class Resolver::ResolveOper : public Service::ResolveOperBase {
2805
public:
2806
    ResolveOper(std::size_t size, Resolver& r, Query q, H&& handler)
2807
        : ResolveOperBase{size, r, std::move(q)}
2808
        , m_handler{std::move(handler)}
2809
    {
3,350✔
2810
    }
3,350✔
2811
    void recycle_and_execute() override final
2812
    {
3,348✔
2813
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,348!
2814
        REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty());
3,348!
2815
        bool orphaned = !m_resolver;
3,348✔
2816
        std::error_code ec = m_error_code;
3,348✔
2817
        if (is_canceled())
3,348✔
2818
            ec = util::error::operation_aborted;
4✔
2819
        // Note: do_recycle_and_execute() commits suicide.
1,652✔
2820
        do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws
3,348✔
2821
    }
3,348✔
2822

2823
private:
2824
    H m_handler;
2825
};
2826

2827
inline Resolver::Resolver(Service& service)
2828
    : m_service_impl{*service.m_impl}
2829
{
11,310✔
2830
}
11,310✔
2831

2832
inline Resolver::~Resolver() noexcept
2833
{
11,314✔
2834
    cancel();
11,314✔
2835
}
11,314✔
2836

2837
inline Endpoint::List Resolver::resolve(const Query& q)
2838
{
7,960✔
2839
    std::error_code ec;
7,960✔
2840
    Endpoint::List list = resolve(q, ec);
7,960✔
2841
    if (REALM_UNLIKELY(ec))
7,960✔
2842
        throw std::system_error(ec);
3,928✔
2843
    return list;
7,960✔
2844
}
7,960✔
2845

2846
template <class H>
2847
void Resolver::async_resolve(Query query, H&& handler)
2848
{
3,350✔
2849
    Service::LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this, std::move(query),
3,350✔
2850
                                                                       std::move(handler)); // Throws
3,350✔
2851
    initiate_oper(std::move(op));                                                           // Throws
3,350✔
2852
}
3,350✔
2853

2854
inline Resolver::Query::Query(std::string service_port, int init_flags)
2855
    : m_flags{init_flags}
2856
    , m_service{service_port}
2857
{
2858
}
2859

2860
inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port, int init_flags)
2861
    : m_flags{init_flags}
2862
    , m_protocol{prot}
2863
    , m_service{service_port}
2864
{
2865
}
2866

2867
inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags)
2868
    : m_flags{init_flags}
2869
    , m_host{host_name}
2870
    , m_service{service_port}
2871
{
11,312✔
2872
}
11,312✔
2873

2874
inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name, std::string service_port,
2875
                              int init_flags)
2876
    : m_flags{init_flags}
2877
    , m_protocol{prot}
2878
    , m_host{host_name}
2879
    , m_service{service_port}
2880
{
2881
}
2882

2883
inline Resolver::Query::~Query() noexcept {}
24,726✔
2884

2885
inline int Resolver::Query::flags() const
2886
{
×
2887
    return m_flags;
×
2888
}
×
2889

2890
inline StreamProtocol Resolver::Query::protocol() const
2891
{
×
2892
    return m_protocol;
×
2893
}
×
2894

2895
inline std::string Resolver::Query::host() const
2896
{
×
2897
    return m_host;
×
2898
}
×
2899

2900
inline std::string Resolver::Query::service() const
2901
{
×
2902
    return m_service;
×
2903
}
×
2904

2905
// ---------------- SocketBase ----------------
2906

2907
inline SocketBase::SocketBase(Service& service)
2908
    : m_desc{*service.m_impl}
2909
{
22,022✔
2910
}
22,022✔
2911

2912
inline SocketBase::~SocketBase() noexcept
2913
{
22,022✔
2914
    close();
22,022✔
2915
}
22,022✔
2916

2917
inline bool SocketBase::is_open() const noexcept
2918
{
61,310✔
2919
    return m_desc.is_open();
61,310✔
2920
}
61,310✔
2921

2922
inline auto SocketBase::native_handle() const noexcept -> native_handle_type
2923
{
×
2924
    return m_desc.native_handle();
×
2925
}
×
2926

2927
inline void SocketBase::open(const StreamProtocol& prot)
2928
{
131✔
2929
    std::error_code ec;
131✔
2930
    if (open(prot, ec))
131✔
2931
        throw std::system_error(ec);
×
2932
}
131✔
2933

2934
inline void SocketBase::close() noexcept
2935
{
22,098✔
2936
    if (!is_open())
22,098✔
2937
        return;
8,038✔
2938
    cancel();
14,060✔
2939
    m_desc.close();
14,060✔
2940
}
14,060✔
2941

2942
template <class O>
2943
inline void SocketBase::get_option(O& opt) const
2944
{
4✔
2945
    std::error_code ec;
4✔
2946
    if (get_option(opt, ec))
4✔
2947
        throw std::system_error(ec);
×
2948
}
4✔
2949

2950
template <class O>
2951
inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const
2952
{
4✔
2953
    opt.get(*this, ec);
4✔
2954
    return ec;
4✔
2955
}
4✔
2956

2957
template <class O>
2958
inline void SocketBase::set_option(const O& opt)
2959
{
1,938✔
2960
    std::error_code ec;
1,938✔
2961
    if (set_option(opt, ec))
1,938✔
2962
        throw std::system_error(ec);
×
2963
}
1,938✔
2964

2965
template <class O>
2966
inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec)
2967
{
9,888✔
2968
    opt.set(*this, ec);
9,888✔
2969
    return ec;
9,888✔
2970
}
9,888✔
2971

2972
inline void SocketBase::bind(const Endpoint& ep)
2973
{
123✔
2974
    std::error_code ec;
123✔
2975
    if (bind(ep, ec))
123✔
2976
        throw std::system_error(ec);
×
2977
}
123✔
2978

2979
inline Endpoint SocketBase::local_endpoint() const
2980
{
19,496✔
2981
    std::error_code ec;
19,496✔
2982
    Endpoint ep = local_endpoint(ec);
19,496✔
2983
    if (ec)
19,496✔
2984
        throw std::system_error(ec);
×
2985
    return ep;
19,496✔
2986
}
19,496✔
2987

2988
inline auto SocketBase::release_native_handle() noexcept -> native_handle_type
2989
{
×
2990
    if (is_open()) {
×
2991
        cancel();
×
2992
        return m_desc.release();
×
2993
    }
×
2994
    return m_desc.native_handle();
×
2995
}
×
2996

2997
inline const StreamProtocol& SocketBase::get_protocol() const noexcept
2998
{
×
2999
    return m_protocol;
×
3000
}
×
3001

3002
template <class T, int opt, class U>
3003
inline SocketBase::Option<T, opt, U>::Option(T init_value)
3004
    : m_value{init_value}
3005
{
9,892✔
3006
}
9,892✔
3007

3008
template <class T, int opt, class U>
3009
inline T SocketBase::Option<T, opt, U>::value() const
3010
{
4✔
3011
    return m_value;
4✔
3012
}
4✔
3013

3014
template <class T, int opt, class U>
3015
inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec)
3016
{
4✔
3017
    union {
4✔
3018
        U value;
4✔
3019
        char strut[sizeof(U) + 1];
4✔
3020
    };
4✔
3021
    std::size_t value_size = sizeof strut;
4✔
3022
    sock.get_option(opt_enum(opt), &value, value_size, ec);
4✔
3023
    if (!ec) {
4✔
3024
        REALM_ASSERT(value_size == sizeof value);
4✔
3025
        m_value = T(value);
4✔
3026
    }
4✔
3027
}
4✔
3028

3029
template <class T, int opt, class U>
3030
inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const
3031
{
9,888✔
3032
    U value_to_set = U(m_value);
9,888✔
3033
    sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec);
9,888✔
3034
}
9,888✔
3035

3036
// ---------------- Socket ----------------
3037

3038
class Socket::ConnectOperBase : public Service::IoOper {
3039
public:
3040
    ConnectOperBase(std::size_t size, Socket& sock) noexcept
3041
        : IoOper{size}
3042
        , m_socket{&sock}
3043
    {
3,548✔
3044
    }
3,548✔
3045
    Want initiate(const Endpoint& ep)
3046
    {
3,548✔
3047
        REALM_ASSERT(this == m_socket->m_write_oper.get());
3,548✔
3048
        if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws
3,548✔
3049
            set_is_complete(true);                                // Failure, or immediate completion
2✔
3050
            return Want::nothing;
2✔
3051
        }
2✔
3052
        return Want::write;
3,546✔
3053
    }
3,546✔
3054
    Want advance() noexcept override final
3055
    {
3,534✔
3056
        REALM_ASSERT(!is_complete());
3,534✔
3057
        REALM_ASSERT(!is_canceled());
3,534✔
3058
        REALM_ASSERT(!m_error_code);
3,534✔
3059
        m_socket->finalize_async_connect(m_error_code);
3,534✔
3060
        set_is_complete(true);
3,534✔
3061
        return Want::nothing;
3,534✔
3062
    }
3,534✔
3063
    void recycle() noexcept override final
3064
    {
×
3065
        bool orphaned = !m_socket;
×
3066
        REALM_ASSERT(orphaned);
×
3067
        // Note: do_recycle() commits suicide.
3068
        do_recycle(orphaned);
×
3069
    }
×
3070
    void orphan() noexcept override final
3071
    {
8✔
3072
        m_socket = nullptr;
8✔
3073
    }
8✔
3074
    Service::Descriptor& descriptor() noexcept override final
3075
    {
×
3076
        return m_socket->m_desc;
×
3077
    }
×
3078

3079
protected:
3080
    Socket* m_socket;
3081
    std::error_code m_error_code;
3082
};
3083

3084
template <class H>
3085
class Socket::ConnectOper : public ConnectOperBase {
3086
public:
3087
    ConnectOper(std::size_t size, Socket& sock, H&& handler)
3088
        : ConnectOperBase{size, sock}
3089
        , m_handler{std::move(handler)}
3090
    {
3,443✔
3091
    }
3,443✔
3092
    void recycle_and_execute() override final
3093
    {
3,443✔
3094
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
3,443!
3095
        bool orphaned = !m_socket;
3,443✔
3096
        std::error_code ec = m_error_code;
3,443✔
3097
        if (is_canceled())
3,443✔
3098
            ec = util::error::operation_aborted;
11✔
3099
        // Note: do_recycle_and_execute() commits suicide.
1,700✔
3100
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
3,443✔
3101
    }
3,443✔
3102

3103
private:
3104
    H m_handler;
3105
};
3106

3107
inline Socket::Socket(Service& service)
3108
    : SocketBase{service}
3109
{
13,812✔
3110
}
13,812✔
3111

3112
inline Socket::Socket(Service& service, const StreamProtocol& prot, native_handle_type native_socket)
3113
    : SocketBase{service}
3114
{
2✔
3115
    assign(prot, native_socket); // Throws
2✔
3116
}
2✔
3117

3118
inline Socket::~Socket() noexcept {}
13,816✔
3119

3120
inline void Socket::connect(const Endpoint& ep)
3121
{
20✔
3122
    std::error_code ec;
20✔
3123
    if (connect(ep, ec)) // Throws
20✔
3124
        throw std::system_error(ec);
×
3125
}
20✔
3126

3127
inline std::size_t Socket::read(char* buffer, std::size_t size)
3128
{
×
3129
    std::error_code ec;
×
3130
    read(buffer, size, ec); // Throws
×
3131
    if (ec)
×
3132
        throw std::system_error(ec);
×
3133
    return size;
×
3134
}
×
3135

3136
inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec)
3137
{
×
3138
    return StreamOps::read(*this, buffer, size, ec); // Throws
×
3139
}
×
3140

3141
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab)
3142
{
8✔
3143
    std::error_code ec;
8✔
3144
    read(buffer, size, rab, ec); // Throws
8✔
3145
    if (ec)
8✔
3146
        throw std::system_error(ec);
×
3147
    return size;
8✔
3148
}
8✔
3149

3150
inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab, std::error_code& ec)
3151
{
16,402✔
3152
    int delim = std::char_traits<char>::eof();
16,402✔
3153
    return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws
16,402✔
3154
}
16,402✔
3155

3156
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab)
3157
{
8✔
3158
    std::error_code ec;
8✔
3159
    std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws
8✔
3160
    if (ec)
8✔
3161
        throw std::system_error(ec);
×
3162
    return n;
8✔
3163
}
8✔
3164

3165
inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab,
3166
                                      std::error_code& ec)
3167
{
8✔
3168
    int delim_2 = std::char_traits<char>::to_int_type(delim);
8✔
3169
    return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws
8✔
3170
}
8✔
3171

3172
inline std::size_t Socket::write(const char* data, std::size_t size)
3173
{
261,756✔
3174
    std::error_code ec;
261,756✔
3175
    write(data, size, ec); // Throws
261,756✔
3176
    if (ec)
261,756✔
3177
        throw std::system_error(ec);
×
3178
    return size;
261,756✔
3179
}
261,756✔
3180

3181
inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec)
3182
{
261,756✔
3183
    return StreamOps::write(*this, data, size, ec); // Throws
261,756✔
3184
}
261,756✔
3185

3186
inline std::size_t Socket::read_some(char* buffer, std::size_t size)
3187
{
4✔
3188
    std::error_code ec;
4✔
3189
    std::size_t n = read_some(buffer, size, ec); // Throws
4✔
3190
    if (ec)
4✔
3191
        throw std::system_error(ec);
4✔
3192
    return n;
×
3193
}
×
3194

3195
inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec)
3196
{
8✔
3197
    return StreamOps::read_some(*this, buffer, size, ec); // Throws
8✔
3198
}
8✔
3199

3200
inline std::size_t Socket::write_some(const char* data, std::size_t size)
3201
{
×
3202
    std::error_code ec;
×
3203
    std::size_t n = write_some(data, size, ec); // Throws
×
3204
    if (ec)
×
3205
        throw std::system_error(ec);
×
3206
    return n;
×
3207
}
×
3208

3209
inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec)
3210
{
23✔
3211
    return StreamOps::write_some(*this, data, size, ec); // Throws
23✔
3212
}
23✔
3213

3214
template <class H>
3215
inline void Socket::async_connect(const Endpoint& ep, H&& handler)
3216
{
3,441✔
3217
    LendersConnectOperPtr op = Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws
3,441✔
3218
    m_desc.initiate_oper(std::move(op), ep);                                                            // Throws
3,441✔
3219
}
3,441✔
3220

3221
template <class H>
3222
inline void Socket::async_read(char* buffer, std::size_t size, H&& handler)
3223
{
×
3224
    bool is_read_some = false;
×
3225
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
×
3226
}
×
3227

3228
template <class H>
3229
inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H&& handler)
3230
{
628,638✔
3231
    int delim = std::char_traits<char>::eof();
628,638✔
3232
    StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws
628,638✔
3233
}
628,638✔
3234

3235
template <class H>
3236
inline void Socket::async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab, H&& handler)
3237
{
45,806✔
3238
    int delim_2 = std::char_traits<char>::to_int_type(delim);
45,806✔
3239
    StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws
45,806✔
3240
}
45,806✔
3241

3242
template <class H>
3243
inline void Socket::async_write(const char* data, std::size_t size, H&& handler)
3244
{
247,867✔
3245
    bool is_write_some = false;
247,867✔
3246
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
247,867✔
3247
}
247,867✔
3248

3249
template <class H>
3250
inline void Socket::async_read_some(char* buffer, std::size_t size, H&& handler)
3251
{
345,276✔
3252
    bool is_read_some = true;
345,276✔
3253
    StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws
345,276✔
3254
}
345,276✔
3255

3256
template <class H>
3257
inline void Socket::async_write_some(const char* data, std::size_t size, H&& handler)
3258
{
341,880✔
3259
    bool is_write_some = true;
341,880✔
3260
    StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws
341,880✔
3261
}
341,880✔
3262

3263
inline void Socket::shutdown(shutdown_type what)
3264
{
18✔
3265
    std::error_code ec;
18✔
3266
    if (shutdown(what, ec)) // Throws
18✔
3267
        throw std::system_error(ec);
×
3268
}
18✔
3269

3270
inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket)
3271
{
2✔
3272
    std::error_code ec;
2✔
3273
    if (assign(prot, native_socket, ec)) // Throws
2✔
3274
        throw std::system_error(ec);
×
3275
}
2✔
3276

3277
inline std::error_code Socket::assign(const StreamProtocol& prot, native_handle_type native_socket,
3278
                                      std::error_code& ec)
3279
{
2✔
3280
    return do_assign(prot, native_socket, ec); // Throws
2✔
3281
}
2✔
3282

3283
inline Socket& Socket::lowest_layer() noexcept
3284
{
11,314,350✔
3285
    return *this;
11,314,350✔
3286
}
11,314,350✔
3287

3288
inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept
3289
{
1,230,282✔
3290
    want = Want::read; // Wait for read readiness before proceeding
1,230,282✔
3291
}
1,230,282✔
3292

3293
inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept
3294
{
973,200✔
3295
    want = Want::write; // Wait for write readiness before proceeding
973,200✔
3296
}
973,200✔
3297

3298
inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size, std::error_code& ec) noexcept
3299
{
131,108✔
3300
    return m_desc.read_some(buffer, size, ec);
131,108✔
3301
}
131,108✔
3302

3303
inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size, std::error_code& ec) noexcept
3304
{
261,779✔
3305
    return m_desc.write_some(data, size, ec);
261,779✔
3306
}
261,779✔
3307

3308
inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size, std::error_code& ec,
3309
                                              Want& want) noexcept
3310
{
1,134,200✔
3311
    std::error_code ec_2;
1,134,200✔
3312
    std::size_t n = m_desc.read_some(buffer, size, ec_2);
1,134,200✔
3313
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
1,134,200✔
3314
    if (REALM_UNLIKELY(!success)) {
1,134,200✔
3315
        ec = ec_2;
1,256✔
3316
        want = Want::nothing; // Failure
1,256✔
3317
        return 0;
1,256✔
3318
    }
1,256✔
3319
    ec = std::error_code();
1,132,944✔
3320
    want = Want::read; // Success
1,132,944✔
3321
    return n;
1,132,944✔
3322
}
1,132,944✔
3323

3324
inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size, std::error_code& ec,
3325
                                               Want& want) noexcept
3326
{
561,682✔
3327
    std::error_code ec_2;
561,682✔
3328
    std::size_t n = m_desc.write_some(data, size, ec_2);
561,682✔
3329
    bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again);
561,682✔
3330
    if (REALM_UNLIKELY(!success)) {
561,682✔
3331
        ec = ec_2;
138✔
3332
        want = Want::nothing; // Failure
138✔
3333
        return 0;
138✔
3334
    }
138✔
3335
    ec = std::error_code();
561,544✔
3336
    want = Want::write; // Success
561,544✔
3337
    return n;
561,544✔
3338
}
561,544✔
3339

3340
// ---------------- Acceptor ----------------
3341

3342
class Acceptor::AcceptOperBase : public Service::IoOper {
3343
public:
3344
    AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e)
3345
        : IoOper{size}
3346
        , m_acceptor{&a}
3347
        , m_socket{s}
3348
        , m_endpoint{e}
3349
    {
10,158✔
3350
    }
10,158✔
3351
    Want initiate()
3352
    {
10,160✔
3353
        REALM_ASSERT(this == m_acceptor->m_read_oper.get());
10,160✔
3354
        REALM_ASSERT(!is_complete());
10,160✔
3355
        m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws
10,160✔
3356
        return Want::read;
10,160✔
3357
    }
10,160✔
3358
    Want advance() noexcept override final
3359
    {
3,180✔
3360
        REALM_ASSERT(!is_complete());
3,180✔
3361
        REALM_ASSERT(!is_canceled());
3,180✔
3362
        REALM_ASSERT(!m_error_code);
3,180✔
3363
        REALM_ASSERT(!m_socket.is_open());
3,180✔
3364
        Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code);
3,180✔
3365
        if (want == Want::nothing)
3,180✔
3366
            set_is_complete(true); // Success or failure
2,190✔
3367
        return want;
3,180✔
3368
    }
3,180✔
3369
    void recycle() noexcept override final
3370
    {
7,956✔
3371
        bool orphaned = !m_acceptor;
7,956✔
3372
        REALM_ASSERT(orphaned);
7,956✔
3373
        // Note: do_recycle() commits suicide.
3,926✔
3374
        do_recycle(orphaned);
7,956✔
3375
    }
7,956✔
3376
    void orphan() noexcept override final
3377
    {
7,958✔
3378
        m_acceptor = nullptr;
7,958✔
3379
    }
7,958✔
3380
    Service::Descriptor& descriptor() noexcept override final
3381
    {
990✔
3382
        return m_acceptor->m_desc;
990✔
3383
    }
990✔
3384

3385
protected:
3386
    Acceptor* m_acceptor;
3387
    Socket& m_socket;           // May be dangling after cancellation
3388
    Endpoint* const m_endpoint; // May be dangling after cancellation
3389
    std::error_code m_error_code;
3390
};
3391

3392
template <class H>
3393
class Acceptor::AcceptOper : public AcceptOperBase {
3394
public:
3395
    AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H&& handler)
3396
        : AcceptOperBase{size, a, s, e}
3397
        , m_handler{std::move(handler)}
3398
    {
10,045✔
3399
    }
10,045✔
3400
    void recycle_and_execute() override final
3401
    {
2,095✔
3402
        REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code));
2,095!
3403
        REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open());
2,095!
3404
        bool orphaned = !m_acceptor;
2,095✔
3405
        std::error_code ec = m_error_code;
2,095✔
3406
        if (is_canceled())
2,095!
3407
            ec = util::error::operation_aborted;
7✔
3408
        // Note: do_recycle_and_execute() commits suicide.
1,042✔
3409
        do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws
2,095✔
3410
    }
2,095✔
3411

3412
private:
3413
    H m_handler;
3414
};
3415

3416
inline Acceptor::Acceptor(Service& service)
3417
    : SocketBase{service}
3418
{
8,206✔
3419
}
8,206✔
3420

3421
inline Acceptor::~Acceptor() noexcept {}
8,206✔
3422

3423
inline void Acceptor::listen(int backlog)
3424
{
8,198✔
3425
    std::error_code ec;
8,198✔
3426
    if (listen(backlog, ec)) // Throws
8,198✔
3427
        throw std::system_error(ec);
×
3428
}
8,198✔
3429

3430
inline void Acceptor::accept(Socket& sock)
3431
{
20✔
3432
    std::error_code ec;
20✔
3433
    if (accept(sock, ec)) // Throws
20✔
3434
        throw std::system_error(ec);
×
3435
}
20✔
3436

3437
inline void Acceptor::accept(Socket& sock, Endpoint& ep)
3438
{
2✔
3439
    std::error_code ec;
2✔
3440
    if (accept(sock, ep, ec)) // Throws
2✔
3441
        throw std::system_error(ec);
×
3442
}
2✔
3443

3444
inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec)
3445
{
20✔
3446
    Endpoint* ep = nullptr;
20✔
3447
    return accept(sock, ep, ec); // Throws
20✔
3448
}
20✔
3449

3450
inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec)
3451
{
2✔
3452
    return accept(sock, &ep, ec); // Throws
2✔
3453
}
2✔
3454

3455
template <class H>
3456
inline void Acceptor::async_accept(Socket& sock, H&& handler)
3457
{
111✔
3458
    Endpoint* ep = nullptr;
111✔
3459
    async_accept(sock, ep, std::move(handler)); // Throws
111✔
3460
}
111✔
3461

3462
template <class H>
3463
inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H&& handler)
3464
{
9,936✔
3465
    async_accept(sock, &ep, std::move(handler)); // Throws
9,936✔
3466
}
9,936✔
3467

3468
inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec)
3469
{
22✔
3470
    REALM_ASSERT(!m_read_oper || !m_read_oper->in_use());
22✔
3471
    if (REALM_UNLIKELY(socket.is_open()))
22✔
3472
        throw util::runtime_error("Socket is already open");
11✔
3473
    m_desc.ensure_blocking_mode(); // Throws
22✔
3474
    m_desc.accept(socket.m_desc, m_protocol, ep, ec);
22✔
3475
    return ec;
22✔
3476
}
22✔
3477

3478
inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep, std::error_code& ec) noexcept
3479
{
3,180✔
3480
    std::error_code ec_2;
3,180✔
3481
    m_desc.accept(socket.m_desc, m_protocol, ep, ec_2);
3,180✔
3482
    if (ec_2 == util::error::resource_unavailable_try_again)
3,180✔
3483
        return Want::read;
990✔
3484
    ec = ec_2;
2,190✔
3485
    return Want::nothing;
2,190✔
3486
}
2,190✔
3487

3488
template <class H>
3489
inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H&& handler)
3490
{
10,047✔
3491
    if (REALM_UNLIKELY(sock.is_open()))
10,047✔
3492
        throw util::runtime_error("Socket is already open");
4,966✔
3493
    LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep,
10,047✔
3494
                                                            std::move(handler)); // Throws
10,047✔
3495
    m_desc.initiate_oper(std::move(op));                                         // Throws
10,047✔
3496
}
10,047✔
3497

3498
// ---------------- DeadlineTimer ----------------
3499

3500
template <class H>
3501
class DeadlineTimer::WaitOper : public Service::WaitOperBase {
3502
public:
3503
    WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H&& handler)
3504
        : Service::WaitOperBase{size, timer, expiration_time}
3505
        , m_handler{std::move(handler)}
3506
    {
278,859✔
3507
    }
278,859✔
3508
    void recycle_and_execute() override final
3509
    {
271,040✔
3510
        bool orphaned = !m_timer;
271,040✔
3511
        Status status = Status::OK();
271,040✔
3512
        if (is_canceled())
271,040!
3513
            status = Status{ErrorCodes::OperationAborted, "Timer canceled"};
12,376✔
3514
        // Note: do_recycle_and_execute() commits suicide.
137,706✔
3515
        do_recycle_and_execute<H>(orphaned, m_handler, status); // Throws
271,040✔
3516
    }
271,040✔
3517

3518
private:
3519
    H m_handler;
3520
};
3521

3522
inline DeadlineTimer::DeadlineTimer(Service& service)
3523
    : m_service_impl{*service.m_impl}
3524
{
24,506✔
3525
}
24,506✔
3526

3527
inline DeadlineTimer::~DeadlineTimer() noexcept
3528
{
24,514✔
3529
    cancel();
24,514✔
3530
}
24,514✔
3531

3532
template <class R, class P, class H>
3533
inline void DeadlineTimer::async_wait(std::chrono::duration<R, P> delay, H&& handler)
3534
{
279,040✔
3535
    clock::time_point now = clock::now();
279,040✔
3536
    // FIXME: This method of detecting overflow does not work. Comparison
141,706✔
3537
    // between distinct duration types is not overflow safe. Overflow easily
141,706✔
3538
    // happens in the implied conversion of arguments to the common duration
141,706✔
3539
    // type (std::common_type<>).
141,706✔
3540
    auto max_add = clock::time_point::max() - now;
279,040✔
3541
    if (delay > max_add)
279,040✔
3542
        throw util::overflow_error("Expiration time overflow");
×
3543
    clock::time_point expiration_time = now + delay;
279,040✔
3544
    initiate_oper(Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time,
279,040✔
3545
                                              std::move(handler))); // Throws
279,040✔
3546
}
279,040✔
3547

3548
// ---------------- ReadAheadBuffer ----------------
3549

3550
inline ReadAheadBuffer::ReadAheadBuffer()
3551
    : m_buffer{new char[s_size]} // Throws
3552
{
13,458✔
3553
}
13,458✔
3554

3555
inline void ReadAheadBuffer::clear() noexcept
3556
{
×
3557
    m_begin = nullptr;
×
3558
    m_end = nullptr;
×
3559
}
×
3560

3561
inline bool ReadAheadBuffer::empty() const noexcept
3562
{
859,594✔
3563
    return (m_begin == m_end);
859,594✔
3564
}
859,594✔
3565

3566
template <class S>
3567
inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept
3568
{
131,100✔
3569
    char* buffer = m_buffer.get();
131,100✔
3570
    std::size_t size = s_size;
131,100✔
3571
    static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), "");
131,100✔
3572
    std::size_t n = stream.do_read_some_sync(buffer, size, ec);
131,100✔
3573
    if (REALM_UNLIKELY(n == 0))
131,100✔
3574
        return;
65,556✔
3575
    REALM_ASSERT(!ec);
131,092✔
3576
    REALM_ASSERT(n <= size);
131,092✔
3577
    m_begin = m_buffer.get();
131,092✔
3578
    m_end = m_begin + n;
131,092✔
3579
}
131,092✔
3580

3581
template <class S>
3582
inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept
3583
{
859,546✔
3584
    char* buffer = m_buffer.get();
859,546✔
3585
    std::size_t size = s_size;
859,546✔
3586
    static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), "");
859,546✔
3587
    std::size_t n = stream.do_read_some_async(buffer, size, ec, want);
859,546✔
3588
    // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0
399,652✔
3589
    if (n == 0)
859,546✔
3590
        return false;
52,648✔
3591
    REALM_ASSERT(!ec || ec == util::MiscExtErrors::end_of_input);
806,898✔
3592
    REALM_ASSERT(n <= size);
806,898✔
3593
    m_begin = m_buffer.get();
806,898✔
3594
    m_end = m_begin + n;
806,898✔
3595
    return true;
806,898✔
3596
}
806,898✔
3597

3598
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc