• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1772

20 Oct 2023 07:34PM UTC coverage: 91.61% (+0.04%) from 91.567%
1772

push

Evergreen

web-flow
Added sync socket result enum for sync socket callback handlers in C API (#7015)

* Added sync socket result enum for sync socket callback handlers in C API
* Updated changelog
* CAPI: timer callbacks are now released by canceled/complete function
* Updated c_api tests to use all sync socket c_api functions
* Additional updates to sync socket c api test
* Added CAPI write callback manager to manage async write callbacks
* Pass error codes up to default socket provider for async_write_binary() callbacks
* Removed async write callback manager from CAPI
* Updated changelog after release
* clang format and updates from review
* Update async_write_binary() error handling to not throw exception
* Updated a few comments
* Another comment update
* Updates from review

94360 of 173622 branches covered (0.0%)

111 of 150 new or added lines in 8 files covered. (74.0%)

66 existing lines in 17 files now uncovered.

230703 of 251832 relevant lines covered (91.61%)

6730695.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.66
/src/realm/sync/network/network.cpp
1

2
#define _WINSOCK_DEPRECATED_NO_WARNINGS
3

4
#include <algorithm>
5
#include <cerrno>
6
#include <condition_variable>
7
#include <limits>
8
#include <mutex>
9
#include <stdexcept>
10
#include <thread>
11
#include <vector>
12

13
#include <fcntl.h>
14

15
#ifndef _WIN32
16
#include <netinet/tcp.h>
17
#include <unistd.h>
18
#include <poll.h>
19
#include <realm/util/to_string.hpp>
20
#endif
21

22
#include <realm/util/features.h>
23
#include <realm/util/optional.hpp>
24
#include <realm/util/misc_errors.hpp>
25
#include <realm/util/priority_queue.hpp>
26
#include <realm/sync/network/network.hpp>
27

28
#if defined _GNU_SOURCE && !REALM_ANDROID
29
#define HAVE_LINUX_PIPE2 1
30
#else
31
#define HAVE_LINUX_PIPE2 0
32
#endif
33

34
// Note: Linux specific accept4() is not available on Android.
35
#if defined _GNU_SOURCE && defined SOCK_NONBLOCK && defined SOCK_CLOEXEC && !REALM_ANDROID
36
#define HAVE_LINUX_ACCEPT4 1
37
#else
38
#define HAVE_LINUX_ACCEPT4 0
39
#endif
40

41
#if defined _GNU_SOURCE && defined SOCK_CLOEXEC
42
#define HAVE_LINUX_SOCK_CLOEXEC 1
43
#else
44
#define HAVE_LINUX_SOCK_CLOEXEC 0
45
#endif
46

47
#ifndef _WIN32
48

49
#if REALM_NETWORK_USE_EPOLL
50
#include <linux/version.h>
51
#include <sys/epoll.h>
52
#elif REALM_HAVE_KQUEUE
53
#include <sys/types.h>
54
#include <sys/event.h>
55
#include <sys/time.h>
56
#else
57
#include <poll.h>
58
#endif
59

60
#endif
61

62
// On Linux kernels earlier than 2.6.37, epoll can't handle timeout values
63
// bigger than (LONG_MAX - 999ULL)/HZ.  HZ in the wild can be as big as 1000,
64
// and LONG_MAX can be as small as (2**31)-1, so the largest number of
65
// milliseconds we can be sure to support on those early kernels is 2147482.
66
#if REALM_NETWORK_USE_EPOLL
67
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 37)
68
#define EPOLL_LARGE_TIMEOUT_BUG 1
69
#endif
70
#endif
71

72
using namespace realm::util;
73
using namespace realm::sync::network;
74

75

76
namespace {
77

78
using native_handle_type = SocketBase::native_handle_type;
79

80
#ifdef _WIN32
81

82
// This Winsock initialization call is required prior to any other Winsock API call
83
// made by the process. It is OK if a process calls it multiple times.
84
struct ProcessInitialization {
85
    ProcessInitialization()
86
    {
87
        WSADATA wsaData;
88
        int i = WSAStartup(MAKEWORD(2, 2), &wsaData);
89
        if (i != 0) {
90
            throw std::system_error(i, std::system_category(), "WSAStartup() Winsock initialization failed");
91
        }
92
    }
93

94
    ~ProcessInitialization()
95
    {
96
        // Must be called 1 time for each call to WSAStartup() that has taken place
97
        WSACleanup();
98
    }
99
};
100

101
ProcessInitialization g_process_initialization;
102

103
std::error_code make_winsock_error_code(int error_code)
104
{
105
    switch (error_code) {
106
        case WSAEAFNOSUPPORT:
107
            return make_basic_system_error_code(EAFNOSUPPORT);
108
        case WSAEINVAL:
109
            return make_basic_system_error_code(EINVAL);
110
        case WSAECANCELLED:
111
            return make_basic_system_error_code(ECANCELED);
112
        case WSAECONNABORTED:
113
            return make_basic_system_error_code(ECONNABORTED);
114
        case WSAECONNRESET:
115
            return make_basic_system_error_code(ECONNRESET);
116
        case WSAEWOULDBLOCK:
117
            return make_basic_system_error_code(EAGAIN);
118
    }
119

120
    // Microsoft's STL can map win32 (and winsock!) error codes to known (posix-compatible) errc ones.
121
    auto ec = std::system_category().default_error_condition(error_code);
122
    if (ec.category() == std::generic_category())
123
        return make_basic_system_error_code(ec.value());
124
    return std::error_code(ec.value(), ec.category());
125
}
126

127
#endif // defined _WIN32
128

129
inline bool check_socket_error(int ret, std::error_code& ec)
130
{
49,466✔
131
#ifdef _WIN32
132
    if (REALM_UNLIKELY(ret == SOCKET_ERROR)) {
133
        ec = make_winsock_error_code(WSAGetLastError());
134
        return true;
135
    }
136
#else
137
    if (REALM_UNLIKELY(ret == -1)) {
49,466✔
138
        ec = make_basic_system_error_code(errno);
4✔
139
        return true;
4✔
140
    }
4✔
141
#endif
49,462✔
142
    return false;
49,462✔
143
}
49,462✔
144

145
// Set file status flag O_NONBLOCK if `value` is true, otherwise clear it.
146
//
147
// Note that these flags are set at the file description level, and are therfore
148
// shared between duplicated descriptors (dup()).
149
//
150
// `ec` untouched on success.
151
std::error_code set_nonblock_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
152
{
11,900✔
153
#ifdef _WIN32
154
    u_long flags = value ? 1 : 0;
155
    int r = ioctlsocket(fd, FIONBIO, &flags);
156
    if (r == SOCKET_ERROR) {
157
        ec = make_winsock_error_code(WSAGetLastError());
158
        return ec;
159
    }
160
#else
161
    int flags = ::fcntl(fd, F_GETFL, 0);
11,900✔
162
    if (REALM_UNLIKELY(flags == -1)) {
11,900✔
163
        ec = make_basic_system_error_code(errno);
×
164
        return ec;
×
165
    }
×
166
    flags &= ~O_NONBLOCK;
11,900✔
167
    flags |= (value ? O_NONBLOCK : 0);
6,000✔
168
    int ret = ::fcntl(fd, F_SETFL, flags);
11,900✔
169
    if (REALM_UNLIKELY(ret == -1)) {
11,900✔
170
        ec = make_basic_system_error_code(errno);
×
171
        return ec;
×
172
    }
×
173
#endif
11,900✔
174

5,930✔
175
    return std::error_code(); // Success
11,900✔
176
}
11,900✔
177

178
// Set file status flag O_NONBLOCK. See set_nonblock_flag(int, bool,
179
// std::error_code&) for details. Throws std::system_error on failure.
180
void set_nonblock_flag(native_handle_type fd, bool value = true)
181
{
11,898✔
182
    std::error_code ec;
11,898✔
183
    if (set_nonblock_flag(fd, value, ec))
11,898✔
184
        throw std::system_error(ec);
×
185
}
11,898✔
186

187
// Set file descriptor flag FD_CLOEXEC if `value` is true, otherwise clear it.
188
//
189
// Note that this method of setting FD_CLOEXEC is subject to a race condition if
190
// another thread calls any of the exec functions concurrently. For that reason,
191
// this function should only be used when there is no better alternative. For
192
// example, Linux generally offers ways to set this flag atomically with the
193
// creation of a new file descriptor.
194
//
195
// `ec` untouched on success.
196
std::error_code set_cloexec_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
197
{
24,532✔
198
#ifndef _WIN32
24,532✔
199
    int flags = ::fcntl(fd, F_GETFD, 0);
24,532✔
200
    if (REALM_UNLIKELY(flags == -1)) {
24,532✔
201
        ec = make_basic_system_error_code(errno);
×
202
        return ec;
×
203
    }
×
204
    flags &= ~FD_CLOEXEC;
24,532✔
205
    flags |= (value ? FD_CLOEXEC : 0);
✔
206
    int ret = ::fcntl(fd, F_SETFD, flags);
24,532✔
207
    if (REALM_UNLIKELY(ret == -1)) {
24,532✔
208
        ec = make_basic_system_error_code(errno);
×
209
        return ec;
×
210
    }
×
211
#endif
24,532✔
212
    return std::error_code(); // Success
24,532✔
213
}
24,532✔
214

215
// Set file descriptor flag FD_CLOEXEC. See set_cloexec_flag(int, bool,
216
// std::error_code&) for details. Throws std::system_error on failure.
217
REALM_UNUSED inline void set_cloexec_flag(native_handle_type fd, bool value = true)
218
{
17,536✔
219
    std::error_code ec;
17,536✔
220
    if (set_cloexec_flag(fd, value, ec))
17,536✔
221
        throw std::system_error(ec);
×
222
}
17,536✔
223

224

225
inline void checked_close(native_handle_type fd) noexcept
226
{
57,390✔
227
#ifdef _WIN32
228
    int status = closesocket(fd);
229
    if (status == -1) {
230
        BOOL b = CloseHandle((HANDLE)fd);
231
        REALM_ASSERT(b || GetLastError() != ERROR_INVALID_HANDLE);
232
    }
233
#else
234
    int ret = ::close(fd);
57,390✔
235
    // We can accept various errors from close(), but they must be ignored as
24,088✔
236
    // the file descriptor is closed in any case (not necessarily according to
24,088✔
237
    // POSIX, but we shall assume it anyway). `EBADF`, however, would indicate
24,088✔
238
    // an implementation bug, so we don't want to ignore that.
24,088✔
239
    REALM_ASSERT(ret != -1 || errno != EBADF);
57,390!
240
#endif
57,390✔
241
}
57,390✔
242

243

244
class CloseGuard {
245
public:
246
    CloseGuard() noexcept {}
37,716✔
247
    explicit CloseGuard(native_handle_type fd) noexcept
248
        : m_fd{fd}
249
    {
20,586✔
250
        REALM_ASSERT(fd != -1);
20,586✔
251
    }
20,586✔
252
    CloseGuard(CloseGuard&& cg) noexcept
253
        : m_fd{cg.release()}
254
    {
×
255
    }
×
256
    ~CloseGuard() noexcept
257
    {
58,310✔
258
        if (m_fd != -1)
58,310✔
259
            checked_close(m_fd);
43,356✔
260
    }
58,310✔
261
    void reset(native_handle_type fd) noexcept
262
    {
36,800✔
263
        REALM_ASSERT(fd != -1);
36,800✔
264
        if (m_fd != -1)
36,800✔
265
            checked_close(m_fd);
×
266
        m_fd = fd;
36,800✔
267
    }
36,800✔
268
    operator native_handle_type() const noexcept
269
    {
1,757,706✔
270
        return m_fd;
1,757,706✔
271
    }
1,757,706✔
272
    native_handle_type release() noexcept
273
    {
14,030✔
274
        native_handle_type fd = m_fd;
14,030✔
275
        m_fd = -1;
14,030✔
276
        return fd;
14,030✔
277
    }
14,030✔
278

279
private:
280
    native_handle_type m_fd = -1;
281
};
282

283

284
#ifndef _WIN32
285

286
class WakeupPipe {
287
public:
288
    WakeupPipe()
289
    {
17,290✔
290
        int fildes[2];
17,290✔
291
#if HAVE_LINUX_PIPE2
8,522✔
292
        int flags = O_CLOEXEC;
8,522✔
293
        int ret = ::pipe2(fildes, flags);
8,522✔
294
#else
295
        int ret = ::pipe(fildes);
8,768✔
296
#endif
8,768✔
297
        if (REALM_UNLIKELY(ret == -1)) {
17,290✔
298
            std::error_code ec = make_basic_system_error_code(errno);
×
299
            throw std::system_error(ec);
×
300
        }
×
301
        m_read_fd.reset(fildes[0]);
17,290✔
302
        m_write_fd.reset(fildes[1]);
17,290✔
303
#if !HAVE_LINUX_PIPE2
8,768✔
304
        set_cloexec_flag(m_read_fd);  // Throws
8,768✔
305
        set_cloexec_flag(m_write_fd); // Throws
8,768✔
306
#endif
8,768✔
307
    }
17,290✔
308

309
    // Thread-safe.
310
    int wait_fd() const noexcept
311
    {
147,382✔
312
        return m_read_fd;
147,382✔
313
    }
147,382✔
314

315
    // Cause the wait descriptor (wait_fd()) to become readable within a short
316
    // amount of time.
317
    //
318
    // Thread-safe.
319
    void signal() noexcept
320
    {
1,183,610✔
321
        std::lock_guard lock{m_mutex};
1,183,610✔
322
        if (!m_signaled) {
1,183,610✔
323
            char c = 0;
244,130✔
324
            ssize_t ret = ::write(m_write_fd, &c, 1);
244,130✔
325
            REALM_ASSERT_RELEASE(ret == 1);
244,130✔
326
            m_signaled = true;
244,130✔
327
        }
244,130✔
328
    }
1,183,610✔
329

330
    // Must be called after the wait descriptor (wait_fd()) becomes readable.
331
    //
332
    // Thread-safe.
333
    void acknowledge_signal() noexcept
334
    {
243,532✔
335
        std::lock_guard lock{m_mutex};
243,532✔
336
        if (m_signaled) {
243,566✔
337
            char c;
243,566✔
338
            ssize_t ret = ::read(m_read_fd, &c, 1);
243,566✔
339
            REALM_ASSERT_RELEASE(ret == 1);
243,566✔
340
            m_signaled = false;
243,566✔
341
        }
243,566✔
342
    }
243,532✔
343

344
private:
345
    CloseGuard m_read_fd, m_write_fd;
346
    std::mutex m_mutex;
347
    bool m_signaled = false; // Protected by `m_mutex`.
348
};
349

350
#else // defined _WIN32
351

352
class WakeupPipe {
353
public:
354
    SOCKET wait_fd() const noexcept
355
    {
356
        return INVALID_SOCKET;
357
    }
358

359
    void signal() noexcept
360
    {
361
        m_signal_count++;
362
    }
363

364
    bool is_signaled() const noexcept
365
    {
366
        return m_signal_count > 0;
367
    }
368

369
    void acknowledge_signal() noexcept
370
    {
371
        m_signal_count--;
372
    }
373

374
private:
375
    std::atomic<uint32_t> m_signal_count = 0;
376
};
377

378
#endif // defined _WIN32
379

380

381
std::error_code translate_addrinfo_error(int err) noexcept
382
{
6✔
383
    switch (err) {
6✔
384
        case EAI_AGAIN:
2✔
385
            return ResolveErrors::host_not_found_try_again;
2✔
386
        case EAI_BADFLAGS:
✔
387
            return error::invalid_argument;
×
388
        case EAI_FAIL:
✔
389
            return ResolveErrors::no_recovery;
×
390
        case EAI_FAMILY:
✔
391
            return error::address_family_not_supported;
×
392
        case EAI_MEMORY:
✔
393
            return error::no_memory;
×
394
        case EAI_NONAME:
4✔
395
#if defined(EAI_ADDRFAMILY)
4✔
396
        case EAI_ADDRFAMILY:
4✔
397
#endif
4✔
398
#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME)
4✔
399
        case EAI_NODATA:
4✔
400
#endif
4✔
401
            return ResolveErrors::host_not_found;
4✔
UNCOV
402
        case EAI_SERVICE:
✔
403
            return ResolveErrors::service_not_found;
×
UNCOV
404
        case EAI_SOCKTYPE:
✔
405
            return ResolveErrors::socket_type_not_supported;
×
UNCOV
406
        default:
✔
407
            return error::unknown;
×
408
    }
6✔
409
}
6✔
410

411

412
struct GetaddrinfoResultOwner {
413
    struct addrinfo* ptr;
414
    GetaddrinfoResultOwner(struct addrinfo* p)
415
        : ptr{p}
416
    {
11,302✔
417
    }
11,302✔
418
    ~GetaddrinfoResultOwner() noexcept
419
    {
11,302✔
420
        if (ptr)
11,302✔
421
            freeaddrinfo(ptr);
11,302✔
422
    }
11,302✔
423
};
424

425
} // unnamed namespace
426

427

428
class Service::IoReactor {
429
public:
430
    IoReactor();
431
    ~IoReactor() noexcept;
432

433
    // Add an initiated I/O operation that did not complete immediately.
434
    void add_oper(Descriptor&, LendersIoOperPtr, Want);
435
    void remove_canceled_ops(Descriptor&, OperQueue<AsyncOper>& completed_ops) noexcept;
436

437
    bool wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
438
                          OperQueue<AsyncOper>& completed_ops);
439

440
    // The reactor is considered empty when no operations are currently managed
441
    // by it. An operation is managed by a reactor if it was added through
442
    // add_oper() and not yet passed out through `completed_ops` of
443
    // wait_and_advance().
444
    bool empty() const noexcept;
445

446
    // Cause wait_and_advance() to return within a short amount of time.
447
    //
448
    // Thread-safe.
449
    void interrupt() noexcept;
450

451
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
452
    void register_desc(Descriptor&);
453
    void deregister_desc(Descriptor&) noexcept;
454
#endif
455

456
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
457
    clock::duration get_and_reset_sleep_time() noexcept;
458
#endif
459

460
private:
461
#if REALM_NETWORK_USE_EPOLL
462

463
    static constexpr int s_epoll_event_buffer_size = 256;
464
    const std::unique_ptr<epoll_event[]> m_epoll_event_buffer;
465
    const CloseGuard m_epoll_fd;
466

467
    static std::unique_ptr<epoll_event[]> make_epoll_event_buffer();
468
    static CloseGuard make_epoll_fd();
469

470
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
471

472
    static constexpr int s_kevent_buffer_size = 256;
473
    const std::unique_ptr<struct kevent[]> m_kevent_buffer;
474
    const CloseGuard m_kqueue_fd;
475

476
    static std::unique_ptr<struct kevent[]> make_kevent_buffer();
477
    static CloseGuard make_kqueue_fd();
478

479
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
480

481
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
482

483
    OperQueue<IoOper> m_active_ops;
484

485
    // If there are already active operations, just activate as many additional
486
    // operations as can be done without blocking. Otherwise, block until at
487
    // least one operation can be activated or the timeout is reached. Then, if
488
    // the timeout was not reached, activate as many additional operations as
489
    // can be done without any further blocking.
490
    //
491
    // May occasionally return with no active operations and before the timeout
492
    // has been reached, but this can be assumed to happen rarely enough that it
493
    // will never amount to a performance problem.
494
    //
495
    // Argument `now` is unused if `timeout.time_since_epoch() <= 0`.
496
    //
497
    // Returns true if, and only if a wakeup pipe signal was
498
    // received. Operations may already have been activated in this case.
499
    bool wait_and_activate(clock::time_point timeout, clock::time_point now);
500

501
    void advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept;
502

503
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
504

505
    struct OperSlot {
506
        std::size_t pollfd_slot_ndx = 0; // Zero when slot is unused
507
        OperQueue<IoOper> read_ops, write_ops;
508
    };
509

510
    std::vector<OperSlot> m_operations; // Indexed by file descriptor
511

512
    // First entry in `m_pollfd_slots` is always the read end of the wakeup
513
    // pipe. There is then an additional entry for each entry in `m_operations`
514
    // where `pollfd_slot_ndx` is nonzero. All entries always have `pollfd::fd`
515
    // >= 0.
516
    //
517
    // INVARIANT: m_pollfd_slots.size() == 1 + N, where N is the number of
518
    // entries in m_operations where pollfd_slot_ndx is nonzero.
519
    std::vector<pollfd> m_pollfd_slots;
520

521
    void discard_pollfd_slot_by_move_last_over(OperSlot&) noexcept;
522

523
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
524

525
    std::size_t m_num_operations = 0;
526
    WakeupPipe m_wakeup_pipe;
527

528
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
529
    clock::duration m_sleep_time = clock::duration::zero();
530
#endif
531
};
532

533

534
inline bool Service::IoReactor::empty() const noexcept
535
{
1,623,848✔
536
    return (m_num_operations == 0);
1,623,848✔
537
}
1,623,848✔
538

539

540
inline void Service::IoReactor::interrupt() noexcept
541
{
1,183,736✔
542
    m_wakeup_pipe.signal();
1,183,736✔
543
}
1,183,736✔
544

545

546
#if REALM_NETWORK_USE_EPOLL
547

548
inline Service::IoReactor::IoReactor()
549
    : m_epoll_event_buffer{make_epoll_event_buffer()} // Throws
550
    , m_epoll_fd{make_epoll_fd()}                     // Throws
551
    , m_wakeup_pipe{}                                 // Throws
552
{
553
    epoll_event event = epoll_event(); // Clear
554
    event.events = EPOLLIN;
555
    event.data.ptr = nullptr;
556
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_wakeup_pipe.wait_fd(), &event);
557
    if (REALM_UNLIKELY(ret == -1)) {
558
        std::error_code ec = make_basic_system_error_code(errno);
559
        throw std::system_error(ec);
560
    }
561
}
562

563

564
inline Service::IoReactor::~IoReactor() noexcept {}
565

566

567
inline void Service::IoReactor::register_desc(Descriptor& desc)
568
{
569
    epoll_event event = epoll_event();                        // Clear
570
    event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Enable edge triggering
571
    event.data.ptr = &desc;
572
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, desc.m_fd, &event);
573
    if (REALM_UNLIKELY(ret == -1)) {
574
        std::error_code ec = make_basic_system_error_code(errno);
575
        throw std::system_error(ec);
576
    }
577
}
578

579

580
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
581
{
582
    epoll_event event = epoll_event(); // Clear
583
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, desc.m_fd, &event);
584
    REALM_ASSERT(ret != -1);
585
}
586

587

588
inline std::unique_ptr<epoll_event[]> Service::IoReactor::make_epoll_event_buffer()
589
{
590
    return std::make_unique<epoll_event[]>(s_epoll_event_buffer_size); // Throws
591
}
592

593

594
inline CloseGuard Service::IoReactor::make_epoll_fd()
595
{
596
    int flags = 0;
597
    flags |= EPOLL_CLOEXEC;
598
    int ret = epoll_create1(flags);
599
    if (REALM_UNLIKELY(ret == -1)) {
600
        std::error_code ec = make_basic_system_error_code(errno);
601
        throw std::system_error(ec);
602
    }
603
    int epoll_fd = ret;
604
    return CloseGuard{epoll_fd};
605
}
606

607

608
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
609
{
610
    int max_wait_millis = 0;
611
    bool allow_blocking_wait = m_active_ops.empty();
612
    if (allow_blocking_wait) {
613
        if (timeout.time_since_epoch().count() <= 0) {
614
            max_wait_millis = -1; // Allow indefinite blocking
615
        }
616
        else if (now < timeout) {
617
            auto diff = timeout - now;
618
            int max_int_millis = std::numeric_limits<int>::max();
619
            // 17592186044415 is the largest value (45-bit signed integer)
620
            // garanteed to be supported by std::chrono::milliseconds. In the
621
            // worst case, `int` is a 16-bit integer, meaning that we can only
622
            // wait about 30 seconds at a time. In the best case
623
            // (17592186044415) we can wait more than 500 years at a time. In
624
            // the typical case (`int` has 32 bits), we can wait 24 days at a
625
            // time.
626
            long long max_chrono_millis = 17592186044415;
627
            if (max_chrono_millis < max_int_millis)
628
                max_int_millis = int(max_chrono_millis);
629
#if EPOLL_LARGE_TIMEOUT_BUG
630
            long max_safe_millis = 2147482; // Circa 35 minutes
631
            if (max_safe_millis < max_int_millis)
632
                max_int_millis = int(max_safe_millis);
633
#endif
634
            if (diff > std::chrono::milliseconds(max_int_millis)) {
635
                max_wait_millis = max_int_millis;
636
            }
637
            else {
638
                // Overflow is impossible here, due to the preceding check
639
                auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
640
                // The conversion to milliseconds will round down if the tick
641
                // period of `diff` is less than a millisecond, which it usually
642
                // is. This is a problem, because it can lead to premature
643
                // wakeups, which in turn could cause extranous iterations in
644
                // the event loop. This is especially problematic when a small
645
                // `diff` is rounded down to zero milliseconds, becuase that can
646
                // easily produce a "busy wait" condition for up to a
647
                // millisecond every time this happens. Obviously, the solution
648
                // is to round up, instead of down.
649
                if (diff_millis < diff) {
650
                    // Note that the following increment cannot overflow,
651
                    // because diff_millis < diff <= max_int_millis <=
652
                    // std::numeric_limits<int>::max().
653
                    ++diff_millis;
654
                }
655
                max_wait_millis = int(diff_millis.count());
656
            }
657
        }
658
    }
659
    for (int i = 0; i < 2; ++i) {
660
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
661
        clock::time_point sleep_start_time = clock::now();
662
#endif
663
        int ret = epoll_wait(m_epoll_fd, m_epoll_event_buffer.get(), s_epoll_event_buffer_size, max_wait_millis);
664
        if (REALM_UNLIKELY(ret == -1)) {
665
            int err = errno;
666
            if (err == EINTR)
667
                return false; // Infrequent premature return is ok
668
            std::error_code ec = make_basic_system_error_code(err);
669
            throw std::system_error(ec);
670
        }
671
        REALM_ASSERT(ret >= 0);
672
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
673
        m_sleep_time += clock::now() - sleep_start_time;
674
#endif
675
        int n = ret;
676
        bool got_wakeup_pipe_signal = false;
677
        for (int j = 0; j < n; ++j) {
678
            const epoll_event& event = m_epoll_event_buffer[j];
679
            bool is_wakeup_pipe_signal = !event.data.ptr;
680
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
681
                m_wakeup_pipe.acknowledge_signal();
682
                got_wakeup_pipe_signal = true;
683
                continue;
684
            }
685
            Descriptor& desc = *static_cast<Descriptor*>(event.data.ptr);
686
            if ((event.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) {
687
                if (!desc.m_read_ready) {
688
                    desc.m_read_ready = true;
689
                    m_active_ops.push_back(desc.m_suspended_read_ops);
690
                }
691
            }
692
            if ((event.events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) {
693
                if (!desc.m_write_ready) {
694
                    desc.m_write_ready = true;
695
                    m_active_ops.push_back(desc.m_suspended_write_ops);
696
                }
697
            }
698
            if ((event.events & EPOLLRDHUP) != 0)
699
                desc.m_imminent_end_of_input = true;
700
        }
701
        if (got_wakeup_pipe_signal)
702
            return true;
703
        if (n < s_epoll_event_buffer_size)
704
            break;
705
        max_wait_millis = 0;
706
    }
707
    return false;
708
}
709

710

711
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
712

713

714
inline Service::IoReactor::IoReactor()
715
    : m_kevent_buffer{make_kevent_buffer()} // Throws
716
    , m_kqueue_fd{make_kqueue_fd()}         // Throws
717
    , m_wakeup_pipe{}                       // Throws
718
{
8,768✔
719
    struct kevent event;
8,768✔
720
    EV_SET(&event, m_wakeup_pipe.wait_fd(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
8,768✔
721
    int ret = ::kevent(m_kqueue_fd, &event, 1, nullptr, 0, nullptr);
8,768✔
722
    if (REALM_UNLIKELY(ret == -1)) {
8,768✔
723
        std::error_code ec = make_basic_system_error_code(errno);
724
        throw std::system_error(ec);
725
    }
726
}
8,768✔
727

728

729
inline Service::IoReactor::~IoReactor() noexcept {}
8,768✔
730

731

732
inline void Service::IoReactor::register_desc(Descriptor& desc)
733
{
6,906✔
734
    struct kevent events[2];
6,906✔
735
    // EV_CLEAR enables edge-triggered behavior
736
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &desc);
6,906✔
737
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, &desc);
6,906✔
738
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
6,906✔
739
    if (REALM_UNLIKELY(ret == -1)) {
6,906✔
740
        std::error_code ec = make_basic_system_error_code(errno);
741
        throw std::system_error(ec);
742
    }
743
}
6,906✔
744

745

746
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
747
{
6,906✔
748
    struct kevent events[2];
6,906✔
749
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
6,906✔
750
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
6,906✔
751
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
6,906✔
752
    REALM_ASSERT(ret != -1);
6,906✔
753
}
6,906✔
754

755

756
inline std::unique_ptr<struct kevent[]> Service::IoReactor::make_kevent_buffer()
757
{
8,768✔
758
    return std::make_unique<struct kevent[]>(s_kevent_buffer_size); // Throws
8,768✔
759
}
8,768✔
760

761

762
inline CloseGuard Service::IoReactor::make_kqueue_fd()
763
{
8,768✔
764
    int ret = ::kqueue();
8,768✔
765
    if (REALM_UNLIKELY(ret == -1)) {
8,768✔
766
        std::error_code ec = make_basic_system_error_code(errno);
767
        throw std::system_error(ec);
768
    }
769
    int epoll_fd = ret;
8,768✔
770
    return CloseGuard{epoll_fd};
8,768✔
771
}
8,768✔
772

773

774
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
775
{
1,069,388✔
776
    timespec max_wait_time{}; // Clear to zero
1,069,388✔
777
    bool allow_blocking_wait = m_active_ops.empty();
1,069,388✔
778
    if (allow_blocking_wait) {
1,069,388✔
779
        // Note that ::kevent() will silently clamp `max_wait_time` to 24 hours
780
        // (86400 seconds), but that is ok, because the caller is prepared for
781
        // premature return as long as it happens infrequently enough to not
782
        // pose a performance problem.
783
        constexpr std::time_t max_wait_seconds = 86400;
140,486✔
784
        if (timeout.time_since_epoch().count() <= 0) {
140,486✔
785
            max_wait_time.tv_sec = max_wait_seconds;
16,230✔
786
        }
16,230✔
787
        else if (now < timeout) {
124,256✔
788
            auto diff = timeout - now;
124,236✔
789
            auto secs = std::chrono::duration_cast<std::chrono::seconds>(diff);
124,236✔
790
            auto nsecs = std::chrono::duration_cast<std::chrono::nanoseconds>(diff - secs);
124,236✔
791
            auto secs_2 = std::min(secs.count(), std::chrono::seconds::rep(max_wait_seconds));
124,236✔
792
            max_wait_time.tv_sec = std::time_t(secs_2);
124,236✔
793
            max_wait_time.tv_nsec = long(nsecs.count());
124,236✔
794
        }
124,236✔
795
    }
140,486✔
796
    for (int i = 0; i < 4; ++i) {
2,147,483,647✔
797
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
798
        clock::time_point sleep_start_time = clock::now();
799
#endif
800
        int ret = ::kevent(m_kqueue_fd, nullptr, 0, m_kevent_buffer.get(), s_kevent_buffer_size, &max_wait_time);
1,069,454✔
801
        if (REALM_UNLIKELY(ret == -1)) {
1,069,454✔
802
            int err = errno;
803
            if (err == EINTR)
×
804
                return false; // Infrequent premature return is ok
805
            std::error_code ec = make_basic_system_error_code(err);
806
            throw std::system_error(ec);
807
        }
808
        REALM_ASSERT(ret >= 0);
1,069,454✔
809
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
810
        m_sleep_time += clock::now() - sleep_start_time;
811
#endif
812
        int n = ret;
1,069,454✔
813
        bool got_wakeup_pipe_signal = false;
1,069,454✔
814
        for (int j = 0; j < n; ++j) {
1,600,192✔
815
            const struct kevent& event = m_kevent_buffer[j];
530,738✔
816
            bool is_wakeup_pipe_signal = !event.udata;
530,738✔
817
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
530,738✔
818
                REALM_ASSERT(m_wakeup_pipe.wait_fd() == int(event.ident));
130,118✔
819
                m_wakeup_pipe.acknowledge_signal();
130,118✔
820
                got_wakeup_pipe_signal = true;
130,118✔
821
                continue;
130,118✔
822
            }
130,118✔
823
            Descriptor& desc = *static_cast<Descriptor*>(event.udata);
400,620✔
824
            REALM_ASSERT(desc.m_fd == int(event.ident));
400,620✔
825
            if (event.filter == EVFILT_READ) {
400,620✔
826
                if (!desc.m_read_ready) {
207,802✔
827
                    desc.m_read_ready = true;
54,052✔
828
                    m_active_ops.push_back(desc.m_suspended_read_ops);
54,052✔
829
                }
54,052✔
830
                if ((event.flags & EV_EOF) != 0)
207,802✔
831
                    desc.m_imminent_end_of_input = true;
662✔
832
            }
207,802✔
833
            if (event.filter == EVFILT_WRITE) {
400,620✔
834
                if (!desc.m_write_ready) {
192,852✔
835
                    desc.m_write_ready = true;
5,024✔
836
                    m_active_ops.push_back(desc.m_suspended_write_ops);
5,024✔
837
                }
5,024✔
838
            }
192,852✔
839
        }
400,620✔
840
        if (got_wakeup_pipe_signal)
1,069,454✔
841
            return true;
130,136✔
842
        if (n < s_kevent_buffer_size)
939,318✔
843
            break;
939,396✔
844
        // Clear to zero to disable blocking for any additional opportunistic
845
        // event extractions.
846
        max_wait_time = timespec{};
2,147,483,647✔
847
    }
2,147,483,647✔
848
    return false;
939,252✔
849
}
1,069,388✔
850

851
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
852

853

854
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
855

856
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
857
{
1,113,076✔
858
    if (REALM_UNLIKELY(!desc.m_is_registered)) {
1,113,076✔
859
        register_desc(desc); // Throws
6,906✔
860
        desc.m_is_registered = true;
6,906✔
861
    }
6,906✔
862

863
    switch (want) {
1,113,076✔
864
        case Want::read:
619,312✔
865
            if (REALM_UNLIKELY(desc.m_read_ready))
619,312✔
866
                goto active;
608,132✔
867
            desc.m_suspended_read_ops.push_back(std::move(op));
11,180✔
868
            goto proceed;
11,180✔
869
        case Want::write:
494,182✔
870
            if (REALM_UNLIKELY(desc.m_write_ready))
494,182✔
871
                goto active;
488,858✔
872
            desc.m_suspended_write_ops.push_back(std::move(op));
5,324✔
873
            goto proceed;
5,324✔
874
        case Want::nothing:
2✔
875
            break;
876
    }
877
    REALM_ASSERT(false);
878

879
active:
1,096,696✔
880
    m_active_ops.push_back(std::move(op));
1,096,696✔
881

882
proceed:
1,112,686✔
883
    ++m_num_operations;
1,112,686✔
884
}
1,112,686✔
885

886

887
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
888
{
255,152✔
889
    // Note: Canceled operations that are currently active (in m_active_ops)
890
    // will be removed later by advance_active_ops().
891

892
    while (LendersIoOperPtr op = desc.m_suspended_read_ops.pop_front()) {
263,946✔
893
        completed_ops.push_back(std::move(op));
8,794✔
894
        --m_num_operations;
8,794✔
895
    }
8,794✔
896
    while (LendersIoOperPtr op = desc.m_suspended_write_ops.pop_front()) {
258,510✔
897
        completed_ops.push_back(std::move(op));
3,358✔
898
        --m_num_operations;
3,358✔
899
    }
3,358✔
900
}
255,152✔
901

902

903
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
904
                                          OperQueue<AsyncOper>& completed_ops)
905
{
871,832✔
906
    clock::time_point now_2 = now;
871,832✔
907
    for (;;) {
1,069,308✔
908
        bool wakeup_pipe_signal = wait_and_activate(timeout, now_2); // Throws
1,069,308✔
909
        if (REALM_UNLIKELY(wakeup_pipe_signal)) {
1,069,308✔
910
            interrupted = true;
130,134✔
911
            return false;
130,134✔
912
        }
130,134✔
913
        advance_active_ops(completed_ops);
939,174✔
914
        if (!completed_ops.empty())
939,174✔
915
            return true;
734,394✔
916
        if (timeout.time_since_epoch().count() > 0) {
204,780✔
917
            now_2 = clock::now();
88,854✔
918
            bool timed_out = (now_2 >= timeout);
88,854✔
919
            if (timed_out)
88,854✔
920
                return false;
7,434✔
921
        }
88,854✔
922
    }
204,780✔
923
}
871,832✔
924

925

926
void Service::IoReactor::advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept
927
{
939,266✔
928
    OperQueue<IoOper> new_active_ops;
939,266✔
929
    while (LendersIoOperPtr op = m_active_ops.pop_front()) {
2,244,250✔
930
        if (op->is_canceled()) {
1,304,286✔
931
            completed_ops.push_back(std::move(op));
434,126✔
932
            --m_num_operations;
434,126✔
933
            continue;
434,126✔
934
        }
434,126✔
935
        Want want = op->advance();
870,160✔
936
        switch (want) {
870,160✔
937
            case Want::nothing:
666,552✔
938
                REALM_ASSERT(op->is_complete());
666,552✔
939
                completed_ops.push_back(std::move(op));
666,552✔
940
                --m_num_operations;
666,552✔
941
                continue;
666,552✔
942
            case Want::read: {
201,036✔
943
                Descriptor& desc = op->descriptor();
201,036✔
944
                if (REALM_UNLIKELY(desc.m_read_ready))
201,036✔
945
                    goto still_active;
149,462✔
946
                desc.m_suspended_read_ops.push_back(std::move(op));
51,574✔
947
                continue;
51,574✔
948
            }
51,574✔
949
            case Want::write: {
3,274✔
950
                Descriptor& desc = op->descriptor();
3,274✔
951
                if (REALM_UNLIKELY(desc.m_write_ready))
3,274✔
952
                    goto still_active;
710✔
953
                desc.m_suspended_write_ops.push_back(std::move(op));
2,564✔
954
                continue;
2,564✔
955
            }
2,564✔
956
        }
957
        REALM_ASSERT(false);
958

959
    still_active:
150,168✔
960
        new_active_ops.push_back(std::move(op));
150,168✔
961
    }
150,168✔
962
    m_active_ops.push_back(new_active_ops);
939,964✔
963
}
939,964✔
964

965

966
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
967

968

969
inline Service::IoReactor::IoReactor()
970
    : m_wakeup_pipe{} // Throws
971
{
8,522✔
972
    pollfd slot = pollfd(); // Cleared slot
8,522✔
973
    slot.fd = m_wakeup_pipe.wait_fd();
8,522✔
974
    slot.events = POLLRDNORM;
8,522✔
975
    m_pollfd_slots.emplace_back(slot); // Throws
8,522✔
976
}
8,522✔
977

978

979
inline Service::IoReactor::~IoReactor() noexcept
980
{
8,526✔
981
#if REALM_ASSERTIONS_ENABLED
8,526✔
982
    std::size_t n = 0;
8,526✔
983
    for (std::size_t i = 0; i < m_operations.size(); ++i) {
190,640✔
984
        OperSlot& oper_slot = m_operations[i];
182,114✔
985
        while (oper_slot.read_ops.pop_front())
182,114✔
986
            ++n;
987
        while (oper_slot.write_ops.pop_front())
182,114✔
988
            ++n;
989
    }
182,114✔
990
    REALM_ASSERT(n == m_num_operations);
8,526✔
991
#endif
8,526✔
992
}
8,526✔
993

994

995
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
996
{
1,127,564✔
997
    native_handle_type fd = desc.m_fd;
1,127,564✔
998

1,127,564✔
999
    // Make sure there are enough slots in m_operations
1,127,564✔
1000
    {
1,127,564✔
1001
        std::size_t n = std::size_t(fd) + 1; // FIXME: Check for arithmetic overflow
1,127,564✔
1002
        if (m_operations.size() < n)
1,127,564✔
1003
            m_operations.resize(n); // Throws
5,650✔
1004
    }
1,127,564✔
1005

1,127,564✔
1006
    // Allocate a pollfd_slot unless we already have one
1,127,564✔
1007
    OperSlot& oper_slot = m_operations[fd];
1,127,564✔
1008
    if (oper_slot.pollfd_slot_ndx == 0) {
1,127,564✔
1009
        pollfd pollfd_slot = pollfd(); // Cleared slot
734,400✔
1010
        pollfd_slot.fd = fd;
734,400✔
1011
        std::size_t pollfd_slot_ndx = m_pollfd_slots.size();
734,400✔
1012
        REALM_ASSERT(pollfd_slot_ndx > 0);
734,400✔
1013
        m_pollfd_slots.emplace_back(pollfd_slot); // Throws
734,400✔
1014
        oper_slot.pollfd_slot_ndx = pollfd_slot_ndx;
734,400✔
1015
    }
734,400✔
1016

1,127,564✔
1017
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
1,127,564✔
1018
    REALM_ASSERT(pollfd_slot.fd == fd);
1,127,564✔
1019
    REALM_ASSERT(((pollfd_slot.events & POLLRDNORM) != 0) == !oper_slot.read_ops.empty());
1,127,564✔
1020
    REALM_ASSERT(((pollfd_slot.events & POLLWRNORM) != 0) == !oper_slot.write_ops.empty());
1,127,564✔
1021
    REALM_ASSERT((pollfd_slot.events & ~(POLLRDNORM | POLLWRNORM)) == 0);
1,127,564✔
1022
    switch (want) {
1,127,564✔
1023
        case Want::nothing:
1024
            break;
1025
        case Want::read:
625,382✔
1026
            pollfd_slot.events |= POLLRDNORM;
625,382✔
1027
            oper_slot.read_ops.push_back(std::move(op));
625,382✔
1028
            goto finish;
625,382✔
1029
        case Want::write:
504,860✔
1030
            pollfd_slot.events |= POLLWRNORM;
504,860✔
1031
            oper_slot.write_ops.push_back(std::move(op));
504,860✔
1032
            goto finish;
504,860✔
1033
    }
1034
    REALM_ASSERT(false);
1035
    return;
1036

1,128,832✔
1037
finish:
1,128,832✔
1038
    ++m_num_operations;
1,128,832✔
1039
}
1,128,832✔
1040

1041

1042
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
1043
{
259,966✔
1044
    native_handle_type fd = desc.m_fd;
259,966✔
1045
    REALM_ASSERT(fd >= 0);
259,966✔
1046
    REALM_ASSERT(std::size_t(fd) < m_operations.size());
259,966✔
1047
    OperSlot& oper_slot = m_operations[fd];
259,966✔
1048
    REALM_ASSERT(oper_slot.pollfd_slot_ndx > 0);
259,966✔
1049
    REALM_ASSERT(!oper_slot.read_ops.empty() || !oper_slot.write_ops.empty());
259,966✔
1050
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
259,966✔
1051
    REALM_ASSERT(pollfd_slot.fd == fd);
259,966✔
1052
    while (LendersIoOperPtr op = oper_slot.read_ops.pop_front()) {
496,834✔
1053
        completed_ops.push_back(std::move(op));
236,868✔
1054
        --m_num_operations;
236,868✔
1055
    }
236,868✔
1056
    while (LendersIoOperPtr op = oper_slot.write_ops.pop_front()) {
484,046✔
1057
        completed_ops.push_back(std::move(op));
224,080✔
1058
        --m_num_operations;
224,080✔
1059
    }
224,080✔
1060
    discard_pollfd_slot_by_move_last_over(oper_slot);
259,966✔
1061
}
259,966✔
1062

1063

1064
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
1065
                                          OperQueue<AsyncOper>& completed_ops)
1066
{
751,660✔
1067
#ifdef _WIN32
1068
    using nfds_type = std::size_t;
1069
#else
1070
    using nfds_type = nfds_t;
751,660✔
1071
#endif
751,660✔
1072
    clock::time_point now_2 = now;
751,660✔
1073
    std::size_t num_ready_descriptors = 0;
751,660✔
1074
    {
751,660✔
1075
        // std::vector guarantees contiguous storage
751,660✔
1076
        pollfd* fds = &m_pollfd_slots.front();
751,660✔
1077
        nfds_type nfds = nfds_type(m_pollfd_slots.size());
751,660✔
1078
        for (;;) {
751,660✔
1079
            int max_wait_millis = -1; // Wait indefinitely
751,512✔
1080
            if (timeout.time_since_epoch().count() > 0) {
751,512✔
1081
                if (now_2 >= timeout)
408,742✔
1082
                    return false; // No operations completed
1083
                auto diff = timeout - now_2;
408,742✔
1084
                int max_int_millis = std::numeric_limits<int>::max();
408,742✔
1085
                // 17592186044415 is the largest value (45-bit signed integer)
408,742✔
1086
                // garanteed to be supported by std::chrono::milliseconds. In
408,742✔
1087
                // the worst case, `int` is a 16-bit integer, meaning that we
408,742✔
1088
                // can only wait about 30 seconds at a time. In the best case
408,742✔
1089
                // (17592186044415) we can wait more than 500 years at a
408,742✔
1090
                // time. In the typical case (`int` has 32 bits), we can wait 24
408,742✔
1091
                // days at a time.
408,742✔
1092
                long long max_chrono_millis = 17592186044415;
408,742✔
1093
                if (max_int_millis > max_chrono_millis)
408,742✔
1094
                    max_int_millis = int(max_chrono_millis);
1095
                if (diff > std::chrono::milliseconds(max_int_millis)) {
408,742✔
1096
                    max_wait_millis = max_int_millis;
3,522✔
1097
                }
3,522✔
1098
                else {
405,220✔
1099
                    // Overflow is impossible here, due to the preceeding check
405,220✔
1100
                    auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
405,220✔
1101
                    // The conversion to milliseconds will round down if the
405,220✔
1102
                    // tick period of `diff` is less than a millisecond, which
405,220✔
1103
                    // it usually is. This is a problem, because it can lead to
405,220✔
1104
                    // premature wakeups, which in turn could cause extranous
405,220✔
1105
                    // iterations in the event loop. This is especially
405,220✔
1106
                    // problematic when a small `diff` is rounded down to zero
405,220✔
1107
                    // milliseconds, becuase that can easily produce a "busy
405,220✔
1108
                    // wait" condition for up to a millisecond every time this
405,220✔
1109
                    // happens. Obviously, the solution is to round up, instead
405,220✔
1110
                    // of down.
405,220✔
1111
                    if (diff_millis < diff) {
405,238✔
1112
                        // Note that the following increment cannot overflow,
405,238✔
1113
                        // because diff_millis < diff <= max_int_millis <=
405,238✔
1114
                        // std::numeric_limits<int>::max().
405,238✔
1115
                        ++diff_millis;
405,238✔
1116
                    }
405,238✔
1117
                    max_wait_millis = int(diff_millis.count());
405,220✔
1118
                }
405,220✔
1119
            }
408,742✔
1120

751,512✔
1121
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1122
            clock::time_point sleep_start_time = clock::now();
1123
#endif
1124

751,512✔
1125
#ifdef _WIN32
1126
            max_wait_millis = 1000;
1127

1128
            // Windows does not have a single API call to wait for pipes and
1129
            // sockets with a timeout. So we repeatedly poll them individually
1130
            // in a loop until max_wait_millis has elapsed or an event happend.
1131
            //
1132
            // FIXME: Maybe switch to Windows IOCP instead.
1133

1134
            // Following variable is the poll time for the sockets in
1135
            // miliseconds. Adjust it to find a balance between CPU usage and
1136
            // response time:
1137
            constexpr INT socket_poll_timeout = 10;
1138

1139
            for (size_t t = 0; t < m_pollfd_slots.size(); t++)
1140
                m_pollfd_slots[t].revents = 0;
1141

1142
            using namespace std::chrono;
1143
            auto started = steady_clock::now();
1144
            int ret = 0;
1145

1146
            for (;;) {
1147
                if (m_pollfd_slots.size() > 1) {
1148
                    // Poll all network sockets
1149
                    ret = WSAPoll(LPWSAPOLLFD(&m_pollfd_slots[1]), ULONG(m_pollfd_slots.size() - 1),
1150
                                  socket_poll_timeout);
1151
                    REALM_ASSERT(ret != SOCKET_ERROR);
1152
                }
1153

1154
                if (m_wakeup_pipe.is_signaled()) {
1155
                    m_pollfd_slots[0].revents = POLLIN;
1156
                    ret++;
1157
                }
1158

1159
                if (ret != 0 ||
1160
                    (duration_cast<milliseconds>(steady_clock::now() - started).count() >= max_wait_millis)) {
1161
                    break;
1162
                }
1163

1164
                // If we don't have any sockets to poll for (m_pollfd_slots is less than 2) and no one signals
1165
                // the wakeup pipe, we'd be stuck busy waiting for either condition to become true.
1166
                std::this_thread::sleep_for(std::chrono::milliseconds(socket_poll_timeout));
1167
            }
1168

1169
#else // !defined _WIN32
1170
            int ret = ::poll(fds, nfds, max_wait_millis);
751,512✔
1171
#endif
751,512✔
1172
            bool interrupted_2 = false;
751,512✔
1173
            if (REALM_UNLIKELY(ret == -1)) {
751,512✔
1174
#ifndef _WIN32
1175
                int err = errno;
1176
                if (REALM_UNLIKELY(err != EINTR)) {
1177
                    std::error_code ec = make_basic_system_error_code(err);
1178
                    throw std::system_error(ec);
1179
                }
1180
#endif
1181
                interrupted_2 = true;
1182
            }
1183

751,512✔
1184
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1185
            m_sleep_time += clock::now() - sleep_start_time;
1186
#endif
1187

751,512✔
1188
            if (REALM_LIKELY(!interrupted_2)) {
751,848✔
1189
                REALM_ASSERT(ret >= 0);
751,848✔
1190
                num_ready_descriptors = ret;
751,848✔
1191
                break;
751,848✔
1192
            }
751,848✔
1193

2,147,483,647✔
1194
            // Retry on interruption by system signal
2,147,483,647✔
1195
            if (timeout.time_since_epoch().count() > 0)
2,147,483,647✔
1196
                now_2 = clock::now();
1197
        }
2,147,483,647✔
1198
    }
751,660✔
1199

751,660✔
1200
    if (num_ready_descriptors == 0)
751,660✔
1201
        return false; // No operations completed
4,006✔
1202

747,654✔
1203
    // Check wake-up descriptor
747,654✔
1204
    if (m_pollfd_slots[0].revents != 0) {
747,654✔
1205
        REALM_ASSERT((m_pollfd_slots[0].revents & POLLNVAL) == 0);
113,446✔
1206
        m_wakeup_pipe.acknowledge_signal();
113,446✔
1207
        interrupted = true;
113,446✔
1208
        return false;
113,446✔
1209
    }
113,446✔
1210

634,208✔
1211
    std::size_t orig_num_operations = m_num_operations;
634,208✔
1212
    std::size_t num_pollfd_slots = m_pollfd_slots.size();
634,208✔
1213
    std::size_t pollfd_slot_ndx = 1;
634,208✔
1214
    while (pollfd_slot_ndx < num_pollfd_slots && num_ready_descriptors > 0) {
1,401,052✔
1215
        pollfd& pollfd_slot = m_pollfd_slots[pollfd_slot_ndx];
766,844✔
1216
        REALM_ASSERT(pollfd_slot.fd >= 0);
766,844✔
1217
        if (REALM_LIKELY(pollfd_slot.revents == 0)) {
766,844✔
1218
            ++pollfd_slot_ndx;
76,270✔
1219
            continue;
76,270✔
1220
        }
76,270✔
1221
        --num_ready_descriptors;
690,574✔
1222

690,574✔
1223
        REALM_ASSERT((pollfd_slot.revents & POLLNVAL) == 0);
690,574✔
1224

690,574✔
1225
        // Treat errors like read and/or write-readiness
690,574✔
1226
        if ((pollfd_slot.revents & (POLLHUP | POLLERR)) != 0) {
690,574✔
1227
            REALM_ASSERT((pollfd_slot.events & (POLLRDNORM | POLLWRNORM)) != 0);
500✔
1228
            if ((pollfd_slot.events & POLLRDNORM) != 0)
500✔
1229
                pollfd_slot.revents |= POLLRDNORM;
500✔
1230
            if ((pollfd_slot.events & POLLWRNORM) != 0)
500✔
1231
                pollfd_slot.revents |= POLLWRNORM;
62✔
1232
        }
500✔
1233

690,574✔
1234
        OperSlot& oper_slot = m_operations[pollfd_slot.fd];
690,574✔
1235
        REALM_ASSERT(oper_slot.pollfd_slot_ndx == pollfd_slot_ndx);
690,574✔
1236

690,574✔
1237
        OperQueue<IoOper> new_read_ops, new_write_ops;
690,574✔
1238
        auto advance_ops = [&](OperQueue<IoOper>& ops) noexcept {
816,090✔
1239
            while (LendersIoOperPtr op = ops.pop_front()) {
1,632,866✔
1240
                Want want = op->advance();
816,170✔
1241
                switch (want) {
816,170✔
1242
                    case Want::nothing:
670,890✔
1243
                        REALM_ASSERT(op->is_complete());
670,890✔
1244
                        completed_ops.push_back(std::move(op));
670,890✔
1245
                        --m_num_operations;
670,890✔
1246
                        continue;
670,890✔
1247
                    case Want::read:
145,868✔
1248
                        new_read_ops.push_back(std::move(op));
145,868✔
1249
                        continue;
145,868✔
1250
                    case Want::write:
18✔
1251
                        new_write_ops.push_back(std::move(op));
18✔
1252
                        continue;
18✔
1253
                }
1254
                REALM_ASSERT(false);
1255
            }
1256
        };
816,090✔
1257

690,574✔
1258
        // Check read-readiness
690,574✔
1259
        if ((pollfd_slot.revents & POLLRDNORM) != 0) {
690,574✔
1260
            REALM_ASSERT(!oper_slot.read_ops.empty());
535,238✔
1261
            advance_ops(oper_slot.read_ops);
535,238✔
1262
            pollfd_slot.events &= ~POLLRDNORM;
535,238✔
1263
        }
535,238✔
1264

690,574✔
1265
        // Check write-readiness
690,574✔
1266
        if ((pollfd_slot.revents & POLLWRNORM) != 0) {
690,574✔
1267
            REALM_ASSERT(!oper_slot.write_ops.empty());
281,386✔
1268
            advance_ops(oper_slot.write_ops);
281,386✔
1269
            pollfd_slot.events &= ~POLLWRNORM;
281,386✔
1270
        }
281,386✔
1271

690,574✔
1272
        if (!new_read_ops.empty()) {
690,574✔
1273
            oper_slot.read_ops.push_back(new_read_ops);
145,866✔
1274
            pollfd_slot.events |= POLLRDNORM;
145,866✔
1275
        }
145,866✔
1276

690,574✔
1277
        if (!new_write_ops.empty()) {
690,574✔
1278
            oper_slot.write_ops.push_back(new_write_ops);
18✔
1279
            pollfd_slot.events |= POLLWRNORM;
18✔
1280
        }
18✔
1281

690,574✔
1282
        if (pollfd_slot.events == 0) {
690,574✔
1283
            discard_pollfd_slot_by_move_last_over(oper_slot);
475,518✔
1284
            --num_pollfd_slots;
475,518✔
1285
        }
475,518✔
1286
        else {
215,056✔
1287
            ++pollfd_slot_ndx;
215,056✔
1288
        }
215,056✔
1289
    }
690,574✔
1290

634,208✔
1291
    REALM_ASSERT(num_ready_descriptors == 0);
634,208✔
1292

634,208✔
1293
    bool any_operations_completed = (m_num_operations < orig_num_operations);
634,208✔
1294
    return any_operations_completed;
634,208✔
1295
}
634,208✔
1296

1297

1298
void Service::IoReactor::discard_pollfd_slot_by_move_last_over(OperSlot& oper_slot) noexcept
1299
{
734,800✔
1300
    std::size_t pollfd_slot_ndx = oper_slot.pollfd_slot_ndx;
734,800✔
1301
    oper_slot.pollfd_slot_ndx = 0; // Mark unused
734,800✔
1302
    if (pollfd_slot_ndx < m_pollfd_slots.size() - 1) {
734,800✔
1303
        pollfd& last_pollfd_slot = m_pollfd_slots.back();
58,818✔
1304
        m_operations[last_pollfd_slot.fd].pollfd_slot_ndx = pollfd_slot_ndx;
58,818✔
1305
        m_pollfd_slots[pollfd_slot_ndx] = last_pollfd_slot;
58,818✔
1306
    }
58,818✔
1307
    m_pollfd_slots.pop_back();
734,800✔
1308
}
734,800✔
1309

1310
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
1311

1312

1313
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1314

1315
auto Service::IoReactor::get_and_reset_sleep_time() noexcept -> clock::duration
1316
{
1317
    clock::duration sleep_time = m_sleep_time;
1318
    m_sleep_time = clock::duration::zero();
1319
    return sleep_time;
1320
}
1321

1322
#endif // REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1323

1324

1325
class Service::Impl {
1326
public:
1327
    Service& service;
1328
    IoReactor io_reactor;
1329

1330
    Impl(Service& s)
1331
        : service{s}
1332
        , io_reactor{} // Throws
1333
    {
17,290✔
1334
    }
17,290✔
1335

1336
    ~Impl()
1337
    {
17,294✔
1338
        bool resolver_thread_started = m_resolver_thread.joinable();
17,294✔
1339
        if (resolver_thread_started) {
17,294✔
1340
            {
1,768✔
1341
                std::lock_guard lock{m_mutex};
1,768✔
1342
                m_stop_resolver_thread = true;
1,768✔
1343
                m_resolver_cond.notify_all();
1,768✔
1344
            }
1,768✔
1345
            m_resolver_thread.join();
1,768✔
1346
        }
1,768✔
1347

8,526✔
1348
        // Avoid calls to recycle_post_oper() after destruction has begun.
8,526✔
1349
        m_completed_operations.clear();
17,294✔
1350
    }
17,294✔
1351

1352
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1353
    {
×
1354
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1355
        m_event_loop_metrics_timer.emplace(service);
1356
        m_event_loop_metrics_timer->async_wait(
1357
            std::chrono::seconds{30}, [this, handler = std::move(handler)](Status status) {
1358
                REALM_ASSERT(status.is_ok());
1359
                clock::time_point now = clock::now();
1360
                clock::duration elapsed_time = now - m_event_loop_metrics_start_time;
1361
                clock::duration sleep_time = io_reactor.get_and_reset_sleep_time();
1362
                clock::duration nonsleep_time = elapsed_time - sleep_time;
1363
                double saturation = double(nonsleep_time.count()) / double(elapsed_time.count());
1364
                clock::duration internal_exec_time = nonsleep_time - m_handler_exec_time;
1365
                internal_exec_time += now - m_handler_exec_start_time;
1366
                double inefficiency = double(internal_exec_time.count()) / double(elapsed_time.count());
1367
                m_event_loop_metrics_start_time = now;
1368
                m_handler_exec_start_time = now;
1369
                m_handler_exec_time = clock::duration::zero();
1370
                handler(saturation, inefficiency);             // Throws
1371
                report_event_loop_metrics(std::move(handler)); // Throws
1372
            });                                                // Throws
1373
#else
1374
        static_cast<void>(handler);
×
1375
#endif
×
1376
    }
×
1377

1378
    void run()
1379
    {
8,492✔
1380
        run_impl(true);
8,492✔
1381
    }
8,492✔
1382

1383
    void run_until_stopped()
1384
    {
8,982✔
1385
        run_impl(false);
8,982✔
1386
    }
8,982✔
1387

1388
    void stop() noexcept
1389
    {
16,966✔
1390
        {
16,966✔
1391
            std::lock_guard lock{m_mutex};
16,966✔
1392
            if (m_stopped)
16,966✔
1393
                return;
×
1394
            m_stopped = true;
16,966✔
1395
        }
16,966✔
1396
        io_reactor.interrupt();
16,966✔
1397
    }
16,966✔
1398

1399
    void reset() noexcept
1400
    {
16✔
1401
        std::lock_guard lock{m_mutex};
16✔
1402
        m_stopped = false;
16✔
1403
    }
16✔
1404

1405
    static Endpoint::List resolve(const Resolver::Query&, std::error_code&);
1406

1407
    void add_resolve_oper(LendersResolveOperPtr op)
1408
    {
3,328✔
1409
        {
3,328✔
1410
            std::lock_guard lock{m_mutex};
3,328✔
1411
            m_resolve_operations.push_back(std::move(op)); // Throws
3,328✔
1412
            m_resolver_cond.notify_all();
3,328✔
1413
        }
3,328✔
1414
        bool resolver_thread_started = m_resolver_thread.joinable();
3,328✔
1415
        if (resolver_thread_started)
3,328✔
1416
            return;
1,564✔
1417
        auto func = [this]() noexcept {
1,768✔
1418
            resolver_thread();
1,768✔
1419
        };
1,768✔
1420
        m_resolver_thread = std::thread{std::move(func)};
1,764✔
1421
    }
1,764✔
1422

1423
    void add_wait_oper(LendersWaitOperPtr op)
1424
    {
568,054✔
1425
        m_wait_operations.push(std::move(op)); // Throws
568,054✔
1426
    }
568,054✔
1427

1428
    void post(PostOperConstr constr, std::size_t size, void* cookie)
1429
    {
1,164,128✔
1430
        {
1,164,128✔
1431
            std::lock_guard lock{m_mutex};
1,164,128✔
1432
            std::unique_ptr<char[]> mem;
1,164,128✔
1433
            if (m_post_oper && m_post_oper->m_size >= size) {
1,164,128✔
1434
                // Reuse old memory
210,168✔
1435
                AsyncOper* op = m_post_oper.release();
367,982✔
1436
                REALM_ASSERT(dynamic_cast<UnusedOper*>(op));
367,982✔
1437
                static_cast<UnusedOper*>(op)->UnusedOper::~UnusedOper(); // Static dispatch
367,982✔
1438
                mem.reset(static_cast<char*>(static_cast<void*>(op)));
367,982✔
1439
            }
367,982✔
1440
            else {
796,146✔
1441
                // Allocate new memory
382,350✔
1442
                mem.reset(new char[size]); // Throws
796,146✔
1443
            }
796,146✔
1444

592,518✔
1445
            std::unique_ptr<PostOperBase, LendersOperDeleter> op;
1,164,128✔
1446
            op.reset((*constr)(mem.get(), size, *this, cookie)); // Throws
1,164,128✔
1447
            mem.release();
1,164,128✔
1448
            m_completed_operations_2.push_back(std::move(op));
1,164,128✔
1449
        }
1,164,128✔
1450
        io_reactor.interrupt();
1,164,128✔
1451
    }
1,164,128✔
1452

1453
    void recycle_post_oper(PostOperBase* op) noexcept
1454
    {
1,164,464✔
1455
        std::size_t size = op->m_size;
1,164,464✔
1456
        op->~PostOperBase();                           // Dynamic dispatch
1,164,464✔
1457
        OwnersOperPtr op_2(new (op) UnusedOper(size)); // Does not throw
1,164,464✔
1458

592,710✔
1459
        // Keep the larger memory chunk (`op_2` or m_post_oper)
592,710✔
1460
        {
1,164,464✔
1461
            std::lock_guard lock{m_mutex};
1,164,464✔
1462
            if (!m_post_oper || m_post_oper->m_size < size)
1,164,464✔
1463
                swap(op_2, m_post_oper);
382,986✔
1464
        }
1,164,464✔
1465
    }
1,164,464✔
1466

1467
    void trigger_exec(TriggerExecOperBase& op) noexcept
1468
    {
×
1469
        {
×
1470
            std::lock_guard lock{m_mutex};
×
1471
            if (op.m_in_use)
×
1472
                return;
×
1473
            op.m_in_use = true;
×
1474
            bind_ptr<TriggerExecOperBase> op_2{&op}; // Increment use count
×
1475
            LendersOperPtr op_3{op_2.release()};
×
1476
            m_completed_operations_2.push_back(std::move(op_3));
×
1477
        }
×
1478
        io_reactor.interrupt();
×
1479
    }
×
1480

1481
    void reset_trigger_exec(TriggerExecOperBase& op) noexcept
1482
    {
×
1483
        std::lock_guard lock{m_mutex};
×
1484
        op.m_in_use = false;
×
1485
    }
×
1486

1487
    void add_completed_oper(LendersOperPtr op) noexcept
1488
    {
1,189,314✔
1489
        m_completed_operations.push_back(std::move(op));
1,189,314✔
1490
    }
1,189,314✔
1491

1492
    void remove_canceled_ops(Descriptor& desc) noexcept
1493
    {
515,156✔
1494
        io_reactor.remove_canceled_ops(desc, m_completed_operations);
515,156✔
1495
    }
515,156✔
1496

1497
    void cancel_resolve_oper(ResolveOperBase& op) noexcept
1498
    {
10✔
1499
        std::lock_guard lock{m_mutex};
10✔
1500
        op.cancel();
10✔
1501
    }
10✔
1502

1503
    void cancel_incomplete_wait_oper(WaitOperBase& op) noexcept
1504
    {
20,352✔
1505
        auto p = std::equal_range(m_wait_operations.begin(), m_wait_operations.end(), op.m_expiration_time,
20,352✔
1506
                                  WaitOperCompare{});
20,352✔
1507
        auto pred = [&op](const LendersWaitOperPtr& op_2) {
20,354✔
1508
            return &*op_2 == &op;
20,352✔
1509
        };
20,352✔
1510
        auto i = std::find_if(p.first, p.second, pred);
20,352✔
1511
        REALM_ASSERT(i != p.second);
20,352✔
1512
        m_completed_operations.push_back(m_wait_operations.erase(i));
20,352✔
1513
    }
20,352✔
1514

1515
private:
1516
    OperQueue<AsyncOper> m_completed_operations; // Completed, canceled, and post operations
1517

1518
    struct WaitOperCompare {
1519
        bool operator()(const LendersWaitOperPtr& a, clock::time_point b)
1520
        {
27,158✔
1521
            return a->m_expiration_time > b;
27,158✔
1522
        }
27,158✔
1523
        bool operator()(clock::time_point a, const LendersWaitOperPtr& b)
1524
        {
23,942✔
1525
            return a > b->m_expiration_time;
23,942✔
1526
        }
23,942✔
1527
        bool operator()(const LendersWaitOperPtr& a, const LendersWaitOperPtr& b)
1528
        {
148,336✔
1529
            return a->m_expiration_time > b->m_expiration_time;
148,336✔
1530
        }
148,336✔
1531
    };
1532

1533
    using WaitQueue = util::PriorityQueue<LendersWaitOperPtr, std::vector<LendersWaitOperPtr>, WaitOperCompare>;
1534
    WaitQueue m_wait_operations;
1535

1536
    std::mutex m_mutex;
1537
    OwnersOperPtr m_post_oper;                       // Protected by `m_mutex`
1538
    OperQueue<ResolveOperBase> m_resolve_operations; // Protected by `m_mutex`
1539
    OperQueue<AsyncOper> m_completed_operations_2;   // Protected by `m_mutex`
1540
    bool m_stopped = false;                          // Protected by `m_mutex`
1541
    bool m_stop_resolver_thread = false;             // Protected by `m_mutex`
1542
    bool m_resolve_in_progress = false;              // Protected by `m_mutex`
1543
    std::condition_variable m_resolver_cond;         // Protected by `m_mutex`
1544

1545
    std::thread m_resolver_thread;
1546

1547
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1548
    util::Optional<DeadlineTimer> m_event_loop_metrics_timer;
1549
    clock::time_point m_event_loop_metrics_start_time = clock::now();
1550
    clock::time_point m_handler_exec_start_time;
1551
    clock::duration m_handler_exec_time = clock::duration::zero();
1552
#endif
1553
    void run_impl(bool return_when_idle)
1554
    {
17,470✔
1555
        bool no_incomplete_resolve_operations;
17,470✔
1556

8,608✔
1557
    on_handlers_executed_or_interrupted : {
2,302,862✔
1558
        std::lock_guard lock{m_mutex};
2,302,862✔
1559
        if (m_stopped)
2,302,862✔
1560
            return;
16,922✔
1561
        // Note: Order of post operations must be preserved.
1,013,940✔
1562
        m_completed_operations.push_back(m_completed_operations_2);
2,285,940✔
1563
        no_incomplete_resolve_operations = (!m_resolve_in_progress && m_resolve_operations.empty());
2,287,328✔
1564

1,013,940✔
1565
        if (m_completed_operations.empty())
2,285,940✔
1566
            goto on_time_progressed;
2,003,312✔
1567
    }
2,043,240✔
1568

901,552✔
1569
    on_operations_completed : {
2,043,240✔
1570
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1571
        m_handler_exec_start_time = clock::now();
1572
#endif
1573
        while (LendersOperPtr op = m_completed_operations.pop_front())
7,156,370✔
1574
            execute(op); // Throws
5,113,130✔
1575
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1576
        m_handler_exec_time += clock::now() - m_handler_exec_start_time;
1577
#endif
1578
        goto on_handlers_executed_or_interrupted;
2,043,240✔
1579
    }
2,151,372✔
1580

1,017,346✔
1581
    on_time_progressed : {
2,151,372✔
1582
        clock::time_point now = clock::now();
2,151,372✔
1583
        if (process_timers(now))
2,151,372✔
1584
            goto on_operations_completed;
527,656✔
1585

752,314✔
1586
        bool no_incomplete_operations =
1,623,716✔
1587
            (io_reactor.empty() && m_wait_operations.empty() && no_incomplete_resolve_operations);
1,623,716✔
1588
        if (no_incomplete_operations && return_when_idle) {
1,623,716✔
1589
            // We can only get to this point when there are no completion
258✔
1590
            // handlers ready to execute. It happens either because of a
258✔
1591
            // fall-through from on_operations_completed, or because of a
258✔
1592
            // jump to on_time_progressed, but that only happens if no
258✔
1593
            // completions handlers became ready during
258✔
1594
            // wait_and_process_io().
258✔
1595
            //
258✔
1596
            // We can also only get to this point when there are no
258✔
1597
            // asynchronous operations in progress (due to the preceeding
258✔
1598
            // if-condition.
258✔
1599
            //
258✔
1600
            // It is possible that an other thread has added new post
258✔
1601
            // operations since we checked, but there is really no point in
258✔
1602
            // rechecking that, as it is always possible, even after a
258✔
1603
            // recheck, that new post handlers get added after we decide to
258✔
1604
            // return, but before we actually do return. Also, if would
258✔
1605
            // offer no additional guarantees to the application.
258✔
1606
            return; // Out of work
520✔
1607
        }
520✔
1608

752,056✔
1609
        // Blocking wait for I/O
752,056✔
1610
        bool interrupted = false;
1,623,196✔
1611
        if (wait_and_process_io(now, interrupted)) // Throws
1,623,196✔
1612
            goto on_operations_completed;
1,231,386✔
1613
        if (interrupted)
391,810✔
1614
            goto on_handlers_executed_or_interrupted;
243,578✔
1615
        goto on_time_progressed;
148,232✔
1616
    }
148,232✔
1617
    }
148,232✔
1618
    bool process_timers(clock::time_point now)
1619
    {
2,151,426✔
1620
        bool any_operations_completed = false;
2,151,426✔
1621
        for (;;) {
2,696,908✔
1622
            if (m_wait_operations.empty())
2,696,908✔
1623
                break;
1,010,960✔
1624
            auto& op = m_wait_operations.top();
1,685,948✔
1625
            if (now < op->m_expiration_time)
1,685,948✔
1626
                break;
1,140,854✔
1627
            op->complete();
545,094✔
1628
            m_completed_operations.push_back(m_wait_operations.pop_top());
545,094✔
1629
            any_operations_completed = true;
545,094✔
1630
        }
545,094✔
1631
        return any_operations_completed;
2,151,426✔
1632
    }
2,151,426✔
1633

1634
    bool wait_and_process_io(clock::time_point now, bool& interrupted)
1635
    {
1,623,424✔
1636
        clock::time_point timeout;
1,623,424✔
1637
        if (!m_wait_operations.empty())
1,623,424✔
1638
            timeout = m_wait_operations.top()->m_expiration_time;
1,026,520✔
1639
        bool operations_completed = io_reactor.wait_and_advance(timeout, now, interrupted,
1,623,424✔
1640
                                                                m_completed_operations); // Throws
1,623,424✔
1641
        return operations_completed;
1,623,424✔
1642
    }
1,623,424✔
1643

1644
    static void execute(LendersOperPtr& lenders_ptr)
1645
    {
5,111,312✔
1646
        lenders_ptr.release()->recycle_and_execute(); // Throws
5,111,312✔
1647
    }
5,111,312✔
1648

1649
    void resolver_thread() noexcept
1650
    {
1,768✔
1651
        LendersResolveOperPtr op;
1,768✔
1652
        for (;;) {
5,100✔
1653
            {
5,100✔
1654
                std::unique_lock lock{m_mutex};
5,100✔
1655
                if (op) {
5,100✔
1656
                    m_completed_operations_2.push_back(std::move(op));
3,332✔
1657
                    io_reactor.interrupt();
3,332✔
1658
                }
3,332✔
1659
                m_resolve_in_progress = false;
5,100✔
1660
                while (m_resolve_operations.empty() && !m_stop_resolver_thread)
8,428✔
1661
                    m_resolver_cond.wait(lock);
3,328✔
1662
                if (m_stop_resolver_thread)
5,100✔
1663
                    return;
1,768✔
1664
                op = m_resolve_operations.pop_front();
3,332✔
1665
                m_resolve_in_progress = true;
3,332✔
1666
                if (op->is_canceled())
3,332✔
1667
                    continue;
4✔
1668
            }
3,328✔
1669
            try {
3,328✔
1670
                op->m_endpoints = resolve(op->m_query, op->m_error_code); // Throws only std::bad_alloc
3,328✔
1671
            }
3,328✔
1672
            catch (std::bad_alloc&) {
1,694✔
1673
                op->m_error_code = make_basic_system_error_code(ENOMEM);
×
1674
            }
×
1675
            op->complete();
3,328✔
1676
        }
3,328✔
1677
    }
1,768✔
1678
};
1679

1680

1681
// This function promises to only ever throw std::bad_alloc.
1682
Endpoint::List Service::Impl::resolve(const Resolver::Query& query, std::error_code& ec)
1683
{
11,304✔
1684
    Endpoint::List list;
11,304✔
1685

5,630✔
1686
    using addrinfo_type = struct addrinfo;
11,304✔
1687
    addrinfo_type hints = addrinfo_type(); // Clear
11,304✔
1688
    hints.ai_flags = query.m_flags;
11,304✔
1689
    hints.ai_family = query.m_protocol.m_family;
11,304✔
1690
    hints.ai_socktype = query.m_protocol.m_socktype;
11,304✔
1691
    hints.ai_protocol = query.m_protocol.m_protocol;
11,304✔
1692

5,630✔
1693
    const char* query_host = query.m_host.empty() ? 0 : query.m_host.c_str();
11,304✔
1694
    const char* query_service = query.m_service.empty() ? 0 : query.m_service.c_str();
9,670✔
1695
    struct addrinfo* first = nullptr;
11,304✔
1696
    int ret = ::getaddrinfo(query_host, query_service, &hints, &first);
11,304✔
1697
    if (REALM_UNLIKELY(ret != 0)) {
11,304✔
1698
#ifdef EAI_SYSTEM
6✔
1699
        if (ret == EAI_SYSTEM) {
6✔
1700
            if (errno != 0) {
×
1701
                ec = make_basic_system_error_code(errno);
×
1702
            }
×
1703
            else {
×
1704
                ec = error::unknown;
×
1705
            }
×
1706
            return list;
×
1707
        }
×
1708
#endif
6✔
1709
        ec = translate_addrinfo_error(ret);
6✔
1710
        return list;
6✔
1711
    }
6✔
1712

5,628✔
1713
    GetaddrinfoResultOwner gro(first);
11,298✔
1714

5,628✔
1715
    // Count number of IPv4/IPv6 endpoints
5,628✔
1716
    std::size_t num_endpoints = 0;
11,298✔
1717
    {
11,298✔
1718
        struct addrinfo* curr = first;
11,298✔
1719
        while (curr) {
24,346✔
1720
            bool ip_v4 = curr->ai_family == AF_INET;
13,048✔
1721
            bool ip_v6 = curr->ai_family == AF_INET6;
13,048✔
1722
            if (ip_v4 || ip_v6)
13,048✔
1723
                ++num_endpoints;
13,048✔
1724
            curr = curr->ai_next;
13,048✔
1725
        }
13,048✔
1726
    }
11,298✔
1727
    REALM_ASSERT(num_endpoints >= 1);
11,298✔
1728

5,628✔
1729
    // Copy the IPv4/IPv6 endpoints
5,628✔
1730
    list.m_endpoints.set_size(num_endpoints); // Throws
11,298✔
1731
    struct addrinfo* curr = first;
11,298✔
1732
    std::size_t endpoint_ndx = 0;
11,298✔
1733
    while (curr) {
24,346✔
1734
        bool ip_v4 = curr->ai_family == AF_INET;
13,048✔
1735
        bool ip_v6 = curr->ai_family == AF_INET6;
13,048✔
1736
        if (ip_v4 || ip_v6) {
13,048✔
1737
            REALM_ASSERT((ip_v4 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v4_type)) ||
13,048✔
1738
                         (ip_v6 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v6_type)));
13,048✔
1739
            Endpoint& ep = list.m_endpoints[endpoint_ndx];
13,048✔
1740
            ep.m_protocol.m_family = curr->ai_family;
13,048✔
1741
            ep.m_protocol.m_socktype = curr->ai_socktype;
13,048✔
1742
            ep.m_protocol.m_protocol = curr->ai_protocol;
13,048✔
1743
            if (ip_v4) {
13,048✔
1744
                ep.m_sockaddr_union.m_ip_v4 = reinterpret_cast<Endpoint::sockaddr_ip_v4_type&>(*curr->ai_addr);
11,302✔
1745
            }
11,302✔
1746
            else {
1,746✔
1747
                ep.m_sockaddr_union.m_ip_v6 = reinterpret_cast<Endpoint::sockaddr_ip_v6_type&>(*curr->ai_addr);
1,746✔
1748
            }
1,746✔
1749
            ++endpoint_ndx;
13,048✔
1750
        }
13,048✔
1751
        curr = curr->ai_next;
13,048✔
1752
    }
13,048✔
1753

5,628✔
1754
    ec = std::error_code(); // Success
11,298✔
1755
    return list;
11,298✔
1756
}
11,298✔
1757

1758

1759
Service::Service()
1760
    : m_impl{std::make_unique<Impl>(*this)} // Throws
1761
{
17,290✔
1762
}
17,290✔
1763

1764

1765
Service::~Service() noexcept {}
17,294✔
1766

1767

1768
void Service::run()
1769
{
8,492✔
1770
    m_impl->run(); // Throws
8,492✔
1771
}
8,492✔
1772

1773

1774
void Service::run_until_stopped()
1775
{
8,982✔
1776
    m_impl->run_until_stopped();
8,982✔
1777
}
8,982✔
1778

1779

1780
void Service::stop() noexcept
1781
{
16,966✔
1782
    m_impl->stop();
16,966✔
1783
}
16,966✔
1784

1785

1786
void Service::reset() noexcept
1787
{
16✔
1788
    m_impl->reset();
16✔
1789
}
16✔
1790

1791

1792
void Service::report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1793
{
×
1794
    m_impl->report_event_loop_metrics(std::move(handler)); // Throws
×
1795
}
×
1796

1797

1798
void Service::do_post(PostOperConstr constr, std::size_t size, void* cookie)
1799
{
1,164,408✔
1800
    m_impl->post(constr, size, cookie); // Throws
1,164,408✔
1801
}
1,164,408✔
1802

1803

1804
void Service::recycle_post_oper(Impl& impl, PostOperBase* op) noexcept
1805
{
1,164,448✔
1806
    impl.recycle_post_oper(op);
1,164,448✔
1807
}
1,164,448✔
1808

1809

1810
void Service::trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1811
{
×
1812
    impl.trigger_exec(op);
×
1813
}
×
1814

1815

1816
void Service::reset_trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1817
{
×
1818
    impl.reset_trigger_exec(op);
×
1819
}
×
1820

1821

1822
void Service::Descriptor::accept(Descriptor& desc, StreamProtocol protocol, Endpoint* ep,
1823
                                 std::error_code& ec) noexcept
1824
{
3,136✔
1825
    REALM_ASSERT(is_open());
3,136✔
1826

1,150✔
1827
    union union_type {
3,136✔
1828
        Endpoint::sockaddr_union_type m_sockaddr_union;
3,136✔
1829
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
3,136✔
1830
    };
3,136✔
1831
    union_type buffer;
3,136✔
1832
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
3,136✔
1833
    socklen_t addr_len = sizeof buffer;
3,136✔
1834
    CloseGuard new_sock_fd;
3,136✔
1835
    for (;;) {
3,136✔
1836
#if HAVE_LINUX_ACCEPT4
1,150✔
1837
        // On Linux (HAVE_LINUX_ACCEPT4), make the accepted socket inherit the
1,150✔
1838
        // O_NONBLOCK status flag from the accepting socket to avoid an extra
1,150✔
1839
        // call to fcntl(). Note, it is deemed most likely that the accepted
1,150✔
1840
        // socket is going to be used in nonblocking when, and only when the
1,150✔
1841
        // accepting socket is used in nonblocking mode. Other platforms are
1,150✔
1842
        // handled below.
1,150✔
1843
        int flags = SOCK_CLOEXEC;
1,150✔
1844
        if (!in_blocking_mode())
1,150✔
1845
            flags |= SOCK_NONBLOCK;
1,128✔
1846
        native_handle_type ret = ::accept4(m_fd, addr, &addr_len, flags);
1,150✔
1847
#else
1848
        native_handle_type ret = ::accept(m_fd, addr, &addr_len);
1,986✔
1849
#endif
1,986✔
1850
#ifdef _WIN32
1851
        if (ret == INVALID_SOCKET) {
1852
            int err = WSAGetLastError();
1853
            if (err == WSAEINTR)
1854
                continue; // Retry on interruption by system signal
1855
            set_read_ready(err != WSAEWOULDBLOCK);
1856
            ec = make_winsock_error_code(err); // Failure
1857
            return;
1858
        }
1859
#else
1860
        if (REALM_UNLIKELY(ret == -1)) {
3,136✔
1861
            int err = errno;
924✔
1862
            if (err == EINTR)
924✔
1863
                continue; // Retry on interruption by system signal
×
1864
            if (err == EWOULDBLOCK)
924✔
1865
                err = EAGAIN;
924✔
1866
            set_read_ready(err != EAGAIN);
924✔
1867
            ec = make_basic_system_error_code(err); // Failure
924✔
1868
            return;
924✔
1869
        }
924✔
1870
#endif
2,212✔
1871
        new_sock_fd.reset(ret);
2,212✔
1872

1,150✔
1873
#if REALM_PLATFORM_APPLE
1,062✔
1874
        int optval = 1;
1,062✔
1875
        ret = ::setsockopt(new_sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
1,062✔
1876
        if (REALM_UNLIKELY(ret == -1)) {
1,062✔
1877
            // setsockopt() reports EINVAL if the other side disconnected while
1878
            // the connection was waiting in the listen queue.
1879
            int err = errno;
1880
            if (err == EINVAL) {
×
1881
                continue;
1882
            }
1883
            ec = make_basic_system_error_code(err);
1884
            return;
1885
        }
1886
#endif
1,062✔
1887

1,150✔
1888
        set_read_ready(true);
2,212✔
1889
        break;
2,212✔
1890
    }
2,212✔
1891
    socklen_t expected_addr_len =
2,212✔
1892
        protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
1,876✔
1893
    if (REALM_UNLIKELY(addr_len != expected_addr_len))
2,212✔
1894
        REALM_TERMINATE("Unexpected peer address length");
1,150✔
1895

1,150✔
1896
#if !HAVE_LINUX_ACCEPT4
1,062✔
1897
    {
1,062✔
1898
        bool value = true;
1,062✔
1899
        if (REALM_UNLIKELY(set_cloexec_flag(new_sock_fd, value, ec)))
1,062✔
1900
            return;
1901
    }
1,062✔
1902
#endif
1,062✔
1903

1,150✔
1904
    // On some platforms (such as Mac OS X), the accepted socket automatically
1,150✔
1905
    // inherits file status flags from the accepting socket, but on other
1,150✔
1906
    // systems, this is not the case. In the case of Linux (HAVE_LINUX_ACCEPT4),
1,150✔
1907
    // the inheriting behaviour is obtained by using the Linux specific
1,150✔
1908
    // accept4() system call.
1,150✔
1909
    //
1,150✔
1910
    // For other platforms, we need to be sure that m_in_blocking_mode for the
1,150✔
1911
    // new socket is initialized to reflect the actual state of O_NONBLOCK on
1,150✔
1912
    // the new socket.
1,150✔
1913
    //
1,150✔
1914
    // Note: This implementation currently never modifies status flags other
1,150✔
1915
    // than O_NONBLOCK, so we only need to consider that flag.
1,150✔
1916

1,150✔
1917
#if !REALM_PLATFORM_APPLE && !HAVE_LINUX_ACCEPT4
1918
    // Make the accepted socket inherit the state of O_NONBLOCK from the
1919
    // accepting socket.
1920
    {
1921
        bool value = !m_in_blocking_mode;
1922
        if (::set_nonblock_flag(new_sock_fd, value, ec))
1923
            return;
1924
    }
1925
#endif
1926

1,150✔
1927
    desc.assign(new_sock_fd.release(), m_in_blocking_mode);
2,212✔
1928
    desc.set_write_ready(true);
2,212✔
1929
    if (ep) {
2,212✔
1930
        ep->m_protocol = protocol;
1,970✔
1931
        ep->m_sockaddr_union = buffer.m_sockaddr_union;
1,970✔
1932
    }
1,970✔
1933
    ec = std::error_code(); // Success
2,212✔
1934
}
2,212✔
1935

1936

1937
std::size_t Service::Descriptor::read_some(char* buffer, std::size_t size, std::error_code& ec) noexcept
1938
{
2,133,446✔
1939
    if (REALM_UNLIKELY(assume_read_would_block())) {
2,133,446✔
1940
        ec = error::resource_unavailable_try_again; // Failure
322✔
1941
        return 0;
322✔
1942
    }
322✔
1943
    for (;;) {
2,133,124✔
1944
        int flags = 0;
2,132,104✔
1945
#ifdef _WIN32
1946
        ssize_t ret = ::recv(m_fd, buffer, int(size), flags);
1947
        if (ret == SOCKET_ERROR) {
1948
            int err = WSAGetLastError();
1949
            // Retry on interruption by system signal
1950
            if (err == WSAEINTR)
1951
                continue;
1952
            set_read_ready(err != WSAEWOULDBLOCK);
1953
            ec = make_winsock_error_code(err); // Failure
1954
            return 0;
1955
        }
1956
#else
1957
        ssize_t ret = ::recv(m_fd, buffer, size, flags);
2,132,104✔
1958
        if (ret == -1) {
2,132,104✔
1959
            int err = errno;
52,072✔
1960
            // Retry on interruption by system signal
280✔
1961
            if (err == EINTR)
52,072✔
1962
                continue;
280✔
1963
            if (err == EWOULDBLOCK)
52,072✔
1964
                err = EAGAIN;
51,922✔
1965
            set_read_ready(err != EAGAIN);
52,072✔
1966
            ec = make_basic_system_error_code(err); // Failure
52,072✔
1967
            return 0;
52,072✔
1968
        }
52,072✔
1969
#endif
2,080,032✔
1970
        if (REALM_UNLIKELY(ret == 0)) {
2,080,032✔
1971
            set_read_ready(true);
1,032✔
1972
            ec = MiscExtErrors::end_of_input;
1,032✔
1973
            return 0;
1,032✔
1974
        }
1,032✔
1975
        REALM_ASSERT(ret > 0);
2,079,000✔
1976
        std::size_t n = std::size_t(ret);
2,079,000✔
1977
        REALM_ASSERT(n <= size);
2,079,000✔
1978
#if REALM_NETWORK_USE_EPOLL
1979
        // On Linux a partial read (n < size) on a nonblocking stream-mode
1980
        // socket is guaranteed to only ever happen if a complete read would
1981
        // have been impossible without blocking (i.e., without failing with
1982
        // EAGAIN/EWOULDBLOCK), or if the end of input from the remote peer was
1983
        // detected by the Linux kernel.
1984
        //
1985
        // Further more, after a partial read, and when working with Linux epoll
1986
        // in edge-triggered mode (EPOLLET), it is safe to suspend further
1987
        // reading until a new read-readiness notification is received, provided
1988
        // that we registered interest in EPOLLRDHUP events, and an EPOLLRDHUP
1989
        // event was not received prior to the partial read. This is safe in the
1990
        // sense that reading is guaranteed to be resumed in a timely fashion
1991
        // (without unnessesary blocking), and in a manner that is free of race
1992
        // conditions. Note in particular that if a read was partial because the
1993
        // kernel had detected the end of input prior to that read, but the
1994
        // EPOLLRDHUP event was not received prior the that read, then reading
1995
        // will still be resumed immediately by the pending EPOLLRDHUP event.
1996
        //
1997
        // Note that without this extra "loss of read-readiness" trigger, it
1998
        // would have been necessary for the caller to immediately follow up
1999
        // with an (otherwise redundant) additional invocation of read_some()
2000
        // just to detect the loss of read-readiness.
2001
        //
2002
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2003
        // In particular, do we know that a partial read (n < size) on a
2004
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2005
        // complete read would have been impossible without blocking, or if the
2006
        // end of input from the remote peer was detected by the FreeBSD and/or
2007
        // macOS kernel? See http://stackoverflow.com/q/40123626/1698548.
2008
        set_read_ready(n == size || m_imminent_end_of_input);
2009
#else
2010
        set_read_ready(true);
2,079,000✔
2011
#endif
2,079,000✔
2012
        ec = std::error_code(); // Success
2,079,000✔
2013
        return n;
2,079,000✔
2014
    }
2,079,000✔
2015
}
2,133,124✔
2016

2017

2018
std::size_t Service::Descriptor::write_some(const char* data, std::size_t size, std::error_code& ec) noexcept
2019
{
1,498,620✔
2020
    if (REALM_UNLIKELY(assume_write_would_block())) {
1,498,620✔
2021
        ec = error::resource_unavailable_try_again; // Failure
107,458✔
2022
        return 0;
107,458✔
2023
    }
107,458✔
2024
    for (;;) {
1,391,302✔
2025
        int flags = 0;
1,391,032✔
2026
#ifdef __linux__
717,468✔
2027
        // Prevent SIGPIPE when remote peer has closed the connection.
717,468✔
2028
        flags |= MSG_NOSIGNAL;
717,468✔
2029
#endif
717,468✔
2030
#ifdef _WIN32
2031
        ssize_t ret = ::send(m_fd, data, int(size), flags);
2032
        if (ret == SOCKET_ERROR) {
2033
            int err = WSAGetLastError();
2034
            // Retry on interruption by system signal
2035
            if (err == WSAEINTR)
2036
                continue;
2037
            set_write_ready(err != WSAEWOULDBLOCK);
2038
            ec = make_winsock_error_code(err); // Failure
2039
            return 0;
2040
        }
2041
#else
2042
        ssize_t ret = ::send(m_fd, data, size, flags);
1,391,032✔
2043
        if (ret == -1) {
1,391,032✔
2044
            int err = errno;
3,822✔
2045
            // Retry on interruption by system signal
422✔
2046
            if (err == EINTR)
3,822✔
2047
                continue;
422✔
2048
#if REALM_PLATFORM_APPLE
3,400✔
2049
            // The macOS kernel can generate an undocumented EPROTOTYPE in
2050
            // certain cases where the peer has closed the connection (in
2051
            // tcp_usr_send() in bsd/netinet/tcp_usrreq.c) See also
2052
            // http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/.
2053
            if (REALM_UNLIKELY(err == EPROTOTYPE))
3,400✔
2054
                err = EPIPE;
2055
#endif
3,400✔
2056
            if (err == EWOULDBLOCK)
3,822✔
2057
                err = EAGAIN;
3,706✔
2058
            set_write_ready(err != EAGAIN);
3,822✔
2059
            ec = make_basic_system_error_code(err); // Failure
3,822✔
2060
            return 0;
3,822✔
2061
        }
3,822✔
2062
#endif
1,387,210✔
2063
        REALM_ASSERT(ret >= 0);
1,387,210✔
2064
        std::size_t n = std::size_t(ret);
1,387,210✔
2065
        REALM_ASSERT(n <= size);
1,387,210✔
2066
#if REALM_NETWORK_USE_EPOLL
2067
        // On Linux a partial write (n < size) on a nonblocking stream-mode
2068
        // socket is guaranteed to only ever happen if a complete write would
2069
        // have been impossible without blocking (i.e., without failing with
2070
        // EAGAIN/EWOULDBLOCK).
2071
        //
2072
        // Further more, after a partial write, and when working with Linux
2073
        // epoll in edge-triggered mode (EPOLLET), it is safe to suspend further
2074
        // writing until a new write-readiness notification is received. This is
2075
        // safe in the sense that writing is guaranteed to be resumed in a
2076
        // timely fashion (without unnessesary blocking), and in a manner that
2077
        // is free of race conditions.
2078
        //
2079
        // Note that without this extra "loss of write-readiness" trigger, it
2080
        // would have been necessary for the caller to immediately follow up
2081
        // with an (otherwise redundant) additional invocation of write_some()
2082
        // just to detect the loss of write-readiness.
2083
        //
2084
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2085
        // In particular, do we know that a partial write (n < size) on a
2086
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2087
        // complete write would have been impossible without blocking? See
2088
        // http://stackoverflow.com/q/40123626/1698548.
2089
        set_write_ready(n == size);
2090
#else
2091
        set_write_ready(true);
1,387,210✔
2092
#endif
1,387,210✔
2093
        ec = std::error_code(); // Success
1,387,210✔
2094
        return n;
1,387,210✔
2095
    }
1,387,210✔
2096
}
1,391,162✔
2097

2098

2099
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2100

2101
void Service::Descriptor::deregister_for_async() noexcept
2102
{
6,906✔
2103
    service_impl.io_reactor.deregister_desc(*this);
6,906✔
2104
}
6,906✔
2105

2106
#endif // REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2107

2108

2109
void Service::Descriptor::set_nonblock_flag(bool value)
2110
{
11,898✔
2111
    ::set_nonblock_flag(m_fd, value); // Throws
11,898✔
2112
}
11,898✔
2113

2114

2115
void Service::Descriptor::add_initiated_oper(LendersIoOperPtr op, Want want)
2116
{
3,418,226✔
2117
    if (REALM_UNLIKELY(want == Want::nothing)) {
3,418,226✔
2118
        REALM_ASSERT(op->is_complete());
1,190,400✔
2119
        service_impl.add_completed_oper(std::move(op));
1,190,400✔
2120
        return;
1,190,400✔
2121
    }
1,190,400✔
2122
    REALM_ASSERT(!op->is_complete());
2,227,826✔
2123
    service_impl.io_reactor.add_oper(*this, std::move(op), want); // Throws
2,227,826✔
2124
}
2,227,826✔
2125

2126

2127
void Service::Descriptor::do_close() noexcept
2128
{
14,034✔
2129
    checked_close(m_fd);
14,034✔
2130
    m_fd = -1;
14,034✔
2131
}
14,034✔
2132

2133

2134
auto Service::Descriptor::do_release() noexcept -> native_handle_type
2135
{
×
2136
    native_handle_type fd = m_fd;
×
2137
    m_fd = -1;
×
2138
    return fd;
×
2139
}
×
2140

2141

2142
Service& Resolver::get_service() noexcept
2143
{
×
2144
    return m_service_impl.service;
×
2145
}
×
2146

2147

2148
Endpoint::List Resolver::resolve(const Query& query, std::error_code& ec)
2149
{
7,980✔
2150
    return Service::Impl::resolve(query, ec); // Throws
7,980✔
2151
}
7,980✔
2152

2153

2154
void Resolver::cancel() noexcept
2155
{
11,314✔
2156
    if (m_resolve_oper && m_resolve_oper->in_use() && !m_resolve_oper->is_canceled()) {
11,314✔
2157
        Service::ResolveOperBase& op = static_cast<Service::ResolveOperBase&>(*m_resolve_oper);
10✔
2158
        m_service_impl.cancel_resolve_oper(op);
10✔
2159
    }
10✔
2160
}
11,314✔
2161

2162

2163
void Resolver::initiate_oper(Service::LendersResolveOperPtr op)
2164
{
3,326✔
2165
    m_service_impl.add_resolve_oper(std::move(op)); // Throws
3,326✔
2166
}
3,326✔
2167

2168

2169
Service& SocketBase::get_service() noexcept
2170
{
480✔
2171
    return m_desc.service_impl.service;
480✔
2172
}
480✔
2173

2174

2175
void SocketBase::cancel() noexcept
2176
{
1,038,372✔
2177
    bool any_incomplete = false;
1,038,372✔
2178
    if (m_read_oper && m_read_oper->in_use() && !m_read_oper->is_canceled()) {
1,038,372✔
2179
        m_read_oper->cancel();
464,796✔
2180
        if (!m_read_oper->is_complete())
464,796✔
2181
            any_incomplete = true;
464,870✔
2182
    }
464,796✔
2183
    if (m_write_oper && m_write_oper->in_use() && !m_write_oper->is_canceled()) {
1,038,372✔
2184
        m_write_oper->cancel();
442,944✔
2185
        if (!m_write_oper->is_complete())
442,944✔
2186
            any_incomplete = true;
442,802✔
2187
    }
442,944✔
2188
    if (any_incomplete)
1,038,372✔
2189
        m_desc.service_impl.remove_canceled_ops(m_desc);
515,158✔
2190
}
1,038,372✔
2191

2192

2193
std::error_code SocketBase::bind(const Endpoint& ep, std::error_code& ec)
2194
{
8,218✔
2195
    if (!is_open()) {
8,218✔
2196
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
×
2197
            return ec;
×
2198
    }
8,218✔
2199

4,056✔
2200
    native_handle_type sock_fd = m_desc.native_handle();
8,218✔
2201
    socklen_t addr_len =
8,218✔
2202
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
7,872✔
2203

4,056✔
2204
    int ret = ::bind(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
8,218✔
2205
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8,218✔
2206
        return ec;
4,056✔
2207
    ec = std::error_code(); // Success
8,218✔
2208
    return ec;
8,218✔
2209
}
8,218✔
2210

2211

2212
Endpoint SocketBase::local_endpoint(std::error_code& ec) const
2213
{
19,512✔
2214
    Endpoint ep;
19,512✔
2215
    union union_type {
19,512✔
2216
        Endpoint::sockaddr_union_type m_sockaddr_union;
19,512✔
2217
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
19,512✔
2218
    };
19,512✔
2219
    native_handle_type sock_fd = m_desc.native_handle();
19,512✔
2220
    union_type buffer;
19,512✔
2221
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
19,512✔
2222
    socklen_t addr_len = sizeof buffer;
19,512✔
2223
    int ret = ::getsockname(sock_fd, addr, &addr_len);
19,512✔
2224
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
19,512✔
2225
        return ep;
9,684✔
2226

9,684✔
2227
    socklen_t expected_addr_len =
19,512✔
2228
        m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
17,430✔
2229
    if (addr_len != expected_addr_len)
19,512✔
2230
        throw util::runtime_error("Unexpected local address length");
×
2231
    ep.m_protocol = m_protocol;
19,512✔
2232
    ep.m_sockaddr_union = buffer.m_sockaddr_union;
19,512✔
2233
    ec = std::error_code(); // Success
19,512✔
2234
#ifdef _WIN32
2235
    ep.m_sockaddr_union.m_ip_v4.sin_addr.s_addr = inet_addr("127.0.0.1");
2236
#endif
2237
    return ep;
19,512✔
2238
}
19,512✔
2239

2240

2241
std::error_code SocketBase::open(const StreamProtocol& prot, std::error_code& ec)
2242
{
11,816✔
2243
    if (REALM_UNLIKELY(is_open()))
11,816✔
2244
        throw util::runtime_error("Socket is already open");
5,884✔
2245
    int type = prot.m_socktype;
11,816✔
2246
#if HAVE_LINUX_SOCK_CLOEXEC
5,884✔
2247
    type |= SOCK_CLOEXEC;
5,884✔
2248
#endif
5,884✔
2249
    native_handle_type ret = ::socket(prot.m_family, type, prot.m_protocol);
11,816✔
2250
#ifdef _WIN32
2251
    if (REALM_UNLIKELY(ret == INVALID_SOCKET)) {
2252
        ec = make_winsock_error_code(WSAGetLastError());
2253
        return ec;
2254
    }
2255
#else
2256
    if (REALM_UNLIKELY(ret == -1)) {
11,816✔
2257
        ec = make_basic_system_error_code(errno);
×
2258
        return ec;
×
2259
    }
×
2260
#endif
11,816✔
2261

5,884✔
2262
    CloseGuard sock_fd{ret};
11,816✔
2263

5,884✔
2264
#if !HAVE_LINUX_SOCK_CLOEXEC
5,932✔
2265
    {
5,932✔
2266
        bool value = true;
5,932✔
2267
        if (REALM_UNLIKELY(set_cloexec_flag(sock_fd, value, ec)))
5,932✔
2268
            return ec;
2269
    }
5,932✔
2270
#endif
5,932✔
2271

5,884✔
2272
#if REALM_PLATFORM_APPLE
5,932✔
2273
    {
5,932✔
2274
        int optval = 1;
5,932✔
2275
        int ret = setsockopt(sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
5,932✔
2276
        if (REALM_UNLIKELY(ret == -1)) {
5,932✔
2277
            ec = make_basic_system_error_code(errno);
2278
            return ec;
2279
        }
2280
    }
5,932✔
2281
#endif
5,932✔
2282

5,884✔
2283
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
11,816✔
2284
    m_desc.assign(sock_fd.release(), in_blocking_mode);
11,816✔
2285
    m_protocol = prot;
11,816✔
2286
    ec = std::error_code(); // Success
11,816✔
2287
    return ec;
11,816✔
2288
}
11,816✔
2289

2290

2291
std::error_code SocketBase::do_assign(const StreamProtocol& prot, native_handle_type sock_fd, std::error_code& ec)
2292
{
4✔
2293
    if (REALM_UNLIKELY(is_open()))
4✔
2294
        throw util::runtime_error("Socket is already open");
2✔
2295

2✔
2296
    // We need to know whether the specified socket is in blocking or in
2✔
2297
    // nonblocking mode. Rather than reading the current mode, we set it to
2✔
2298
    // blocking mode (disable nonblocking mode), and initialize
2✔
2299
    // `m_in_blocking_mode` to true.
2✔
2300
    {
4✔
2301
        bool value = false;
4✔
2302
        if (::set_nonblock_flag(sock_fd, value, ec))
4✔
2303
            return ec;
×
2304
    }
4✔
2305

2✔
2306
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
4✔
2307
    m_desc.assign(sock_fd, in_blocking_mode);
4✔
2308
    m_protocol = prot;
4✔
2309
    ec = std::error_code(); // Success
4✔
2310
    return ec;
4✔
2311
}
4✔
2312

2313

2314
void SocketBase::get_option(opt_enum opt, void* value_data, std::size_t& value_size, std::error_code& ec) const
2315
{
8✔
2316
    int level = 0;
8✔
2317
    int option_name = 0;
8✔
2318
    map_option(opt, level, option_name);
8✔
2319

4✔
2320
    native_handle_type sock_fd = m_desc.native_handle();
8✔
2321
    socklen_t option_len = socklen_t(value_size);
8✔
2322
    int ret = ::getsockopt(sock_fd, level, option_name, static_cast<char*>(value_data), &option_len);
8✔
2323
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8✔
2324
        return;
4✔
2325
    value_size = std::size_t(option_len);
8✔
2326
    ec = std::error_code(); // Success
8✔
2327
}
8✔
2328

2329

2330
void SocketBase::set_option(opt_enum opt, const void* value_data, std::size_t value_size, std::error_code& ec)
2331
{
9,888✔
2332
    int level = 0;
9,888✔
2333
    int option_name = 0;
9,888✔
2334
    map_option(opt, level, option_name);
9,888✔
2335

4,992✔
2336
    native_handle_type sock_fd = m_desc.native_handle();
9,888✔
2337
    int ret = ::setsockopt(sock_fd, level, option_name, static_cast<const char*>(value_data), socklen_t(value_size));
9,888✔
2338
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
9,888✔
2339
        return;
4,992✔
2340
    ec = std::error_code(); // Success
9,888✔
2341
}
9,888✔
2342

2343

2344
void SocketBase::map_option(opt_enum opt, int& level, int& option_name) const
2345
{
9,896✔
2346
    switch (opt) {
9,896✔
2347
        case opt_ReuseAddr:
7,980✔
2348
            level = SOL_SOCKET;
7,980✔
2349
            option_name = SO_REUSEADDR;
7,980✔
2350
            return;
7,980✔
2351
        case opt_Linger:
✔
2352
            level = SOL_SOCKET;
×
2353
#if REALM_PLATFORM_APPLE
2354
            // By default, SO_LINGER on Darwin uses "ticks" instead of
2355
            // seconds for better accuracy, but we want to be cross-platform.
2356
            option_name = SO_LINGER_SEC;
2357
#else
2358
            option_name = SO_LINGER;
2359
#endif // REALM_PLATFORM_APPLE
2360
            return;
×
2361
        case opt_NoDelay:
1,916✔
2362
            level = IPPROTO_TCP;
1,916✔
2363
            option_name = TCP_NODELAY; // Specified by POSIX.1-2001
1,916✔
2364
            return;
1,916✔
2365
    }
×
2366
    REALM_ASSERT(false);
×
2367
}
×
2368

2369

2370
std::error_code Socket::connect(const Endpoint& ep, std::error_code& ec)
2371
{
68✔
2372
    REALM_ASSERT(!m_write_oper || !m_write_oper->in_use());
68!
2373

32✔
2374
    if (!is_open()) {
68✔
2375
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
64✔
2376
            return ec;
30✔
2377
    }
68✔
2378

32✔
2379
    m_desc.ensure_blocking_mode(); // Throws
68✔
2380

32✔
2381
    native_handle_type sock_fd = m_desc.native_handle();
68✔
2382
    socklen_t addr_len =
68✔
2383
        (ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type));
54✔
2384
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
68✔
2385
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
68✔
2386
        return ec;
36✔
2387
    ec = std::error_code(); // Success
64✔
2388
    return ec;
64✔
2389
}
64✔
2390

2391

2392
std::error_code Socket::shutdown(shutdown_type what, std::error_code& ec)
2393
{
56✔
2394
    native_handle_type sock_fd = m_desc.native_handle();
56✔
2395
    int how = what;
56✔
2396
    int ret = ::shutdown(sock_fd, how);
56✔
2397
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
56✔
2398
        return ec;
28✔
2399
    ec = std::error_code(); // Success
56✔
2400
    return ec;
56✔
2401
}
56✔
2402

2403

2404
bool Socket::initiate_async_connect(const Endpoint& ep, std::error_code& ec)
2405
{
3,524✔
2406
    if (!is_open()) {
3,524✔
2407
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
3,518✔
2408
            return true; // Failure
1,790✔
2409
    }
3,524✔
2410
    m_desc.ensure_nonblocking_mode(); // Throws
3,524✔
2411

1,792✔
2412
    // Initiate connect operation.
1,792✔
2413
    native_handle_type sock_fd = m_desc.native_handle();
3,524✔
2414
    socklen_t addr_len =
3,524✔
2415
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
3,186✔
2416
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
3,524✔
2417
    if (ret != -1) {
3,524✔
2418
        ec = std::error_code(); // Success
2✔
2419
        return true;            // Immediate completion.
2✔
2420
    }
2✔
2421

1,790✔
2422
    // EINPROGRESS (and on Windows, also WSAEWOULDBLOCK) indicates that the
1,790✔
2423
    // underlying connect operation was successfully initiated, but not
1,790✔
2424
    // immediately completed, and EALREADY indicates that an underlying connect
1,790✔
2425
    // operation was already initiated, and still not completed, presumably
1,790✔
2426
    // because a previous call to connect() or async_connect() failed, or was
1,790✔
2427
    // canceled.
1,790✔
2428

1,790✔
2429
#ifdef _WIN32
2430
    int err = WSAGetLastError();
2431
    if (err != WSAEWOULDBLOCK) {
2432
        ec = make_winsock_error_code(err);
2433
        return true; // Failure
2434
    }
2435
#else
2436
    int err = errno;
3,522✔
2437
    if (REALM_UNLIKELY(err != EINPROGRESS && err != EALREADY)) {
3,522✔
2438
        ec = make_basic_system_error_code(err);
×
2439
        return true; // Failure
×
2440
    }
×
2441
#endif
3,522✔
2442

1,790✔
2443
    return false; // Successful initiation, but no immediate completion.
3,522✔
2444
}
3,522✔
2445

2446

2447
std::error_code Socket::finalize_async_connect(std::error_code& ec) noexcept
2448
{
3,510✔
2449
    native_handle_type sock_fd = m_desc.native_handle();
3,510✔
2450
    int connect_errno = 0;
3,510✔
2451
    socklen_t connect_errno_size = sizeof connect_errno;
3,510✔
2452
    int ret =
3,510✔
2453
        ::getsockopt(sock_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&connect_errno), &connect_errno_size);
3,510✔
2454
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
3,510✔
2455
        return ec; // getsockopt() failed
1,784✔
2456
    if (REALM_UNLIKELY(connect_errno)) {
3,510✔
2457
        ec = make_basic_system_error_code(connect_errno);
×
2458
        return ec; // connect failed
×
2459
    }
×
2460
    return std::error_code(); // Success
3,510✔
2461
}
3,510✔
2462

2463

2464
std::error_code Acceptor::listen(int backlog, std::error_code& ec)
2465
{
8,218✔
2466
    native_handle_type sock_fd = m_desc.native_handle();
8,218✔
2467
    int ret = ::listen(sock_fd, backlog);
8,218✔
2468
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8,218✔
2469
        return ec;
4,056✔
2470
    ec = std::error_code(); // Success
8,218✔
2471
    return ec;
8,218✔
2472
}
8,218✔
2473

2474

2475
Service& DeadlineTimer::get_service() noexcept
2476
{
×
2477
    return m_service_impl.service;
×
2478
}
×
2479

2480

2481
void DeadlineTimer::cancel() noexcept
2482
{
24,530✔
2483
    if (m_wait_oper && m_wait_oper->in_use() && !m_wait_oper->is_canceled()) {
24,530✔
2484
        m_wait_oper->cancel();
20,356✔
2485
        if (!m_wait_oper->is_complete()) {
20,356✔
2486
            using WaitOperBase = Service::WaitOperBase;
20,354✔
2487
            WaitOperBase& wait_operation = static_cast<WaitOperBase&>(*m_wait_oper);
20,354✔
2488
            m_service_impl.cancel_incomplete_wait_oper(wait_operation);
20,354✔
2489
        }
20,354✔
2490
    }
20,356✔
2491
}
24,530✔
2492

2493

2494
void DeadlineTimer::initiate_oper(Service::LendersWaitOperPtr op)
2495
{
568,064✔
2496
    m_service_impl.add_wait_oper(std::move(op)); // Throws
568,064✔
2497
}
568,064✔
2498

2499

2500
bool ReadAheadBuffer::read(char*& begin, char* end, int delim, std::error_code& ec) noexcept
2501
{
1,988,500✔
2502
    std::size_t in_avail = m_end - m_begin;
1,988,500✔
2503
    std::size_t out_avail = end - begin;
1,988,500✔
2504
    std::size_t n = std::min(in_avail, out_avail);
1,988,500✔
2505
    // If n is 0, return whether or not the read expects 0 bytes for the completed response
1,001,766✔
2506
    if (n == 0)
1,988,500✔
2507
        return out_avail == 0;
496,174✔
2508

754,500✔
2509
    bool delim_mode = (delim != std::char_traits<char>::eof());
1,492,326✔
2510
    char* i =
1,492,326✔
2511
        (!delim_mode ? m_begin + n : std::find(m_begin, m_begin + n, std::char_traits<char>::to_char_type(delim)));
1,470,332✔
2512
    begin = std::copy(m_begin, i, begin);
1,492,326✔
2513
    m_begin = i;
1,492,326✔
2514
    if (begin == end) {
1,492,326✔
2515
        if (delim_mode)
821,154✔
2516
            ec = MiscExtErrors::delim_not_found;
×
2517
    }
821,154✔
2518
    else {
671,172✔
2519
        if (m_begin == m_end)
671,172✔
2520
            return false;
627,048✔
2521
        REALM_ASSERT(delim_mode);
44,124✔
2522
        *begin++ = *m_begin++; // Transfer delimiter
44,124✔
2523
    }
44,124✔
2524
    return true;
1,177,198✔
2525
}
1,492,326✔
2526

2527

2528
namespace realm::sync::network {
2529

2530
std::string host_name()
2531
{
4✔
2532
    // POSIX allows for gethostname() to report success even if the buffer is
2✔
2533
    // too small to hold the name, and in that case POSIX requires that the
2✔
2534
    // buffer is filled, but not that it contains a final null-termination.
2✔
2535
    char small_stack_buffer[256];
4✔
2536
    int ret = ::gethostname(small_stack_buffer, sizeof small_stack_buffer);
4✔
2537
    if (ret != -1) {
4✔
2538
        // Check that a null-termination was included
2✔
2539
        char* end = small_stack_buffer + sizeof small_stack_buffer;
4✔
2540
        char* i = std::find(small_stack_buffer, end, 0);
4✔
2541
        if (i != end)
4✔
2542
            return std::string(small_stack_buffer, i);
4✔
2543
    }
×
2544
    constexpr std::size_t large_heap_buffer_size = 4096;
×
2545
    std::unique_ptr<char[]> large_heap_buffer(new char[large_heap_buffer_size]); // Throws
×
2546
    ret = ::gethostname(large_heap_buffer.get(), large_heap_buffer_size);
×
2547
    if (REALM_LIKELY(ret != -1)) {
×
2548
        // Check that a null-termination was included
2549
        char* end = large_heap_buffer.get() + large_heap_buffer_size;
×
2550
        char* i = std::find(large_heap_buffer.get(), end, 0);
×
2551
        if (i != end)
×
2552
            return std::string(large_heap_buffer.get(), i);
×
2553
    }
×
2554
    throw std::system_error(errno, std::system_category(), "gethostname() failed");
×
2555
}
×
2556

2557

2558
Address make_address(const char* c_str, std::error_code& ec) noexcept
2559
{
×
2560
    Address addr;
×
2561
    int ret = ::inet_pton(AF_INET6, c_str, &addr.m_union);
×
2562
    REALM_ASSERT(ret == 0 || ret == 1);
×
2563
    if (ret == 1) {
×
2564
        addr.m_is_ip_v6 = true;
×
2565
        ec = std::error_code(); // Success (IPv6)
×
2566
        return addr;
×
2567
    }
×
2568
    ret = ::inet_pton(AF_INET, c_str, &addr.m_union);
×
2569
    REALM_ASSERT(ret == 0 || ret == 1);
×
2570
    if (ret == 1) {
×
2571
        ec = std::error_code(); // Success (IPv4)
×
2572
        return addr;
×
2573
    }
×
2574
    ec = error::invalid_argument;
×
2575
    return Address();
×
2576

2577
    // FIXME: Currently. `addr.m_ip_v6_scope_id` is always set to zero. It nees
2578
    // to be set based on a combined inspection of the original string
2579
    // representation, and the parsed address. The following code is "borrowed"
2580
    // from ASIO:
2581
    /*
2582
        *scope_id = 0;
2583
        if (const char* if_name = strchr(src, '%'))
2584
        {
2585
          in6_addr_type* ipv6_address = static_cast<in6_addr_type*>(dest);
2586
          bool is_link_local = ((ipv6_address->s6_addr[0] == 0xfe)
2587
              && ((ipv6_address->s6_addr[1] & 0xc0) == 0x80));
2588
          bool is_multicast_link_local = ((ipv6_address->s6_addr[0] == 0xff)
2589
              && ((ipv6_address->s6_addr[1] & 0x0f) == 0x02));
2590
          if (is_link_local || is_multicast_link_local)
2591
            *scope_id = if_nametoindex(if_name + 1);
2592
          if (*scope_id == 0)
2593
            *scope_id = atoi(if_name + 1);
2594
        }
2595
    */
2596
}
×
2597

2598
class ResolveErrorCategory : public std::error_category {
2599
public:
2600
    const char* name() const noexcept final
2601
    {
×
2602
        return "realm.sync.network.resolve";
×
2603
    }
×
2604

2605
    std::string message(int value) const final
2606
    {
8✔
2607
        switch (ResolveErrors(value)) {
8✔
2608
            case ResolveErrors::host_not_found:
4✔
2609
                return "Host not found (authoritative)";
4✔
2610
            case ResolveErrors::host_not_found_try_again:
4✔
2611
                return "Host not found (non-authoritative)";
4✔
2612
            case ResolveErrors::no_data:
✔
2613
                return "The query is valid but does not have associated address data";
×
2614
            case ResolveErrors::no_recovery:
✔
2615
                return "A non-recoverable error occurred";
×
2616
            case ResolveErrors::service_not_found:
✔
2617
                return "The service is not supported for the given socket type";
×
2618
            case ResolveErrors::socket_type_not_supported:
✔
2619
                return "The socket type is not supported";
×
2620
        }
×
2621
        REALM_ASSERT(false);
×
2622
        return {};
×
2623
    }
×
2624
};
2625

2626
const std::error_category& resolve_error_category() noexcept
2627
{
6✔
2628
    static const ResolveErrorCategory resolve_error_category;
6✔
2629
    return resolve_error_category;
6✔
2630
}
6✔
2631

2632
std::error_code make_error_code(ResolveErrors err)
2633
{
6✔
2634
    return std::error_code(int(err), resolve_error_category());
6✔
2635
}
6✔
2636

2637
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc