• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_2947

01 Dec 2023 08:08PM UTC coverage: 91.739% (+0.04%) from 91.695%
jonathan.reams_2947

Pull #7160

Evergreen

jbreams
allow handle_error to decide resumability
Pull Request #7160: Prevent resuming a session that has not been fully shut down

92428 of 169414 branches covered (0.0%)

315 of 349 new or added lines in 14 files covered. (90.26%)

80 existing lines in 14 files now uncovered.

232137 of 253041 relevant lines covered (91.74%)

6882826.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.86
/src/realm/sync/network/network.cpp
1

2
#define _WINSOCK_DEPRECATED_NO_WARNINGS
3

4
#include <algorithm>
5
#include <cerrno>
6
#include <condition_variable>
7
#include <limits>
8
#include <mutex>
9
#include <stdexcept>
10
#include <thread>
11
#include <vector>
12

13
#include <fcntl.h>
14

15
#ifndef _WIN32
16
#include <netinet/tcp.h>
17
#include <unistd.h>
18
#include <poll.h>
19
#include <realm/util/to_string.hpp>
20
#endif
21

22
#include <realm/util/features.h>
23
#include <realm/util/optional.hpp>
24
#include <realm/util/misc_errors.hpp>
25
#include <realm/util/priority_queue.hpp>
26
#include <realm/sync/network/network.hpp>
27

28
#if defined _GNU_SOURCE && !REALM_ANDROID
29
#define HAVE_LINUX_PIPE2 1
30
#else
31
#define HAVE_LINUX_PIPE2 0
32
#endif
33

34
// Note: Linux specific accept4() is not available on Android.
35
#if defined _GNU_SOURCE && defined SOCK_NONBLOCK && defined SOCK_CLOEXEC && !REALM_ANDROID
36
#define HAVE_LINUX_ACCEPT4 1
37
#else
38
#define HAVE_LINUX_ACCEPT4 0
39
#endif
40

41
#if defined _GNU_SOURCE && defined SOCK_CLOEXEC
42
#define HAVE_LINUX_SOCK_CLOEXEC 1
43
#else
44
#define HAVE_LINUX_SOCK_CLOEXEC 0
45
#endif
46

47
#ifndef _WIN32
48

49
#if REALM_NETWORK_USE_EPOLL
50
#include <linux/version.h>
51
#include <sys/epoll.h>
52
#elif REALM_HAVE_KQUEUE
53
#include <sys/types.h>
54
#include <sys/event.h>
55
#include <sys/time.h>
56
#else
57
#include <poll.h>
58
#endif
59

60
#endif
61

62
// On Linux kernels earlier than 2.6.37, epoll can't handle timeout values
63
// bigger than (LONG_MAX - 999ULL)/HZ.  HZ in the wild can be as big as 1000,
64
// and LONG_MAX can be as small as (2**31)-1, so the largest number of
65
// milliseconds we can be sure to support on those early kernels is 2147482.
66
#if REALM_NETWORK_USE_EPOLL
67
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 37)
68
#define EPOLL_LARGE_TIMEOUT_BUG 1
69
#endif
70
#endif
71

72
using namespace realm::util;
73
using namespace realm::sync::network;
74

75

76
namespace {
77

78
using native_handle_type = SocketBase::native_handle_type;
79

80
#ifdef _WIN32
81

82
// This Winsock initialization call is required prior to any other Winsock API call
83
// made by the process. It is OK if a process calls it multiple times.
84
struct ProcessInitialization {
85
    ProcessInitialization()
86
    {
87
        WSADATA wsaData;
88
        int i = WSAStartup(MAKEWORD(2, 2), &wsaData);
89
        if (i != 0) {
90
            throw std::system_error(i, std::system_category(), "WSAStartup() Winsock initialization failed");
91
        }
92
    }
93

94
    ~ProcessInitialization()
95
    {
96
        // Must be called 1 time for each call to WSAStartup() that has taken place
97
        WSACleanup();
98
    }
99
};
100

101
ProcessInitialization g_process_initialization;
102

103
std::error_code make_winsock_error_code(int error_code)
104
{
105
    switch (error_code) {
106
        case WSAEAFNOSUPPORT:
107
            return make_basic_system_error_code(EAFNOSUPPORT);
108
        case WSAEINVAL:
109
            return make_basic_system_error_code(EINVAL);
110
        case WSAECANCELLED:
111
            return make_basic_system_error_code(ECANCELED);
112
        case WSAECONNABORTED:
113
            return make_basic_system_error_code(ECONNABORTED);
114
        case WSAECONNRESET:
115
            return make_basic_system_error_code(ECONNRESET);
116
        case WSAEWOULDBLOCK:
117
            return make_basic_system_error_code(EAGAIN);
118
    }
119

120
    // Microsoft's STL can map win32 (and winsock!) error codes to known (posix-compatible) errc ones.
121
    auto ec = std::system_category().default_error_condition(error_code);
122
    if (ec.category() == std::generic_category())
123
        return make_basic_system_error_code(ec.value());
124
    return std::error_code(ec.value(), ec.category());
125
}
126

127
#endif // defined _WIN32
128

129
inline bool check_socket_error(int ret, std::error_code& ec)
130
{
51,950✔
131
#ifdef _WIN32
132
    if (REALM_UNLIKELY(ret == SOCKET_ERROR)) {
133
        ec = make_winsock_error_code(WSAGetLastError());
134
        return true;
135
    }
136
#else
137
    if (REALM_UNLIKELY(ret == -1)) {
51,950✔
138
        ec = make_basic_system_error_code(errno);
4✔
139
        return true;
4✔
140
    }
4✔
141
#endif
51,946✔
142
    return false;
51,946✔
143
}
51,946✔
144

145
// Set file status flag O_NONBLOCK if `value` is true, otherwise clear it.
146
//
147
// Note that these flags are set at the file description level, and are therfore
148
// shared between duplicated descriptors (dup()).
149
//
150
// `ec` untouched on success.
151
std::error_code set_nonblock_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
152
{
12,424✔
153
#ifdef _WIN32
154
    u_long flags = value ? 1 : 0;
155
    int r = ioctlsocket(fd, FIONBIO, &flags);
156
    if (r == SOCKET_ERROR) {
157
        ec = make_winsock_error_code(WSAGetLastError());
158
        return ec;
159
    }
160
#else
161
    int flags = ::fcntl(fd, F_GETFL, 0);
12,424✔
162
    if (REALM_UNLIKELY(flags == -1)) {
12,424✔
163
        ec = make_basic_system_error_code(errno);
×
164
        return ec;
×
165
    }
×
166
    flags &= ~O_NONBLOCK;
12,424✔
167
    flags |= (value ? O_NONBLOCK : 0);
6,180✔
168
    int ret = ::fcntl(fd, F_SETFL, flags);
12,424✔
169
    if (REALM_UNLIKELY(ret == -1)) {
12,424✔
170
        ec = make_basic_system_error_code(errno);
×
171
        return ec;
×
172
    }
×
173
#endif
12,424✔
174

6,108✔
175
    return std::error_code(); // Success
12,424✔
176
}
12,424✔
177

178
// Set file status flag O_NONBLOCK. See set_nonblock_flag(int, bool,
179
// std::error_code&) for details. Throws std::system_error on failure.
180
void set_nonblock_flag(native_handle_type fd, bool value = true)
181
{
12,420✔
182
    std::error_code ec;
12,420✔
183
    if (set_nonblock_flag(fd, value, ec))
12,420✔
184
        throw std::system_error(ec);
×
185
}
12,420✔
186

187
// Set file descriptor flag FD_CLOEXEC if `value` is true, otherwise clear it.
188
//
189
// Note that this method of setting FD_CLOEXEC is subject to a race condition if
190
// another thread calls any of the exec functions concurrently. For that reason,
191
// this function should only be used when there is no better alternative. For
192
// example, Linux generally offers ways to set this flag atomically with the
193
// creation of a new file descriptor.
194
//
195
// `ec` untouched on success.
196
std::error_code set_cloexec_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
197
{
25,932✔
198
#ifndef _WIN32
25,932✔
199
    int flags = ::fcntl(fd, F_GETFD, 0);
25,932✔
200
    if (REALM_UNLIKELY(flags == -1)) {
25,932✔
201
        ec = make_basic_system_error_code(errno);
×
202
        return ec;
×
203
    }
×
204
    flags &= ~FD_CLOEXEC;
25,932✔
205
    flags |= (value ? FD_CLOEXEC : 0);
✔
206
    int ret = ::fcntl(fd, F_SETFD, flags);
25,932✔
207
    if (REALM_UNLIKELY(ret == -1)) {
25,932✔
208
        ec = make_basic_system_error_code(errno);
×
209
        return ec;
×
210
    }
×
211
#endif
25,932✔
212
    return std::error_code(); // Success
25,932✔
213
}
25,932✔
214

215
// Set file descriptor flag FD_CLOEXEC. See set_cloexec_flag(int, bool,
216
// std::error_code&) for details. Throws std::system_error on failure.
217
REALM_UNUSED inline void set_cloexec_flag(native_handle_type fd, bool value = true)
218
{
18,524✔
219
    std::error_code ec;
18,524✔
220
    if (set_cloexec_flag(fd, value, ec))
18,524✔
221
        throw std::system_error(ec);
×
222
}
18,524✔
223

224

225
inline void checked_close(native_handle_type fd) noexcept
226
{
60,354✔
227
#ifdef _WIN32
228
    int status = closesocket(fd);
229
    if (status == -1) {
230
        BOOL b = CloseHandle((HANDLE)fd);
231
        REALM_ASSERT(b || GetLastError() != ERROR_INVALID_HANDLE);
232
    }
233
#else
234
    int ret = ::close(fd);
60,354✔
235
    // We can accept various errors from close(), but they must be ignored as
25,158✔
236
    // the file descriptor is closed in any case (not necessarily according to
25,158✔
237
    // POSIX, but we shall assume it anyway). `EBADF`, however, would indicate
25,158✔
238
    // an implementation bug, so we don't want to ignore that.
25,158✔
239
    REALM_ASSERT(ret != -1 || errno != EBADF);
60,354!
240
#endif
60,354✔
241
}
60,354✔
242

243

244
class CloseGuard {
245
public:
246
    CloseGuard() noexcept {}
39,738✔
247
    explicit CloseGuard(native_handle_type fd) noexcept
248
        : m_fd{fd}
249
    {
21,604✔
250
        REALM_ASSERT(fd != -1);
21,604✔
251
    }
21,604✔
252
    CloseGuard(CloseGuard&& cg) noexcept
253
        : m_fd{cg.release()}
254
    {
×
255
    }
×
256
    ~CloseGuard() noexcept
257
    {
61,342✔
258
        if (m_fd != -1)
61,342✔
259
            checked_close(m_fd);
45,826✔
260
    }
61,342✔
261
    void reset(native_handle_type fd) noexcept
262
    {
38,746✔
263
        REALM_ASSERT(fd != -1);
38,746✔
264
        if (m_fd != -1)
38,746✔
265
            checked_close(m_fd);
×
266
        m_fd = fd;
38,746✔
267
    }
38,746✔
268
    operator native_handle_type() const noexcept
269
    {
1,826,660✔
270
        return m_fd;
1,826,660✔
271
    }
1,826,660✔
272
    native_handle_type release() noexcept
273
    {
14,524✔
274
        native_handle_type fd = m_fd;
14,524✔
275
        m_fd = -1;
14,524✔
276
        return fd;
14,524✔
277
    }
14,524✔
278

279
private:
280
    native_handle_type m_fd = -1;
281
};
282

283

284
#ifndef _WIN32
285

286
class WakeupPipe {
287
public:
288
    WakeupPipe()
289
    {
18,282✔
290
        int fildes[2];
18,282✔
291
#if HAVE_LINUX_PIPE2
9,020✔
292
        int flags = O_CLOEXEC;
9,020✔
293
        int ret = ::pipe2(fildes, flags);
9,020✔
294
#else
295
        int ret = ::pipe(fildes);
9,262✔
296
#endif
9,262✔
297
        if (REALM_UNLIKELY(ret == -1)) {
18,282✔
298
            std::error_code ec = make_basic_system_error_code(errno);
×
299
            throw std::system_error(ec);
×
300
        }
×
301
        m_read_fd.reset(fildes[0]);
18,282✔
302
        m_write_fd.reset(fildes[1]);
18,282✔
303
#if !HAVE_LINUX_PIPE2
9,262✔
304
        set_cloexec_flag(m_read_fd);  // Throws
9,262✔
305
        set_cloexec_flag(m_write_fd); // Throws
9,262✔
306
#endif
9,262✔
307
    }
18,282✔
308

309
    // Thread-safe.
310
    int wait_fd() const noexcept
311
    {
166,222✔
312
        return m_read_fd;
166,222✔
313
    }
166,222✔
314

315
    // Cause the wait descriptor (wait_fd()) to become readable within a short
316
    // amount of time.
317
    //
318
    // Thread-safe.
319
    void signal() noexcept
320
    {
1,195,658✔
321
        std::lock_guard lock{m_mutex};
1,195,658✔
322
        if (!m_signaled) {
1,195,658✔
323
            char c = 0;
260,522✔
324
            ssize_t ret = ::write(m_write_fd, &c, 1);
260,522✔
325
            REALM_ASSERT_RELEASE(ret == 1);
260,522✔
326
            m_signaled = true;
260,522✔
327
        }
260,522✔
328
    }
1,195,658✔
329

330
    // Must be called after the wait descriptor (wait_fd()) becomes readable.
331
    //
332
    // Thread-safe.
333
    void acknowledge_signal() noexcept
334
    {
259,896✔
335
        std::lock_guard lock{m_mutex};
259,896✔
336
        if (m_signaled) {
259,930✔
337
            char c;
259,930✔
338
            ssize_t ret = ::read(m_read_fd, &c, 1);
259,930✔
339
            REALM_ASSERT_RELEASE(ret == 1);
259,930✔
340
            m_signaled = false;
259,930✔
341
        }
259,930✔
342
    }
259,896✔
343

344
private:
345
    CloseGuard m_read_fd, m_write_fd;
346
    std::mutex m_mutex;
347
    bool m_signaled = false; // Protected by `m_mutex`.
348
};
349

350
#else // defined _WIN32
351

352
class WakeupPipe {
353
public:
354
    SOCKET wait_fd() const noexcept
355
    {
356
        return INVALID_SOCKET;
357
    }
358

359
    void signal() noexcept
360
    {
361
        m_signal_count++;
362
    }
363

364
    bool is_signaled() const noexcept
365
    {
366
        return m_signal_count > 0;
367
    }
368

369
    void acknowledge_signal() noexcept
370
    {
371
        m_signal_count--;
372
    }
373

374
private:
375
    std::atomic<uint32_t> m_signal_count = 0;
376
};
377

378
#endif // defined _WIN32
379

380

381
std::error_code translate_addrinfo_error(int err) noexcept
382
{
8✔
383
    switch (err) {
8✔
384
        case EAI_AGAIN:
2✔
385
            return ResolveErrors::host_not_found_try_again;
2✔
386
        case EAI_BADFLAGS:
✔
387
            return error::invalid_argument;
×
388
        case EAI_FAIL:
✔
389
            return ResolveErrors::no_recovery;
×
390
        case EAI_FAMILY:
✔
391
            return error::address_family_not_supported;
×
392
        case EAI_MEMORY:
✔
393
            return error::no_memory;
×
394
        case EAI_NONAME:
6✔
395
#if defined(EAI_ADDRFAMILY)
6✔
396
        case EAI_ADDRFAMILY:
6✔
397
#endif
6✔
398
#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME)
6✔
399
        case EAI_NODATA:
6✔
400
#endif
6✔
401
            return ResolveErrors::host_not_found;
6✔
402
        case EAI_SERVICE:
2✔
403
            return ResolveErrors::service_not_found;
×
404
        case EAI_SOCKTYPE:
2✔
405
            return ResolveErrors::socket_type_not_supported;
×
406
        default:
2✔
407
            return error::unknown;
×
408
    }
8✔
409
}
8✔
410

411

412
struct GetaddrinfoResultOwner {
413
    struct addrinfo* ptr;
414
    GetaddrinfoResultOwner(struct addrinfo* p)
415
        : ptr{p}
416
    {
11,826✔
417
    }
11,826✔
418
    ~GetaddrinfoResultOwner() noexcept
419
    {
11,826✔
420
        if (ptr)
11,826✔
421
            freeaddrinfo(ptr);
11,826✔
422
    }
11,826✔
423
};
424

425
} // unnamed namespace
426

427

428
class Service::IoReactor {
429
public:
430
    IoReactor();
431
    ~IoReactor() noexcept;
432

433
    // Add an initiated I/O operation that did not complete immediately.
434
    void add_oper(Descriptor&, LendersIoOperPtr, Want);
435
    void remove_canceled_ops(Descriptor&, OperQueue<AsyncOper>& completed_ops) noexcept;
436

437
    bool wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
438
                          OperQueue<AsyncOper>& completed_ops);
439

440
    // The reactor is considered empty when no operations are currently managed
441
    // by it. An operation is managed by a reactor if it was added through
442
    // add_oper() and not yet passed out through `completed_ops` of
443
    // wait_and_advance().
444
    bool empty() const noexcept;
445

446
    // Cause wait_and_advance() to return within a short amount of time.
447
    //
448
    // Thread-safe.
449
    void interrupt() noexcept;
450

451
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
452
    void register_desc(Descriptor&);
453
    void deregister_desc(Descriptor&) noexcept;
454
#endif
455

456
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
457
    clock::duration get_and_reset_sleep_time() noexcept;
458
#endif
459

460
private:
461
#if REALM_NETWORK_USE_EPOLL
462

463
    static constexpr int s_epoll_event_buffer_size = 256;
464
    const std::unique_ptr<epoll_event[]> m_epoll_event_buffer;
465
    const CloseGuard m_epoll_fd;
466

467
    static std::unique_ptr<epoll_event[]> make_epoll_event_buffer();
468
    static CloseGuard make_epoll_fd();
469

470
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
471

472
    static constexpr int s_kevent_buffer_size = 256;
473
    const std::unique_ptr<struct kevent[]> m_kevent_buffer;
474
    const CloseGuard m_kqueue_fd;
475

476
    static std::unique_ptr<struct kevent[]> make_kevent_buffer();
477
    static CloseGuard make_kqueue_fd();
478

479
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
480

481
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
482

483
    OperQueue<IoOper> m_active_ops;
484

485
    // If there are already active operations, just activate as many additional
486
    // operations as can be done without blocking. Otherwise, block until at
487
    // least one operation can be activated or the timeout is reached. Then, if
488
    // the timeout was not reached, activate as many additional operations as
489
    // can be done without any further blocking.
490
    //
491
    // May occasionally return with no active operations and before the timeout
492
    // has been reached, but this can be assumed to happen rarely enough that it
493
    // will never amount to a performance problem.
494
    //
495
    // Argument `now` is unused if `timeout.time_since_epoch() <= 0`.
496
    //
497
    // Returns true if, and only if a wakeup pipe signal was
498
    // received. Operations may already have been activated in this case.
499
    bool wait_and_activate(clock::time_point timeout, clock::time_point now);
500

501
    void advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept;
502

503
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
504

505
    struct OperSlot {
506
        std::size_t pollfd_slot_ndx = 0; // Zero when slot is unused
507
        OperQueue<IoOper> read_ops, write_ops;
508
    };
509

510
    std::vector<OperSlot> m_operations; // Indexed by file descriptor
511

512
    // First entry in `m_pollfd_slots` is always the read end of the wakeup
513
    // pipe. There is then an additional entry for each entry in `m_operations`
514
    // where `pollfd_slot_ndx` is nonzero. All entries always have `pollfd::fd`
515
    // >= 0.
516
    //
517
    // INVARIANT: m_pollfd_slots.size() == 1 + N, where N is the number of
518
    // entries in m_operations where pollfd_slot_ndx is nonzero.
519
    std::vector<pollfd> m_pollfd_slots;
520

521
    void discard_pollfd_slot_by_move_last_over(OperSlot&) noexcept;
522

523
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
524

525
    std::size_t m_num_operations = 0;
526
    WakeupPipe m_wakeup_pipe;
527

528
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
529
    clock::duration m_sleep_time = clock::duration::zero();
530
#endif
531
};
532

533

534
inline bool Service::IoReactor::empty() const noexcept
535
{
1,634,342✔
536
    return (m_num_operations == 0);
1,634,342✔
537
}
1,634,342✔
538

539

540
inline void Service::IoReactor::interrupt() noexcept
541
{
1,195,836✔
542
    m_wakeup_pipe.signal();
1,195,836✔
543
}
1,195,836✔
544

545

546
#if REALM_NETWORK_USE_EPOLL
547

548
inline Service::IoReactor::IoReactor()
549
    : m_epoll_event_buffer{make_epoll_event_buffer()} // Throws
550
    , m_epoll_fd{make_epoll_fd()}                     // Throws
551
    , m_wakeup_pipe{}                                 // Throws
552
{
553
    epoll_event event = epoll_event(); // Clear
554
    event.events = EPOLLIN;
555
    event.data.ptr = nullptr;
556
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_wakeup_pipe.wait_fd(), &event);
557
    if (REALM_UNLIKELY(ret == -1)) {
558
        std::error_code ec = make_basic_system_error_code(errno);
559
        throw std::system_error(ec);
560
    }
561
}
562

563

564
inline Service::IoReactor::~IoReactor() noexcept {}
565

566

567
inline void Service::IoReactor::register_desc(Descriptor& desc)
568
{
569
    epoll_event event = epoll_event();                        // Clear
570
    event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Enable edge triggering
571
    event.data.ptr = &desc;
572
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, desc.m_fd, &event);
573
    if (REALM_UNLIKELY(ret == -1)) {
574
        std::error_code ec = make_basic_system_error_code(errno);
575
        throw std::system_error(ec);
576
    }
577
}
578

579

580
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
581
{
582
    epoll_event event = epoll_event(); // Clear
583
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, desc.m_fd, &event);
584
    REALM_ASSERT(ret != -1);
585
}
586

587

588
inline std::unique_ptr<epoll_event[]> Service::IoReactor::make_epoll_event_buffer()
589
{
590
    return std::make_unique<epoll_event[]>(s_epoll_event_buffer_size); // Throws
591
}
592

593

594
inline CloseGuard Service::IoReactor::make_epoll_fd()
595
{
596
    int flags = 0;
597
    flags |= EPOLL_CLOEXEC;
598
    int ret = epoll_create1(flags);
599
    if (REALM_UNLIKELY(ret == -1)) {
600
        std::error_code ec = make_basic_system_error_code(errno);
601
        throw std::system_error(ec);
602
    }
603
    int epoll_fd = ret;
604
    return CloseGuard{epoll_fd};
605
}
606

607

608
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
609
{
610
    int max_wait_millis = 0;
611
    bool allow_blocking_wait = m_active_ops.empty();
612
    if (allow_blocking_wait) {
613
        if (timeout.time_since_epoch().count() <= 0) {
614
            max_wait_millis = -1; // Allow indefinite blocking
615
        }
616
        else if (now < timeout) {
617
            auto diff = timeout - now;
618
            int max_int_millis = std::numeric_limits<int>::max();
619
            // 17592186044415 is the largest value (45-bit signed integer)
620
            // garanteed to be supported by std::chrono::milliseconds. In the
621
            // worst case, `int` is a 16-bit integer, meaning that we can only
622
            // wait about 30 seconds at a time. In the best case
623
            // (17592186044415) we can wait more than 500 years at a time. In
624
            // the typical case (`int` has 32 bits), we can wait 24 days at a
625
            // time.
626
            long long max_chrono_millis = 17592186044415;
627
            if (max_chrono_millis < max_int_millis)
628
                max_int_millis = int(max_chrono_millis);
629
#if EPOLL_LARGE_TIMEOUT_BUG
630
            long max_safe_millis = 2147482; // Circa 35 minutes
631
            if (max_safe_millis < max_int_millis)
632
                max_int_millis = int(max_safe_millis);
633
#endif
634
            if (diff > std::chrono::milliseconds(max_int_millis)) {
635
                max_wait_millis = max_int_millis;
636
            }
637
            else {
638
                // Overflow is impossible here, due to the preceding check
639
                auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
640
                // The conversion to milliseconds will round down if the tick
641
                // period of `diff` is less than a millisecond, which it usually
642
                // is. This is a problem, because it can lead to premature
643
                // wakeups, which in turn could cause extranous iterations in
644
                // the event loop. This is especially problematic when a small
645
                // `diff` is rounded down to zero milliseconds, becuase that can
646
                // easily produce a "busy wait" condition for up to a
647
                // millisecond every time this happens. Obviously, the solution
648
                // is to round up, instead of down.
649
                if (diff_millis < diff) {
650
                    // Note that the following increment cannot overflow,
651
                    // because diff_millis < diff <= max_int_millis <=
652
                    // std::numeric_limits<int>::max().
653
                    ++diff_millis;
654
                }
655
                max_wait_millis = int(diff_millis.count());
656
            }
657
        }
658
    }
659
    for (int i = 0; i < 2; ++i) {
660
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
661
        clock::time_point sleep_start_time = clock::now();
662
#endif
663
        int ret = epoll_wait(m_epoll_fd, m_epoll_event_buffer.get(), s_epoll_event_buffer_size, max_wait_millis);
664
        if (REALM_UNLIKELY(ret == -1)) {
665
            int err = errno;
666
            if (err == EINTR)
667
                return false; // Infrequent premature return is ok
668
            std::error_code ec = make_basic_system_error_code(err);
669
            throw std::system_error(ec);
670
        }
671
        REALM_ASSERT(ret >= 0);
672
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
673
        m_sleep_time += clock::now() - sleep_start_time;
674
#endif
675
        int n = ret;
676
        bool got_wakeup_pipe_signal = false;
677
        for (int j = 0; j < n; ++j) {
678
            const epoll_event& event = m_epoll_event_buffer[j];
679
            bool is_wakeup_pipe_signal = !event.data.ptr;
680
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
681
                m_wakeup_pipe.acknowledge_signal();
682
                got_wakeup_pipe_signal = true;
683
                continue;
684
            }
685
            Descriptor& desc = *static_cast<Descriptor*>(event.data.ptr);
686
            if ((event.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) {
687
                if (!desc.m_read_ready) {
688
                    desc.m_read_ready = true;
689
                    m_active_ops.push_back(desc.m_suspended_read_ops);
690
                }
691
            }
692
            if ((event.events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) {
693
                if (!desc.m_write_ready) {
694
                    desc.m_write_ready = true;
695
                    m_active_ops.push_back(desc.m_suspended_write_ops);
696
                }
697
            }
698
            if ((event.events & EPOLLRDHUP) != 0)
699
                desc.m_imminent_end_of_input = true;
700
        }
701
        if (got_wakeup_pipe_signal)
702
            return true;
703
        if (n < s_epoll_event_buffer_size)
704
            break;
705
        max_wait_millis = 0;
706
    }
707
    return false;
708
}
709

710

711
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
712

713

714
inline Service::IoReactor::IoReactor()
715
    : m_kevent_buffer{make_kevent_buffer()} // Throws
716
    , m_kqueue_fd{make_kqueue_fd()}         // Throws
717
    , m_wakeup_pipe{}                       // Throws
718
{
9,262✔
719
    struct kevent event;
9,262✔
720
    EV_SET(&event, m_wakeup_pipe.wait_fd(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
9,262✔
721
    int ret = ::kevent(m_kqueue_fd, &event, 1, nullptr, 0, nullptr);
9,262✔
722
    if (REALM_UNLIKELY(ret == -1)) {
9,262✔
723
        std::error_code ec = make_basic_system_error_code(errno);
724
        throw std::system_error(ec);
725
    }
726
}
9,262✔
727

728

729
inline Service::IoReactor::~IoReactor() noexcept {}
9,262✔
730

731

732
inline void Service::IoReactor::register_desc(Descriptor& desc)
733
{
7,320✔
734
    struct kevent events[2];
7,320✔
735
    // EV_CLEAR enables edge-triggered behavior
736
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &desc);
7,320✔
737
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, &desc);
7,320✔
738
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
7,320✔
739
    if (REALM_UNLIKELY(ret == -1)) {
7,320✔
740
        std::error_code ec = make_basic_system_error_code(errno);
741
        throw std::system_error(ec);
742
    }
743
}
7,320✔
744

745

746
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
747
{
7,320✔
748
    struct kevent events[2];
7,320✔
749
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
7,320✔
750
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
7,320✔
751
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
7,320✔
752
    REALM_ASSERT(ret != -1);
7,320✔
753
}
7,320✔
754

755

756
inline std::unique_ptr<struct kevent[]> Service::IoReactor::make_kevent_buffer()
757
{
9,262✔
758
    return std::make_unique<struct kevent[]>(s_kevent_buffer_size); // Throws
9,262✔
759
}
9,262✔
760

761

762
inline CloseGuard Service::IoReactor::make_kqueue_fd()
763
{
9,262✔
764
    int ret = ::kqueue();
9,262✔
765
    if (REALM_UNLIKELY(ret == -1)) {
9,262✔
766
        std::error_code ec = make_basic_system_error_code(errno);
767
        throw std::system_error(ec);
768
    }
769
    int epoll_fd = ret;
9,262✔
770
    return CloseGuard{epoll_fd};
9,262✔
771
}
9,262✔
772

773

774
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
775
{
1,083,792✔
776
    timespec max_wait_time{}; // Clear to zero
1,083,792✔
777
    bool allow_blocking_wait = m_active_ops.empty();
1,083,792✔
778
    if (allow_blocking_wait) {
1,083,792✔
779
        // Note that ::kevent() will silently clamp `max_wait_time` to 24 hours
780
        // (86400 seconds), but that is ok, because the caller is prepared for
781
        // premature return as long as it happens infrequently enough to not
782
        // pose a performance problem.
783
        constexpr std::time_t max_wait_seconds = 86400;
148,140✔
784
        if (timeout.time_since_epoch().count() <= 0) {
148,140✔
785
            max_wait_time.tv_sec = max_wait_seconds;
16,824✔
786
        }
16,824✔
787
        else if (now < timeout) {
131,320✔
788
            auto diff = timeout - now;
131,320✔
789
            auto secs = std::chrono::duration_cast<std::chrono::seconds>(diff);
131,320✔
790
            auto nsecs = std::chrono::duration_cast<std::chrono::nanoseconds>(diff - secs);
131,320✔
791
            auto secs_2 = std::min(secs.count(), std::chrono::seconds::rep(max_wait_seconds));
131,320✔
792
            max_wait_time.tv_sec = std::time_t(secs_2);
131,320✔
793
            max_wait_time.tv_nsec = long(nsecs.count());
131,320✔
794
        }
131,320✔
795
    }
148,140✔
796
    for (int i = 0; i < 4; ++i) {
2,147,483,647✔
797
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
798
        clock::time_point sleep_start_time = clock::now();
799
#endif
800
        int ret = ::kevent(m_kqueue_fd, nullptr, 0, m_kevent_buffer.get(), s_kevent_buffer_size, &max_wait_time);
1,083,868✔
801
        if (REALM_UNLIKELY(ret == -1)) {
1,083,868✔
802
            int err = errno;
803
            if (err == EINTR)
×
804
                return false; // Infrequent premature return is ok
805
            std::error_code ec = make_basic_system_error_code(err);
806
            throw std::system_error(ec);
807
        }
808
        REALM_ASSERT(ret >= 0);
1,083,868✔
809
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
810
        m_sleep_time += clock::now() - sleep_start_time;
811
#endif
812
        int n = ret;
1,083,868✔
813
        bool got_wakeup_pipe_signal = false;
1,083,868✔
814
        for (int j = 0; j < n; ++j) {
1,673,970✔
815
            const struct kevent& event = m_kevent_buffer[j];
590,102✔
816
            bool is_wakeup_pipe_signal = !event.udata;
590,102✔
817
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
590,102✔
818
                REALM_ASSERT(m_wakeup_pipe.wait_fd() == int(event.ident));
147,976✔
819
                m_wakeup_pipe.acknowledge_signal();
147,976✔
820
                got_wakeup_pipe_signal = true;
147,976✔
821
                continue;
147,976✔
822
            }
147,976✔
823
            Descriptor& desc = *static_cast<Descriptor*>(event.udata);
442,126✔
824
            REALM_ASSERT(desc.m_fd == int(event.ident));
442,126✔
825
            if (event.filter == EVFILT_READ) {
442,126✔
826
                if (!desc.m_read_ready) {
238,082✔
827
                    desc.m_read_ready = true;
53,176✔
828
                    m_active_ops.push_back(desc.m_suspended_read_ops);
53,176✔
829
                }
53,176✔
830
                if ((event.flags & EV_EOF) != 0)
238,082✔
831
                    desc.m_imminent_end_of_input = true;
720✔
832
            }
238,082✔
833
            if (event.filter == EVFILT_WRITE) {
442,126✔
834
                if (!desc.m_write_ready) {
204,074✔
835
                    desc.m_write_ready = true;
5,098✔
836
                    m_active_ops.push_back(desc.m_suspended_write_ops);
5,098✔
837
                }
5,098✔
838
            }
204,074✔
839
        }
442,126✔
840
        if (got_wakeup_pipe_signal)
1,083,868✔
841
            return true;
147,982✔
842
        if (n < s_kevent_buffer_size)
935,886✔
843
            break;
936,128✔
844
        // Clear to zero to disable blocking for any additional opportunistic
845
        // event extractions.
846
        max_wait_time = timespec{};
2,147,483,647✔
847
    }
2,147,483,647✔
848
    return false;
935,810✔
849
}
1,083,792✔
850

851
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
852

853

854
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
855

856
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
857
{
1,106,878✔
858
    if (REALM_UNLIKELY(!desc.m_is_registered)) {
1,106,878✔
859
        register_desc(desc); // Throws
7,320✔
860
        desc.m_is_registered = true;
7,320✔
861
    }
7,320✔
862

863
    switch (want) {
1,106,878✔
864
        case Want::read:
619,584✔
865
            if (REALM_UNLIKELY(desc.m_read_ready))
619,584✔
866
                goto active;
607,106✔
867
            desc.m_suspended_read_ops.push_back(std::move(op));
12,478✔
868
            goto proceed;
12,478✔
869
        case Want::write:
487,832✔
870
            if (REALM_UNLIKELY(desc.m_write_ready))
487,832✔
871
                goto active;
481,152✔
872
            desc.m_suspended_write_ops.push_back(std::move(op));
6,680✔
873
            goto proceed;
6,680✔
874
        case Want::nothing:
2✔
875
            break;
876
    }
877
    REALM_ASSERT(false);
878

879
active:
1,087,914✔
880
    m_active_ops.push_back(std::move(op));
1,087,914✔
881

882
proceed:
1,106,518✔
883
    ++m_num_operations;
1,106,518✔
884
}
1,106,518✔
885

886

887
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
888
{
246,362✔
889
    // Note: Canceled operations that are currently active (in m_active_ops)
890
    // will be removed later by advance_active_ops().
891

892
    while (LendersIoOperPtr op = desc.m_suspended_read_ops.pop_front()) {
256,910✔
893
        completed_ops.push_back(std::move(op));
10,548✔
894
        --m_num_operations;
10,548✔
895
    }
10,548✔
896
    while (LendersIoOperPtr op = desc.m_suspended_write_ops.pop_front()) {
251,080✔
897
        completed_ops.push_back(std::move(op));
4,718✔
898
        --m_num_operations;
4,718✔
899
    }
4,718✔
900
}
246,362✔
901

902

903
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
904
                                          OperQueue<AsyncOper>& completed_ops)
905
{
885,066✔
906
    clock::time_point now_2 = now;
885,066✔
907
    for (;;) {
1,083,786✔
908
        bool wakeup_pipe_signal = wait_and_activate(timeout, now_2); // Throws
1,083,786✔
909
        if (REALM_UNLIKELY(wakeup_pipe_signal)) {
1,083,786✔
910
            interrupted = true;
147,980✔
911
            return false;
147,980✔
912
        }
147,980✔
913
        advance_active_ops(completed_ops);
935,806✔
914
        if (!completed_ops.empty())
935,806✔
915
            return true;
730,506✔
916
        if (timeout.time_since_epoch().count() > 0) {
205,300✔
917
            now_2 = clock::now();
89,164✔
918
            bool timed_out = (now_2 >= timeout);
89,164✔
919
            if (timed_out)
89,164✔
920
                return false;
6,740✔
921
        }
89,164✔
922
    }
205,300✔
923
}
885,066✔
924

925

926
void Service::IoReactor::advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept
927
{
936,016✔
928
    OperQueue<IoOper> new_active_ops;
936,016✔
929
    while (LendersIoOperPtr op = m_active_ops.pop_front()) {
2,233,252✔
930
        if (op->is_canceled()) {
1,295,906✔
931
            completed_ops.push_back(std::move(op));
413,070✔
932
            --m_num_operations;
413,070✔
933
            continue;
413,070✔
934
        }
413,070✔
935
        Want want = op->advance();
882,836✔
936
        switch (want) {
882,836✔
937
            case Want::nothing:
678,396✔
938
                REALM_ASSERT(op->is_complete());
678,396✔
939
                completed_ops.push_back(std::move(op));
678,396✔
940
                --m_num_operations;
678,396✔
941
                continue;
678,396✔
942
            case Want::read: {
202,458✔
943
                Descriptor& desc = op->descriptor();
202,458✔
944
                if (REALM_UNLIKELY(desc.m_read_ready))
202,458✔
945
                    goto still_active;
151,318✔
946
                desc.m_suspended_read_ops.push_back(std::move(op));
51,140✔
947
                continue;
51,140✔
948
            }
51,140✔
949
            case Want::write: {
3,314✔
950
                Descriptor& desc = op->descriptor();
3,314✔
951
                if (REALM_UNLIKELY(desc.m_write_ready))
3,314✔
952
                    goto still_active;
614✔
953
                desc.m_suspended_write_ops.push_back(std::move(op));
2,700✔
954
                continue;
2,700✔
955
            }
2,700✔
956
        }
957
        REALM_ASSERT(false);
958

959
    still_active:
151,930✔
960
        new_active_ops.push_back(std::move(op));
151,930✔
961
    }
151,930✔
962
    m_active_ops.push_back(new_active_ops);
937,346✔
963
}
937,346✔
964

965

966
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
967

968

969
inline Service::IoReactor::IoReactor()
970
    : m_wakeup_pipe{} // Throws
971
{
9,020✔
972
    pollfd slot = pollfd(); // Cleared slot
9,020✔
973
    slot.fd = m_wakeup_pipe.wait_fd();
9,020✔
974
    slot.events = POLLRDNORM;
9,020✔
975
    m_pollfd_slots.emplace_back(slot); // Throws
9,020✔
976
}
9,020✔
977

978

979
inline Service::IoReactor::~IoReactor() noexcept
980
{
9,020✔
981
#if REALM_ASSERTIONS_ENABLED
9,020✔
982
    std::size_t n = 0;
9,020✔
983
    for (std::size_t i = 0; i < m_operations.size(); ++i) {
181,506✔
984
        OperSlot& oper_slot = m_operations[i];
172,486✔
985
        while (oper_slot.read_ops.pop_front())
172,486✔
986
            ++n;
987
        while (oper_slot.write_ops.pop_front())
172,486✔
988
            ++n;
989
    }
172,486✔
990
    REALM_ASSERT(n == m_num_operations);
9,020✔
991
#endif
9,020✔
992
}
9,020✔
993

994

995
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
996
{
1,085,430✔
997
    native_handle_type fd = desc.m_fd;
1,085,430✔
998

1,085,430✔
999
    // Make sure there are enough slots in m_operations
1,085,430✔
1000
    {
1,085,430✔
1001
        std::size_t n = std::size_t(fd) + 1; // FIXME: Check for arithmetic overflow
1,085,430✔
1002
        if (m_operations.size() < n)
1,085,430✔
1003
            m_operations.resize(n); // Throws
5,916✔
1004
    }
1,085,430✔
1005

1,085,430✔
1006
    // Allocate a pollfd_slot unless we already have one
1,085,430✔
1007
    OperSlot& oper_slot = m_operations[fd];
1,085,430✔
1008
    if (oper_slot.pollfd_slot_ndx == 0) {
1,085,430✔
1009
        pollfd pollfd_slot = pollfd(); // Cleared slot
716,512✔
1010
        pollfd_slot.fd = fd;
716,512✔
1011
        std::size_t pollfd_slot_ndx = m_pollfd_slots.size();
716,512✔
1012
        REALM_ASSERT(pollfd_slot_ndx > 0);
716,512✔
1013
        m_pollfd_slots.emplace_back(pollfd_slot); // Throws
716,512✔
1014
        oper_slot.pollfd_slot_ndx = pollfd_slot_ndx;
716,512✔
1015
    }
716,512✔
1016

1,085,430✔
1017
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
1,085,430✔
1018
    REALM_ASSERT(pollfd_slot.fd == fd);
1,085,430✔
1019
    REALM_ASSERT(((pollfd_slot.events & POLLRDNORM) != 0) == !oper_slot.read_ops.empty());
1,085,430✔
1020
    REALM_ASSERT(((pollfd_slot.events & POLLWRNORM) != 0) == !oper_slot.write_ops.empty());
1,085,430✔
1021
    REALM_ASSERT((pollfd_slot.events & ~(POLLRDNORM | POLLWRNORM)) == 0);
1,085,430✔
1022
    switch (want) {
1,085,430✔
1023
        case Want::nothing:
1024
            break;
1025
        case Want::read:
603,324✔
1026
            pollfd_slot.events |= POLLRDNORM;
603,324✔
1027
            oper_slot.read_ops.push_back(std::move(op));
603,324✔
1028
            goto finish;
603,324✔
1029
        case Want::write:
486,594✔
1030
            pollfd_slot.events |= POLLWRNORM;
486,594✔
1031
            oper_slot.write_ops.push_back(std::move(op));
486,594✔
1032
            goto finish;
486,594✔
1033
    }
1034
    REALM_ASSERT(false);
1035
    return;
1036

1,087,974✔
1037
finish:
1,087,974✔
1038
    ++m_num_operations;
1,087,974✔
1039
}
1,087,974✔
1040

1041

1042
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
1043
{
242,480✔
1044
    native_handle_type fd = desc.m_fd;
242,480✔
1045
    REALM_ASSERT(fd >= 0);
242,480✔
1046
    REALM_ASSERT(std::size_t(fd) < m_operations.size());
242,480✔
1047
    OperSlot& oper_slot = m_operations[fd];
242,480✔
1048
    REALM_ASSERT(oper_slot.pollfd_slot_ndx > 0);
242,480✔
1049
    REALM_ASSERT(!oper_slot.read_ops.empty() || !oper_slot.write_ops.empty());
242,480✔
1050
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
242,480✔
1051
    REALM_ASSERT(pollfd_slot.fd == fd);
242,480✔
1052
    while (LendersIoOperPtr op = oper_slot.read_ops.pop_front()) {
460,702✔
1053
        completed_ops.push_back(std::move(op));
218,222✔
1054
        --m_num_operations;
218,222✔
1055
    }
218,222✔
1056
    while (LendersIoOperPtr op = oper_slot.write_ops.pop_front()) {
449,664✔
1057
        completed_ops.push_back(std::move(op));
207,184✔
1058
        --m_num_operations;
207,184✔
1059
    }
207,184✔
1060
    discard_pollfd_slot_by_move_last_over(oper_slot);
242,480✔
1061
}
242,480✔
1062

1063

1064
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
1065
                                          OperQueue<AsyncOper>& completed_ops)
1066
{
748,960✔
1067
#ifdef _WIN32
1068
    using nfds_type = std::size_t;
1069
#else
1070
    using nfds_type = nfds_t;
748,960✔
1071
#endif
748,960✔
1072
    clock::time_point now_2 = now;
748,960✔
1073
    std::size_t num_ready_descriptors = 0;
748,960✔
1074
    {
748,960✔
1075
        // std::vector guarantees contiguous storage
748,960✔
1076
        pollfd* fds = &m_pollfd_slots.front();
748,960✔
1077
        nfds_type nfds = nfds_type(m_pollfd_slots.size());
748,960✔
1078
        for (;;) {
748,960✔
1079
            int max_wait_millis = -1; // Wait indefinitely
748,574✔
1080
            if (timeout.time_since_epoch().count() > 0) {
748,574✔
1081
                if (now_2 >= timeout)
406,322✔
1082
                    return false; // No operations completed
1083
                auto diff = timeout - now_2;
406,322✔
1084
                int max_int_millis = std::numeric_limits<int>::max();
406,322✔
1085
                // 17592186044415 is the largest value (45-bit signed integer)
406,322✔
1086
                // garanteed to be supported by std::chrono::milliseconds. In
406,322✔
1087
                // the worst case, `int` is a 16-bit integer, meaning that we
406,322✔
1088
                // can only wait about 30 seconds at a time. In the best case
406,322✔
1089
                // (17592186044415) we can wait more than 500 years at a
406,322✔
1090
                // time. In the typical case (`int` has 32 bits), we can wait 24
406,322✔
1091
                // days at a time.
406,322✔
1092
                long long max_chrono_millis = 17592186044415;
406,322✔
1093
                if (max_int_millis > max_chrono_millis)
406,322✔
1094
                    max_int_millis = int(max_chrono_millis);
1095
                if (diff > std::chrono::milliseconds(max_int_millis)) {
406,322✔
1096
                    max_wait_millis = max_int_millis;
1,738✔
1097
                }
1,738✔
1098
                else {
404,584✔
1099
                    // Overflow is impossible here, due to the preceeding check
404,584✔
1100
                    auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
404,584✔
1101
                    // The conversion to milliseconds will round down if the
404,584✔
1102
                    // tick period of `diff` is less than a millisecond, which
404,584✔
1103
                    // it usually is. This is a problem, because it can lead to
404,584✔
1104
                    // premature wakeups, which in turn could cause extranous
404,584✔
1105
                    // iterations in the event loop. This is especially
404,584✔
1106
                    // problematic when a small `diff` is rounded down to zero
404,584✔
1107
                    // milliseconds, becuase that can easily produce a "busy
404,584✔
1108
                    // wait" condition for up to a millisecond every time this
404,584✔
1109
                    // happens. Obviously, the solution is to round up, instead
404,584✔
1110
                    // of down.
404,584✔
1111
                    if (diff_millis < diff) {
404,584✔
1112
                        // Note that the following increment cannot overflow,
404,498✔
1113
                        // because diff_millis < diff <= max_int_millis <=
404,498✔
1114
                        // std::numeric_limits<int>::max().
404,498✔
1115
                        ++diff_millis;
404,498✔
1116
                    }
404,498✔
1117
                    max_wait_millis = int(diff_millis.count());
404,584✔
1118
                }
404,584✔
1119
            }
406,322✔
1120

748,574✔
1121
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1122
            clock::time_point sleep_start_time = clock::now();
1123
#endif
1124

748,574✔
1125
#ifdef _WIN32
1126
            max_wait_millis = 1000;
1127

1128
            // Windows does not have a single API call to wait for pipes and
1129
            // sockets with a timeout. So we repeatedly poll them individually
1130
            // in a loop until max_wait_millis has elapsed or an event happend.
1131
            //
1132
            // FIXME: Maybe switch to Windows IOCP instead.
1133

1134
            // Following variable is the poll time for the sockets in
1135
            // miliseconds. Adjust it to find a balance between CPU usage and
1136
            // response time:
1137
            constexpr INT socket_poll_timeout = 10;
1138

1139
            for (size_t t = 0; t < m_pollfd_slots.size(); t++)
1140
                m_pollfd_slots[t].revents = 0;
1141

1142
            using namespace std::chrono;
1143
            auto started = steady_clock::now();
1144
            int ret = 0;
1145

1146
            for (;;) {
1147
                if (m_pollfd_slots.size() > 1) {
1148
                    // Poll all network sockets
1149
                    ret = WSAPoll(LPWSAPOLLFD(&m_pollfd_slots[1]), ULONG(m_pollfd_slots.size() - 1),
1150
                                  socket_poll_timeout);
1151
                    REALM_ASSERT(ret != SOCKET_ERROR);
1152
                }
1153

1154
                if (m_wakeup_pipe.is_signaled()) {
1155
                    m_pollfd_slots[0].revents = POLLIN;
1156
                    ret++;
1157
                }
1158

1159
                if (ret != 0 ||
1160
                    (duration_cast<milliseconds>(steady_clock::now() - started).count() >= max_wait_millis)) {
1161
                    break;
1162
                }
1163

1164
                // If we don't have any sockets to poll for (m_pollfd_slots is less than 2) and no one signals
1165
                // the wakeup pipe, we'd be stuck busy waiting for either condition to become true.
1166
                std::this_thread::sleep_for(std::chrono::milliseconds(socket_poll_timeout));
1167
            }
1168

1169
#else // !defined _WIN32
1170
            int ret = ::poll(fds, nfds, max_wait_millis);
748,574✔
1171
#endif
748,574✔
1172
            bool interrupted_2 = false;
748,574✔
1173
            if (REALM_UNLIKELY(ret == -1)) {
748,574✔
1174
#ifndef _WIN32
1175
                int err = errno;
1176
                if (REALM_UNLIKELY(err != EINTR)) {
1177
                    std::error_code ec = make_basic_system_error_code(err);
1178
                    throw std::system_error(ec);
1179
                }
1180
#endif
1181
                interrupted_2 = true;
1182
            }
1183

748,574✔
1184
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1185
            m_sleep_time += clock::now() - sleep_start_time;
1186
#endif
1187

748,574✔
1188
            if (REALM_LIKELY(!interrupted_2)) {
749,458✔
1189
                REALM_ASSERT(ret >= 0);
749,458✔
1190
                num_ready_descriptors = ret;
749,458✔
1191
                break;
749,458✔
1192
            }
749,458✔
1193

2,147,483,647✔
1194
            // Retry on interruption by system signal
2,147,483,647✔
1195
            if (timeout.time_since_epoch().count() > 0)
2,147,483,647✔
1196
                now_2 = clock::now();
1197
        }
2,147,483,647✔
1198
    }
748,960✔
1199

748,960✔
1200
    if (num_ready_descriptors == 0)
748,960✔
1201
        return false; // No operations completed
4,090✔
1202

744,870✔
1203
    // Check wake-up descriptor
744,870✔
1204
    if (m_pollfd_slots[0].revents != 0) {
744,870✔
1205
        REALM_ASSERT((m_pollfd_slots[0].revents & POLLNVAL) == 0);
111,952✔
1206
        m_wakeup_pipe.acknowledge_signal();
111,952✔
1207
        interrupted = true;
111,952✔
1208
        return false;
111,952✔
1209
    }
111,952✔
1210

632,918✔
1211
    std::size_t orig_num_operations = m_num_operations;
632,918✔
1212
    std::size_t num_pollfd_slots = m_pollfd_slots.size();
632,918✔
1213
    std::size_t pollfd_slot_ndx = 1;
632,918✔
1214
    while (pollfd_slot_ndx < num_pollfd_slots && num_ready_descriptors > 0) {
1,398,676✔
1215
        pollfd& pollfd_slot = m_pollfd_slots[pollfd_slot_ndx];
765,758✔
1216
        REALM_ASSERT(pollfd_slot.fd >= 0);
765,758✔
1217
        if (REALM_LIKELY(pollfd_slot.revents == 0)) {
765,758✔
1218
            ++pollfd_slot_ndx;
76,968✔
1219
            continue;
76,968✔
1220
        }
76,968✔
1221
        --num_ready_descriptors;
688,790✔
1222

688,790✔
1223
        REALM_ASSERT((pollfd_slot.revents & POLLNVAL) == 0);
688,790✔
1224

688,790✔
1225
        // Treat errors like read and/or write-readiness
688,790✔
1226
        if ((pollfd_slot.revents & (POLLHUP | POLLERR)) != 0) {
688,790✔
1227
            REALM_ASSERT((pollfd_slot.events & (POLLRDNORM | POLLWRNORM)) != 0);
782✔
1228
            if ((pollfd_slot.events & POLLRDNORM) != 0)
782✔
1229
                pollfd_slot.revents |= POLLRDNORM;
782✔
1230
            if ((pollfd_slot.events & POLLWRNORM) != 0)
782✔
1231
                pollfd_slot.revents |= POLLWRNORM;
26✔
1232
        }
782✔
1233

688,790✔
1234
        OperSlot& oper_slot = m_operations[pollfd_slot.fd];
688,790✔
1235
        REALM_ASSERT(oper_slot.pollfd_slot_ndx == pollfd_slot_ndx);
688,790✔
1236

688,790✔
1237
        OperQueue<IoOper> new_read_ops, new_write_ops;
688,790✔
1238
        auto advance_ops = [&](OperQueue<IoOper>& ops) noexcept {
811,574✔
1239
            while (LendersIoOperPtr op = ops.pop_front()) {
1,624,604✔
1240
                Want want = op->advance();
811,790✔
1241
                switch (want) {
811,790✔
1242
                    case Want::nothing:
667,408✔
1243
                        REALM_ASSERT(op->is_complete());
667,408✔
1244
                        completed_ops.push_back(std::move(op));
667,408✔
1245
                        --m_num_operations;
667,408✔
1246
                        continue;
667,408✔
1247
                    case Want::read:
145,604✔
1248
                        new_read_ops.push_back(std::move(op));
145,604✔
1249
                        continue;
145,604✔
1250
                    case Want::write:
18✔
1251
                        new_write_ops.push_back(std::move(op));
18✔
1252
                        continue;
18✔
1253
                }
1254
                REALM_ASSERT(false);
1255
            }
1256
        };
811,574✔
1257

688,790✔
1258
        // Check read-readiness
688,790✔
1259
        if ((pollfd_slot.revents & POLLRDNORM) != 0) {
688,790✔
1260
            REALM_ASSERT(!oper_slot.read_ops.empty());
532,024✔
1261
            advance_ops(oper_slot.read_ops);
532,024✔
1262
            pollfd_slot.events &= ~POLLRDNORM;
532,024✔
1263
        }
532,024✔
1264

688,790✔
1265
        // Check write-readiness
688,790✔
1266
        if ((pollfd_slot.revents & POLLWRNORM) != 0) {
688,790✔
1267
            REALM_ASSERT(!oper_slot.write_ops.empty());
280,814✔
1268
            advance_ops(oper_slot.write_ops);
280,814✔
1269
            pollfd_slot.events &= ~POLLWRNORM;
280,814✔
1270
        }
280,814✔
1271

688,790✔
1272
        if (!new_read_ops.empty()) {
688,790✔
1273
            oper_slot.read_ops.push_back(new_read_ops);
145,604✔
1274
            pollfd_slot.events |= POLLRDNORM;
145,604✔
1275
        }
145,604✔
1276

688,790✔
1277
        if (!new_write_ops.empty()) {
688,790✔
1278
            oper_slot.write_ops.push_back(new_write_ops);
18✔
1279
            pollfd_slot.events |= POLLWRNORM;
18✔
1280
        }
18✔
1281

688,790✔
1282
        if (pollfd_slot.events == 0) {
688,790✔
1283
            discard_pollfd_slot_by_move_last_over(oper_slot);
476,132✔
1284
            --num_pollfd_slots;
476,132✔
1285
        }
476,132✔
1286
        else {
212,658✔
1287
            ++pollfd_slot_ndx;
212,658✔
1288
        }
212,658✔
1289
    }
688,790✔
1290

632,918✔
1291
    REALM_ASSERT(num_ready_descriptors == 0);
632,918✔
1292

632,918✔
1293
    bool any_operations_completed = (m_num_operations < orig_num_operations);
632,918✔
1294
    return any_operations_completed;
632,918✔
1295
}
632,918✔
1296

1297

1298
void Service::IoReactor::discard_pollfd_slot_by_move_last_over(OperSlot& oper_slot) noexcept
1299
{
717,630✔
1300
    std::size_t pollfd_slot_ndx = oper_slot.pollfd_slot_ndx;
717,630✔
1301
    oper_slot.pollfd_slot_ndx = 0; // Mark unused
717,630✔
1302
    if (pollfd_slot_ndx < m_pollfd_slots.size() - 1) {
717,630✔
1303
        pollfd& last_pollfd_slot = m_pollfd_slots.back();
58,538✔
1304
        m_operations[last_pollfd_slot.fd].pollfd_slot_ndx = pollfd_slot_ndx;
58,538✔
1305
        m_pollfd_slots[pollfd_slot_ndx] = last_pollfd_slot;
58,538✔
1306
    }
58,538✔
1307
    m_pollfd_slots.pop_back();
717,630✔
1308
}
717,630✔
1309

1310
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
1311

1312

1313
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1314

1315
auto Service::IoReactor::get_and_reset_sleep_time() noexcept -> clock::duration
1316
{
1317
    clock::duration sleep_time = m_sleep_time;
1318
    m_sleep_time = clock::duration::zero();
1319
    return sleep_time;
1320
}
1321

1322
#endif // REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1323

1324

1325
class Service::Impl {
1326
public:
1327
    Service& service;
1328
    IoReactor io_reactor;
1329

1330
    Impl(Service& s)
1331
        : service{s}
1332
        , io_reactor{} // Throws
1333
    {
18,282✔
1334
    }
18,282✔
1335

1336
    ~Impl()
1337
    {
18,282✔
1338
        bool resolver_thread_started = m_resolver_thread.joinable();
18,282✔
1339
        if (resolver_thread_started) {
18,282✔
1340
            {
1,774✔
1341
                std::lock_guard lock{m_mutex};
1,774✔
1342
                m_stop_resolver_thread = true;
1,774✔
1343
                m_resolver_cond.notify_all();
1,774✔
1344
            }
1,774✔
1345
            m_resolver_thread.join();
1,774✔
1346
        }
1,774✔
1347

9,020✔
1348
        // Avoid calls to recycle_post_oper() after destruction has begun.
9,020✔
1349
        m_completed_operations.clear();
18,282✔
1350
    }
18,282✔
1351

1352
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1353
    {
×
1354
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1355
        m_event_loop_metrics_timer.emplace(service);
1356
        m_event_loop_metrics_timer->async_wait(
1357
            std::chrono::seconds{30}, [this, handler = std::move(handler)](Status status) {
1358
                REALM_ASSERT(status.is_ok());
1359
                clock::time_point now = clock::now();
1360
                clock::duration elapsed_time = now - m_event_loop_metrics_start_time;
1361
                clock::duration sleep_time = io_reactor.get_and_reset_sleep_time();
1362
                clock::duration nonsleep_time = elapsed_time - sleep_time;
1363
                double saturation = double(nonsleep_time.count()) / double(elapsed_time.count());
1364
                clock::duration internal_exec_time = nonsleep_time - m_handler_exec_time;
1365
                internal_exec_time += now - m_handler_exec_start_time;
1366
                double inefficiency = double(internal_exec_time.count()) / double(elapsed_time.count());
1367
                m_event_loop_metrics_start_time = now;
1368
                m_handler_exec_start_time = now;
1369
                m_handler_exec_time = clock::duration::zero();
1370
                handler(saturation, inefficiency);             // Throws
1371
                report_event_loop_metrics(std::move(handler)); // Throws
1372
            });                                                // Throws
1373
#else
1374
        static_cast<void>(handler);
×
1375
#endif
×
1376
    }
×
1377

1378
    void run()
1379
    {
8,980✔
1380
        run_impl(true);
8,980✔
1381
    }
8,980✔
1382

1383
    void run_until_stopped()
1384
    {
9,482✔
1385
        run_impl(false);
9,482✔
1386
    }
9,482✔
1387

1388
    void stop() noexcept
1389
    {
17,954✔
1390
        {
17,954✔
1391
            std::lock_guard lock{m_mutex};
17,954✔
1392
            if (m_stopped)
17,954✔
1393
                return;
×
1394
            m_stopped = true;
17,954✔
1395
        }
17,954✔
1396
        io_reactor.interrupt();
17,954✔
1397
    }
17,954✔
1398

1399
    void reset() noexcept
1400
    {
16✔
1401
        std::lock_guard lock{m_mutex};
16✔
1402
        m_stopped = false;
16✔
1403
    }
16✔
1404

1405
    static Endpoint::List resolve(const Resolver::Query&, std::error_code&);
1406

1407
    void add_resolve_oper(LendersResolveOperPtr op)
1408
    {
3,362✔
1409
        {
3,362✔
1410
            std::lock_guard lock{m_mutex};
3,362✔
1411
            m_resolve_operations.push_back(std::move(op)); // Throws
3,362✔
1412
            m_resolver_cond.notify_all();
3,362✔
1413
        }
3,362✔
1414
        bool resolver_thread_started = m_resolver_thread.joinable();
3,362✔
1415
        if (resolver_thread_started)
3,362✔
1416
            return;
1,596✔
1417
        auto func = [this]() noexcept {
1,774✔
1418
            resolver_thread();
1,774✔
1419
        };
1,774✔
1420
        m_resolver_thread = std::thread{std::move(func)};
1,766✔
1421
    }
1,766✔
1422

1423
    void add_wait_oper(LendersWaitOperPtr op)
1424
    {
535,202✔
1425
        m_wait_operations.push(std::move(op)); // Throws
535,202✔
1426
    }
535,202✔
1427

1428
    void post(PostOperConstr constr, std::size_t size, void* cookie)
1429
    {
1,175,444✔
1430
        {
1,175,444✔
1431
            std::lock_guard lock{m_mutex};
1,175,444✔
1432
            std::unique_ptr<char[]> mem;
1,175,444✔
1433
            if (m_post_oper && m_post_oper->m_size >= size) {
1,175,444✔
1434
                // Reuse old memory
233,848✔
1435
                AsyncOper* op = m_post_oper.release();
412,484✔
1436
                REALM_ASSERT(dynamic_cast<UnusedOper*>(op));
412,484✔
1437
                static_cast<UnusedOper*>(op)->UnusedOper::~UnusedOper(); // Static dispatch
412,484✔
1438
                mem.reset(static_cast<char*>(static_cast<void*>(op)));
412,484✔
1439
            }
412,484✔
1440
            else {
762,960✔
1441
                // Allocate new memory
357,740✔
1442
                mem.reset(new char[size]); // Throws
762,960✔
1443
            }
762,960✔
1444

591,588✔
1445
            std::unique_ptr<PostOperBase, LendersOperDeleter> op;
1,175,444✔
1446
            op.reset((*constr)(mem.get(), size, *this, cookie)); // Throws
1,175,444✔
1447
            mem.release();
1,175,444✔
1448
            m_completed_operations_2.push_back(std::move(op));
1,175,444✔
1449
        }
1,175,444✔
1450
        io_reactor.interrupt();
1,175,444✔
1451
    }
1,175,444✔
1452

1453
    void recycle_post_oper(PostOperBase* op) noexcept
1454
    {
1,175,948✔
1455
        std::size_t size = op->m_size;
1,175,948✔
1456
        op->~PostOperBase();                           // Dynamic dispatch
1,175,948✔
1457
        OwnersOperPtr op_2(new (op) UnusedOper(size)); // Does not throw
1,175,948✔
1458

591,860✔
1459
        // Keep the larger memory chunk (`op_2` or m_post_oper)
591,860✔
1460
        {
1,175,948✔
1461
            std::lock_guard lock{m_mutex};
1,175,948✔
1462
            if (!m_post_oper || m_post_oper->m_size < size)
1,175,948✔
1463
                swap(op_2, m_post_oper);
427,988✔
1464
        }
1,175,948✔
1465
    }
1,175,948✔
1466

1467
    void trigger_exec(TriggerExecOperBase& op) noexcept
1468
    {
×
1469
        {
×
1470
            std::lock_guard lock{m_mutex};
×
1471
            if (op.m_in_use)
×
1472
                return;
×
1473
            op.m_in_use = true;
×
1474
            bind_ptr<TriggerExecOperBase> op_2{&op}; // Increment use count
×
1475
            LendersOperPtr op_3{op_2.release()};
×
1476
            m_completed_operations_2.push_back(std::move(op_3));
×
1477
        }
×
1478
        io_reactor.interrupt();
×
1479
    }
×
1480

1481
    void reset_trigger_exec(TriggerExecOperBase& op) noexcept
1482
    {
×
1483
        std::lock_guard lock{m_mutex};
×
1484
        op.m_in_use = false;
×
1485
    }
×
1486

1487
    void add_completed_oper(LendersOperPtr op) noexcept
1488
    {
1,198,946✔
1489
        m_completed_operations.push_back(std::move(op));
1,198,946✔
1490
    }
1,198,946✔
1491

1492
    void remove_canceled_ops(Descriptor& desc) noexcept
1493
    {
488,834✔
1494
        io_reactor.remove_canceled_ops(desc, m_completed_operations);
488,834✔
1495
    }
488,834✔
1496

1497
    void cancel_resolve_oper(ResolveOperBase& op) noexcept
1498
    {
12✔
1499
        std::lock_guard lock{m_mutex};
12✔
1500
        op.cancel();
12✔
1501
    }
12✔
1502

1503
    void cancel_incomplete_wait_oper(WaitOperBase& op) noexcept
1504
    {
20,934✔
1505
        auto p = std::equal_range(m_wait_operations.begin(), m_wait_operations.end(), op.m_expiration_time,
20,934✔
1506
                                  WaitOperCompare{});
20,934✔
1507
        auto pred = [&op](const LendersWaitOperPtr& op_2) {
20,936✔
1508
            return &*op_2 == &op;
20,936✔
1509
        };
20,936✔
1510
        auto i = std::find_if(p.first, p.second, pred);
20,934✔
1511
        REALM_ASSERT(i != p.second);
20,934✔
1512
        m_completed_operations.push_back(m_wait_operations.erase(i));
20,934✔
1513
    }
20,934✔
1514

1515
private:
1516
    OperQueue<AsyncOper> m_completed_operations; // Completed, canceled, and post operations
1517

1518
    struct WaitOperCompare {
1519
        bool operator()(const LendersWaitOperPtr& a, clock::time_point b)
1520
        {
28,280✔
1521
            return a->m_expiration_time > b;
28,280✔
1522
        }
28,280✔
1523
        bool operator()(clock::time_point a, const LendersWaitOperPtr& b)
1524
        {
25,046✔
1525
            return a > b->m_expiration_time;
25,046✔
1526
        }
25,046✔
1527
        bool operator()(const LendersWaitOperPtr& a, const LendersWaitOperPtr& b)
1528
        {
142,014✔
1529
            return a->m_expiration_time > b->m_expiration_time;
142,014✔
1530
        }
142,014✔
1531
    };
1532

1533
    using WaitQueue = util::PriorityQueue<LendersWaitOperPtr, std::vector<LendersWaitOperPtr>, WaitOperCompare>;
1534
    WaitQueue m_wait_operations;
1535

1536
    std::mutex m_mutex;
1537
    OwnersOperPtr m_post_oper;                       // Protected by `m_mutex`
1538
    OperQueue<ResolveOperBase> m_resolve_operations; // Protected by `m_mutex`
1539
    OperQueue<AsyncOper> m_completed_operations_2;   // Protected by `m_mutex`
1540
    bool m_stopped = false;                          // Protected by `m_mutex`
1541
    bool m_stop_resolver_thread = false;             // Protected by `m_mutex`
1542
    bool m_resolve_in_progress = false;              // Protected by `m_mutex`
1543
    std::condition_variable m_resolver_cond;         // Protected by `m_mutex`
1544

1545
    std::thread m_resolver_thread;
1546

1547
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1548
    util::Optional<DeadlineTimer> m_event_loop_metrics_timer;
1549
    clock::time_point m_event_loop_metrics_start_time = clock::now();
1550
    clock::time_point m_handler_exec_start_time;
1551
    clock::duration m_handler_exec_time = clock::duration::zero();
1552
#endif
1553
    void run_impl(bool return_when_idle)
1554
    {
18,460✔
1555
        bool no_incomplete_resolve_operations;
18,460✔
1556

9,104✔
1557
    on_handlers_executed_or_interrupted : {
2,296,476✔
1558
        std::lock_guard lock{m_mutex};
2,296,476✔
1559
        if (m_stopped)
2,296,476✔
1560
            return;
17,910✔
1561
        // Note: Order of post operations must be preserved.
990,892✔
1562
        m_completed_operations.push_back(m_completed_operations_2);
2,278,566✔
1563
        no_incomplete_resolve_operations = (!m_resolve_in_progress && m_resolve_operations.empty());
2,280,034✔
1564

990,892✔
1565
        if (m_completed_operations.empty())
2,278,566✔
1566
            goto on_time_progressed;
1,980,498✔
1567
    }
2,019,676✔
1568

880,284✔
1569
    on_operations_completed : {
2,019,676✔
1570
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1571
        m_handler_exec_start_time = clock::now();
1572
#endif
1573
        while (LendersOperPtr op = m_completed_operations.pop_front())
7,086,910✔
1574
            execute(op); // Throws
5,067,234✔
1575
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1576
        m_handler_exec_time += clock::now() - m_handler_exec_start_time;
1577
#endif
1578
        goto on_handlers_executed_or_interrupted;
2,019,676✔
1579
    }
2,127,552✔
1580

996,628✔
1581
    on_time_progressed : {
2,127,552✔
1582
        clock::time_point now = clock::now();
2,127,552✔
1583
        if (process_timers(now))
2,127,552✔
1584
            goto on_operations_completed;
494,308✔
1585

748,706✔
1586
        bool no_incomplete_operations =
1,633,244✔
1587
            (io_reactor.empty() && m_wait_operations.empty() && no_incomplete_resolve_operations);
1,633,244✔
1588
        if (no_incomplete_operations && return_when_idle) {
1,633,244✔
1589
            // We can only get to this point when there are no completion
258✔
1590
            // handlers ready to execute. It happens either because of a
258✔
1591
            // fall-through from on_operations_completed, or because of a
258✔
1592
            // jump to on_time_progressed, but that only happens if no
258✔
1593
            // completions handlers became ready during
258✔
1594
            // wait_and_process_io().
258✔
1595
            //
258✔
1596
            // We can also only get to this point when there are no
258✔
1597
            // asynchronous operations in progress (due to the preceeding
258✔
1598
            // if-condition.
258✔
1599
            //
258✔
1600
            // It is possible that an other thread has added new post
258✔
1601
            // operations since we checked, but there is really no point in
258✔
1602
            // rechecking that, as it is always possible, even after a
258✔
1603
            // recheck, that new post handlers get added after we decide to
258✔
1604
            // return, but before we actually do return. Also, if would
258✔
1605
            // offer no additional guarantees to the application.
258✔
1606
            return; // Out of work
520✔
1607
        }
520✔
1608

748,448✔
1609
        // Blocking wait for I/O
748,448✔
1610
        bool interrupted = false;
1,632,724✔
1611
        if (wait_and_process_io(now, interrupted)) // Throws
1,632,724✔
1612
            goto on_operations_completed;
1,226,290✔
1613
        if (interrupted)
406,434✔
1614
            goto on_handlers_executed_or_interrupted;
259,934✔
1615
        goto on_time_progressed;
146,500✔
1616
    }
146,500✔
1617
    }
146,500✔
1618
    bool process_timers(clock::time_point now)
1619
    {
2,128,358✔
1620
        bool any_operations_completed = false;
2,128,358✔
1621
        for (;;) {
2,640,580✔
1622
            if (m_wait_operations.empty())
2,640,580✔
1623
                break;
996,312✔
1624
            auto& op = m_wait_operations.top();
1,644,268✔
1625
            if (now < op->m_expiration_time)
1,644,268✔
1626
                break;
1,132,638✔
1627
            op->complete();
511,630✔
1628
            m_completed_operations.push_back(m_wait_operations.pop_top());
511,630✔
1629
            any_operations_completed = true;
511,630✔
1630
        }
511,630✔
1631
        return any_operations_completed;
2,128,358✔
1632
    }
2,128,358✔
1633

1634
    bool wait_and_process_io(clock::time_point now, bool& interrupted)
1635
    {
1,633,924✔
1636
        clock::time_point timeout;
1,633,924✔
1637
        if (!m_wait_operations.empty())
1,633,924✔
1638
            timeout = m_wait_operations.top()->m_expiration_time;
1,024,348✔
1639
        bool operations_completed = io_reactor.wait_and_advance(timeout, now, interrupted,
1,633,924✔
1640
                                                                m_completed_operations); // Throws
1,633,924✔
1641
        return operations_completed;
1,633,924✔
1642
    }
1,633,924✔
1643

1644
    static void execute(LendersOperPtr& lenders_ptr)
1645
    {
5,065,664✔
1646
        lenders_ptr.release()->recycle_and_execute(); // Throws
5,065,664✔
1647
    }
5,065,664✔
1648

1649
    void resolver_thread() noexcept
1650
    {
1,774✔
1651
        LendersResolveOperPtr op;
1,774✔
1652
        for (;;) {
5,144✔
1653
            {
5,144✔
1654
                std::unique_lock lock{m_mutex};
5,144✔
1655
                if (op) {
5,144✔
1656
                    m_completed_operations_2.push_back(std::move(op));
3,370✔
1657
                    io_reactor.interrupt();
3,370✔
1658
                }
3,370✔
1659
                m_resolve_in_progress = false;
5,144✔
1660
                while (m_resolve_operations.empty() && !m_stop_resolver_thread)
8,508✔
1661
                    m_resolver_cond.wait(lock);
3,364✔
1662
                if (m_stop_resolver_thread)
5,144✔
1663
                    return;
1,774✔
1664
                op = m_resolve_operations.pop_front();
3,370✔
1665
                m_resolve_in_progress = true;
3,370✔
1666
                if (op->is_canceled())
3,370✔
1667
                    continue;
4✔
1668
            }
3,366✔
1669
            try {
3,366✔
1670
                op->m_endpoints = resolve(op->m_query, op->m_error_code); // Throws only std::bad_alloc
3,366✔
1671
            }
3,366✔
1672
            catch (std::bad_alloc&) {
1,632✔
1673
                op->m_error_code = make_basic_system_error_code(ENOMEM);
×
1674
            }
×
1675
            op->complete();
3,366✔
1676
        }
3,366✔
1677
    }
1,774✔
1678
};
1679

1680

1681
// This function promises to only ever throw std::bad_alloc.
1682
Endpoint::List Service::Impl::resolve(const Resolver::Query& query, std::error_code& ec)
1683
{
11,834✔
1684
    Endpoint::List list;
11,834✔
1685

5,814✔
1686
    using addrinfo_type = struct addrinfo;
11,834✔
1687
    addrinfo_type hints = addrinfo_type(); // Clear
11,834✔
1688
    hints.ai_flags = query.m_flags;
11,834✔
1689
    hints.ai_family = query.m_protocol.m_family;
11,834✔
1690
    hints.ai_socktype = query.m_protocol.m_socktype;
11,834✔
1691
    hints.ai_protocol = query.m_protocol.m_protocol;
11,834✔
1692

5,814✔
1693
    const char* query_host = query.m_host.empty() ? 0 : query.m_host.c_str();
11,834✔
1694
    const char* query_service = query.m_service.empty() ? 0 : query.m_service.c_str();
10,098✔
1695
    struct addrinfo* first = nullptr;
11,834✔
1696
    int ret = ::getaddrinfo(query_host, query_service, &hints, &first);
11,834✔
1697
    if (REALM_UNLIKELY(ret != 0)) {
11,834✔
1698
#ifdef EAI_SYSTEM
8✔
1699
        if (ret == EAI_SYSTEM) {
8✔
1700
            if (errno != 0) {
×
1701
                ec = make_basic_system_error_code(errno);
×
1702
            }
×
1703
            else {
×
1704
                ec = error::unknown;
×
1705
            }
×
1706
            return list;
×
1707
        }
×
1708
#endif
8✔
1709
        ec = translate_addrinfo_error(ret);
8✔
1710
        return list;
8✔
1711
    }
8✔
1712

5,810✔
1713
    GetaddrinfoResultOwner gro(first);
11,826✔
1714

5,810✔
1715
    // Count number of IPv4/IPv6 endpoints
5,810✔
1716
    std::size_t num_endpoints = 0;
11,826✔
1717
    {
11,826✔
1718
        struct addrinfo* curr = first;
11,826✔
1719
        while (curr) {
25,492✔
1720
            bool ip_v4 = curr->ai_family == AF_INET;
13,666✔
1721
            bool ip_v6 = curr->ai_family == AF_INET6;
13,666✔
1722
            if (ip_v4 || ip_v6)
13,666✔
1723
                ++num_endpoints;
13,666✔
1724
            curr = curr->ai_next;
13,666✔
1725
        }
13,666✔
1726
    }
11,826✔
1727
    REALM_ASSERT(num_endpoints >= 1);
11,826✔
1728

5,810✔
1729
    // Copy the IPv4/IPv6 endpoints
5,810✔
1730
    list.m_endpoints.set_size(num_endpoints); // Throws
11,826✔
1731
    struct addrinfo* curr = first;
11,826✔
1732
    std::size_t endpoint_ndx = 0;
11,826✔
1733
    while (curr) {
25,492✔
1734
        bool ip_v4 = curr->ai_family == AF_INET;
13,666✔
1735
        bool ip_v6 = curr->ai_family == AF_INET6;
13,666✔
1736
        if (ip_v4 || ip_v6) {
13,666✔
1737
            REALM_ASSERT((ip_v4 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v4_type)) ||
13,666✔
1738
                         (ip_v6 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v6_type)));
13,666✔
1739
            Endpoint& ep = list.m_endpoints[endpoint_ndx];
13,666✔
1740
            ep.m_protocol.m_family = curr->ai_family;
13,666✔
1741
            ep.m_protocol.m_socktype = curr->ai_socktype;
13,666✔
1742
            ep.m_protocol.m_protocol = curr->ai_protocol;
13,666✔
1743
            if (ip_v4) {
13,666✔
1744
                ep.m_sockaddr_union.m_ip_v4 = reinterpret_cast<Endpoint::sockaddr_ip_v4_type&>(*curr->ai_addr);
11,826✔
1745
            }
11,826✔
1746
            else {
1,840✔
1747
                ep.m_sockaddr_union.m_ip_v6 = reinterpret_cast<Endpoint::sockaddr_ip_v6_type&>(*curr->ai_addr);
1,840✔
1748
            }
1,840✔
1749
            ++endpoint_ndx;
13,666✔
1750
        }
13,666✔
1751
        curr = curr->ai_next;
13,666✔
1752
    }
13,666✔
1753

5,810✔
1754
    ec = std::error_code(); // Success
11,826✔
1755
    return list;
11,826✔
1756
}
11,826✔
1757

1758

1759
Service::Service()
1760
    : m_impl{std::make_unique<Impl>(*this)} // Throws
1761
{
18,282✔
1762
}
18,282✔
1763

1764

1765
Service::~Service() noexcept {}
18,282✔
1766

1767

1768
void Service::run()
1769
{
8,980✔
1770
    m_impl->run(); // Throws
8,980✔
1771
}
8,980✔
1772

1773

1774
void Service::run_until_stopped()
1775
{
9,482✔
1776
    m_impl->run_until_stopped();
9,482✔
1777
}
9,482✔
1778

1779

1780
void Service::stop() noexcept
1781
{
17,954✔
1782
    m_impl->stop();
17,954✔
1783
}
17,954✔
1784

1785

1786
void Service::reset() noexcept
1787
{
16✔
1788
    m_impl->reset();
16✔
1789
}
16✔
1790

1791

1792
void Service::report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1793
{
×
1794
    m_impl->report_event_loop_metrics(std::move(handler)); // Throws
×
1795
}
×
1796

1797

1798
void Service::do_post(PostOperConstr constr, std::size_t size, void* cookie)
1799
{
1,175,802✔
1800
    m_impl->post(constr, size, cookie); // Throws
1,175,802✔
1801
}
1,175,802✔
1802

1803

1804
void Service::recycle_post_oper(Impl& impl, PostOperBase* op) noexcept
1805
{
1,175,952✔
1806
    impl.recycle_post_oper(op);
1,175,952✔
1807
}
1,175,952✔
1808

1809

1810
void Service::trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1811
{
×
1812
    impl.trigger_exec(op);
×
1813
}
×
1814

1815

1816
void Service::reset_trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1817
{
×
1818
    impl.reset_trigger_exec(op);
×
1819
}
×
1820

1821

1822
void Service::Descriptor::accept(Descriptor& desc, StreamProtocol protocol, Endpoint* ep,
1823
                                 std::error_code& ec) noexcept
1824
{
3,174✔
1825
    REALM_ASSERT(is_open());
3,174✔
1826

1,052✔
1827
    union union_type {
3,174✔
1828
        Endpoint::sockaddr_union_type m_sockaddr_union;
3,174✔
1829
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
3,174✔
1830
    };
3,174✔
1831
    union_type buffer;
3,174✔
1832
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
3,174✔
1833
    socklen_t addr_len = sizeof buffer;
3,174✔
1834
    CloseGuard new_sock_fd;
3,174✔
1835
    for (;;) {
3,174✔
1836
#if HAVE_LINUX_ACCEPT4
1,052✔
1837
        // On Linux (HAVE_LINUX_ACCEPT4), make the accepted socket inherit the
1,052✔
1838
        // O_NONBLOCK status flag from the accepting socket to avoid an extra
1,052✔
1839
        // call to fcntl(). Note, it is deemed most likely that the accepted
1,052✔
1840
        // socket is going to be used in nonblocking when, and only when the
1,052✔
1841
        // accepting socket is used in nonblocking mode. Other platforms are
1,052✔
1842
        // handled below.
1,052✔
1843
        int flags = SOCK_CLOEXEC;
1,052✔
1844
        if (!in_blocking_mode())
1,052✔
1845
            flags |= SOCK_NONBLOCK;
1,030✔
1846
        native_handle_type ret = ::accept4(m_fd, addr, &addr_len, flags);
1,052✔
1847
#else
1848
        native_handle_type ret = ::accept(m_fd, addr, &addr_len);
2,122✔
1849
#endif
2,122✔
1850
#ifdef _WIN32
1851
        if (ret == INVALID_SOCKET) {
1852
            int err = WSAGetLastError();
1853
            if (err == WSAEINTR)
1854
                continue; // Retry on interruption by system signal
1855
            set_read_ready(err != WSAEWOULDBLOCK);
1856
            ec = make_winsock_error_code(err); // Failure
1857
            return;
1858
        }
1859
#else
1860
        if (REALM_UNLIKELY(ret == -1)) {
3,174✔
1861
            int err = errno;
992✔
1862
            if (err == EINTR)
992✔
1863
                continue; // Retry on interruption by system signal
×
1864
            if (err == EWOULDBLOCK)
992✔
1865
                err = EAGAIN;
992✔
1866
            set_read_ready(err != EAGAIN);
992✔
1867
            ec = make_basic_system_error_code(err); // Failure
992✔
1868
            return;
992✔
1869
        }
992✔
1870
#endif
2,182✔
1871
        new_sock_fd.reset(ret);
2,182✔
1872

1,052✔
1873
#if REALM_PLATFORM_APPLE
1,130✔
1874
        int optval = 1;
1,130✔
1875
        ret = ::setsockopt(new_sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
1,130✔
1876
        if (REALM_UNLIKELY(ret == -1)) {
1,130✔
1877
            // setsockopt() reports EINVAL if the other side disconnected while
1878
            // the connection was waiting in the listen queue.
1879
            int err = errno;
1880
            if (err == EINVAL) {
×
1881
                continue;
1882
            }
1883
            ec = make_basic_system_error_code(err);
1884
            return;
1885
        }
1886
#endif
1,130✔
1887

1,052✔
1888
        set_read_ready(true);
2,182✔
1889
        break;
2,182✔
1890
    }
2,182✔
1891
    socklen_t expected_addr_len =
2,182✔
1892
        protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
1,844✔
1893
    if (REALM_UNLIKELY(addr_len != expected_addr_len))
2,182✔
1894
        REALM_TERMINATE("Unexpected peer address length");
1895

1,052✔
1896
#if !HAVE_LINUX_ACCEPT4
1,130✔
1897
    {
1,130✔
1898
        bool value = true;
1,130✔
1899
        if (REALM_UNLIKELY(set_cloexec_flag(new_sock_fd, value, ec)))
1,130✔
1900
            return;
1901
    }
1,130✔
1902
#endif
1,130✔
1903

1,052✔
1904
    // On some platforms (such as Mac OS X), the accepted socket automatically
1,052✔
1905
    // inherits file status flags from the accepting socket, but on other
1,052✔
1906
    // systems, this is not the case. In the case of Linux (HAVE_LINUX_ACCEPT4),
1,052✔
1907
    // the inheriting behaviour is obtained by using the Linux specific
1,052✔
1908
    // accept4() system call.
1,052✔
1909
    //
1,052✔
1910
    // For other platforms, we need to be sure that m_in_blocking_mode for the
1,052✔
1911
    // new socket is initialized to reflect the actual state of O_NONBLOCK on
1,052✔
1912
    // the new socket.
1,052✔
1913
    //
1,052✔
1914
    // Note: This implementation currently never modifies status flags other
1,052✔
1915
    // than O_NONBLOCK, so we only need to consider that flag.
1,052✔
1916

1,052✔
1917
#if !REALM_PLATFORM_APPLE && !HAVE_LINUX_ACCEPT4
1918
    // Make the accepted socket inherit the state of O_NONBLOCK from the
1919
    // accepting socket.
1920
    {
1921
        bool value = !m_in_blocking_mode;
1922
        if (::set_nonblock_flag(new_sock_fd, value, ec))
1923
            return;
1924
    }
1925
#endif
1926

1,052✔
1927
    desc.assign(new_sock_fd.release(), m_in_blocking_mode);
2,182✔
1928
    desc.set_write_ready(true);
2,182✔
1929
    if (ep) {
2,182✔
1930
        ep->m_protocol = protocol;
1,942✔
1931
        ep->m_sockaddr_union = buffer.m_sockaddr_union;
1,942✔
1932
    }
1,942✔
1933
    ec = std::error_code(); // Success
2,182✔
1934
}
2,182✔
1935

1936

1937
std::size_t Service::Descriptor::read_some(char* buffer, std::size_t size, std::error_code& ec) noexcept
1938
{
2,135,832✔
1939
    if (REALM_UNLIKELY(assume_read_would_block())) {
2,135,832✔
1940
        ec = error::resource_unavailable_try_again; // Failure
156✔
1941
        return 0;
156✔
1942
    }
156✔
1943
    for (;;) {
2,135,676✔
1944
        int flags = 0;
2,134,858✔
1945
#ifdef _WIN32
1946
        ssize_t ret = ::recv(m_fd, buffer, int(size), flags);
1947
        if (ret == SOCKET_ERROR) {
1948
            int err = WSAGetLastError();
1949
            // Retry on interruption by system signal
1950
            if (err == WSAEINTR)
1951
                continue;
1952
            set_read_ready(err != WSAEWOULDBLOCK);
1953
            ec = make_winsock_error_code(err); // Failure
1954
            return 0;
1955
        }
1956
#else
1957
        ssize_t ret = ::recv(m_fd, buffer, size, flags);
2,134,858✔
1958
        if (ret == -1) {
2,134,858✔
1959
            int err = errno;
51,002✔
1960
            // Retry on interruption by system signal
244✔
1961
            if (err == EINTR)
51,002✔
1962
                continue;
244✔
1963
            if (err == EWOULDBLOCK)
51,002✔
1964
                err = EAGAIN;
50,816✔
1965
            set_read_ready(err != EAGAIN);
51,002✔
1966
            ec = make_basic_system_error_code(err); // Failure
51,002✔
1967
            return 0;
51,002✔
1968
        }
51,002✔
1969
#endif
2,083,856✔
1970
        if (REALM_UNLIKELY(ret == 0)) {
2,083,856✔
1971
            set_read_ready(true);
1,004✔
1972
            ec = MiscExtErrors::end_of_input;
1,004✔
1973
            return 0;
1,004✔
1974
        }
1,004✔
1975
        REALM_ASSERT(ret > 0);
2,082,852✔
1976
        std::size_t n = std::size_t(ret);
2,082,852✔
1977
        REALM_ASSERT(n <= size);
2,082,852✔
1978
#if REALM_NETWORK_USE_EPOLL
1979
        // On Linux a partial read (n < size) on a nonblocking stream-mode
1980
        // socket is guaranteed to only ever happen if a complete read would
1981
        // have been impossible without blocking (i.e., without failing with
1982
        // EAGAIN/EWOULDBLOCK), or if the end of input from the remote peer was
1983
        // detected by the Linux kernel.
1984
        //
1985
        // Further more, after a partial read, and when working with Linux epoll
1986
        // in edge-triggered mode (EPOLLET), it is safe to suspend further
1987
        // reading until a new read-readiness notification is received, provided
1988
        // that we registered interest in EPOLLRDHUP events, and an EPOLLRDHUP
1989
        // event was not received prior to the partial read. This is safe in the
1990
        // sense that reading is guaranteed to be resumed in a timely fashion
1991
        // (without unnessesary blocking), and in a manner that is free of race
1992
        // conditions. Note in particular that if a read was partial because the
1993
        // kernel had detected the end of input prior to that read, but the
1994
        // EPOLLRDHUP event was not received prior the that read, then reading
1995
        // will still be resumed immediately by the pending EPOLLRDHUP event.
1996
        //
1997
        // Note that without this extra "loss of read-readiness" trigger, it
1998
        // would have been necessary for the caller to immediately follow up
1999
        // with an (otherwise redundant) additional invocation of read_some()
2000
        // just to detect the loss of read-readiness.
2001
        //
2002
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2003
        // In particular, do we know that a partial read (n < size) on a
2004
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2005
        // complete read would have been impossible without blocking, or if the
2006
        // end of input from the remote peer was detected by the FreeBSD and/or
2007
        // macOS kernel? See http://stackoverflow.com/q/40123626/1698548.
2008
        set_read_ready(n == size || m_imminent_end_of_input);
2009
#else
2010
        set_read_ready(true);
2,082,852✔
2011
#endif
2,082,852✔
2012
        ec = std::error_code(); // Success
2,082,852✔
2013
        return n;
2,082,852✔
2014
    }
2,082,852✔
2015
}
2,135,676✔
2016

2017

2018
std::size_t Service::Descriptor::write_some(const char* data, std::size_t size, std::error_code& ec) noexcept
2019
{
1,488,918✔
2020
    if (REALM_UNLIKELY(assume_write_would_block())) {
1,488,918✔
2021
        ec = error::resource_unavailable_try_again; // Failure
93,416✔
2022
        return 0;
93,416✔
2023
    }
93,416✔
2024
    for (;;) {
1,395,582✔
2025
        int flags = 0;
1,395,478✔
2026
#ifdef __linux__
717,712✔
2027
        // Prevent SIGPIPE when remote peer has closed the connection.
717,712✔
2028
        flags |= MSG_NOSIGNAL;
717,712✔
2029
#endif
717,712✔
2030
#ifdef _WIN32
2031
        ssize_t ret = ::send(m_fd, data, int(size), flags);
2032
        if (ret == SOCKET_ERROR) {
2033
            int err = WSAGetLastError();
2034
            // Retry on interruption by system signal
2035
            if (err == WSAEINTR)
2036
                continue;
2037
            set_write_ready(err != WSAEWOULDBLOCK);
2038
            ec = make_winsock_error_code(err); // Failure
2039
            return 0;
2040
        }
2041
#else
2042
        ssize_t ret = ::send(m_fd, data, size, flags);
1,395,478✔
2043
        if (ret == -1) {
1,395,478✔
2044
            int err = errno;
3,808✔
2045
            // Retry on interruption by system signal
428✔
2046
            if (err == EINTR)
3,808✔
2047
                continue;
428✔
2048
#if REALM_PLATFORM_APPLE
3,380✔
2049
            // The macOS kernel can generate an undocumented EPROTOTYPE in
2050
            // certain cases where the peer has closed the connection (in
2051
            // tcp_usr_send() in bsd/netinet/tcp_usrreq.c) See also
2052
            // http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/.
2053
            if (REALM_UNLIKELY(err == EPROTOTYPE))
3,380✔
2054
                err = EPIPE;
2055
#endif
3,380✔
2056
            if (err == EWOULDBLOCK)
3,808✔
2057
                err = EAGAIN;
3,684✔
2058
            set_write_ready(err != EAGAIN);
3,808✔
2059
            ec = make_basic_system_error_code(err); // Failure
3,808✔
2060
            return 0;
3,808✔
2061
        }
3,808✔
2062
#endif
1,391,670✔
2063
        REALM_ASSERT(ret >= 0);
1,391,670✔
2064
        std::size_t n = std::size_t(ret);
1,391,670✔
2065
        REALM_ASSERT(n <= size);
1,391,670✔
2066
#if REALM_NETWORK_USE_EPOLL
2067
        // On Linux a partial write (n < size) on a nonblocking stream-mode
2068
        // socket is guaranteed to only ever happen if a complete write would
2069
        // have been impossible without blocking (i.e., without failing with
2070
        // EAGAIN/EWOULDBLOCK).
2071
        //
2072
        // Further more, after a partial write, and when working with Linux
2073
        // epoll in edge-triggered mode (EPOLLET), it is safe to suspend further
2074
        // writing until a new write-readiness notification is received. This is
2075
        // safe in the sense that writing is guaranteed to be resumed in a
2076
        // timely fashion (without unnessesary blocking), and in a manner that
2077
        // is free of race conditions.
2078
        //
2079
        // Note that without this extra "loss of write-readiness" trigger, it
2080
        // would have been necessary for the caller to immediately follow up
2081
        // with an (otherwise redundant) additional invocation of write_some()
2082
        // just to detect the loss of write-readiness.
2083
        //
2084
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2085
        // In particular, do we know that a partial write (n < size) on a
2086
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2087
        // complete write would have been impossible without blocking? See
2088
        // http://stackoverflow.com/q/40123626/1698548.
2089
        set_write_ready(n == size);
2090
#else
2091
        set_write_ready(true);
1,391,670✔
2092
#endif
1,391,670✔
2093
        ec = std::error_code(); // Success
1,391,670✔
2094
        return n;
1,391,670✔
2095
    }
1,391,670✔
2096
}
1,395,502✔
2097

2098

2099
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2100

2101
void Service::Descriptor::deregister_for_async() noexcept
2102
{
7,320✔
2103
    service_impl.io_reactor.deregister_desc(*this);
7,320✔
2104
}
7,320✔
2105

2106
#endif // REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2107

2108

2109
void Service::Descriptor::set_nonblock_flag(bool value)
2110
{
12,420✔
2111
    ::set_nonblock_flag(m_fd, value); // Throws
12,420✔
2112
}
12,420✔
2113

2114

2115
void Service::Descriptor::add_initiated_oper(LendersIoOperPtr op, Want want)
2116
{
3,391,238✔
2117
    if (REALM_UNLIKELY(want == Want::nothing)) {
3,391,238✔
2118
        REALM_ASSERT(op->is_complete());
1,199,016✔
2119
        service_impl.add_completed_oper(std::move(op));
1,199,016✔
2120
        return;
1,199,016✔
2121
    }
1,199,016✔
2122
    REALM_ASSERT(!op->is_complete());
2,192,222✔
2123
    service_impl.io_reactor.add_oper(*this, std::move(op), want); // Throws
2,192,222✔
2124
}
2,192,222✔
2125

2126

2127
void Service::Descriptor::do_close() noexcept
2128
{
14,528✔
2129
    checked_close(m_fd);
14,528✔
2130
    m_fd = -1;
14,528✔
2131
}
14,528✔
2132

2133

2134
auto Service::Descriptor::do_release() noexcept -> native_handle_type
2135
{
×
2136
    native_handle_type fd = m_fd;
×
2137
    m_fd = -1;
×
2138
    return fd;
×
2139
}
×
2140

2141

2142
Service& Resolver::get_service() noexcept
2143
{
×
2144
    return m_service_impl.service;
×
2145
}
×
2146

2147

2148
Endpoint::List Resolver::resolve(const Query& query, std::error_code& ec)
2149
{
8,468✔
2150
    return Service::Impl::resolve(query, ec); // Throws
8,468✔
2151
}
8,468✔
2152

2153

2154
void Resolver::cancel() noexcept
2155
{
11,840✔
2156
    if (m_resolve_oper && m_resolve_oper->in_use() && !m_resolve_oper->is_canceled()) {
11,840✔
2157
        Service::ResolveOperBase& op = static_cast<Service::ResolveOperBase&>(*m_resolve_oper);
12✔
2158
        m_service_impl.cancel_resolve_oper(op);
12✔
2159
    }
12✔
2160
}
11,840✔
2161

2162

2163
void Resolver::initiate_oper(Service::LendersResolveOperPtr op)
2164
{
3,368✔
2165
    m_service_impl.add_resolve_oper(std::move(op)); // Throws
3,368✔
2166
}
3,368✔
2167

2168

2169
Service& SocketBase::get_service() noexcept
2170
{
480✔
2171
    return m_desc.service_impl.service;
480✔
2172
}
480✔
2173

2174

2175
void SocketBase::cancel() noexcept
2176
{
1,005,796✔
2177
    bool any_incomplete = false;
1,005,796✔
2178
    if (m_read_oper && m_read_oper->in_use() && !m_read_oper->is_canceled()) {
1,005,796✔
2179
        m_read_oper->cancel();
440,524✔
2180
        if (!m_read_oper->is_complete())
440,524✔
2181
            any_incomplete = true;
440,624✔
2182
    }
440,524✔
2183
    if (m_write_oper && m_write_oper->in_use() && !m_write_oper->is_canceled()) {
1,005,796✔
2184
        m_write_oper->cancel();
413,666✔
2185
        if (!m_write_oper->is_complete())
413,666✔
2186
            any_incomplete = true;
413,538✔
2187
    }
413,666✔
2188
    if (any_incomplete)
1,005,796✔
2189
        m_desc.service_impl.remove_canceled_ops(m_desc);
488,854✔
2190
}
1,005,796✔
2191

2192

2193
std::error_code SocketBase::bind(const Endpoint& ep, std::error_code& ec)
2194
{
8,706✔
2195
    if (!is_open()) {
8,706✔
2196
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
×
2197
            return ec;
×
2198
    }
8,706✔
2199

4,300✔
2200
    native_handle_type sock_fd = m_desc.native_handle();
8,706✔
2201
    socklen_t addr_len =
8,706✔
2202
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
8,364✔
2203

4,300✔
2204
    int ret = ::bind(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
8,706✔
2205
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8,706✔
2206
        return ec;
4,300✔
2207
    ec = std::error_code(); // Success
8,706✔
2208
    return ec;
8,706✔
2209
}
8,706✔
2210

2211

2212
Endpoint SocketBase::local_endpoint(std::error_code& ec) const
2213
{
20,526✔
2214
    Endpoint ep;
20,526✔
2215
    union union_type {
20,526✔
2216
        Endpoint::sockaddr_union_type m_sockaddr_union;
20,526✔
2217
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
20,526✔
2218
    };
20,526✔
2219
    native_handle_type sock_fd = m_desc.native_handle();
20,526✔
2220
    union_type buffer;
20,526✔
2221
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
20,526✔
2222
    socklen_t addr_len = sizeof buffer;
20,526✔
2223
    int ret = ::getsockname(sock_fd, addr, &addr_len);
20,526✔
2224
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
20,526✔
2225
        return ep;
10,108✔
2226

10,108✔
2227
    socklen_t expected_addr_len =
20,526✔
2228
        m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
18,354✔
2229
    if (addr_len != expected_addr_len)
20,526✔
2230
        throw util::runtime_error("Unexpected local address length");
×
2231
    ep.m_protocol = m_protocol;
20,526✔
2232
    ep.m_sockaddr_union = buffer.m_sockaddr_union;
20,526✔
2233
    ec = std::error_code(); // Success
20,526✔
2234
#ifdef _WIN32
2235
    ep.m_sockaddr_union.m_ip_v4.sin_addr.s_addr = inet_addr("127.0.0.1");
2236
#endif
2237
    return ep;
20,526✔
2238
}
20,526✔
2239

2240

2241
std::error_code SocketBase::open(const StreamProtocol& prot, std::error_code& ec)
2242
{
12,342✔
2243
    if (REALM_UNLIKELY(is_open()))
12,342✔
2244
        throw util::runtime_error("Socket is already open");
6,064✔
2245
    int type = prot.m_socktype;
12,342✔
2246
#if HAVE_LINUX_SOCK_CLOEXEC
6,064✔
2247
    type |= SOCK_CLOEXEC;
6,064✔
2248
#endif
6,064✔
2249
    native_handle_type ret = ::socket(prot.m_family, type, prot.m_protocol);
12,342✔
2250
#ifdef _WIN32
2251
    if (REALM_UNLIKELY(ret == INVALID_SOCKET)) {
2252
        ec = make_winsock_error_code(WSAGetLastError());
2253
        return ec;
2254
    }
2255
#else
2256
    if (REALM_UNLIKELY(ret == -1)) {
12,342✔
2257
        ec = make_basic_system_error_code(errno);
×
2258
        return ec;
×
2259
    }
×
2260
#endif
12,342✔
2261

6,064✔
2262
    CloseGuard sock_fd{ret};
12,342✔
2263

6,064✔
2264
#if !HAVE_LINUX_SOCK_CLOEXEC
6,278✔
2265
    {
6,278✔
2266
        bool value = true;
6,278✔
2267
        if (REALM_UNLIKELY(set_cloexec_flag(sock_fd, value, ec)))
6,278✔
2268
            return ec;
2269
    }
6,278✔
2270
#endif
6,278✔
2271

6,064✔
2272
#if REALM_PLATFORM_APPLE
6,278✔
2273
    {
6,278✔
2274
        int optval = 1;
6,278✔
2275
        int ret = setsockopt(sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
6,278✔
2276
        if (REALM_UNLIKELY(ret == -1)) {
6,278✔
2277
            ec = make_basic_system_error_code(errno);
2278
            return ec;
2279
        }
2280
    }
6,278✔
2281
#endif
6,278✔
2282

6,064✔
2283
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
12,342✔
2284
    m_desc.assign(sock_fd.release(), in_blocking_mode);
12,342✔
2285
    m_protocol = prot;
12,342✔
2286
    ec = std::error_code(); // Success
12,342✔
2287
    return ec;
12,342✔
2288
}
12,342✔
2289

2290

2291
std::error_code SocketBase::do_assign(const StreamProtocol& prot, native_handle_type sock_fd, std::error_code& ec)
2292
{
4✔
2293
    if (REALM_UNLIKELY(is_open()))
4✔
2294
        throw util::runtime_error("Socket is already open");
2✔
2295

2✔
2296
    // We need to know whether the specified socket is in blocking or in
2✔
2297
    // nonblocking mode. Rather than reading the current mode, we set it to
2✔
2298
    // blocking mode (disable nonblocking mode), and initialize
2✔
2299
    // `m_in_blocking_mode` to true.
2✔
2300
    {
4✔
2301
        bool value = false;
4✔
2302
        if (::set_nonblock_flag(sock_fd, value, ec))
4✔
2303
            return ec;
×
2304
    }
4✔
2305

2✔
2306
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
4✔
2307
    m_desc.assign(sock_fd, in_blocking_mode);
4✔
2308
    m_protocol = prot;
4✔
2309
    ec = std::error_code(); // Success
4✔
2310
    return ec;
4✔
2311
}
4✔
2312

2313

2314
void SocketBase::get_option(opt_enum opt, void* value_data, std::size_t& value_size, std::error_code& ec) const
2315
{
8✔
2316
    int level = 0;
8✔
2317
    int option_name = 0;
8✔
2318
    map_option(opt, level, option_name);
8✔
2319

4✔
2320
    native_handle_type sock_fd = m_desc.native_handle();
8✔
2321
    socklen_t option_len = socklen_t(value_size);
8✔
2322
    int ret = ::getsockopt(sock_fd, level, option_name, static_cast<char*>(value_data), &option_len);
8✔
2323
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8✔
2324
        return;
4✔
2325
    value_size = std::size_t(option_len);
8✔
2326
    ec = std::error_code(); // Success
8✔
2327
}
8✔
2328

2329

2330
void SocketBase::set_option(opt_enum opt, const void* value_data, std::size_t value_size, std::error_code& ec)
2331
{
10,344✔
2332
    int level = 0;
10,344✔
2333
    int option_name = 0;
10,344✔
2334
    map_option(opt, level, option_name);
10,344✔
2335

5,138✔
2336
    native_handle_type sock_fd = m_desc.native_handle();
10,344✔
2337
    int ret = ::setsockopt(sock_fd, level, option_name, static_cast<const char*>(value_data), socklen_t(value_size));
10,344✔
2338
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
10,344✔
2339
        return;
5,138✔
2340
    ec = std::error_code(); // Success
10,344✔
2341
}
10,344✔
2342

2343

2344
void SocketBase::map_option(opt_enum opt, int& level, int& option_name) const
2345
{
10,352✔
2346
    switch (opt) {
10,352✔
2347
        case opt_ReuseAddr:
8,468✔
2348
            level = SOL_SOCKET;
8,468✔
2349
            option_name = SO_REUSEADDR;
8,468✔
2350
            return;
8,468✔
2351
        case opt_Linger:
✔
2352
            level = SOL_SOCKET;
×
2353
#if REALM_PLATFORM_APPLE
2354
            // By default, SO_LINGER on Darwin uses "ticks" instead of
2355
            // seconds for better accuracy, but we want to be cross-platform.
2356
            option_name = SO_LINGER_SEC;
2357
#else
2358
            option_name = SO_LINGER;
2359
#endif // REALM_PLATFORM_APPLE
2360
            return;
×
2361
        case opt_NoDelay:
1,884✔
2362
            level = IPPROTO_TCP;
1,884✔
2363
            option_name = TCP_NODELAY; // Specified by POSIX.1-2001
1,884✔
2364
            return;
1,884✔
2365
    }
×
2366
    REALM_ASSERT(false);
×
2367
}
×
2368

2369

2370
std::error_code Socket::connect(const Endpoint& ep, std::error_code& ec)
2371
{
68✔
2372
    REALM_ASSERT(!m_write_oper || !m_write_oper->in_use());
68!
2373

32✔
2374
    if (!is_open()) {
68✔
2375
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
64✔
2376
            return ec;
30✔
2377
    }
68✔
2378

32✔
2379
    m_desc.ensure_blocking_mode(); // Throws
68✔
2380

32✔
2381
    native_handle_type sock_fd = m_desc.native_handle();
68✔
2382
    socklen_t addr_len =
68✔
2383
        (ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type));
54✔
2384
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
68✔
2385
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
68✔
2386
        return ec;
36✔
2387
    ec = std::error_code(); // Success
64✔
2388
    return ec;
64✔
2389
}
64✔
2390

2391

2392
std::error_code Socket::shutdown(shutdown_type what, std::error_code& ec)
2393
{
56✔
2394
    native_handle_type sock_fd = m_desc.native_handle();
56✔
2395
    int how = what;
56✔
2396
    int ret = ::shutdown(sock_fd, how);
56✔
2397
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
56✔
2398
        return ec;
28✔
2399
    ec = std::error_code(); // Success
56✔
2400
    return ec;
56✔
2401
}
56✔
2402

2403

2404
bool Socket::initiate_async_connect(const Endpoint& ep, std::error_code& ec)
2405
{
3,560✔
2406
    if (!is_open()) {
3,560✔
2407
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
3,556✔
2408
            return true; // Failure
1,726✔
2409
    }
3,560✔
2410
    m_desc.ensure_nonblocking_mode(); // Throws
3,560✔
2411

1,728✔
2412
    // Initiate connect operation.
1,728✔
2413
    native_handle_type sock_fd = m_desc.native_handle();
3,560✔
2414
    socklen_t addr_len =
3,560✔
2415
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
3,220✔
2416
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
3,560✔
2417
    if (ret != -1) {
3,560✔
2418
        ec = std::error_code(); // Success
2✔
2419
        return true;            // Immediate completion.
2✔
2420
    }
2✔
2421

1,726✔
2422
    // EINPROGRESS (and on Windows, also WSAEWOULDBLOCK) indicates that the
1,726✔
2423
    // underlying connect operation was successfully initiated, but not
1,726✔
2424
    // immediately completed, and EALREADY indicates that an underlying connect
1,726✔
2425
    // operation was already initiated, and still not completed, presumably
1,726✔
2426
    // because a previous call to connect() or async_connect() failed, or was
1,726✔
2427
    // canceled.
1,726✔
2428

1,726✔
2429
#ifdef _WIN32
2430
    int err = WSAGetLastError();
2431
    if (err != WSAEWOULDBLOCK) {
2432
        ec = make_winsock_error_code(err);
2433
        return true; // Failure
2434
    }
2435
#else
2436
    int err = errno;
3,558✔
2437
    if (REALM_UNLIKELY(err != EINPROGRESS && err != EALREADY)) {
3,558✔
2438
        ec = make_basic_system_error_code(err);
×
2439
        return true; // Failure
×
2440
    }
×
2441
#endif
3,558✔
2442

1,726✔
2443
    return false; // Successful initiation, but no immediate completion.
3,558✔
2444
}
3,558✔
2445

2446

2447
std::error_code Socket::finalize_async_connect(std::error_code& ec) noexcept
2448
{
3,548✔
2449
    native_handle_type sock_fd = m_desc.native_handle();
3,548✔
2450
    int connect_errno = 0;
3,548✔
2451
    socklen_t connect_errno_size = sizeof connect_errno;
3,548✔
2452
    int ret =
3,548✔
2453
        ::getsockopt(sock_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&connect_errno), &connect_errno_size);
3,548✔
2454
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
3,548✔
2455
        return ec; // getsockopt() failed
1,720✔
2456
    if (REALM_UNLIKELY(connect_errno)) {
3,548✔
UNCOV
2457
        ec = make_basic_system_error_code(connect_errno);
×
UNCOV
2458
        return ec; // connect failed
×
UNCOV
2459
    }
×
2460
    return std::error_code(); // Success
3,548✔
2461
}
3,548✔
2462

2463

2464
std::error_code Acceptor::listen(int backlog, std::error_code& ec)
2465
{
8,706✔
2466
    native_handle_type sock_fd = m_desc.native_handle();
8,706✔
2467
    int ret = ::listen(sock_fd, backlog);
8,706✔
2468
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8,706✔
2469
        return ec;
4,300✔
2470
    ec = std::error_code(); // Success
8,706✔
2471
    return ec;
8,706✔
2472
}
8,706✔
2473

2474

2475
Service& DeadlineTimer::get_service() noexcept
2476
{
×
2477
    return m_service_impl.service;
×
2478
}
×
2479

2480

2481
void DeadlineTimer::cancel() noexcept
2482
{
25,132✔
2483
    if (m_wait_oper && m_wait_oper->in_use() && !m_wait_oper->is_canceled()) {
25,132✔
2484
        m_wait_oper->cancel();
20,934✔
2485
        if (!m_wait_oper->is_complete()) {
20,934✔
2486
            using WaitOperBase = Service::WaitOperBase;
20,934✔
2487
            WaitOperBase& wait_operation = static_cast<WaitOperBase&>(*m_wait_oper);
20,934✔
2488
            m_service_impl.cancel_incomplete_wait_oper(wait_operation);
20,934✔
2489
        }
20,934✔
2490
    }
20,934✔
2491
}
25,132✔
2492

2493

2494
void DeadlineTimer::initiate_oper(Service::LendersWaitOperPtr op)
2495
{
535,214✔
2496
    m_service_impl.add_wait_oper(std::move(op)); // Throws
535,214✔
2497
}
535,214✔
2498

2499

2500
bool ReadAheadBuffer::read(char*& begin, char* end, int delim, std::error_code& ec) noexcept
2501
{
2,010,646✔
2502
    std::size_t in_avail = m_end - m_begin;
2,010,646✔
2503
    std::size_t out_avail = end - begin;
2,010,646✔
2504
    std::size_t n = std::min(in_avail, out_avail);
2,010,646✔
2505
    // If n is 0, return whether or not the read expects 0 bytes for the completed response
996,810✔
2506
    if (n == 0)
2,010,646✔
2507
        return out_avail == 0;
506,300✔
2508

751,158✔
2509
    bool delim_mode = (delim != std::char_traits<char>::eof());
1,504,346✔
2510
    char* i =
1,504,346✔
2511
        (!delim_mode ? m_begin + n : std::find(m_begin, m_begin + n, std::char_traits<char>::to_char_type(delim)));
1,480,090✔
2512
    begin = std::copy(m_begin, i, begin);
1,504,346✔
2513
    m_begin = i;
1,504,346✔
2514
    if (begin == end) {
1,504,346✔
2515
        if (delim_mode)
830,950✔
2516
            ec = MiscExtErrors::delim_not_found;
×
2517
    }
830,950✔
2518
    else {
673,396✔
2519
        if (m_begin == m_end)
673,396✔
2520
            return false;
628,176✔
2521
        REALM_ASSERT(delim_mode);
45,220✔
2522
        *begin++ = *m_begin++; // Transfer delimiter
45,220✔
2523
    }
45,220✔
2524
    return true;
1,187,408✔
2525
}
1,504,346✔
2526

2527

2528
namespace realm::sync::network {
2529

2530
std::string host_name()
2531
{
4✔
2532
    // POSIX allows for gethostname() to report success even if the buffer is
2✔
2533
    // too small to hold the name, and in that case POSIX requires that the
2✔
2534
    // buffer is filled, but not that it contains a final null-termination.
2✔
2535
    char small_stack_buffer[256];
4✔
2536
    int ret = ::gethostname(small_stack_buffer, sizeof small_stack_buffer);
4✔
2537
    if (ret != -1) {
4✔
2538
        // Check that a null-termination was included
2✔
2539
        char* end = small_stack_buffer + sizeof small_stack_buffer;
4✔
2540
        char* i = std::find(small_stack_buffer, end, 0);
4✔
2541
        if (i != end)
4✔
2542
            return std::string(small_stack_buffer, i);
4✔
2543
    }
×
2544
    constexpr std::size_t large_heap_buffer_size = 4096;
×
2545
    std::unique_ptr<char[]> large_heap_buffer(new char[large_heap_buffer_size]); // Throws
×
2546
    ret = ::gethostname(large_heap_buffer.get(), large_heap_buffer_size);
×
2547
    if (REALM_LIKELY(ret != -1)) {
×
2548
        // Check that a null-termination was included
2549
        char* end = large_heap_buffer.get() + large_heap_buffer_size;
×
2550
        char* i = std::find(large_heap_buffer.get(), end, 0);
×
2551
        if (i != end)
×
2552
            return std::string(large_heap_buffer.get(), i);
×
2553
    }
×
2554
    throw std::system_error(errno, std::system_category(), "gethostname() failed");
×
2555
}
×
2556

2557

2558
Address make_address(const char* c_str, std::error_code& ec) noexcept
2559
{
×
2560
    Address addr;
×
2561
    int ret = ::inet_pton(AF_INET6, c_str, &addr.m_union);
×
2562
    REALM_ASSERT(ret == 0 || ret == 1);
×
2563
    if (ret == 1) {
×
2564
        addr.m_is_ip_v6 = true;
×
2565
        ec = std::error_code(); // Success (IPv6)
×
2566
        return addr;
×
2567
    }
×
2568
    ret = ::inet_pton(AF_INET, c_str, &addr.m_union);
×
2569
    REALM_ASSERT(ret == 0 || ret == 1);
×
2570
    if (ret == 1) {
×
2571
        ec = std::error_code(); // Success (IPv4)
×
2572
        return addr;
×
2573
    }
×
2574
    ec = error::invalid_argument;
×
2575
    return Address();
×
2576

2577
    // FIXME: Currently. `addr.m_ip_v6_scope_id` is always set to zero. It nees
2578
    // to be set based on a combined inspection of the original string
2579
    // representation, and the parsed address. The following code is "borrowed"
2580
    // from ASIO:
2581
    /*
2582
        *scope_id = 0;
2583
        if (const char* if_name = strchr(src, '%'))
2584
        {
2585
          in6_addr_type* ipv6_address = static_cast<in6_addr_type*>(dest);
2586
          bool is_link_local = ((ipv6_address->s6_addr[0] == 0xfe)
2587
              && ((ipv6_address->s6_addr[1] & 0xc0) == 0x80));
2588
          bool is_multicast_link_local = ((ipv6_address->s6_addr[0] == 0xff)
2589
              && ((ipv6_address->s6_addr[1] & 0x0f) == 0x02));
2590
          if (is_link_local || is_multicast_link_local)
2591
            *scope_id = if_nametoindex(if_name + 1);
2592
          if (*scope_id == 0)
2593
            *scope_id = atoi(if_name + 1);
2594
        }
2595
    */
2596
}
×
2597

2598
class ResolveErrorCategory : public std::error_category {
2599
public:
2600
    const char* name() const noexcept final
2601
    {
×
2602
        return "realm.sync.network.resolve";
×
2603
    }
×
2604

2605
    std::string message(int value) const final
2606
    {
8✔
2607
        switch (ResolveErrors(value)) {
8✔
2608
            case ResolveErrors::host_not_found:
4✔
2609
                return "Host not found (authoritative)";
4✔
2610
            case ResolveErrors::host_not_found_try_again:
4✔
2611
                return "Host not found (non-authoritative)";
4✔
2612
            case ResolveErrors::no_data:
✔
2613
                return "The query is valid but does not have associated address data";
×
2614
            case ResolveErrors::no_recovery:
✔
2615
                return "A non-recoverable error occurred";
×
2616
            case ResolveErrors::service_not_found:
✔
2617
                return "The service is not supported for the given socket type";
×
2618
            case ResolveErrors::socket_type_not_supported:
✔
2619
                return "The socket type is not supported";
×
2620
        }
×
2621
        REALM_ASSERT(false);
×
2622
        return {};
×
2623
    }
×
2624
};
2625

2626
const std::error_category& resolve_error_category() noexcept
2627
{
8✔
2628
    static const ResolveErrorCategory resolve_error_category;
8✔
2629
    return resolve_error_category;
8✔
2630
}
8✔
2631

2632
std::error_code make_error_code(ResolveErrors err)
2633
{
8✔
2634
    return std::error_code(int(err), resolve_error_category());
8✔
2635
}
8✔
2636

2637
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc