• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_121

21 Nov 2023 01:54PM UTC coverage: 92.117% (+0.4%) from 91.683%
thomas.goyne_121

push

Evergreen

jedelbo
Move bson files to core utils

92262 of 169120 branches covered (0.0%)

234642 of 254722 relevant lines covered (92.12%)

6329664.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.49
/src/realm/sync/network/network.cpp
1

2
#define _WINSOCK_DEPRECATED_NO_WARNINGS
3

4
#include <algorithm>
5
#include <cerrno>
6
#include <condition_variable>
7
#include <limits>
8
#include <mutex>
9
#include <stdexcept>
10
#include <thread>
11
#include <vector>
12

13
#include <fcntl.h>
14

15
#ifndef _WIN32
16
#include <netinet/tcp.h>
17
#include <unistd.h>
18
#include <poll.h>
19
#include <realm/util/to_string.hpp>
20
#endif
21

22
#include <realm/util/features.h>
23
#include <realm/util/optional.hpp>
24
#include <realm/util/misc_errors.hpp>
25
#include <realm/util/priority_queue.hpp>
26
#include <realm/sync/network/network.hpp>
27

28
#if defined _GNU_SOURCE && !REALM_ANDROID
29
#define HAVE_LINUX_PIPE2 1
30
#else
31
#define HAVE_LINUX_PIPE2 0
32
#endif
33

34
// Note: Linux specific accept4() is not available on Android.
35
#if defined _GNU_SOURCE && defined SOCK_NONBLOCK && defined SOCK_CLOEXEC && !REALM_ANDROID
36
#define HAVE_LINUX_ACCEPT4 1
37
#else
38
#define HAVE_LINUX_ACCEPT4 0
39
#endif
40

41
#if defined _GNU_SOURCE && defined SOCK_CLOEXEC
42
#define HAVE_LINUX_SOCK_CLOEXEC 1
43
#else
44
#define HAVE_LINUX_SOCK_CLOEXEC 0
45
#endif
46

47
#ifndef _WIN32
48

49
#if REALM_NETWORK_USE_EPOLL
50
#include <linux/version.h>
51
#include <sys/epoll.h>
52
#elif REALM_HAVE_KQUEUE
53
#include <sys/types.h>
54
#include <sys/event.h>
55
#include <sys/time.h>
56
#else
57
#include <poll.h>
58
#endif
59

60
#endif
61

62
// On Linux kernels earlier than 2.6.37, epoll can't handle timeout values
63
// bigger than (LONG_MAX - 999ULL)/HZ.  HZ in the wild can be as big as 1000,
64
// and LONG_MAX can be as small as (2**31)-1, so the largest number of
65
// milliseconds we can be sure to support on those early kernels is 2147482.
66
#if REALM_NETWORK_USE_EPOLL
67
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 37)
68
#define EPOLL_LARGE_TIMEOUT_BUG 1
69
#endif
70
#endif
71

72
using namespace realm::util;
73
using namespace realm::sync::network;
74

75

76
namespace {
77

78
using native_handle_type = SocketBase::native_handle_type;
79

80
#ifdef _WIN32
81

82
// This Winsock initialization call is required prior to any other Winsock API call
83
// made by the process. It is OK if a process calls it multiple times.
84
struct ProcessInitialization {
85
    ProcessInitialization()
86
    {
87
        WSADATA wsaData;
88
        int i = WSAStartup(MAKEWORD(2, 2), &wsaData);
89
        if (i != 0) {
90
            throw std::system_error(i, std::system_category(), "WSAStartup() Winsock initialization failed");
91
        }
92
    }
93

94
    ~ProcessInitialization()
95
    {
96
        // Must be called 1 time for each call to WSAStartup() that has taken place
97
        WSACleanup();
98
    }
99
};
100

101
ProcessInitialization g_process_initialization;
102

103
std::error_code make_winsock_error_code(int error_code)
104
{
105
    switch (error_code) {
106
        case WSAEAFNOSUPPORT:
107
            return make_basic_system_error_code(EAFNOSUPPORT);
108
        case WSAEINVAL:
109
            return make_basic_system_error_code(EINVAL);
110
        case WSAECANCELLED:
111
            return make_basic_system_error_code(ECANCELED);
112
        case WSAECONNABORTED:
113
            return make_basic_system_error_code(ECONNABORTED);
114
        case WSAECONNRESET:
115
            return make_basic_system_error_code(ECONNRESET);
116
        case WSAEWOULDBLOCK:
117
            return make_basic_system_error_code(EAGAIN);
118
    }
119

120
    // Microsoft's STL can map win32 (and winsock!) error codes to known (posix-compatible) errc ones.
121
    auto ec = std::system_category().default_error_condition(error_code);
122
    if (ec.category() == std::generic_category())
123
        return make_basic_system_error_code(ec.value());
124
    return std::error_code(ec.value(), ec.category());
125
}
126

127
#endif // defined _WIN32
128

129
inline bool check_socket_error(int ret, std::error_code& ec)
130
{
49,506✔
131
#ifdef _WIN32
132
    if (REALM_UNLIKELY(ret == SOCKET_ERROR)) {
133
        ec = make_winsock_error_code(WSAGetLastError());
134
        return true;
135
    }
136
#else
137
    if (REALM_UNLIKELY(ret == -1)) {
49,506✔
138
        ec = make_basic_system_error_code(errno);
4✔
139
        return true;
4✔
140
    }
4✔
141
#endif
49,502✔
142
    return false;
49,502✔
143
}
49,502✔
144

145
// Set file status flag O_NONBLOCK if `value` is true, otherwise clear it.
146
//
147
// Note that these flags are set at the file description level, and are therfore
148
// shared between duplicated descriptors (dup()).
149
//
150
// `ec` untouched on success.
151
std::error_code set_nonblock_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
152
{
11,930✔
153
#ifdef _WIN32
154
    u_long flags = value ? 1 : 0;
155
    int r = ioctlsocket(fd, FIONBIO, &flags);
156
    if (r == SOCKET_ERROR) {
157
        ec = make_winsock_error_code(WSAGetLastError());
158
        return ec;
159
    }
160
#else
161
    int flags = ::fcntl(fd, F_GETFL, 0);
11,930✔
162
    if (REALM_UNLIKELY(flags == -1)) {
11,930✔
163
        ec = make_basic_system_error_code(errno);
×
164
        return ec;
×
165
    }
×
166
    flags &= ~O_NONBLOCK;
11,930✔
167
    flags |= (value ? O_NONBLOCK : 0);
5,940✔
168
    int ret = ::fcntl(fd, F_SETFL, flags);
11,930✔
169
    if (REALM_UNLIKELY(ret == -1)) {
11,930✔
170
        ec = make_basic_system_error_code(errno);
×
171
        return ec;
×
172
    }
×
173
#endif
11,930✔
174

5,868✔
175
    return std::error_code(); // Success
11,930✔
176
}
11,930✔
177

178
// Set file status flag O_NONBLOCK. See set_nonblock_flag(int, bool,
179
// std::error_code&) for details. Throws std::system_error on failure.
180
void set_nonblock_flag(native_handle_type fd, bool value = true)
181
{
11,926✔
182
    std::error_code ec;
11,926✔
183
    if (set_nonblock_flag(fd, value, ec))
11,926✔
184
        throw std::system_error(ec);
×
185
}
11,926✔
186

187
// Set file descriptor flag FD_CLOEXEC if `value` is true, otherwise clear it.
188
//
189
// Note that this method of setting FD_CLOEXEC is subject to a race condition if
190
// another thread calls any of the exec functions concurrently. For that reason,
191
// this function should only be used when there is no better alternative. For
192
// example, Linux generally offers ways to set this flag atomically with the
193
// creation of a new file descriptor.
194
//
195
// `ec` untouched on success.
196
std::error_code set_cloexec_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
197
{
24,690✔
198
#ifndef _WIN32
24,690✔
199
    int flags = ::fcntl(fd, F_GETFD, 0);
24,690✔
200
    if (REALM_UNLIKELY(flags == -1)) {
24,690✔
201
        ec = make_basic_system_error_code(errno);
×
202
        return ec;
×
203
    }
×
204
    flags &= ~FD_CLOEXEC;
24,690✔
205
    flags |= (value ? FD_CLOEXEC : 0);
✔
206
    int ret = ::fcntl(fd, F_SETFD, flags);
24,690✔
207
    if (REALM_UNLIKELY(ret == -1)) {
24,690✔
208
        ec = make_basic_system_error_code(errno);
×
209
        return ec;
×
210
    }
×
211
#endif
24,690✔
212
    return std::error_code(); // Success
24,690✔
213
}
24,690✔
214

215
// Set file descriptor flag FD_CLOEXEC. See set_cloexec_flag(int, bool,
216
// std::error_code&) for details. Throws std::system_error on failure.
217
REALM_UNUSED inline void set_cloexec_flag(native_handle_type fd, bool value = true)
218
{
17,544✔
219
    std::error_code ec;
17,544✔
220
    if (set_cloexec_flag(fd, value, ec))
17,544✔
221
        throw std::system_error(ec);
×
222
}
17,544✔
223

224

225
inline void checked_close(native_handle_type fd) noexcept
226
{
57,426✔
227
#ifdef _WIN32
228
    int status = closesocket(fd);
229
    if (status == -1) {
230
        BOOL b = CloseHandle((HANDLE)fd);
231
        REALM_ASSERT(b || GetLastError() != ERROR_INVALID_HANDLE);
232
    }
233
#else
234
    int ret = ::close(fd);
57,426✔
235
    // We can accept various errors from close(), but they must be ignored as
23,962✔
236
    // the file descriptor is closed in any case (not necessarily according to
23,962✔
237
    // POSIX, but we shall assume it anyway). `EBADF`, however, would indicate
23,962✔
238
    // an implementation bug, so we don't want to ignore that.
23,962✔
239
    REALM_ASSERT(ret != -1 || errno != EBADF);
57,426!
240
#endif
57,426✔
241
}
57,426✔
242

243

244
class CloseGuard {
245
public:
246
    CloseGuard() noexcept {}
37,788✔
247
    explicit CloseGuard(native_handle_type fd) noexcept
248
        : m_fd{fd}
249
    {
20,616✔
250
        REALM_ASSERT(fd != -1);
20,616✔
251
    }
20,616✔
252
    CloseGuard(CloseGuard&& cg) noexcept
253
        : m_fd{cg.release()}
254
    {
×
255
    }
×
256
    ~CloseGuard() noexcept
257
    {
58,406✔
258
        if (m_fd != -1)
58,406✔
259
            checked_close(m_fd);
43,392✔
260
    }
58,406✔
261
    void reset(native_handle_type fd) noexcept
262
    {
36,804✔
263
        REALM_ASSERT(fd != -1);
36,804✔
264
        if (m_fd != -1)
36,804✔
265
            checked_close(m_fd);
×
266
        m_fd = fd;
36,804✔
267
    }
36,804✔
268
    operator native_handle_type() const noexcept
269
    {
1,846,322✔
270
        return m_fd;
1,846,322✔
271
    }
1,846,322✔
272
    native_handle_type release() noexcept
273
    {
14,030✔
274
        native_handle_type fd = m_fd;
14,030✔
275
        m_fd = -1;
14,030✔
276
        return fd;
14,030✔
277
    }
14,030✔
278

279
private:
280
    native_handle_type m_fd = -1;
281
};
282

283

284
#ifndef _WIN32
285

286
class WakeupPipe {
287
public:
288
    WakeupPipe()
289
    {
17,310✔
290
        int fildes[2];
17,310✔
291
#if HAVE_LINUX_PIPE2
8,538✔
292
        int flags = O_CLOEXEC;
8,538✔
293
        int ret = ::pipe2(fildes, flags);
8,538✔
294
#else
295
        int ret = ::pipe(fildes);
8,772✔
296
#endif
8,772✔
297
        if (REALM_UNLIKELY(ret == -1)) {
17,310✔
298
            std::error_code ec = make_basic_system_error_code(errno);
×
299
            throw std::system_error(ec);
×
300
        }
×
301
        m_read_fd.reset(fildes[0]);
17,310✔
302
        m_write_fd.reset(fildes[1]);
17,310✔
303
#if !HAVE_LINUX_PIPE2
8,772✔
304
        set_cloexec_flag(m_read_fd);  // Throws
8,772✔
305
        set_cloexec_flag(m_write_fd); // Throws
8,772✔
306
#endif
8,772✔
307
    }
17,310✔
308

309
    // Thread-safe.
310
    int wait_fd() const noexcept
311
    {
169,544✔
312
        return m_read_fd;
169,544✔
313
    }
169,544✔
314

315
    // Cause the wait descriptor (wait_fd()) to become readable within a short
316
    // amount of time.
317
    //
318
    // Thread-safe.
319
    void signal() noexcept
320
    {
1,200,968✔
321
        std::lock_guard lock{m_mutex};
1,200,968✔
322
        if (!m_signaled) {
1,200,968✔
323
            char c = 0;
263,170✔
324
            ssize_t ret = ::write(m_write_fd, &c, 1);
263,170✔
325
            REALM_ASSERT_RELEASE(ret == 1);
263,170✔
326
            m_signaled = true;
263,170✔
327
        }
263,170✔
328
    }
1,200,968✔
329

330
    // Must be called after the wait descriptor (wait_fd()) becomes readable.
331
    //
332
    // Thread-safe.
333
    void acknowledge_signal() noexcept
334
    {
262,606✔
335
        std::lock_guard lock{m_mutex};
262,606✔
336
        if (m_signaled) {
262,610✔
337
            char c;
262,588✔
338
            ssize_t ret = ::read(m_read_fd, &c, 1);
262,588✔
339
            REALM_ASSERT_RELEASE(ret == 1);
262,588✔
340
            m_signaled = false;
262,588✔
341
        }
262,588✔
342
    }
262,606✔
343

344
private:
345
    CloseGuard m_read_fd, m_write_fd;
346
    std::mutex m_mutex;
347
    bool m_signaled = false; // Protected by `m_mutex`.
348
};
349

350
#else // defined _WIN32
351

352
class WakeupPipe {
353
public:
354
    SOCKET wait_fd() const noexcept
355
    {
356
        return INVALID_SOCKET;
357
    }
358

359
    void signal() noexcept
360
    {
361
        m_signal_count++;
362
    }
363

364
    bool is_signaled() const noexcept
365
    {
366
        return m_signal_count > 0;
367
    }
368

369
    void acknowledge_signal() noexcept
370
    {
371
        m_signal_count--;
372
    }
373

374
private:
375
    std::atomic<uint32_t> m_signal_count = 0;
376
};
377

378
#endif // defined _WIN32
379

380

381
std::error_code translate_addrinfo_error(int err) noexcept
382
{
6✔
383
    switch (err) {
6✔
384
        case EAI_AGAIN:
2✔
385
            return ResolveErrors::host_not_found_try_again;
2✔
386
        case EAI_BADFLAGS:
✔
387
            return error::invalid_argument;
×
388
        case EAI_FAIL:
✔
389
            return ResolveErrors::no_recovery;
×
390
        case EAI_FAMILY:
✔
391
            return error::address_family_not_supported;
×
392
        case EAI_MEMORY:
✔
393
            return error::no_memory;
×
394
        case EAI_NONAME:
4✔
395
#if defined(EAI_ADDRFAMILY)
4✔
396
        case EAI_ADDRFAMILY:
4✔
397
#endif
4✔
398
#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME)
4✔
399
        case EAI_NODATA:
4✔
400
#endif
4✔
401
            return ResolveErrors::host_not_found;
4✔
402
        case EAI_SERVICE:
2✔
403
            return ResolveErrors::service_not_found;
×
404
        case EAI_SOCKTYPE:
2✔
405
            return ResolveErrors::socket_type_not_supported;
×
406
        default:
2✔
407
            return error::unknown;
×
408
    }
6✔
409
}
6✔
410

411

412
struct GetaddrinfoResultOwner {
413
    struct addrinfo* ptr;
414
    GetaddrinfoResultOwner(struct addrinfo* p)
415
        : ptr{p}
416
    {
11,330✔
417
    }
11,330✔
418
    ~GetaddrinfoResultOwner() noexcept
419
    {
11,330✔
420
        if (ptr)
11,330✔
421
            freeaddrinfo(ptr);
11,330✔
422
    }
11,330✔
423
};
424

425
} // unnamed namespace
426

427

428
class Service::IoReactor {
429
public:
430
    IoReactor();
431
    ~IoReactor() noexcept;
432

433
    // Add an initiated I/O operation that did not complete immediately.
434
    void add_oper(Descriptor&, LendersIoOperPtr, Want);
435
    void remove_canceled_ops(Descriptor&, OperQueue<AsyncOper>& completed_ops) noexcept;
436

437
    bool wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
438
                          OperQueue<AsyncOper>& completed_ops);
439

440
    // The reactor is considered empty when no operations are currently managed
441
    // by it. An operation is managed by a reactor if it was added through
442
    // add_oper() and not yet passed out through `completed_ops` of
443
    // wait_and_advance().
444
    bool empty() const noexcept;
445

446
    // Cause wait_and_advance() to return within a short amount of time.
447
    //
448
    // Thread-safe.
449
    void interrupt() noexcept;
450

451
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
452
    void register_desc(Descriptor&);
453
    void deregister_desc(Descriptor&) noexcept;
454
#endif
455

456
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
457
    clock::duration get_and_reset_sleep_time() noexcept;
458
#endif
459

460
private:
461
#if REALM_NETWORK_USE_EPOLL
462

463
    static constexpr int s_epoll_event_buffer_size = 256;
464
    const std::unique_ptr<epoll_event[]> m_epoll_event_buffer;
465
    const CloseGuard m_epoll_fd;
466

467
    static std::unique_ptr<epoll_event[]> make_epoll_event_buffer();
468
    static CloseGuard make_epoll_fd();
469

470
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
471

472
    static constexpr int s_kevent_buffer_size = 256;
473
    const std::unique_ptr<struct kevent[]> m_kevent_buffer;
474
    const CloseGuard m_kqueue_fd;
475

476
    static std::unique_ptr<struct kevent[]> make_kevent_buffer();
477
    static CloseGuard make_kqueue_fd();
478

479
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
480

481
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
482

483
    OperQueue<IoOper> m_active_ops;
484

485
    // If there are already active operations, just activate as many additional
486
    // operations as can be done without blocking. Otherwise, block until at
487
    // least one operation can be activated or the timeout is reached. Then, if
488
    // the timeout was not reached, activate as many additional operations as
489
    // can be done without any further blocking.
490
    //
491
    // May occasionally return with no active operations and before the timeout
492
    // has been reached, but this can be assumed to happen rarely enough that it
493
    // will never amount to a performance problem.
494
    //
495
    // Argument `now` is unused if `timeout.time_since_epoch() <= 0`.
496
    //
497
    // Returns true if, and only if a wakeup pipe signal was
498
    // received. Operations may already have been activated in this case.
499
    bool wait_and_activate(clock::time_point timeout, clock::time_point now);
500

501
    void advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept;
502

503
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
504

505
    struct OperSlot {
506
        std::size_t pollfd_slot_ndx = 0; // Zero when slot is unused
507
        OperQueue<IoOper> read_ops, write_ops;
508
    };
509

510
    std::vector<OperSlot> m_operations; // Indexed by file descriptor
511

512
    // First entry in `m_pollfd_slots` is always the read end of the wakeup
513
    // pipe. There is then an additional entry for each entry in `m_operations`
514
    // where `pollfd_slot_ndx` is nonzero. All entries always have `pollfd::fd`
515
    // >= 0.
516
    //
517
    // INVARIANT: m_pollfd_slots.size() == 1 + N, where N is the number of
518
    // entries in m_operations where pollfd_slot_ndx is nonzero.
519
    std::vector<pollfd> m_pollfd_slots;
520

521
    void discard_pollfd_slot_by_move_last_over(OperSlot&) noexcept;
522

523
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
524

525
    std::size_t m_num_operations = 0;
526
    WakeupPipe m_wakeup_pipe;
527

528
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
741,136✔
529
    clock::duration m_sleep_time = clock::duration::zero();
741,136✔
530
#endif
741,136✔
531
};
532

533

534
inline bool Service::IoReactor::empty() const noexcept
594,834✔
535
{
1,492,882✔
536
    return (m_num_operations == 0);
1,492,882✔
537
}
898,048✔
538

539

540
inline void Service::IoReactor::interrupt() noexcept
541
{
606,256✔
542
    m_wakeup_pipe.signal();
606,256✔
543
}
606,256✔
544

545

546
#if REALM_NETWORK_USE_EPOLL
547

548
inline Service::IoReactor::IoReactor()
549
    : m_epoll_event_buffer{make_epoll_event_buffer()} // Throws
550
    , m_epoll_fd{make_epoll_fd()}                     // Throws
551
    , m_wakeup_pipe{}                                 // Throws
552
{
553
    epoll_event event = epoll_event(); // Clear
554
    event.events = EPOLLIN;
555
    event.data.ptr = nullptr;
556
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_wakeup_pipe.wait_fd(), &event);
557
    if (REALM_UNLIKELY(ret == -1)) {
558
        std::error_code ec = make_basic_system_error_code(errno);
559
        throw std::system_error(ec);
560
    }
561
}
562

563

564
inline Service::IoReactor::~IoReactor() noexcept {}
565

566

567
inline void Service::IoReactor::register_desc(Descriptor& desc)
568
{
569
    epoll_event event = epoll_event();                        // Clear
570
    event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Enable edge triggering
571
    event.data.ptr = &desc;
572
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, desc.m_fd, &event);
573
    if (REALM_UNLIKELY(ret == -1)) {
574
        std::error_code ec = make_basic_system_error_code(errno);
575
        throw std::system_error(ec);
576
    }
577
}
578

579

580
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
581
{
582
    epoll_event event = epoll_event(); // Clear
583
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, desc.m_fd, &event);
584
    REALM_ASSERT(ret != -1);
585
}
586

587

588
inline std::unique_ptr<epoll_event[]> Service::IoReactor::make_epoll_event_buffer()
589
{
590
    return std::make_unique<epoll_event[]>(s_epoll_event_buffer_size); // Throws
591
}
592

593

594
inline CloseGuard Service::IoReactor::make_epoll_fd()
595
{
596
    int flags = 0;
597
    flags |= EPOLL_CLOEXEC;
598
    int ret = epoll_create1(flags);
599
    if (REALM_UNLIKELY(ret == -1)) {
600
        std::error_code ec = make_basic_system_error_code(errno);
601
        throw std::system_error(ec);
602
    }
603
    int epoll_fd = ret;
604
    return CloseGuard{epoll_fd};
605
}
606

607

608
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
609
{
610
    int max_wait_millis = 0;
611
    bool allow_blocking_wait = m_active_ops.empty();
612
    if (allow_blocking_wait) {
613
        if (timeout.time_since_epoch().count() <= 0) {
614
            max_wait_millis = -1; // Allow indefinite blocking
615
        }
616
        else if (now < timeout) {
617
            auto diff = timeout - now;
618
            int max_int_millis = std::numeric_limits<int>::max();
619
            // 17592186044415 is the largest value (45-bit signed integer)
620
            // garanteed to be supported by std::chrono::milliseconds. In the
621
            // worst case, `int` is a 16-bit integer, meaning that we can only
622
            // wait about 30 seconds at a time. In the best case
623
            // (17592186044415) we can wait more than 500 years at a time. In
624
            // the typical case (`int` has 32 bits), we can wait 24 days at a
625
            // time.
626
            long long max_chrono_millis = 17592186044415;
627
            if (max_chrono_millis < max_int_millis)
628
                max_int_millis = int(max_chrono_millis);
629
#if EPOLL_LARGE_TIMEOUT_BUG
630
            long max_safe_millis = 2147482; // Circa 35 minutes
631
            if (max_safe_millis < max_int_millis)
632
                max_int_millis = int(max_safe_millis);
633
#endif
634
            if (diff > std::chrono::milliseconds(max_int_millis)) {
635
                max_wait_millis = max_int_millis;
636
            }
637
            else {
638
                // Overflow is impossible here, due to the preceding check
639
                auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
640
                // The conversion to milliseconds will round down if the tick
641
                // period of `diff` is less than a millisecond, which it usually
642
                // is. This is a problem, because it can lead to premature
643
                // wakeups, which in turn could cause extranous iterations in
644
                // the event loop. This is especially problematic when a small
645
                // `diff` is rounded down to zero milliseconds, becuase that can
646
                // easily produce a "busy wait" condition for up to a
647
                // millisecond every time this happens. Obviously, the solution
648
                // is to round up, instead of down.
649
                if (diff_millis < diff) {
650
                    // Note that the following increment cannot overflow,
651
                    // because diff_millis < diff <= max_int_millis <=
652
                    // std::numeric_limits<int>::max().
653
                    ++diff_millis;
654
                }
655
                max_wait_millis = int(diff_millis.count());
656
            }
657
        }
658
    }
659
    for (int i = 0; i < 2; ++i) {
660
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
661
        clock::time_point sleep_start_time = clock::now();
662
#endif
663
        int ret = epoll_wait(m_epoll_fd, m_epoll_event_buffer.get(), s_epoll_event_buffer_size, max_wait_millis);
664
        if (REALM_UNLIKELY(ret == -1)) {
665
            int err = errno;
666
            if (err == EINTR)
667
                return false; // Infrequent premature return is ok
668
            std::error_code ec = make_basic_system_error_code(err);
669
            throw std::system_error(ec);
670
        }
671
        REALM_ASSERT(ret >= 0);
672
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
673
        m_sleep_time += clock::now() - sleep_start_time;
674
#endif
675
        int n = ret;
676
        bool got_wakeup_pipe_signal = false;
677
        for (int j = 0; j < n; ++j) {
678
            const epoll_event& event = m_epoll_event_buffer[j];
679
            bool is_wakeup_pipe_signal = !event.data.ptr;
680
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
681
                m_wakeup_pipe.acknowledge_signal();
682
                got_wakeup_pipe_signal = true;
683
                continue;
684
            }
685
            Descriptor& desc = *static_cast<Descriptor*>(event.data.ptr);
686
            if ((event.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) {
687
                if (!desc.m_read_ready) {
688
                    desc.m_read_ready = true;
689
                    m_active_ops.push_back(desc.m_suspended_read_ops);
690
                }
691
            }
692
            if ((event.events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) {
693
                if (!desc.m_write_ready) {
694
                    desc.m_write_ready = true;
695
                    m_active_ops.push_back(desc.m_suspended_write_ops);
696
                }
697
            }
698
            if ((event.events & EPOLLRDHUP) != 0)
699
                desc.m_imminent_end_of_input = true;
700
        }
701
        if (got_wakeup_pipe_signal)
702
            return true;
703
        if (n < s_epoll_event_buffer_size)
704
            break;
705
        max_wait_millis = 0;
706
    }
707
    return false;
708
}
709

710

711
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
712

713

714
inline Service::IoReactor::IoReactor()
715
    : m_kevent_buffer{make_kevent_buffer()} // Throws
716
    , m_kqueue_fd{make_kqueue_fd()}         // Throws
717
    , m_wakeup_pipe{}                       // Throws
718
{
8,772✔
719
    struct kevent event;
8,772✔
720
    EV_SET(&event, m_wakeup_pipe.wait_fd(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
8,772✔
721
    int ret = ::kevent(m_kqueue_fd, &event, 1, nullptr, 0, nullptr);
8,772✔
722
    if (REALM_UNLIKELY(ret == -1)) {
8,772✔
723
        std::error_code ec = make_basic_system_error_code(errno);
724
        throw std::system_error(ec);
725
    }
726
}
8,772✔
727

728

729
inline Service::IoReactor::~IoReactor() noexcept {}
8,772✔
730

731

732
inline void Service::IoReactor::register_desc(Descriptor& desc)
733
{
7,058✔
734
    struct kevent events[2];
7,058✔
735
    // EV_CLEAR enables edge-triggered behavior
736
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &desc);
7,058✔
737
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, &desc);
7,058✔
738
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
7,058✔
739
    if (REALM_UNLIKELY(ret == -1)) {
7,058✔
740
        std::error_code ec = make_basic_system_error_code(errno);
741
        throw std::system_error(ec);
742
    }
743
}
7,058✔
744

745

746
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
747
{
7,058✔
748
    struct kevent events[2];
7,058✔
749
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
7,058✔
750
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
7,058✔
751
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
7,058✔
752
    REALM_ASSERT(ret != -1);
7,058✔
753
}
7,058✔
754

755

756
inline std::unique_ptr<struct kevent[]> Service::IoReactor::make_kevent_buffer()
757
{
8,772✔
758
    return std::make_unique<struct kevent[]>(s_kevent_buffer_size); // Throws
8,772✔
759
}
8,772✔
760

761

762
inline CloseGuard Service::IoReactor::make_kqueue_fd()
763
{
8,772✔
764
    int ret = ::kqueue();
8,772✔
765
    if (REALM_UNLIKELY(ret == -1)) {
8,772✔
766
        std::error_code ec = make_basic_system_error_code(errno);
767
        throw std::system_error(ec);
768
    }
769
    int epoll_fd = ret;
8,772✔
770
    return CloseGuard{epoll_fd};
8,772✔
771
}
8,772✔
772

773

774
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
775
{
1,097,328✔
776
    timespec max_wait_time{}; // Clear to zero
1,097,328✔
777
    bool allow_blocking_wait = m_active_ops.empty();
1,097,328✔
778
    if (allow_blocking_wait) {
1,097,328✔
779
        // Note that ::kevent() will silently clamp `max_wait_time` to 24 hours
780
        // (86400 seconds), but that is ok, because the caller is prepared for
781
        // premature return as long as it happens infrequently enough to not
782
        // pose a performance problem.
783
        constexpr std::time_t max_wait_seconds = 86400;
159,250✔
784
        if (timeout.time_since_epoch().count() <= 0) {
159,250✔
785
            max_wait_time.tv_sec = max_wait_seconds;
16,950✔
786
        }
16,950✔
787
        else if (now < timeout) {
142,300✔
788
            auto diff = timeout - now;
142,288✔
789
            auto secs = std::chrono::duration_cast<std::chrono::seconds>(diff);
142,288✔
790
            auto nsecs = std::chrono::duration_cast<std::chrono::nanoseconds>(diff - secs);
142,288✔
791
            auto secs_2 = std::min(secs.count(), std::chrono::seconds::rep(max_wait_seconds));
142,288✔
792
            max_wait_time.tv_sec = std::time_t(secs_2);
142,288✔
793
            max_wait_time.tv_nsec = long(nsecs.count());
142,288✔
794
        }
142,288✔
795
    }
159,250✔
796
    for (int i = 0; i < 4; ++i) {
2,147,483,647✔
797
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
798
        clock::time_point sleep_start_time = clock::now();
799
#endif
800
        int ret = ::kevent(m_kqueue_fd, nullptr, 0, m_kevent_buffer.get(), s_kevent_buffer_size, &max_wait_time);
1,097,358✔
801
        if (REALM_UNLIKELY(ret == -1)) {
1,097,358✔
802
            int err = errno;
803
            if (err == EINTR)
×
804
                return false; // Infrequent premature return is ok
805
            std::error_code ec = make_basic_system_error_code(err);
806
            throw std::system_error(ec);
807
        }
808
        REALM_ASSERT(ret >= 0);
1,097,358✔
809
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
810
        m_sleep_time += clock::now() - sleep_start_time;
811
#endif
812
        int n = ret;
1,097,358✔
813
        bool got_wakeup_pipe_signal = false;
1,097,358✔
814
        for (int j = 0; j < n; ++j) {
1,690,782✔
815
            const struct kevent& event = m_kevent_buffer[j];
593,424✔
816
            bool is_wakeup_pipe_signal = !event.udata;
593,424✔
817
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
593,424✔
818
                REALM_ASSERT(m_wakeup_pipe.wait_fd() == int(event.ident));
152,238✔
819
                m_wakeup_pipe.acknowledge_signal();
152,238✔
820
                got_wakeup_pipe_signal = true;
152,238✔
821
                continue;
152,238✔
822
            }
152,238✔
823
            Descriptor& desc = *static_cast<Descriptor*>(event.udata);
441,186✔
824
            REALM_ASSERT(desc.m_fd == int(event.ident));
441,186✔
825
            if (event.filter == EVFILT_READ) {
441,186✔
826
                if (!desc.m_read_ready) {
228,864✔
827
                    desc.m_read_ready = true;
63,664✔
828
                    m_active_ops.push_back(desc.m_suspended_read_ops);
63,664✔
829
                }
63,664✔
830
                if ((event.flags & EV_EOF) != 0)
228,864✔
831
                    desc.m_imminent_end_of_input = true;
748✔
832
            }
228,864✔
833
            if (event.filter == EVFILT_WRITE) {
441,186✔
834
                if (!desc.m_write_ready) {
212,406✔
835
                    desc.m_write_ready = true;
6,884✔
836
                    m_active_ops.push_back(desc.m_suspended_write_ops);
6,884✔
837
                }
6,884✔
838
            }
212,406✔
839
        }
441,186✔
840
        if (got_wakeup_pipe_signal)
1,097,358✔
841
            return true;
152,238✔
842
        if (n < s_kevent_buffer_size)
945,120✔
843
            break;
945,370✔
844
        // Clear to zero to disable blocking for any additional opportunistic
845
        // event extractions.
846
        max_wait_time = timespec{};
2,147,483,647✔
847
    }
2,147,483,647✔
848
    return false;
945,090✔
849
}
1,097,328✔
850

851
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
852

853

854
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
855

856
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
857
{
1,128,918✔
858
    if (REALM_UNLIKELY(!desc.m_is_registered)) {
1,128,918✔
859
        register_desc(desc); // Throws
7,058✔
860
        desc.m_is_registered = true;
7,058✔
861
    }
7,058✔
862

863
    switch (want) {
1,128,918✔
864
        case Want::read:
625,694✔
865
            if (REALM_UNLIKELY(desc.m_read_ready))
625,694✔
866
                goto active;
611,448✔
867
            desc.m_suspended_read_ops.push_back(std::move(op));
14,246✔
868
            goto proceed;
14,246✔
869
        case Want::write:
504,426✔
870
            if (REALM_UNLIKELY(desc.m_write_ready))
504,426✔
871
                goto active;
495,298✔
872
            desc.m_suspended_write_ops.push_back(std::move(op));
9,128✔
873
            goto proceed;
9,128✔
874
        case Want::nothing:
2✔
875
            break;
876
    }
877
    REALM_ASSERT(false);
878

879
active:
1,105,958✔
880
    m_active_ops.push_back(std::move(op));
1,105,958✔
881

882
proceed:
1,129,096✔
883
    ++m_num_operations;
1,129,096✔
884
}
1,129,096✔
885

886

887
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
888
{
244,252✔
889
    // Note: Canceled operations that are currently active (in m_active_ops)
890
    // will be removed later by advance_active_ops().
891

892
    while (LendersIoOperPtr op = desc.m_suspended_read_ops.pop_front()) {
255,250✔
893
        completed_ops.push_back(std::move(op));
10,998✔
894
        --m_num_operations;
10,998✔
895
    }
10,998✔
896
    while (LendersIoOperPtr op = desc.m_suspended_write_ops.pop_front()) {
251,370✔
897
        completed_ops.push_back(std::move(op));
7,118✔
898
        --m_num_operations;
7,118✔
899
    }
7,118✔
900
}
244,252✔
901

902

903
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
904
                                          OperQueue<AsyncOper>& completed_ops)
905
{
898,094✔
906
    clock::time_point now_2 = now;
898,094✔
907
    for (;;) {
1,097,244✔
908
        bool wakeup_pipe_signal = wait_and_activate(timeout, now_2); // Throws
1,097,244✔
909
        if (REALM_UNLIKELY(wakeup_pipe_signal)) {
1,097,244✔
910
            interrupted = true;
152,250✔
911
            return false;
152,250✔
912
        }
152,250✔
913
        advance_active_ops(completed_ops);
944,994✔
914
        if (!completed_ops.empty())
944,994✔
915
            return true;
738,756✔
916
        if (timeout.time_since_epoch().count() > 0) {
206,238✔
917
            now_2 = clock::now();
90,786✔
918
            bool timed_out = (now_2 >= timeout);
90,786✔
919
            if (timed_out)
90,786✔
920
                return false;
7,432✔
921
        }
90,786✔
922
    }
206,238✔
923
}
898,094✔
924

925

926
void Service::IoReactor::advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept
927
{
945,094✔
928
    OperQueue<IoOper> new_active_ops;
945,094✔
929
    while (LendersIoOperPtr op = m_active_ops.pop_front()) {
2,270,766✔
930
        if (op->is_canceled()) {
1,323,512✔
931
            completed_ops.push_back(std::move(op));
421,362✔
932
            --m_num_operations;
421,362✔
933
            continue;
421,362✔
934
        }
421,362✔
935
        Want want = op->advance();
902,150✔
936
        switch (want) {
902,150✔
937
            case Want::nothing:
690,542✔
938
                REALM_ASSERT(op->is_complete());
690,542✔
939
                completed_ops.push_back(std::move(op));
690,542✔
940
                --m_num_operations;
690,542✔
941
                continue;
690,542✔
942
            case Want::read: {
208,734✔
943
                Descriptor& desc = op->descriptor();
208,734✔
944
                if (REALM_UNLIKELY(desc.m_read_ready))
208,734✔
945
                    goto still_active;
148,394✔
946
                desc.m_suspended_read_ops.push_back(std::move(op));
60,340✔
947
                continue;
60,340✔
948
            }
60,340✔
949
            case Want::write: {
5,036✔
950
                Descriptor& desc = op->descriptor();
5,036✔
951
                if (REALM_UNLIKELY(desc.m_write_ready))
5,036✔
952
                    goto still_active;
9,126✔
953
                desc.m_suspended_write_ops.push_back(std::move(op));
12,986✔
954
                continue;
12,986✔
955
            }
12,986✔
956
        }
8,538✔
957
        REALM_ASSERT(false);
8,538✔
958

959
    still_active:
148,980✔
960
        new_active_ops.push_back(std::move(op));
148,980✔
961
    }
157,518✔
962
    m_active_ops.push_back(new_active_ops);
955,792✔
963
}
955,792✔
964

178,702✔
965

170,164✔
966
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
170,164✔
967

968

170,164✔
969
inline Service::IoReactor::IoReactor()
970
    : m_wakeup_pipe{} // Throws
170,164✔
971
{
8,538✔
972
    pollfd slot = pollfd(); // Cleared slot
8,538✔
973
    slot.fd = m_wakeup_pipe.wait_fd();
8,538✔
974
    slot.events = POLLRDNORM;
975
    m_pollfd_slots.emplace_back(slot); // Throws
976
}
977

1,162,552✔
978

1,162,552✔
979
inline Service::IoReactor::~IoReactor() noexcept
1,162,552✔
980
{
1,162,552✔
981
#if REALM_ASSERTIONS_ENABLED
1,162,552✔
982
    std::size_t n = 0;
1,162,552✔
983
    for (std::size_t i = 0; i < m_operations.size(); ++i) {
1,162,552✔
984
        OperSlot& oper_slot = m_operations[i];
5,660✔
985
        while (oper_slot.read_ops.pop_front())
1,162,552✔
986
            ++n;
1,162,552✔
987
        while (oper_slot.write_ops.pop_front())
1,162,552✔
988
            ++n;
1,162,552✔
989
    }
1,162,552✔
990
    REALM_ASSERT(n == m_num_operations);
747,294✔
991
#endif
747,294✔
992
}
747,294✔
993

747,294✔
994

747,294✔
995
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
747,294✔
996
{
747,294✔
997
    native_handle_type fd = desc.m_fd;
1,162,552✔
998

1,162,552✔
999
    // Make sure there are enough slots in m_operations
1,162,552✔
1000
    {
1,162,552✔
1001
        std::size_t n = std::size_t(fd) + 1; // FIXME: Check for arithmetic overflow
1,162,552✔
1002
        if (m_operations.size() < n)
1,162,552✔
1003
            m_operations.resize(n); // Throws
1,162,552✔
1004
    }
1005

1006
    // Allocate a pollfd_slot unless we already have one
642,318✔
1007
    OperSlot& oper_slot = m_operations[fd];
642,318✔
1008
    if (oper_slot.pollfd_slot_ndx == 0) {
642,318✔
1009
        pollfd pollfd_slot = pollfd(); // Cleared slot
642,318✔
1010
        pollfd_slot.fd = fd;
529,694✔
1011
        std::size_t pollfd_slot_ndx = m_pollfd_slots.size();
529,694✔
1012
        REALM_ASSERT(pollfd_slot_ndx > 0);
529,694✔
1013
        m_pollfd_slots.emplace_back(pollfd_slot); // Throws
529,694✔
1014
        oper_slot.pollfd_slot_ndx = pollfd_slot_ndx;
1015
    }
1016

1017
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
1,164,050✔
1018
    REALM_ASSERT(pollfd_slot.fd == fd);
1,164,050✔
1019
    REALM_ASSERT(((pollfd_slot.events & POLLRDNORM) != 0) == !oper_slot.read_ops.empty());
1,164,050✔
1020
    REALM_ASSERT(((pollfd_slot.events & POLLWRNORM) != 0) == !oper_slot.write_ops.empty());
1,164,050✔
1021
    REALM_ASSERT((pollfd_slot.events & ~(POLLRDNORM | POLLWRNORM)) == 0);
1022
    switch (want) {
1023
        case Want::nothing:
1024
            break;
280,728✔
1025
        case Want::read:
280,728✔
1026
            pollfd_slot.events |= POLLRDNORM;
280,728✔
1027
            oper_slot.read_ops.push_back(std::move(op));
280,728✔
1028
            goto finish;
280,728✔
1029
        case Want::write:
280,728✔
1030
            pollfd_slot.events |= POLLWRNORM;
280,728✔
1031
            oper_slot.write_ops.push_back(std::move(op));
280,728✔
1032
            goto finish;
280,728✔
1033
    }
540,226✔
1034
    REALM_ASSERT(false);
259,498✔
1035
    return;
259,498✔
1036

259,498✔
1037
finish:
530,434✔
1038
    ++m_num_operations;
249,706✔
1039
}
249,706✔
1040

249,706✔
1041

280,728✔
1042
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
280,728✔
1043
{
1044
    native_handle_type fd = desc.m_fd;
1045
    REALM_ASSERT(fd >= 0);
1046
    REALM_ASSERT(std::size_t(fd) < m_operations.size());
1047
    OperSlot& oper_slot = m_operations[fd];
741,066✔
1048
    REALM_ASSERT(oper_slot.pollfd_slot_ndx > 0);
1049
    REALM_ASSERT(!oper_slot.read_ops.empty() || !oper_slot.write_ops.empty());
1050
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
1051
    REALM_ASSERT(pollfd_slot.fd == fd);
741,066✔
1052
    while (LendersIoOperPtr op = oper_slot.read_ops.pop_front()) {
741,066✔
1053
        completed_ops.push_back(std::move(op));
741,066✔
1054
        --m_num_operations;
741,066✔
1055
    }
741,066✔
1056
    while (LendersIoOperPtr op = oper_slot.write_ops.pop_front()) {
741,066✔
1057
        completed_ops.push_back(std::move(op));
741,066✔
1058
        --m_num_operations;
741,066✔
1059
    }
741,066✔
1060
    discard_pollfd_slot_by_move_last_over(oper_slot);
740,324✔
1061
}
740,324✔
1062

399,768✔
1063

1064
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
399,768✔
1065
                                          OperQueue<AsyncOper>& completed_ops)
399,768✔
1066
{
399,768✔
1067
#ifdef _WIN32
399,768✔
1068
    using nfds_type = std::size_t;
399,768✔
1069
#else
399,768✔
1070
    using nfds_type = nfds_t;
399,768✔
1071
#endif
399,768✔
1072
    clock::time_point now_2 = now;
399,768✔
1073
    std::size_t num_ready_descriptors = 0;
399,768✔
1074
    {
399,768✔
1075
        // std::vector guarantees contiguous storage
1076
        pollfd* fds = &m_pollfd_slots.front();
399,768✔
1077
        nfds_type nfds = nfds_type(m_pollfd_slots.size());
1,164✔
1078
        for (;;) {
1,164✔
1079
            int max_wait_millis = -1; // Wait indefinitely
398,604✔
1080
            if (timeout.time_since_epoch().count() > 0) {
398,604✔
1081
                if (now_2 >= timeout)
398,604✔
1082
                    return false; // No operations completed
398,604✔
1083
                auto diff = timeout - now_2;
398,604✔
1084
                int max_int_millis = std::numeric_limits<int>::max();
398,604✔
1085
                // 17592186044415 is the largest value (45-bit signed integer)
398,604✔
1086
                // garanteed to be supported by std::chrono::milliseconds. In
398,604✔
1087
                // the worst case, `int` is a 16-bit integer, meaning that we
398,604✔
1088
                // can only wait about 30 seconds at a time. In the best case
398,604✔
1089
                // (17592186044415) we can wait more than 500 years at a
398,604✔
1090
                // time. In the typical case (`int` has 32 bits), we can wait 24
398,604✔
1091
                // days at a time.
398,604✔
1092
                long long max_chrono_millis = 17592186044415;
398,604✔
1093
                if (max_int_millis > max_chrono_millis)
398,590✔
1094
                    max_int_millis = int(max_chrono_millis);
398,590✔
1095
                if (diff > std::chrono::milliseconds(max_int_millis)) {
398,590✔
1096
                    max_wait_millis = max_int_millis;
398,590✔
1097
                }
398,590✔
1098
                else {
398,604✔
1099
                    // Overflow is impossible here, due to the preceeding check
398,604✔
1100
                    auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
399,768✔
1101
                    // The conversion to milliseconds will round down if the
740,324✔
1102
                    // tick period of `diff` is less than a millisecond, which
740,324✔
1103
                    // it usually is. This is a problem, because it can lead to
1104
                    // premature wakeups, which in turn could cause extranous
1105
                    // iterations in the event loop. This is especially
1106
                    // problematic when a small `diff` is rounded down to zero
1107
                    // milliseconds, becuase that can easily produce a "busy
1108
                    // wait" condition for up to a millisecond every time this
1109
                    // happens. Obviously, the solution is to round up, instead
1110
                    // of down.
1111
                    if (diff_millis < diff) {
1112
                        // Note that the following increment cannot overflow,
1113
                        // because diff_millis < diff <= max_int_millis <=
1114
                        // std::numeric_limits<int>::max().
1115
                        ++diff_millis;
1116
                    }
1117
                    max_wait_millis = int(diff_millis.count());
1118
                }
1119
            }
1120

1121
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1122
            clock::time_point sleep_start_time = clock::now();
1123
#endif
1124

1125
#ifdef _WIN32
1126
            max_wait_millis = 1000;
1127

1128
            // Windows does not have a single API call to wait for pipes and
1129
            // sockets with a timeout. So we repeatedly poll them individually
1130
            // in a loop until max_wait_millis has elapsed or an event happend.
1131
            //
1132
            // FIXME: Maybe switch to Windows IOCP instead.
1133

1134
            // Following variable is the poll time for the sockets in
1135
            // miliseconds. Adjust it to find a balance between CPU usage and
1136
            // response time:
1137
            constexpr INT socket_poll_timeout = 10;
1138

1139
            for (size_t t = 0; t < m_pollfd_slots.size(); t++)
1140
                m_pollfd_slots[t].revents = 0;
1141

1142
            using namespace std::chrono;
1143
            auto started = steady_clock::now();
1144
            int ret = 0;
1145

1146
            for (;;) {
1147
                if (m_pollfd_slots.size() > 1) {
1148
                    // Poll all network sockets
740,324✔
1149
                    ret = WSAPoll(LPWSAPOLLFD(&m_pollfd_slots[1]), ULONG(m_pollfd_slots.size() - 1),
740,324✔
1150
                                  socket_poll_timeout);
740,324✔
1151
                    REALM_ASSERT(ret != SOCKET_ERROR);
740,324✔
1152
                }
1153

1154
                if (m_wakeup_pipe.is_signaled()) {
1155
                    m_pollfd_slots[0].revents = POLLIN;
1156
                    ret++;
1157
                }
1158

1159
                if (ret != 0 ||
1160
                    (duration_cast<milliseconds>(steady_clock::now() - started).count() >= max_wait_millis)) {
1161
                    break;
740,324✔
1162
                }
740,324✔
1163

742,128✔
1164
                // If we don't have any sockets to poll for (m_pollfd_slots is less than 2) and no one signals
742,128✔
1165
                // the wakeup pipe, we'd be stuck busy waiting for either condition to become true.
742,128✔
1166
                std::this_thread::sleep_for(std::chrono::milliseconds(socket_poll_timeout));
742,128✔
1167
            }
742,128✔
1168

2,147,483,647✔
1169
#else // !defined _WIN32
2,147,483,647✔
1170
            int ret = ::poll(fds, nfds, max_wait_millis);
2,147,483,647✔
1171
#endif
1172
            bool interrupted_2 = false;
2,147,483,647✔
1173
            if (REALM_UNLIKELY(ret == -1)) {
741,066✔
1174
#ifndef _WIN32
741,066✔
1175
                int err = errno;
741,066✔
1176
                if (REALM_UNLIKELY(err != EINTR)) {
3,314✔
1177
                    std::error_code ec = make_basic_system_error_code(err);
737,752✔
1178
                    throw std::system_error(ec);
737,752✔
1179
                }
737,752✔
1180
#endif
110,374✔
1181
                interrupted_2 = true;
110,374✔
1182
            }
110,374✔
1183

110,374✔
1184
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
110,374✔
1185
            m_sleep_time += clock::now() - sleep_start_time;
627,378✔
1186
#endif
627,378✔
1187

627,378✔
1188
            if (REALM_LIKELY(!interrupted_2)) {
627,378✔
1189
                REALM_ASSERT(ret >= 0);
1,386,722✔
1190
                num_ready_descriptors = ret;
759,344✔
1191
                break;
759,344✔
1192
            }
759,344✔
1193

76,410✔
1194
            // Retry on interruption by system signal
76,410✔
1195
            if (timeout.time_since_epoch().count() > 0)
76,410✔
1196
                now_2 = clock::now();
682,934✔
1197
        }
682,934✔
1198
    }
682,934✔
1199

682,934✔
1200
    if (num_ready_descriptors == 0)
682,934✔
1201
        return false; // No operations completed
682,934✔
1202

110✔
1203
    // Check wake-up descriptor
110✔
1204
    if (m_pollfd_slots[0].revents != 0) {
110✔
1205
        REALM_ASSERT((m_pollfd_slots[0].revents & POLLNVAL) == 0);
110✔
1206
        m_wakeup_pipe.acknowledge_signal();
34✔
1207
        interrupted = true;
110✔
1208
        return false;
682,934✔
1209
    }
682,934✔
1210

682,934✔
1211
    std::size_t orig_num_operations = m_num_operations;
682,934✔
1212
    std::size_t num_pollfd_slots = m_pollfd_slots.size();
682,934✔
1213
    std::size_t pollfd_slot_ndx = 1;
811,500✔
1214
    while (pollfd_slot_ndx < num_pollfd_slots && num_ready_descriptors > 0) {
1,625,042✔
1215
        pollfd& pollfd_slot = m_pollfd_slots[pollfd_slot_ndx];
811,202✔
1216
        REALM_ASSERT(pollfd_slot.fd >= 0);
811,202✔
1217
        if (REALM_LIKELY(pollfd_slot.revents == 0)) {
667,814✔
1218
            ++pollfd_slot_ndx;
667,814✔
1219
            continue;
667,814✔
1220
        }
667,814✔
1221
        --num_ready_descriptors;
667,814✔
1222

145,710✔
1223
        REALM_ASSERT((pollfd_slot.revents & POLLNVAL) == 0);
145,710✔
1224

145,710✔
1225
        // Treat errors like read and/or write-readiness
18✔
1226
        if ((pollfd_slot.revents & (POLLHUP | POLLERR)) != 0) {
18✔
1227
            REALM_ASSERT((pollfd_slot.events & (POLLRDNORM | POLLWRNORM)) != 0);
18✔
1228
            if ((pollfd_slot.events & POLLRDNORM) != 0)
1229
                pollfd_slot.revents |= POLLRDNORM;
1230
            if ((pollfd_slot.events & POLLWRNORM) != 0)
1231
                pollfd_slot.revents |= POLLWRNORM;
811,500✔
1232
        }
682,934✔
1233

682,934✔
1234
        OperSlot& oper_slot = m_operations[pollfd_slot.fd];
682,934✔
1235
        REALM_ASSERT(oper_slot.pollfd_slot_ndx == pollfd_slot_ndx);
532,056✔
1236

532,056✔
1237
        OperQueue<IoOper> new_read_ops, new_write_ops;
532,056✔
1238
        auto advance_ops = [&](OperQueue<IoOper>& ops) noexcept {
532,056✔
1239
            while (LendersIoOperPtr op = ops.pop_front()) {
682,934✔
1240
                Want want = op->advance();
682,934✔
1241
                switch (want) {
682,934✔
1242
                    case Want::nothing:
281,450✔
1243
                        REALM_ASSERT(op->is_complete());
281,450✔
1244
                        completed_ops.push_back(std::move(op));
281,450✔
1245
                        --m_num_operations;
281,450✔
1246
                        continue;
682,934✔
1247
                    case Want::read:
682,934✔
1248
                        new_read_ops.push_back(std::move(op));
145,702✔
1249
                        continue;
145,702✔
1250
                    case Want::write:
145,702✔
1251
                        new_write_ops.push_back(std::move(op));
682,934✔
1252
                        continue;
682,934✔
1253
                }
18✔
1254
                REALM_ASSERT(false);
18✔
1255
            }
18✔
1256
        };
682,934✔
1257

682,934✔
1258
        // Check read-readiness
469,916✔
1259
        if ((pollfd_slot.revents & POLLRDNORM) != 0) {
469,916✔
1260
            REALM_ASSERT(!oper_slot.read_ops.empty());
469,916✔
1261
            advance_ops(oper_slot.read_ops);
213,018✔
1262
            pollfd_slot.events &= ~POLLRDNORM;
213,018✔
1263
        }
213,018✔
1264

682,934✔
1265
        // Check write-readiness
627,378✔
1266
        if ((pollfd_slot.revents & POLLWRNORM) != 0) {
627,378✔
1267
            REALM_ASSERT(!oper_slot.write_ops.empty());
627,378✔
1268
            advance_ops(oper_slot.write_ops);
627,378✔
1269
            pollfd_slot.events &= ~POLLWRNORM;
627,378✔
1270
        }
627,378✔
1271

1272
        if (!new_read_ops.empty()) {
1273
            oper_slot.read_ops.push_back(new_read_ops);
1274
            pollfd_slot.events |= POLLRDNORM;
747,004✔
1275
        }
747,004✔
1276

747,004✔
1277
        if (!new_write_ops.empty()) {
747,004✔
1278
            oper_slot.write_ops.push_back(new_write_ops);
58,978✔
1279
            pollfd_slot.events |= POLLWRNORM;
58,978✔
1280
        }
58,978✔
1281

58,978✔
1282
        if (pollfd_slot.events == 0) {
747,004✔
1283
            discard_pollfd_slot_by_move_last_over(oper_slot);
747,004✔
1284
            --num_pollfd_slots;
1285
        }
1286
        else {
1287
            ++pollfd_slot_ndx;
1288
        }
1289
    }
1290

1291
    REALM_ASSERT(num_ready_descriptors == 0);
1292

1293
    bool any_operations_completed = (m_num_operations < orig_num_operations);
1294
    return any_operations_completed;
1295
}
1296

8,538✔
1297

8,538✔
1298
void Service::IoReactor::discard_pollfd_slot_by_move_last_over(OperSlot& oper_slot) noexcept
1299
{
1300
    std::size_t pollfd_slot_ndx = oper_slot.pollfd_slot_ndx;
8,538✔
1301
    oper_slot.pollfd_slot_ndx = 0; // Mark unused
8,538✔
1302
    if (pollfd_slot_ndx < m_pollfd_slots.size() - 1) {
8,538✔
1303
        pollfd& last_pollfd_slot = m_pollfd_slots.back();
822✔
1304
        m_operations[last_pollfd_slot.fd].pollfd_slot_ndx = pollfd_slot_ndx;
822✔
1305
        m_pollfd_slots[pollfd_slot_ndx] = last_pollfd_slot;
822✔
1306
    }
822✔
1307
    m_pollfd_slots.pop_back();
822✔
1308
}
822✔
1309

822✔
1310
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
8,538✔
1311

8,538✔
1312

8,538✔
1313
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
8,538✔
1314

1315
auto Service::IoReactor::get_and_reset_sleep_time() noexcept -> clock::duration
1316
{
4,194✔
1317
    clock::duration sleep_time = m_sleep_time;
4,194✔
1318
    m_sleep_time = clock::duration::zero();
4,194✔
1319
    return sleep_time;
1320
}
1321

4,430✔
1322
#endif // REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
4,430✔
1323

4,430✔
1324

1325
class Service::Impl {
1326
public:
8,372✔
1327
    Service& service;
8,372✔
1328
    IoReactor io_reactor;
8,372✔
1329

8,372✔
1330
    Impl(Service& s)
1331
        : service{s}
8,372✔
1332
        , io_reactor{} // Throws
8,372✔
1333
    {
17,144✔
1334
    }
17,144✔
1335

1336
    ~Impl()
1337
    {
8,780✔
1338
        bool resolver_thread_started = m_resolver_thread.joinable();
8,780✔
1339
        if (resolver_thread_started) {
8,780✔
1340
            {
970✔
1341
                std::lock_guard lock{m_mutex};
962✔
1342
                m_stop_resolver_thread = true;
962✔
1343
                m_resolver_cond.notify_all();
962✔
1344
            }
962✔
1345
            m_resolver_thread.join();
2,594✔
1346
        }
2,594✔
1347

1,632✔
1348
        // Avoid calls to recycle_post_oper() after destruction has begun.
1,632✔
1349
        m_completed_operations.clear();
10,404✔
1350
    }
10,404✔
1351

1,632✔
1352
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1,632✔
1353
    {
810✔
1354
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
822✔
1355
        m_event_loop_metrics_timer.emplace(service);
822✔
1356
        m_event_loop_metrics_timer->async_wait(
822✔
1357
            std::chrono::seconds{30}, [this, handler = std::move(handler)](Status status) {
822✔
1358
                REALM_ASSERT(status.is_ok());
822✔
1359
                clock::time_point now = clock::now();
1360
                clock::duration elapsed_time = now - m_event_loop_metrics_start_time;
1361
                clock::duration sleep_time = io_reactor.get_and_reset_sleep_time();
310,048✔
1362
                clock::duration nonsleep_time = elapsed_time - sleep_time;
310,048✔
1363
                double saturation = double(nonsleep_time.count()) / double(elapsed_time.count());
310,048✔
1364
                clock::duration internal_exec_time = nonsleep_time - m_handler_exec_time;
1365
                internal_exec_time += now - m_handler_exec_start_time;
1366
                double inefficiency = double(internal_exec_time.count()) / double(elapsed_time.count());
584,768✔
1367
                m_event_loop_metrics_start_time = now;
584,768✔
1368
                m_handler_exec_start_time = now;
584,768✔
1369
                m_handler_exec_time = clock::duration::zero();
584,768✔
1370
                handler(saturation, inefficiency);             // Throws
584,768✔
1371
                report_event_loop_metrics(std::move(handler)); // Throws
243,648✔
1372
            });                                                // Throws
243,648✔
1373
#else
243,648✔
1374
        static_cast<void>(handler);
243,648✔
1375
#endif
243,648✔
1376
    }
243,648✔
1377

341,120✔
1378
    void run()
341,120✔
1379
    {
345,422✔
1380
        run_impl(true);
345,422✔
1381
    }
589,070✔
1382

584,768✔
1383
    void run_until_stopped()
584,768✔
1384
    {
589,332✔
1385
        run_impl(false);
589,332✔
1386
    }
589,332✔
1387

584,768✔
1388
    void stop() noexcept
584,768✔
1389
    {
8,610✔
1390
        {
8,610✔
1391
            std::lock_guard lock{m_mutex};
593,870✔
1392
            if (m_stopped)
593,870✔
1393
                return;
585,260✔
1394
            m_stopped = true;
593,870✔
1395
        }
593,870✔
1396
        io_reactor.interrupt();
593,870✔
1397
    }
593,870✔
1398

585,260✔
1399
    void reset() noexcept
585,260✔
1400
    {
251,110✔
1401
        std::lock_guard lock{m_mutex};
585,268✔
1402
        m_stopped = false;
585,268✔
1403
    }
8✔
1404

1405
    static Endpoint::List resolve(const Resolver::Query&, std::error_code&);
1406

1407
    void add_resolve_oper(LendersResolveOperPtr op)
1408
    {
1,722✔
1409
        {
1,722✔
1410
            std::lock_guard lock{m_mutex};
1,722✔
1411
            m_resolve_operations.push_back(std::move(op)); // Throws
1,722✔
1412
            m_resolver_cond.notify_all();
1,722✔
1413
        }
1,722✔
1414
        bool resolver_thread_started = m_resolver_thread.joinable();
1,722✔
1415
        if (resolver_thread_started)
1,722✔
1416
            return;
762✔
1417
        auto func = [this]() noexcept {
962✔
1418
            resolver_thread();
962✔
1419
        };
962✔
1420
        m_resolver_thread = std::thread{std::move(func)};
960✔
1421
    }
960✔
1422

1423
    void add_wait_oper(LendersWaitOperPtr op)
1424
    {
271,134✔
1425
        m_wait_operations.push(std::move(op)); // Throws
916,726✔
1426
    }
916,726✔
1427

645,592✔
1428
    void post(PostOperConstr constr, std::size_t size, void* cookie)
1429
    {
596,640✔
1430
        {
877,398✔
1431
            std::lock_guard lock{m_mutex};
877,398✔
1432
            std::unique_ptr<char[]> mem;
877,398✔
1433
            if (m_post_oper && m_post_oper->m_size >= size) {
596,640✔
1434
                // Reuse old memory
1435
                AsyncOper* op = m_post_oper.release();
187,568✔
1436
                REALM_ASSERT(dynamic_cast<UnusedOper*>(op));
187,568✔
1437
                static_cast<UnusedOper*>(op)->UnusedOper::~UnusedOper(); // Static dispatch
187,568✔
1438
                mem.reset(static_cast<char*>(static_cast<void*>(op)));
187,568✔
1439
            }
187,562✔
1440
            else {
409,078✔
1441
                // Allocate new memory
9,930✔
1442
                mem.reset(new char[size]); // Throws
419,008✔
1443
            }
419,008✔
1444

9,930✔
1445
            std::unique_ptr<PostOperBase, LendersOperDeleter> op;
606,570✔
1446
            op.reset((*constr)(mem.get(), size, *this, cookie)); // Throws
606,570✔
1447
            mem.release();
606,570✔
1448
            m_completed_operations_2.push_back(std::move(op));
606,570✔
1449
        }
606,570✔
1450
        io_reactor.interrupt();
606,570✔
1451
    }
596,640✔
1452

1453
    void recycle_post_oper(PostOperBase* op) noexcept
1454
    {
597,330✔
1455
        std::size_t size = op->m_size;
597,330✔
1456
        op->~PostOperBase();                           // Dynamic dispatch
597,330✔
1457
        OwnersOperPtr op_2(new (op) UnusedOper(size)); // Does not throw
610,720✔
1458

13,390✔
1459
        // Keep the larger memory chunk (`op_2` or m_post_oper)
13,390✔
1460
        {
597,330✔
1461
            std::lock_guard lock{m_mutex};
609,208✔
1462
            if (!m_post_oper || m_post_oper->m_size < size)
609,208✔
1463
                swap(op_2, m_post_oper);
207,048✔
1464
        }
597,330✔
1465
    }
664,362✔
1466

67,032✔
1467
    void trigger_exec(TriggerExecOperBase& op) noexcept
67,032✔
1468
    {
1469
        {
1470
            std::lock_guard lock{m_mutex};
1471
            if (op.m_in_use)
×
1472
                return;
1473
            op.m_in_use = true;
1474
            bind_ptr<TriggerExecOperBase> op_2{&op}; // Increment use count
1475
            LendersOperPtr op_3{op_2.release()};
1476
            m_completed_operations_2.push_back(std::move(op_3));
1477
        }
1478
        io_reactor.interrupt();
1479
    }
1480

1481
    void reset_trigger_exec(TriggerExecOperBase& op) noexcept
1482
    {
1483
        std::lock_guard lock{m_mutex};
1484
        op.m_in_use = false;
1485
    }
8,624✔
1486

8,624✔
1487
    void add_completed_oper(LendersOperPtr op) noexcept
8,624✔
1488
    {
1,585,676✔
1489
        m_completed_operations.push_back(std::move(op));
1,585,676✔
1490
    }
1,585,676✔
1491

8,350✔
1492
    void remove_canceled_ops(Descriptor& desc) noexcept
1,014,748✔
1493
    {
1,259,000✔
1494
        io_reactor.remove_canceled_ops(desc, m_completed_operations);
1,267,164✔
1495
    }
1,259,000✔
1496

1,014,748✔
1497
    void cancel_resolve_oper(ResolveOperBase& op) noexcept
889,864✔
1498
    {
909,804✔
1499
        std::lock_guard lock{m_mutex};
909,804✔
1500
        op.cancel();
909,804✔
1501
    }
3,545,770✔
1502

2,635,966✔
1503
    void cancel_incomplete_wait_oper(WaitOperBase& op) noexcept
909,798✔
1504
    {
1,040,468✔
1505
        auto p = std::equal_range(m_wait_operations.begin(), m_wait_operations.end(), op.m_expiration_time,
1,040,468✔
1506
                                  WaitOperCompare{});
1,040,468✔
1507
        auto pred = [&op](const LendersWaitOperPtr& op_2) {
1,040,470✔
1508
            return &*op_2 == &op;
1,040,470✔
1509
        };
299,232✔
1510
        auto i = std::find_if(p.first, p.second, pred);
751,718✔
1511
        REALM_ASSERT(i != p.second);
751,718✔
1512
        m_completed_operations.push_back(m_wait_operations.erase(i));
751,718✔
1513
    }
751,718✔
1514

258✔
1515
private:
258✔
1516
    OperQueue<AsyncOper> m_completed_operations; // Completed, canceled, and post operations
258✔
1517

258✔
1518
    struct WaitOperCompare {
258✔
1519
        bool operator()(const LendersWaitOperPtr& a, clock::time_point b)
258✔
1520
        {
14,486✔
1521
            return a->m_expiration_time > b;
14,486✔
1522
        }
14,486✔
1523
        bool operator()(clock::time_point a, const LendersWaitOperPtr& b)
258✔
1524
        {
12,850✔
1525
            return a > b->m_expiration_time;
12,850✔
1526
        }
12,850✔
1527
        bool operator()(const LendersWaitOperPtr& a, const LendersWaitOperPtr& b)
258✔
1528
        {
60,118✔
1529
            return a->m_expiration_time > b->m_expiration_time;
60,118✔
1530
        }
60,118✔
1531
    };
258✔
1532

258✔
1533
    using WaitQueue = util::PriorityQueue<LendersWaitOperPtr, std::vector<LendersWaitOperPtr>, WaitOperCompare>;
740,980✔
1534
    WaitQueue m_wait_operations;
740,980✔
1535

740,980✔
1536
    std::mutex m_mutex;
740,980✔
1537
    OwnersOperPtr m_post_oper;                       // Protected by `m_mutex`
489,238✔
1538
    OperQueue<ResolveOperBase> m_resolve_operations; // Protected by `m_mutex`
251,742✔
1539
    OperQueue<AsyncOper> m_completed_operations_2;   // Protected by `m_mutex`
110,374✔
1540
    bool m_stopped = false;                          // Protected by `m_mutex`
141,368✔
1541
    bool m_stop_resolver_thread = false;             // Protected by `m_mutex`
141,368✔
1542
    bool m_resolve_in_progress = false;              // Protected by `m_mutex`
141,368✔
1543
    std::condition_variable m_resolver_cond;         // Protected by `m_mutex`
1544

1,031,134✔
1545
    std::thread m_resolver_thread;
1,031,134✔
1546

1,328,320✔
1547
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1,328,320✔
1548
    util::Optional<DeadlineTimer> m_event_loop_metrics_timer;
580,128✔
1549
    clock::time_point m_event_loop_metrics_start_time = clock::now();
748,192✔
1550
    clock::time_point m_handler_exec_start_time;
748,192✔
1551
    clock::duration m_handler_exec_time = clock::duration::zero();
449,500✔
1552
#endif
298,692✔
1553
    void run_impl(bool return_when_idle)
298,692✔
1554
    {
307,558✔
1555
        bool no_incomplete_resolve_operations;
307,558✔
1556

1,031,134✔
1557
    on_handlers_executed_or_interrupted : {
2,353,974✔
1558
        std::lock_guard lock{m_mutex};
1,322,840✔
1559
        if (m_stopped)
1,322,840✔
1560
            return;
749,528✔
1561
        // Note: Order of post operations must be preserved.
740,940✔
1562
        m_completed_operations.push_back(m_completed_operations_2);
2,055,192✔
1563
        no_incomplete_resolve_operations = (!m_resolve_in_progress && m_resolve_operations.empty());
1,714,048✔
1564

740,940✔
1565
        if (m_completed_operations.empty())
2,055,192✔
1566
            goto on_time_progressed;
1,881,230✔
1567
    }
1,903,008✔
1568

1569
    on_operations_completed : {
1,162,068✔
1570
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
2,635,566✔
1571
        m_handler_exec_start_time = clock::now();
2,635,566✔
1572
#endif
2,635,566✔
1573
        while (LendersOperPtr op = m_completed_operations.pop_front())
3,708,452✔
1574
            execute(op); // Throws
2,546,384✔
1575
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
822✔
1576
        m_handler_exec_time += clock::now() - m_handler_exec_start_time;
822✔
1577
#endif
2,454✔
1578
        goto on_handlers_executed_or_interrupted;
1,164,522✔
1579
    }
1,150,476✔
1580

2,454✔
1581
    on_time_progressed : {
1,149,654✔
1582
        clock::time_point now = clock::now();
1,149,654✔
1583
        if (process_timers(now))
1,149,654✔
1584
            goto on_operations_completed;
253,268✔
1585

4,084✔
1586
        bool no_incomplete_operations =
898,838✔
1587
            (io_reactor.empty() && m_wait_operations.empty() && no_incomplete_resolve_operations);
899,662✔
1588
        if (no_incomplete_operations && return_when_idle) {
898,030✔
1589
            // We can only get to this point when there are no completion
1,632✔
1590
            // handlers ready to execute. It happens either because of a
1,632✔
1591
            // fall-through from on_operations_completed, or because of a
1,632✔
1592
            // jump to on_time_progressed, but that only happens if no
2✔
1593
            // completions handlers became ready during
1,630✔
1594
            // wait_and_process_io().
1,630✔
1595
            //
1,630✔
1596
            // We can also only get to this point when there are no
1,630✔
1597
            // asynchronous operations in progress (due to the preceeding
1,630✔
1598
            // if-condition.
1599
            //
1600
            // It is possible that an other thread has added new post
1,630✔
1601
            // operations since we checked, but there is really no point in
1,630✔
1602
            // rechecking that, as it is always possible, even after a
822✔
1603
            // recheck, that new post handlers get added after we decide to
1604
            // return, but before we actually do return. Also, if would
1605
            // offer no additional guarantees to the application.
1606
            return; // Out of work
262✔
1607
        }
262✔
1608

5,572✔
1609
        // Blocking wait for I/O
5,572✔
1610
        bool interrupted = false;
902,518✔
1611
        if (wait_and_process_io(now, interrupted)) // Throws
902,518✔
1612
            goto on_operations_completed;
744,464✔
1613
        if (interrupted)
163,626✔
1614
            goto on_handlers_executed_or_interrupted;
157,818✔
1615
        goto on_time_progressed;
11,380✔
1616
    }
11,380✔
1617
    }
11,380✔
1618
    bool process_timers(clock::time_point now)
5,572✔
1619
    {
1,153,384✔
1620
        bool any_operations_completed = false;
1,153,384✔
1621
        for (;;) {
1,413,576✔
1622
            if (m_wait_operations.empty())
1,413,576✔
1623
                break;
464,328✔
1624
            auto& op = m_wait_operations.top();
943,684✔
1625
            if (now < op->m_expiration_time)
943,680✔
1626
                break;
684,844✔
1627
            op->complete();
258,836✔
1628
            m_completed_operations.push_back(m_wait_operations.pop_top());
258,836✔
1629
            any_operations_completed = true;
258,836✔
1630
        }
258,836✔
1631
        return any_operations_completed;
1,147,812✔
1632
    }
1,147,812✔
1633

4✔
1634
    bool wait_and_process_io(clock::time_point now, bool& interrupted)
4✔
1635
    {
897,996✔
1636
        clock::time_point timeout;
897,996✔
1637
        if (!m_wait_operations.empty())
903,560✔
1638
            timeout = m_wait_operations.top()->m_expiration_time;
646,970✔
1639
        bool operations_completed = io_reactor.wait_and_advance(timeout, now, interrupted,
903,560✔
1640
                                                                m_completed_operations); // Throws
903,560✔
1641
        return operations_completed;
903,560✔
1642
    }
903,560✔
1643

5,568✔
1644
    static void execute(LendersOperPtr& lenders_ptr)
11,136✔
1645
    {
2,551,018✔
1646
        lenders_ptr.release()->recycle_and_execute(); // Throws
2,551,018✔
1647
    }
2,551,018✔
1648

5,568✔
1649
    void resolver_thread() noexcept
5,568✔
1650
    {
6,530✔
1651
        LendersResolveOperPtr op;
6,530✔
1652
        for (;;) {
8,254✔
1653
            {
8,254✔
1654
                std::unique_lock lock{m_mutex};
8,254✔
1655
                if (op) {
8,254✔
1656
                    m_completed_operations_2.push_back(std::move(op));
7,292✔
1657
                    io_reactor.interrupt();
7,292✔
1658
                }
12,860✔
1659
                m_resolve_in_progress = false;
8,254✔
1660
                while (m_resolve_operations.empty() && !m_stop_resolver_thread)
9,976✔
1661
                    m_resolver_cond.wait(lock);
7,290✔
1662
                if (m_stop_resolver_thread)
8,254✔
1663
                    return;
6,530✔
1664
                op = m_resolve_operations.pop_front();
7,292✔
1665
                m_resolve_in_progress = true;
7,292✔
1666
                if (op->is_canceled())
7,292✔
1667
                    continue;
5,570✔
1668
            }
7,290✔
1669
            try {
7,290✔
1670
                op->m_endpoints = resolve(op->m_query, op->m_error_code); // Throws only std::bad_alloc
7,290✔
1671
            }
1,722✔
1672
            catch (std::bad_alloc&) {
×
1673
                op->m_error_code = make_basic_system_error_code(ENOMEM);
×
1674
            }
5,568✔
1675
            op->complete();
7,290✔
1676
        }
7,290✔
1677
    }
6,530✔
1678
};
5,568✔
1679

5,568✔
1680

5,568✔
1681
// This function promises to only ever throw std::bad_alloc.
5,568✔
1682
Endpoint::List Service::Impl::resolve(const Resolver::Query& query, std::error_code& ec)
1683
{
5,764✔
1684
    Endpoint::List list;
5,764✔
1685

1686
    using addrinfo_type = struct addrinfo;
14,302✔
1687
    addrinfo_type hints = addrinfo_type(); // Clear
14,302✔
1688
    hints.ai_flags = query.m_flags;
5,764✔
1689
    hints.ai_family = query.m_protocol.m_family;
5,764✔
1690
    hints.ai_socktype = query.m_protocol.m_socktype;
14,302✔
1691
    hints.ai_protocol = query.m_protocol.m_protocol;
5,764✔
1692

1693
    const char* query_host = query.m_host.empty() ? 0 : query.m_host.c_str();
5,764✔
1694
    const char* query_service = query.m_service.empty() ? 0 : query.m_service.c_str();
8,236✔
1695
    struct addrinfo* first = nullptr;
9,958✔
1696
    int ret = ::getaddrinfo(query_host, query_service, &hints, &first);
9,958✔
1697
    if (REALM_UNLIKELY(ret != 0)) {
5,764✔
1698
#ifdef EAI_SYSTEM
2✔
1699
        if (ret == EAI_SYSTEM) {
2✔
1700
            if (errno != 0) {
4,430!
1701
                ec = make_basic_system_error_code(errno);
4,430✔
1702
            }
4,430✔
1703
            else {
1704
                ec = error::unknown;
1705
            }
1706
            return list;
8,372✔
1707
        }
8,372✔
1708
#endif
8,374✔
1709
        ec = translate_addrinfo_error(ret);
2✔
1710
        return list;
2✔
1711
    }
2✔
1712

8✔
1713
    GetaddrinfoResultOwner gro(first);
5,770✔
1714

8✔
1715
    // Count number of IPv4/IPv6 endpoints
1716
    std::size_t num_endpoints = 0;
5,762✔
1717
    {
5,762✔
1718
        struct addrinfo* curr = first;
590,880✔
1719
        while (curr) {
598,474✔
1720
            bool ip_v4 = curr->ai_family == AF_INET;
592,712✔
1721
            bool ip_v6 = curr->ai_family == AF_INET6;
7,594✔
1722
            if (ip_v4 || ip_v6)
7,594✔
1723
                ++num_endpoints;
7,594✔
1724
            curr = curr->ai_next;
592,868✔
1725
        }
592,868✔
1726
    }
591,036✔
1727
    REALM_ASSERT(num_endpoints >= 1);
5,762✔
1728

1729
    // Copy the IPv4/IPv6 endpoints
1730
    list.m_endpoints.set_size(num_endpoints); // Throws
5,762✔
1731
    struct addrinfo* curr = first;
5,762✔
1732
    std::size_t endpoint_ndx = 0;
5,762✔
1733
    while (curr) {
13,356✔
1734
        bool ip_v4 = curr->ai_family == AF_INET;
7,594✔
1735
        bool ip_v6 = curr->ai_family == AF_INET6;
7,594✔
1736
        if (ip_v4 || ip_v6) {
7,594✔
1737
            REALM_ASSERT((ip_v4 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v4_type)) ||
7,594✔
1738
                         (ip_v6 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v6_type)));
7,594✔
1739
            Endpoint& ep = list.m_endpoints[endpoint_ndx];
7,594✔
1740
            ep.m_protocol.m_family = curr->ai_family;
7,594✔
1741
            ep.m_protocol.m_socktype = curr->ai_socktype;
7,594✔
1742
            ep.m_protocol.m_protocol = curr->ai_protocol;
7,594✔
1743
            if (ip_v4) {
8,656✔
1744
                ep.m_sockaddr_union.m_ip_v4 = reinterpret_cast<Endpoint::sockaddr_ip_v4_type&>(*curr->ai_addr);
6,824✔
1745
            }
6,824✔
1746
            else {
2,894✔
1747
                ep.m_sockaddr_union.m_ip_v6 = reinterpret_cast<Endpoint::sockaddr_ip_v6_type&>(*curr->ai_addr);
2,894✔
1748
            }
2,894✔
1749
            ++endpoint_ndx;
8,656✔
1750
        }
8,656✔
1751
        curr = curr->ai_next;
8,656✔
1752
    }
8,656✔
1753

1,062✔
1754
    ec = std::error_code(); // Success
6,824✔
1755
    return list;
6,824✔
1756
}
6,824✔
1757

1,062✔
1758

1,062✔
1759
Service::Service()
1,062✔
1760
    : m_impl{std::make_unique<Impl>(*this)} // Throws
1,062✔
1761
{
9,834✔
1762
}
9,834✔
1763

1,062✔
1764

1,040✔
1765
Service::~Service() noexcept {}
9,834✔
1766

1767

1768
void Service::run()
1769
{
4,302✔
1770
    m_impl->run(); // Throws
4,302✔
1771
}
4,302✔
1772

1773

1774
void Service::run_until_stopped()
1775
{
4,564✔
1776
    m_impl->run_until_stopped();
4,564✔
1777
}
4,564✔
1778

1779

1,062✔
1780
void Service::stop() noexcept
1781
{
8,610✔
1782
    m_impl->stop();
8,610✔
1783
}
8,610✔
1784

1785

1786
void Service::reset() noexcept
1787
{
8✔
1788
    m_impl->reset();
8✔
1789
}
1,070✔
1790

1,062✔
1791

1,062✔
1792
void Service::report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1793
{
1794
    m_impl->report_event_loop_metrics(std::move(handler)); // Throws
1795
}
1796

1797

1798
void Service::do_post(PostOperConstr constr, std::size_t size, void* cookie)
1799
{
596,948✔
1800
    m_impl->post(constr, size, cookie); // Throws
596,948✔
1801
}
596,948✔
1802

1803

1804
void Service::recycle_post_oper(Impl& impl, PostOperBase* op) noexcept
1805
{
597,360✔
1806
    impl.recycle_post_oper(op);
598,422✔
1807
}
598,422✔
1808

1,062✔
1809

1,062✔
1810
void Service::trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1,062✔
1811
{
1,062✔
1812
    impl.trigger_exec(op);
1,062✔
1813
}
1814

1,062✔
1815

1816
void Service::reset_trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1817
{
1818
    impl.reset_trigger_exec(op);
1819
}
1820

1821

1822
void Service::Descriptor::accept(Descriptor& desc, StreamProtocol protocol, Endpoint* ep,
1,062✔
1823
                                 std::error_code& ec) noexcept
1,062✔
1824
{
3,168✔
1825
    REALM_ASSERT(is_open());
3,168✔
1826

1,062✔
1827
    union union_type {
3,168✔
1828
        Endpoint::sockaddr_union_type m_sockaddr_union;
3,168✔
1829
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
3,168✔
1830
    };
3,168✔
1831
    union_type buffer;
3,168✔
1832
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
3,168✔
1833
    socklen_t addr_len = sizeof buffer;
3,168✔
1834
    CloseGuard new_sock_fd;
3,168✔
1835
    for (;;) {
3,168✔
1836
#if HAVE_LINUX_ACCEPT4
1837
        // On Linux (HAVE_LINUX_ACCEPT4), make the accepted socket inherit the
1838
        // O_NONBLOCK status flag from the accepting socket to avoid an extra
1839
        // call to fcntl(). Note, it is deemed most likely that the accepted
1840
        // socket is going to be used in nonblocking when, and only when the
1841
        // accepting socket is used in nonblocking mode. Other platforms are
1842
        // handled below.
1843
        int flags = SOCK_CLOEXEC;
1844
        if (!in_blocking_mode())
1845
            flags |= SOCK_NONBLOCK;
1,062✔
1846
        native_handle_type ret = ::accept4(m_fd, addr, &addr_len, flags);
1,062✔
1847
#else
1,062✔
1848
        native_handle_type ret = ::accept(m_fd, addr, &addr_len);
3,168✔
1849
#endif
3,048✔
1850
#ifdef _WIN32
942✔
1851
        if (ret == INVALID_SOCKET) {
942✔
1852
            int err = WSAGetLastError();
1,062✔
1853
            if (err == WSAEINTR)
1,062✔
1854
                continue; // Retry on interruption by system signal
1855
            set_read_ready(err != WSAEWOULDBLOCK);
1856
            ec = make_winsock_error_code(err); // Failure
1857
            return;
1,012,100✔
1858
        }
1,012,100✔
1859
#else
1860
        if (REALM_UNLIKELY(ret == -1)) {
2,106✔
1861
            int err = errno;
984✔
1862
            if (err == EINTR)
1,013,084✔
1863
                continue; // Retry on interruption by system signal
1,010,748✔
1864
            if (err == EWOULDBLOCK)
984✔
1865
                err = EAGAIN;
984✔
1866
            set_read_ready(err != EAGAIN);
984✔
1867
            ec = make_basic_system_error_code(err); // Failure
984✔
1868
            return;
984✔
1869
        }
984✔
1870
#endif
1,122✔
1871
        new_sock_fd.reset(ret);
1,122✔
1872

1873
#if REALM_PLATFORM_APPLE
1,122✔
1874
        int optval = 1;
1,122✔
1875
        ret = ::setsockopt(new_sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
1,122✔
1876
        if (REALM_UNLIKELY(ret == -1)) {
1,011,870✔
1877
            // setsockopt() reports EINVAL if the other side disconnected while
1,010,748✔
1878
            // the connection was waiting in the listen queue.
254✔
1879
            int err = errno;
254✔
1880
            if (err == EINVAL) {
254!
1881
                continue;
254✔
1882
            }
254✔
1883
            ec = make_basic_system_error_code(err);
254✔
1884
            return;
254✔
1885
        }
254✔
1886
#endif
1,376✔
1887

254✔
1888
        set_read_ready(true);
1,011,616✔
1889
        break;
1,011,616✔
1890
    }
1,680✔
1891
    socklen_t expected_addr_len =
1,680✔
1892
        protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
1,342✔
1893
    if (REALM_UNLIKELY(addr_len != expected_addr_len))
1,680✔
1894
        REALM_TERMINATE("Unexpected peer address length");
1,009,936✔
1895

1,009,936✔
1896
#if !HAVE_LINUX_ACCEPT4
1,011,058✔
1897
    {
1,122✔
1898
        bool value = true;
1,122✔
1899
        if (REALM_UNLIKELY(set_cloexec_flag(new_sock_fd, value, ec)))
1,122✔
1900
            return;
1901
    }
1,122✔
1902
#endif
1,122✔
1903

1904
    // On some platforms (such as Mac OS X), the accepted socket automatically
1905
    // inherits file status flags from the accepting socket, but on other
1906
    // systems, this is not the case. In the case of Linux (HAVE_LINUX_ACCEPT4),
1907
    // the inheriting behaviour is obtained by using the Linux specific
1908
    // accept4() system call.
1909
    //
1910
    // For other platforms, we need to be sure that m_in_blocking_mode for the
1911
    // new socket is initialized to reflect the actual state of O_NONBLOCK on
1912
    // the new socket.
1913
    //
1914
    // Note: This implementation currently never modifies status flags other
1915
    // than O_NONBLOCK, so we only need to consider that flag.
1916

1917
#if !REALM_PLATFORM_APPLE && !HAVE_LINUX_ACCEPT4
1918
    // Make the accepted socket inherit the state of O_NONBLOCK from the
1919
    // accepting socket.
1920
    {
1921
        bool value = !m_in_blocking_mode;
1922
        if (::set_nonblock_flag(new_sock_fd, value, ec))
1923
            return;
1924
    }
1925
#endif
1926

1927
    desc.assign(new_sock_fd.release(), m_in_blocking_mode);
1,122✔
1928
    desc.set_write_ready(true);
1,122✔
1929
    if (ep) {
1,011,058✔
1930
        ep->m_protocol = protocol;
1,010,938✔
1931
        ep->m_sockaddr_union = buffer.m_sockaddr_union;
1,010,938✔
1932
    }
1,010,938✔
1933
    ec = std::error_code(); // Success
1,011,058✔
1934
}
1,013,222✔
1935

1936

1937
std::size_t Service::Descriptor::read_some(char* buffer, std::size_t size, std::error_code& ec) noexcept
1938
{
1,848,320✔
1939
    if (REALM_UNLIKELY(assume_read_would_block())) {
1,848,320✔
1940
        ec = error::resource_unavailable_try_again; // Failure
494✔
1941
        return 0;
494✔
1942
    }
494✔
1943
    for (;;) {
1,847,980✔
1944
        int flags = 0;
1,847,274✔
1945
#ifdef _WIN32
713,964✔
1946
        ssize_t ret = ::recv(m_fd, buffer, int(size), flags);
713,964✔
1947
        if (ret == SOCKET_ERROR) {
713,964✔
1948
            int err = WSAGetLastError();
713,964✔
1949
            // Retry on interruption by system signal
1950
            if (err == WSAEINTR)
1951
                continue;
1952
            set_read_ready(err != WSAEWOULDBLOCK);
1953
            ec = make_winsock_error_code(err); // Failure
1954
            return 0;
1955
        }
1956
#else
1957
        ssize_t ret = ::recv(m_fd, buffer, size, flags);
1,133,310✔
1958
        if (ret == -1) {
1,133,310✔
1959
            int err = errno;
61,258✔
1960
            // Retry on interruption by system signal
1961
            if (err == EINTR)
775,222✔
1962
                continue;
713,964✔
1963
            if (err == EWOULDBLOCK)
61,688✔
1964
                err = EAGAIN;
61,514✔
1965
            set_read_ready(err != EAGAIN);
61,688✔
1966
            ec = make_basic_system_error_code(err); // Failure
61,688✔
1967
            return 0;
61,258✔
1968
        }
61,258✔
1969
#endif
1,072,052✔
1970
        if (REALM_UNLIKELY(ret == 0)) {
1,072,052✔
1971
            set_read_ready(true);
426✔
1972
            ec = MiscExtErrors::end_of_input;
426✔
1973
            return 0;
426✔
1974
        }
426✔
1975
        REALM_ASSERT(ret > 0);
1,072,056✔
1976
        std::size_t n = std::size_t(ret);
1,072,056✔
1977
        REALM_ASSERT(n <= size);
1,072,056✔
1978
#if REALM_NETWORK_USE_EPOLL
430✔
1979
        // On Linux a partial read (n < size) on a nonblocking stream-mode
430✔
1980
        // socket is guaranteed to only ever happen if a complete read would
430✔
1981
        // have been impossible without blocking (i.e., without failing with
713,534✔
1982
        // EAGAIN/EWOULDBLOCK), or if the end of input from the remote peer was
713,534✔
1983
        // detected by the Linux kernel.
713,534✔
1984
        //
713,534✔
1985
        // Further more, after a partial read, and when working with Linux epoll
1986
        // in edge-triggered mode (EPOLLET), it is safe to suspend further
1987
        // reading until a new read-readiness notification is received, provided
1988
        // that we registered interest in EPOLLRDHUP events, and an EPOLLRDHUP
1989
        // event was not received prior to the partial read. This is safe in the
1990
        // sense that reading is guaranteed to be resumed in a timely fashion
1991
        // (without unnessesary blocking), and in a manner that is free of race
1992
        // conditions. Note in particular that if a read was partial because the
1993
        // kernel had detected the end of input prior to that read, but the
1994
        // EPOLLRDHUP event was not received prior the that read, then reading
1995
        // will still be resumed immediately by the pending EPOLLRDHUP event.
1996
        //
1997
        // Note that without this extra "loss of read-readiness" trigger, it
1998
        // would have been necessary for the caller to immediately follow up
1999
        // with an (otherwise redundant) additional invocation of read_some()
2000
        // just to detect the loss of read-readiness.
2001
        //
2002
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2003
        // In particular, do we know that a partial read (n < size) on a
2004
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2005
        // complete read would have been impossible without blocking, or if the
2006
        // end of input from the remote peer was detected by the FreeBSD and/or
2007
        // macOS kernel? See http://stackoverflow.com/q/40123626/1698548.
2008
        set_read_ready(n == size || m_imminent_end_of_input);
2009
#else
2010
        set_read_ready(true);
1,785,160✔
2011
#endif
1,785,160✔
2012
        ec = std::error_code(); // Success
1,785,160✔
2013
        return n;
1,785,160✔
2014
    }
1,785,160✔
2015
}
1,847,826✔
2016

2017

2018
std::size_t Service::Descriptor::write_some(const char* data, std::size_t size, std::error_code& ec) noexcept
2019
{
793,584✔
2020
    if (REALM_UNLIKELY(assume_write_would_block())) {
793,584✔
2021
        ec = error::resource_unavailable_try_again; // Failure
106,876✔
2022
        return 0;
106,876✔
2023
    }
106,876✔
2024
    for (;;) {
687,026✔
2025
        int flags = 0;
687,026✔
2026
#ifdef __linux__
2027
        // Prevent SIGPIPE when remote peer has closed the connection.
2028
        flags |= MSG_NOSIGNAL;
2029
#endif
5,866✔
2030
#ifdef _WIN32
5,866✔
2031
        ssize_t ret = ::send(m_fd, data, int(size), flags);
5,866✔
2032
        if (ret == SOCKET_ERROR) {
2033
            int err = WSAGetLastError();
2034
            // Retry on interruption by system signal
2035
            if (err == WSAEINTR)
1,787,996✔
2036
                continue;
1,787,996✔
2037
            set_write_ready(err != WSAEWOULDBLOCK);
645,696✔
2038
            ec = make_winsock_error_code(err); // Failure
645,696✔
2039
            return 0;
645,696✔
2040
        }
645,696✔
2041
#else
1,142,300✔
2042
        ssize_t ret = ::send(m_fd, data, size, flags);
1,829,326✔
2043
        if (ret == -1) {
1,829,326✔
2044
            int err = errno;
5,170✔
2045
            // Retry on interruption by system signal
2046
            if (err == EINTR)
5,170✔
2047
                continue;
6,886✔
2048
#if REALM_PLATFORM_APPLE
12,056✔
2049
            // The macOS kernel can generate an undocumented EPROTOTYPE in
6,886✔
2050
            // certain cases where the peer has closed the connection (in
6,886✔
2051
            // tcp_usr_send() in bsd/netinet/tcp_usrreq.c) See also
2052
            // http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/.
2053
            if (REALM_UNLIKELY(err == EPROTOTYPE))
5,170✔
2054
                err = EPIPE;
4✔
2055
#endif
5,170✔
2056
            if (err == EWOULDBLOCK)
5,170✔
2057
                err = EAGAIN;
5,054✔
2058
            set_write_ready(err != EAGAIN);
5,170✔
2059
            ec = make_basic_system_error_code(err); // Failure
5,170✔
2060
            return 0;
5,170✔
2061
        }
5,170✔
2062
#endif
681,856✔
2063
        REALM_ASSERT(ret >= 0);
681,856✔
2064
        std::size_t n = std::size_t(ret);
681,856✔
2065
        REALM_ASSERT(n <= size);
681,856✔
2066
#if REALM_NETWORK_USE_EPOLL
2067
        // On Linux a partial write (n < size) on a nonblocking stream-mode
2068
        // socket is guaranteed to only ever happen if a complete write would
3,942✔
2069
        // have been impossible without blocking (i.e., without failing with
3,942✔
2070
        // EAGAIN/EWOULDBLOCK).
3,942✔
2071
        //
2072
        // Further more, after a partial write, and when working with Linux
2073
        // epoll in edge-triggered mode (EPOLLET), it is safe to suspend further
2074
        // writing until a new write-readiness notification is received. This is
5,576✔
2075
        // safe in the sense that writing is guaranteed to be resumed in a
5,576✔
2076
        // timely fashion (without unnessesary blocking), and in a manner that
6✔
2077
        // is free of race conditions.
6✔
2078
        //
6✔
2079
        // Note that without this extra "loss of write-readiness" trigger, it
5,576✔
2080
        // would have been necessary for the caller to immediately follow up
2081
        // with an (otherwise redundant) additional invocation of write_some()
2082
        // just to detect the loss of write-readiness.
2083
        //
1,632✔
2084
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
1,632✔
2085
        // In particular, do we know that a partial write (n < size) on a
1,632✔
2086
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2087
        // complete write would have been impossible without blocking? See
2088
        // http://stackoverflow.com/q/40123626/1698548.
2089
        set_write_ready(n == size);
234✔
2090
#else
234✔
2091
        set_write_ready(true);
682,090✔
2092
#endif
681,856✔
2093
        ec = std::error_code(); // Success
681,856✔
2094
        return n;
681,856✔
2095
    }
1,223,422✔
2096
}
1,228,274✔
2097

541,566✔
2098

259,184✔
2099
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
259,184✔
2100

259,338✔
2101
void Service::Descriptor::deregister_for_async() noexcept
259,184✔
2102
{
548,624✔
2103
    service_impl.io_reactor.deregister_desc(*this);
257,090✔
2104
}
257,090✔
2105

249,962✔
2106
#endif // REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
250,032✔
2107

541,566✔
2108

280,680✔
2109
void Service::Descriptor::set_nonblock_flag(bool value)
541,566✔
2110
{
6,060✔
2111
    ::set_nonblock_flag(m_fd, value); // Throws
6,060✔
2112
}
6,060✔
2113

4,060✔
2114

4,060✔
2115
void Service::Descriptor::add_initiated_oper(LendersIoOperPtr op, Want want)
2116
{
1,691,724✔
2117
    if (REALM_UNLIKELY(want == Want::nothing)) {
1,695,784✔
2118
        REALM_ASSERT(op->is_complete());
566,890✔
2119
        service_impl.add_completed_oper(std::move(op));
566,890✔
2120
        return;
566,890✔
2121
    }
566,890✔
2122
    REALM_ASSERT(!op->is_complete());
1,132,954✔
2123
    service_impl.io_reactor.add_oper(*this, std::move(op), want); // Throws
1,132,954✔
2124
}
1,132,954✔
2125

4,060✔
2126

4,060✔
2127
void Service::Descriptor::do_close() noexcept
4,060✔
2128
{
11,208✔
2129
    checked_close(m_fd);
7,148✔
2130
    m_fd = -1;
7,148✔
2131
}
7,148✔
2132

9,626✔
2133

9,626✔
2134
auto Service::Descriptor::do_release() noexcept -> native_handle_type
9,626✔
2135
{
9,626✔
2136
    native_handle_type fd = m_fd;
9,626✔
2137
    m_fd = -1;
9,626✔
2138
    return fd;
9,626✔
2139
}
9,626✔
2140

9,626✔
2141

9,626✔
2142
Service& Resolver::get_service() noexcept
9,626✔
2143
{
9,626✔
2144
    return m_service_impl.service;
9,626✔
2145
}
9,626✔
2146

9,626✔
2147

9,626✔
2148
Endpoint::List Resolver::resolve(const Query& query, std::error_code& ec)
9,626✔
2149
{
4,042✔
2150
    return Service::Impl::resolve(query, ec); // Throws
13,668✔
2151
}
13,668✔
2152

9,626✔
2153

2154
void Resolver::cancel() noexcept
2155
{
5,766✔
2156
    if (m_resolve_oper && m_resolve_oper->in_use() && !m_resolve_oper->is_canceled()) {
15,392✔
2157
        Service::ResolveOperBase& op = static_cast<Service::ResolveOperBase&>(*m_resolve_oper);
9,632✔
2158
        m_service_impl.cancel_resolve_oper(op);
6✔
2159
    }
6✔
2160
}
5,766✔
2161

5,820✔
2162

5,820✔
2163
void Resolver::initiate_oper(Service::LendersResolveOperPtr op)
5,820✔
2164
{
7,542✔
2165
    m_service_impl.add_resolve_oper(std::move(op)); // Throws
7,542✔
2166
}
7,542✔
2167

5,820✔
2168

5,820✔
2169
Service& SocketBase::get_service() noexcept
2170
{
246✔
2171
    return m_desc.service_impl.service;
246✔
2172
}
246✔
2173

2174

2175
void SocketBase::cancel() noexcept
5,820✔
2176
{
506,780✔
2177
    bool any_incomplete = false;
506,780✔
2178
    if (m_read_oper && m_read_oper->in_use() && !m_read_oper->is_canceled()) {
506,780✔
2179
        m_read_oper->cancel();
230,026✔
2180
        if (!m_read_oper->is_complete())
230,026✔
2181
            any_incomplete = true;
230,030✔
2182
    }
230,026✔
2183
    if (m_write_oper && m_write_oper->in_use() && !m_write_oper->is_canceled()) {
506,780✔
2184
        m_write_oper->cancel();
215,436✔
2185
        if (!m_write_oper->is_complete())
215,436✔
2186
            any_incomplete = true;
215,392✔
2187
    }
215,436✔
2188
    if (any_incomplete)
506,780✔
2189
        m_desc.service_impl.remove_canceled_ops(m_desc);
244,272✔
2190
}
512,600✔
2191

2192

2193
std::error_code SocketBase::bind(const Endpoint& ep, std::error_code& ec)
2194
{
4,162✔
2195
    if (!is_open()) {
4,162✔
2196
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
×
2197
            return ec;
2198
    }
4,162✔
2199

2200
    native_handle_type sock_fd = m_desc.native_handle();
4,162✔
2201
    socklen_t addr_len =
9,982✔
2202
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
9,636✔
2203

5,820✔
2204
    int ret = ::bind(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
9,982✔
2205
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
9,982✔
2206
        return ec;
5,820✔
2207
    ec = std::error_code(); // Success
9,982✔
2208
    return ec;
4,162✔
2209
}
4,162✔
2210

2211

2✔
2212
Endpoint SocketBase::local_endpoint(std::error_code& ec) const
2✔
2213
{
9,916✔
2214
    Endpoint ep;
9,916✔
2215
    union union_type {
9,916✔
2216
        Endpoint::sockaddr_union_type m_sockaddr_union;
9,916✔
2217
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
9,916✔
2218
    };
9,916✔
2219
    native_handle_type sock_fd = m_desc.native_handle();
9,916✔
2220
    union_type buffer;
9,916✔
2221
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
9,916✔
2222
    socklen_t addr_len = sizeof buffer;
9,914✔
2223
    int ret = ::getsockname(sock_fd, addr, &addr_len);
9,916✔
2224
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
9,916✔
2225
        return ep;
2✔
2226

2✔
2227
    socklen_t expected_addr_len =
9,916✔
2228
        m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
7,752✔
2229
    if (addr_len != expected_addr_len)
9,916✔
2230
        throw util::runtime_error("Unexpected local address length");
2✔
2231
    ep.m_protocol = m_protocol;
9,914✔
2232
    ep.m_sockaddr_union = buffer.m_sockaddr_union;
9,914✔
2233
    ec = std::error_code(); // Success
9,914✔
2234
#ifdef _WIN32
4✔
2235
    ep.m_sockaddr_union.m_ip_v4.sin_addr.s_addr = inet_addr("127.0.0.1");
4✔
2236
#endif
4✔
2237
    return ep;
9,918✔
2238
}
9,918✔
2239

4✔
2240

4✔
2241
std::error_code SocketBase::open(const StreamProtocol& prot, std::error_code& ec)
4✔
2242
{
6,028✔
2243
    if (REALM_UNLIKELY(is_open()))
6,028✔
2244
        throw util::runtime_error("Socket is already open");
4✔
2245
    int type = prot.m_socktype;
6,028✔
2246
#if HAVE_LINUX_SOCK_CLOEXEC
4✔
2247
    type |= SOCK_CLOEXEC;
2248
#endif
2249
    native_handle_type ret = ::socket(prot.m_family, type, prot.m_protocol);
6,024✔
2250
#ifdef _WIN32
4,908✔
2251
    if (REALM_UNLIKELY(ret == INVALID_SOCKET)) {
4,908✔
2252
        ec = make_winsock_error_code(WSAGetLastError());
4,908✔
2253
        return ec;
4,908✔
2254
    }
4,908✔
2255
#else
4,908✔
2256
    if (REALM_UNLIKELY(ret == -1)) {
10,932✔
2257
        ec = make_basic_system_error_code(errno);
4,908✔
2258
        return ec;
4,908✔
2259
    }
4,908✔
2260
#endif
10,932✔
2261

2262
    CloseGuard sock_fd{ret};
6,024✔
2263

2264
#if !HAVE_LINUX_SOCK_CLOEXEC
10,936✔
2265
    {
10,936✔
2266
        bool value = true;
9,966✔
2267
        if (REALM_UNLIKELY(set_cloexec_flag(sock_fd, value, ec)))
9,966✔
2268
            return ec;
3,942✔
2269
    }
9,966✔
2270
#endif
6,024✔
2271

2272
#if REALM_PLATFORM_APPLE
6,024✔
2273
    {
6,024✔
2274
        int optval = 1;
6,024✔
2275
        int ret = setsockopt(sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
6,024✔
2276
        if (REALM_UNLIKELY(ret == -1)) {
6,024✔
2277
            ec = make_basic_system_error_code(errno);
×
2278
            return ec;
×
2279
        }
×
2280
    }
6,994✔
2281
#endif
6,994✔
2282

970✔
2283
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
6,994✔
2284
    m_desc.assign(sock_fd.release(), in_blocking_mode);
6,024✔
2285
    m_protocol = prot;
6,024✔
2286
    ec = std::error_code(); // Success
6,024✔
2287
    return ec;
6,024✔
2288
}
6,024✔
2289

2290

32✔
2291
std::error_code SocketBase::do_assign(const StreamProtocol& prot, native_handle_type sock_fd, std::error_code& ec)
32✔
2292
{
34✔
2293
    if (REALM_UNLIKELY(is_open()))
34✔
2294
        throw util::runtime_error("Socket is already open");
30✔
2295

30✔
2296
    // We need to know whether the specified socket is in blocking or in
32✔
2297
    // nonblocking mode. Rather than reading the current mode, we set it to
32✔
2298
    // blocking mode (disable nonblocking mode), and initialize
32✔
2299
    // `m_in_blocking_mode` to true.
32✔
2300
    {
34✔
2301
        bool value = false;
34✔
2302
        if (::set_nonblock_flag(sock_fd, value, ec))
34✔
2303
            return ec;
32✔
2304
    }
34✔
2305

32✔
2306
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
34✔
2307
    m_desc.assign(sock_fd, in_blocking_mode);
34✔
2308
    m_protocol = prot;
34✔
2309
    ec = std::error_code(); // Success
2✔
2310
    return ec;
2✔
2311
}
2✔
2312

28✔
2313

28✔
2314
void SocketBase::get_option(opt_enum opt, void* value_data, std::size_t& value_size, std::error_code& ec) const
28✔
2315
{
32✔
2316
    int level = 0;
32✔
2317
    int option_name = 0;
32✔
2318
    map_option(opt, level, option_name);
32✔
2319

28✔
2320
    native_handle_type sock_fd = m_desc.native_handle();
32✔
2321
    socklen_t option_len = socklen_t(value_size);
4✔
2322
    int ret = ::getsockopt(sock_fd, level, option_name, static_cast<char*>(value_data), &option_len);
4✔
2323
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
4✔
2324
        return;
1,724✔
2325
    value_size = std::size_t(option_len);
1,728✔
2326
    ec = std::error_code(); // Success
1,726✔
2327
}
1,726✔
2328

1,724✔
2329

1,724✔
2330
void SocketBase::set_option(opt_enum opt, const void* value_data, std::size_t value_size, std::error_code& ec)
1,724✔
2331
{
6,678✔
2332
    int level = 0;
6,678✔
2333
    int option_name = 0;
6,678✔
2334
    map_option(opt, level, option_name);
2,147,488,601✔
2335

1,724✔
2336
    native_handle_type sock_fd = m_desc.native_handle();
6,678✔
2337
    int ret = ::setsockopt(sock_fd, level, option_name, static_cast<const char*>(value_data), socklen_t(value_size));
4,956✔
2338
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
4,956✔
2339
        return;
2✔
2340
    ec = std::error_code(); // Success
6,676✔
2341
}
6,676✔
2342

1,722✔
2343

1,722✔
2344
void SocketBase::map_option(opt_enum opt, int& level, int& option_name) const
1,722✔
2345
{
6,680✔
2346
    switch (opt) {
6,680✔
2347
        case opt_ReuseAddr:
5,764✔
2348
            level = SOL_SOCKET;
4,042✔
2349
            option_name = SO_REUSEADDR;
4,042✔
2350
            return;
4,042✔
2351
        case opt_Linger:
2✔
2352
            level = SOL_SOCKET;
2353
#if REALM_PLATFORM_APPLE
2354
            // By default, SO_LINGER on Darwin uses "ticks" instead of
2355
            // seconds for better accuracy, but we want to be cross-platform.
1,722✔
2356
            option_name = SO_LINGER_SEC;
1,722✔
2357
#else
2358
            option_name = SO_LINGER;
2359
#endif // REALM_PLATFORM_APPLE
2360
            return;
1,722✔
2361
        case opt_NoDelay:
2,638✔
2362
            level = IPPROTO_TCP;
2,638✔
2363
            option_name = TCP_NODELAY; // Specified by POSIX.1-2001
2,638✔
2364
            return;
916✔
2365
    }
2366
    REALM_ASSERT(false);
2367
}
1,718✔
2368

1,718✔
2369

1,718✔
2370
std::error_code Socket::connect(const Endpoint& ep, std::error_code& ec)
1,718✔
2371
{
1,754✔
2372
    REALM_ASSERT(!m_write_oper || !m_write_oper->in_use());
1,754!
2373

1,718✔
2374
    if (!is_open()) {
1,754✔
2375
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
1,752✔
2376
            return ec;
×
2377
    }
36✔
2378

2379
    m_desc.ensure_blocking_mode(); // Throws
1,754✔
2380

1,718✔
2381
    native_handle_type sock_fd = m_desc.native_handle();
36✔
2382
    socklen_t addr_len =
36✔
2383
        (ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type));
22✔
2384
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
4,096✔
2385
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
4,096✔
2386
        return ec;
4,064✔
2387
    ec = std::error_code(); // Success
4,092✔
2388
    return ec;
4,092✔
2389
}
4,092✔
2390

4,060✔
2391

4,060✔
2392
std::error_code Socket::shutdown(shutdown_type what, std::error_code& ec)
2393
{
28✔
2394
    native_handle_type sock_fd = m_desc.native_handle();
28✔
2395
    int how = what;
28✔
2396
    int ret = ::shutdown(sock_fd, how);
28✔
2397
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
28✔
2398
        return ec;
2399
    ec = std::error_code(); // Success
28✔
2400
    return ec;
28✔
2401
}
11,986✔
2402

11,958✔
2403

9,930✔
2404
bool Socket::initiate_async_connect(const Endpoint& ep, std::error_code& ec)
9,930✔
2405
{
11,752✔
2406
    if (!is_open()) {
11,752✔
2407
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
11,750✔
2408
            return true; // Failure
9,930✔
2409
    }
11,752✔
2410
    m_desc.ensure_nonblocking_mode(); // Throws
13,780✔
2411

2412
    // Initiate connect operation.
2413
    native_handle_type sock_fd = m_desc.native_handle();
1,822✔
2414
    socklen_t addr_len =
311,942✔
2415
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
311,598✔
2416
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
311,942✔
2417
    if (ret != -1) {
1,822✔
2418
        ec = std::error_code(); // Success
2419
        return true;            // Immediate completion.
2420
    }
995,172✔
2421

995,172✔
2422
    // EINPROGRESS (and on Windows, also WSAEWOULDBLOCK) indicates that the
995,172✔
2423
    // underlying connect operation was successfully initiated, but not
995,172✔
2424
    // immediately completed, and EALREADY indicates that an underlying connect
995,172✔
2425
    // operation was already initiated, and still not completed, presumably
995,172✔
2426
    // because a previous call to connect() or async_connect() failed, or was
240,370✔
2427
    // canceled.
754,802✔
2428

754,802✔
2429
#ifdef _WIN32
754,802✔
2430
    int err = WSAGetLastError();
754,802✔
2431
    if (err != WSAEWOULDBLOCK) {
754,802✔
2432
        ec = make_winsock_error_code(err);
754,802✔
2433
        return true; // Failure
754,802✔
2434
    }
419,172✔
2435
#else
2436
    int err = errno;
420,994✔
2437
    if (REALM_UNLIKELY(err != EINPROGRESS && err != EALREADY)) {
337,452✔
2438
        ec = make_basic_system_error_code(err);
335,630✔
2439
        return true; // Failure
313,988✔
2440
    }
21,642✔
2441
#endif
23,464✔
2442

21,642✔
2443
    return false; // Successful initiation, but no immediate completion.
756,624✔
2444
}
756,624✔
2445

2446

2447
std::error_code Socket::finalize_async_connect(std::error_code& ec) noexcept
2448
{
1,816✔
2449
    native_handle_type sock_fd = m_desc.native_handle();
1,816✔
2450
    int connect_errno = 0;
1,818✔
2451
    socklen_t connect_errno_size = sizeof connect_errno;
1,818✔
2452
    int ret =
1,818✔
2453
        ::getsockopt(sock_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&connect_errno), &connect_errno_size);
1,818✔
2454
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
1,818✔
2455
        return ec; // getsockopt() failed
2✔
2456
    if (REALM_UNLIKELY(connect_errno)) {
1,818✔
2457
        ec = make_basic_system_error_code(connect_errno);
6✔
2458
        return ec; // connect failed
6✔
2459
    }
6✔
2460
    return std::error_code(); // Success
1,814✔
2461
}
1,814✔
2462

2463

2464
std::error_code Acceptor::listen(int backlog, std::error_code& ec)
2465
{
4,162✔
2466
    native_handle_type sock_fd = m_desc.native_handle();
4,162✔
2467
    int ret = ::listen(sock_fd, backlog);
4,162✔
2468
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
4,162✔
2469
        return ec;
×
2470
    ec = std::error_code(); // Success
4,162✔
2471
    return ec;
4,162✔
2472
}
4,162✔
2473

2474

2475
Service& DeadlineTimer::get_service() noexcept
2476
{
2477
    return m_service_impl.service;
2478
}
×
2479

2480

2481
void DeadlineTimer::cancel() noexcept
2482
{
12,570✔
2483
    if (m_wait_oper && m_wait_oper->in_use() && !m_wait_oper->is_canceled()) {
12,570✔
2484
        m_wait_oper->cancel();
10,480✔
2485
        if (!m_wait_oper->is_complete()) {
10,480✔
2486
            using WaitOperBase = Service::WaitOperBase;
10,480✔
2487
            WaitOperBase& wait_operation = static_cast<WaitOperBase&>(*m_wait_oper);
10,480✔
2488
            m_service_impl.cancel_incomplete_wait_oper(wait_operation);
10,480✔
2489
        }
10,480✔
2490
    }
10,480✔
2491
}
12,570✔
2492

2493

2494
void DeadlineTimer::initiate_oper(Service::LendersWaitOperPtr op)
2495
{
271,140✔
2496
    m_service_impl.add_wait_oper(std::move(op)); // Throws
271,140✔
2497
}
271,140✔
2498

2499

2500
bool ReadAheadBuffer::read(char*& begin, char* end, int delim, std::error_code& ec) noexcept
2501
{
1,023,234✔
2502
    std::size_t in_avail = m_end - m_begin;
1,023,234✔
2503
    std::size_t out_avail = end - begin;
1,023,234✔
2504
    std::size_t n = std::min(in_avail, out_avail);
1,023,234✔
2505
    // If n is 0, return whether or not the read expects 0 bytes for the completed response
2506
    if (n == 0)
1,023,234✔
2507
        return out_avail == 0;
256,208✔
2508

2509
    bool delim_mode = (delim != std::char_traits<char>::eof());
767,026✔
2510
    char* i =
767,026✔
2511
        (!delim_mode ? m_begin + n : std::find(m_begin, m_begin + n, std::char_traits<char>::to_char_type(delim)));
743,152✔
2512
    begin = std::copy(m_begin, i, begin);
767,026✔
2513
    m_begin = i;
767,026✔
2514
    if (begin == end) {
767,026✔
2515
        if (delim_mode)
430,022✔
2516
            ec = MiscExtErrors::delim_not_found;
2517
    }
430,022✔
2518
    else {
337,004✔
2519
        if (m_begin == m_end)
337,004✔
2520
            return false;
313,572✔
2521
        REALM_ASSERT(delim_mode);
23,432✔
2522
        *begin++ = *m_begin++; // Transfer delimiter
23,432✔
2523
    }
23,432✔
2524
    return true;
453,454✔
2525
}
767,030✔
2526

4✔
2527

2528
namespace realm::sync::network {
2529

4✔
2530
std::string host_name()
4✔
2531
{
2✔
2532
    // POSIX allows for gethostname() to report success even if the buffer is
2533
    // too small to hold the name, and in that case POSIX requires that the
2534
    // buffer is filled, but not that it contains a final null-termination.
2535
    char small_stack_buffer[256];
2✔
2536
    int ret = ::gethostname(small_stack_buffer, sizeof small_stack_buffer);
2✔
2537
    if (ret != -1) {
2✔
2538
        // Check that a null-termination was included
2539
        char* end = small_stack_buffer + sizeof small_stack_buffer;
2✔
2540
        char* i = std::find(small_stack_buffer, end, 0);
2✔
2541
        if (i != end)
2✔
2542
            return std::string(small_stack_buffer, i);
2✔
2543
    }
2544
    constexpr std::size_t large_heap_buffer_size = 4096;
2545
    std::unique_ptr<char[]> large_heap_buffer(new char[large_heap_buffer_size]); // Throws
2546
    ret = ::gethostname(large_heap_buffer.get(), large_heap_buffer_size);
4✔
2547
    if (REALM_LIKELY(ret != -1)) {
4!
2548
        // Check that a null-termination was included
4✔
2549
        char* end = large_heap_buffer.get() + large_heap_buffer_size;
4✔
2550
        char* i = std::find(large_heap_buffer.get(), end, 0);
2551
        if (i != end)
×
2552
            return std::string(large_heap_buffer.get(), i);
4✔
2553
    }
4✔
2554
    throw std::system_error(errno, std::system_category(), "gethostname() failed");
4✔
2555
}
2556

2557

2558
Address make_address(const char* c_str, std::error_code& ec) noexcept
2559
{
2560
    Address addr;
2561
    int ret = ::inet_pton(AF_INET6, c_str, &addr.m_union);
2562
    REALM_ASSERT(ret == 0 || ret == 1);
×
2563
    if (ret == 1) {
×
2564
        addr.m_is_ip_v6 = true;
2565
        ec = std::error_code(); // Success (IPv6)
2566
        return addr;
2567
    }
2568
    ret = ::inet_pton(AF_INET, c_str, &addr.m_union);
2569
    REALM_ASSERT(ret == 0 || ret == 1);
×
2570
    if (ret == 1) {
×
2571
        ec = std::error_code(); // Success (IPv4)
2572
        return addr;
2573
    }
2574
    ec = error::invalid_argument;
2575
    return Address();
2576

2577
    // FIXME: Currently. `addr.m_ip_v6_scope_id` is always set to zero. It nees
2578
    // to be set based on a combined inspection of the original string
2579
    // representation, and the parsed address. The following code is "borrowed"
2580
    // from ASIO:
2581
    /*
2582
        *scope_id = 0;
2583
        if (const char* if_name = strchr(src, '%'))
2584
        {
2585
          in6_addr_type* ipv6_address = static_cast<in6_addr_type*>(dest);
2586
          bool is_link_local = ((ipv6_address->s6_addr[0] == 0xfe)
2587
              && ((ipv6_address->s6_addr[1] & 0xc0) == 0x80));
2588
          bool is_multicast_link_local = ((ipv6_address->s6_addr[0] == 0xff)
2589
              && ((ipv6_address->s6_addr[1] & 0x0f) == 0x02));
2590
          if (is_link_local || is_multicast_link_local)
2591
            *scope_id = if_nametoindex(if_name + 1);
2592
          if (*scope_id == 0)
2593
            *scope_id = atoi(if_name + 1);
2594
        }
2595
    */
2596
}
2597

2598
class ResolveErrorCategory : public std::error_category {
2599
public:
2600
    const char* name() const noexcept final
2601
    {
2602
        return "realm.sync.network.resolve";
2603
    }
2604

2605
    std::string message(int value) const final
2606
    {
4✔
2607
        switch (ResolveErrors(value)) {
4✔
2608
            case ResolveErrors::host_not_found:
4✔
2609
                return "Host not found (authoritative)";
4✔
2610
            case ResolveErrors::host_not_found_try_again:
2✔
2611
                return "Host not found (non-authoritative)";
2612
            case ResolveErrors::no_data:
2✔
2613
                return "The query is valid but does not have associated address data";
2614
            case ResolveErrors::no_recovery:
2✔
2615
                return "A non-recoverable error occurred";
2616
            case ResolveErrors::service_not_found:
2✔
2617
                return "The service is not supported for the given socket type";
2618
            case ResolveErrors::socket_type_not_supported:
2✔
2619
                return "The socket type is not supported";
2620
        }
2621
        REALM_ASSERT(false);
2622
        return {};
2623
    }
2624
};
2625

2626
const std::error_category& resolve_error_category() noexcept
2627
{
2✔
2628
    static const ResolveErrorCategory resolve_error_category;
2✔
2629
    return resolve_error_category;
2✔
2630
}
2✔
2631

2632
std::error_code make_error_code(ResolveErrors err)
2633
{
2✔
2634
    return std::error_code(int(err), resolve_error_category());
2✔
2635
}
2✔
2636

2637
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc