• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_478

02 Aug 2024 05:19PM UTC coverage: 91.089% (-0.01%) from 91.1%
thomas.goyne_478

Pull #7944

Evergreen

tgoyne
Only track pending client resets done by the same core version

If the previous attempt at performing a client reset was done with a different
core version then we should retry the client reset as the new version may have
fixed a bug that made the previous attempt fail (or may be a downgrade to a
version before when the bug was introduced). This also simplifies the tracking
as it means that we don't need to be able to read trackers created by different
versions.

This also means that we can freely change the schema of the table, which this
takes advantage of to drop the unused primary key and make the error required,
as we never actually stored null and the code reading it would have crashed if
it encountered a null error.
Pull Request #7944: Only track pending client resets done by the same core version

102704 of 181534 branches covered (56.58%)

138 of 153 new or added lines in 10 files covered. (90.2%)

85 existing lines in 16 files now uncovered.

216717 of 237917 relevant lines covered (91.09%)

5947762.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.95
/src/realm/sync/network/network.cpp
1

2
#define _WINSOCK_DEPRECATED_NO_WARNINGS
3

4
#include <algorithm>
5
#include <cerrno>
6
#include <condition_variable>
7
#include <limits>
8
#include <mutex>
9
#include <stdexcept>
10
#include <thread>
11
#include <vector>
12

13
#include <fcntl.h>
14

15
#ifndef _WIN32
16
#include <netinet/tcp.h>
17
#include <unistd.h>
18
#include <poll.h>
19
#include <realm/util/to_string.hpp>
20
#endif
21

22
#include <realm/util/features.h>
23
#include <realm/util/optional.hpp>
24
#include <realm/util/misc_errors.hpp>
25
#include <realm/util/priority_queue.hpp>
26
#include <realm/sync/network/network.hpp>
27

28
#if defined _GNU_SOURCE && !REALM_ANDROID
29
#define HAVE_LINUX_PIPE2 1
30
#else
31
#define HAVE_LINUX_PIPE2 0
32
#endif
33

34
// Note: Linux specific accept4() is not available on Android.
35
#if defined _GNU_SOURCE && defined SOCK_NONBLOCK && defined SOCK_CLOEXEC && !REALM_ANDROID
36
#define HAVE_LINUX_ACCEPT4 1
37
#else
38
#define HAVE_LINUX_ACCEPT4 0
39
#endif
40

41
#if defined _GNU_SOURCE && defined SOCK_CLOEXEC
42
#define HAVE_LINUX_SOCK_CLOEXEC 1
43
#else
44
#define HAVE_LINUX_SOCK_CLOEXEC 0
45
#endif
46

47
#ifndef _WIN32
48

49
#if REALM_NETWORK_USE_EPOLL
50
#include <linux/version.h>
51
#include <sys/epoll.h>
52
#elif REALM_HAVE_KQUEUE
53
#include <sys/types.h>
54
#include <sys/event.h>
55
#include <sys/time.h>
56
#else
57
#include <poll.h>
58
#endif
59

60
#endif
61

62
// On Linux kernels earlier than 2.6.37, epoll can't handle timeout values
63
// bigger than (LONG_MAX - 999ULL)/HZ.  HZ in the wild can be as big as 1000,
64
// and LONG_MAX can be as small as (2**31)-1, so the largest number of
65
// milliseconds we can be sure to support on those early kernels is 2147482.
66
#if REALM_NETWORK_USE_EPOLL
67
#if LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 37)
68
#define EPOLL_LARGE_TIMEOUT_BUG 1
69
#endif
70
#endif
71

72
using namespace realm::util;
73
using namespace realm::sync::network;
74

75

76
namespace {
77

78
using native_handle_type = SocketBase::native_handle_type;
79

80
#ifdef _WIN32
81

82
// This Winsock initialization call is required prior to any other Winsock API call
83
// made by the process. It is OK if a process calls it multiple times.
84
struct ProcessInitialization {
85
    ProcessInitialization()
86
    {
87
        WSADATA wsaData;
88
        int i = WSAStartup(MAKEWORD(2, 2), &wsaData);
89
        if (i != 0) {
90
            throw std::system_error(i, std::system_category(), "WSAStartup() Winsock initialization failed");
91
        }
92
    }
93

94
    ~ProcessInitialization()
95
    {
96
        // Must be called 1 time for each call to WSAStartup() that has taken place
97
        WSACleanup();
98
    }
99
};
100

101
ProcessInitialization g_process_initialization;
102

103
std::error_code make_winsock_error_code(int error_code)
104
{
105
    switch (error_code) {
106
        case WSAEAFNOSUPPORT:
107
            return make_basic_system_error_code(EAFNOSUPPORT);
108
        case WSAEINVAL:
109
            return make_basic_system_error_code(EINVAL);
110
        case WSAECANCELLED:
111
            return make_basic_system_error_code(ECANCELED);
112
        case WSAECONNABORTED:
113
            return make_basic_system_error_code(ECONNABORTED);
114
        case WSAECONNRESET:
115
            return make_basic_system_error_code(ECONNRESET);
116
        case WSAEWOULDBLOCK:
117
            return make_basic_system_error_code(EAGAIN);
118
    }
119

120
    // Microsoft's STL can map win32 (and winsock!) error codes to known (posix-compatible) errc ones.
121
    auto ec = std::system_category().default_error_condition(error_code);
122
    if (ec.category() == std::generic_category())
123
        return make_basic_system_error_code(ec.value());
124
    return std::error_code(ec.value(), ec.category());
125
}
126

127
#endif // defined _WIN32
128

129
inline bool check_socket_error(int ret, std::error_code& ec)
130
{
16,528✔
131
#ifdef _WIN32
132
    if (REALM_UNLIKELY(ret == SOCKET_ERROR)) {
133
        ec = make_winsock_error_code(WSAGetLastError());
134
        return true;
135
    }
136
#else
137
    if (REALM_UNLIKELY(ret == -1)) {
16,528✔
138
        ec = make_basic_system_error_code(errno);
4✔
139
        return true;
4✔
140
    }
4✔
141
#endif
16,524✔
142
    return false;
16,524✔
143
}
16,528✔
144

145
// Set file status flag O_NONBLOCK if `value` is true, otherwise clear it.
146
//
147
// Note that these flags are set at the file description level, and are therfore
148
// shared between duplicated descriptors (dup()).
149
//
150
// `ec` untouched on success.
151
std::error_code set_nonblock_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
152
{
5,514✔
153
#ifdef _WIN32
154
    u_long flags = value ? 1 : 0;
155
    int r = ioctlsocket(fd, FIONBIO, &flags);
156
    if (r == SOCKET_ERROR) {
157
        ec = make_winsock_error_code(WSAGetLastError());
158
        return ec;
159
    }
160
#else
161
    int flags = ::fcntl(fd, F_GETFL, 0);
5,514✔
162
    if (REALM_UNLIKELY(flags == -1)) {
5,514✔
163
        ec = make_basic_system_error_code(errno);
×
164
        return ec;
×
165
    }
×
166
    flags &= ~O_NONBLOCK;
5,514✔
167
    flags |= (value ? O_NONBLOCK : 0);
5,514✔
168
    int ret = ::fcntl(fd, F_SETFL, flags);
5,514✔
169
    if (REALM_UNLIKELY(ret == -1)) {
5,514✔
170
        ec = make_basic_system_error_code(errno);
×
171
        return ec;
×
172
    }
×
173
#endif
5,514✔
174

175
    return std::error_code(); // Success
5,514✔
176
}
5,514✔
177

178
// Set file status flag O_NONBLOCK. See set_nonblock_flag(int, bool,
179
// std::error_code&) for details. Throws std::system_error on failure.
180
void set_nonblock_flag(native_handle_type fd, bool value = true)
181
{
5,510✔
182
    std::error_code ec;
5,510✔
183
    if (set_nonblock_flag(fd, value, ec))
5,510✔
184
        throw std::system_error(ec);
×
185
}
5,510✔
186

187
// Set file descriptor flag FD_CLOEXEC if `value` is true, otherwise clear it.
188
//
189
// Note that this method of setting FD_CLOEXEC is subject to a race condition if
190
// another thread calls any of the exec functions concurrently. For that reason,
191
// this function should only be used when there is no better alternative. For
192
// example, Linux generally offers ways to set this flag atomically with the
193
// creation of a new file descriptor.
194
//
195
// `ec` untouched on success.
196
std::error_code set_cloexec_flag(native_handle_type fd, bool value, std::error_code& ec) noexcept
197
{
15,894✔
198
#ifndef _WIN32
15,894✔
199
    int flags = ::fcntl(fd, F_GETFD, 0);
15,894✔
200
    if (REALM_UNLIKELY(flags == -1)) {
15,894✔
201
        ec = make_basic_system_error_code(errno);
×
202
        return ec;
×
203
    }
×
204
    flags &= ~FD_CLOEXEC;
15,894✔
205
    flags |= (value ? FD_CLOEXEC : 0);
15,894✔
206
    int ret = ::fcntl(fd, F_SETFD, flags);
15,894✔
207
    if (REALM_UNLIKELY(ret == -1)) {
15,894✔
208
        ec = make_basic_system_error_code(errno);
×
209
        return ec;
×
210
    }
×
211
#endif
15,894✔
212
    return std::error_code(); // Success
15,894✔
213
}
15,894✔
214

215
// Set file descriptor flag FD_CLOEXEC. See set_cloexec_flag(int, bool,
216
// std::error_code&) for details. Throws std::system_error on failure.
217
REALM_UNUSED inline void set_cloexec_flag(native_handle_type fd, bool value = true)
218
{
11,758✔
219
    std::error_code ec;
11,758✔
220
    if (set_cloexec_flag(fd, value, ec))
11,758✔
221
        throw std::system_error(ec);
×
222
}
11,758✔
223

224

225
inline void checked_close(native_handle_type fd) noexcept
226
{
37,208✔
227
#ifdef _WIN32
228
    int status = closesocket(fd);
229
    if (status == -1) {
230
        BOOL b = CloseHandle((HANDLE)fd);
231
        REALM_ASSERT(b || GetLastError() != ERROR_INVALID_HANDLE);
232
    }
233
#else
234
    int ret = ::close(fd);
37,208✔
235
    // We can accept various errors from close(), but they must be ignored as
236
    // the file descriptor is closed in any case (not necessarily according to
237
    // POSIX, but we shall assume it anyway). `EBADF`, however, would indicate
238
    // an implementation bug, so we don't want to ignore that.
239
    REALM_ASSERT(ret != -1 || errno != EBADF);
37,208!
240
#endif
37,208✔
241
}
37,208✔
242

243

244
class CloseGuard {
245
public:
246
    CloseGuard() noexcept {}
27,008✔
247
    explicit CloseGuard(native_handle_type fd) noexcept
248
        : m_fd{fd}
2,552✔
249
    {
11,310✔
250
        REALM_ASSERT(fd != -1);
11,310✔
251
    }
11,310✔
252
    CloseGuard(CloseGuard&& cg) noexcept
253
        : m_fd{cg.release()}
254
    {
×
255
    }
×
256
    ~CloseGuard() noexcept
257
    {
38,318✔
258
        if (m_fd != -1)
38,318✔
259
            checked_close(m_fd);
28,920✔
260
    }
38,318✔
261
    void reset(native_handle_type fd) noexcept
262
    {
25,894✔
263
        REALM_ASSERT(fd != -1);
25,894✔
264
        if (m_fd != -1)
25,894✔
265
            checked_close(m_fd);
×
266
        m_fd = fd;
25,894✔
267
    }
25,894✔
268
    operator native_handle_type() const noexcept
269
    {
1,891,682✔
270
        return m_fd;
1,891,682✔
271
    }
1,891,682✔
272
    native_handle_type release() noexcept
273
    {
8,284✔
274
        native_handle_type fd = m_fd;
8,284✔
275
        m_fd = -1;
8,284✔
276
        return fd;
8,284✔
277
    }
8,284✔
278

279
private:
280
    native_handle_type m_fd = -1;
281
};
282

283

284
#ifndef _WIN32
285

286
class WakeupPipe {
287
public:
288
    WakeupPipe()
289
    {
11,520✔
290
        int fildes[2];
11,520✔
291
#if HAVE_LINUX_PIPE2
5,640✔
292
        int flags = O_CLOEXEC;
5,640✔
293
        int ret = ::pipe2(fildes, flags);
5,640✔
294
#else
295
        int ret = ::pipe(fildes);
5,880✔
296
#endif
5,880✔
297
        if (REALM_UNLIKELY(ret == -1)) {
11,520✔
298
            std::error_code ec = make_basic_system_error_code(errno);
×
299
            throw std::system_error(ec);
×
300
        }
×
301
        m_read_fd.reset(fildes[0]);
11,520✔
302
        m_write_fd.reset(fildes[1]);
11,520✔
303
#if !HAVE_LINUX_PIPE2
5,880✔
304
        set_cloexec_flag(m_read_fd);  // Throws
5,880✔
305
        set_cloexec_flag(m_write_fd); // Throws
5,880✔
306
#endif
5,880✔
307
    }
11,520✔
308

309
    // Thread-safe.
310
    int wait_fd() const noexcept
311
    {
168,382✔
312
        return m_read_fd;
168,382✔
313
    }
168,382✔
314

315
    // Cause the wait descriptor (wait_fd()) to become readable within a short
316
    // amount of time.
317
    //
318
    // Thread-safe.
319
    void signal() noexcept
320
    {
1,211,432✔
321
        std::lock_guard lock{m_mutex};
1,211,432✔
322
        if (!m_signaled) {
1,211,432✔
323
            char c = 0;
275,464✔
324
            ssize_t ret = ::write(m_write_fd, &c, 1);
275,464✔
325
            REALM_ASSERT_RELEASE(ret == 1);
275,464✔
326
            m_signaled = true;
275,464✔
327
        }
275,464✔
328
    }
1,211,432✔
329

330
    // Must be called after the wait descriptor (wait_fd()) becomes readable.
331
    //
332
    // Thread-safe.
333
    void acknowledge_signal() noexcept
334
    {
274,928✔
335
        std::lock_guard lock{m_mutex};
274,928✔
336
        if (m_signaled) {
274,936✔
337
            char c;
274,936✔
338
            ssize_t ret = ::read(m_read_fd, &c, 1);
274,936✔
339
            REALM_ASSERT_RELEASE(ret == 1);
274,936✔
340
            m_signaled = false;
274,936✔
341
        }
274,936✔
342
    }
274,928✔
343

344
private:
345
    CloseGuard m_read_fd, m_write_fd;
346
    std::mutex m_mutex;
347
    bool m_signaled = false; // Protected by `m_mutex`.
348
};
349

350
#else // defined _WIN32
351

352
class WakeupPipe {
353
public:
354
    SOCKET wait_fd() const noexcept
355
    {
356
        return INVALID_SOCKET;
357
    }
358

359
    void signal() noexcept
360
    {
361
        m_signal_count++;
362
    }
363

364
    bool is_signaled() const noexcept
365
    {
366
        return m_signal_count > 0;
367
    }
368

369
    void acknowledge_signal() noexcept
370
    {
371
        m_signal_count--;
372
    }
373

374
private:
375
    std::atomic<uint32_t> m_signal_count = 0;
376
};
377

378
#endif // defined _WIN32
379

380

381
std::error_code translate_addrinfo_error(int err) noexcept
382
{
10✔
383
    switch (err) {
10✔
384
        case EAI_AGAIN:
2✔
385
            return ResolveErrors::host_not_found_try_again;
2✔
386
        case EAI_BADFLAGS:
✔
387
            return error::invalid_argument;
×
388
        case EAI_FAIL:
✔
389
            return ResolveErrors::no_recovery;
×
390
        case EAI_FAMILY:
✔
391
            return error::address_family_not_supported;
×
392
        case EAI_MEMORY:
✔
393
            return error::no_memory;
×
394
        case EAI_NONAME:
8✔
395
#if defined(EAI_ADDRFAMILY)
8✔
396
        case EAI_ADDRFAMILY:
8✔
397
#endif
8✔
398
#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME)
8✔
399
        case EAI_NODATA:
8✔
400
#endif
8✔
401
            return ResolveErrors::host_not_found;
8✔
402
        case EAI_SERVICE:
✔
403
            return ResolveErrors::service_not_found;
×
404
        case EAI_SOCKTYPE:
✔
405
            return ResolveErrors::socket_type_not_supported;
×
406
        default:
✔
407
            return error::unknown;
×
408
    }
10✔
409
}
10✔
410

411

412
struct GetaddrinfoResultOwner {
413
    struct addrinfo* ptr;
414
    GetaddrinfoResultOwner(struct addrinfo* p)
415
        : ptr{p}
2,294✔
416
    {
4,906✔
417
    }
4,906✔
418
    ~GetaddrinfoResultOwner() noexcept
419
    {
4,906✔
420
        if (ptr)
4,906✔
421
            freeaddrinfo(ptr);
4,906✔
422
    }
4,906✔
423
};
424

425
} // unnamed namespace
426

427

428
class Service::IoReactor {
429
public:
430
    IoReactor();
431
    ~IoReactor() noexcept;
432

433
    // Add an initiated I/O operation that did not complete immediately.
434
    void add_oper(Descriptor&, LendersIoOperPtr, Want);
435
    void remove_canceled_ops(Descriptor&, OperQueue<AsyncOper>& completed_ops) noexcept;
436

437
    bool wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
438
                          OperQueue<AsyncOper>& completed_ops);
439

440
    // The reactor is considered empty when no operations are currently managed
441
    // by it. An operation is managed by a reactor if it was added through
442
    // add_oper() and not yet passed out through `completed_ops` of
443
    // wait_and_advance().
444
    bool empty() const noexcept;
445

446
    // Cause wait_and_advance() to return within a short amount of time.
447
    //
448
    // Thread-safe.
449
    void interrupt() noexcept;
450

451
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
452
    void register_desc(Descriptor&);
453
    void deregister_desc(Descriptor&) noexcept;
454
#endif
455

456
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
457
    clock::duration get_and_reset_sleep_time() noexcept;
458
#endif
459

460
private:
461
#if REALM_NETWORK_USE_EPOLL
462

463
    static constexpr int s_epoll_event_buffer_size = 256;
464
    const std::unique_ptr<epoll_event[]> m_epoll_event_buffer;
465
    const CloseGuard m_epoll_fd;
466

467
    static std::unique_ptr<epoll_event[]> make_epoll_event_buffer();
468
    static CloseGuard make_epoll_fd();
469

470
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
471

472
    static constexpr int s_kevent_buffer_size = 256;
473
    const std::unique_ptr<struct kevent[]> m_kevent_buffer;
474
    const CloseGuard m_kqueue_fd;
475

476
    static std::unique_ptr<struct kevent[]> make_kevent_buffer();
477
    static CloseGuard make_kqueue_fd();
478

479
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
480

481
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
482

483
    OperQueue<IoOper> m_active_ops;
484

485
    // If there are already active operations, just activate as many additional
486
    // operations as can be done without blocking. Otherwise, block until at
487
    // least one operation can be activated or the timeout is reached. Then, if
488
    // the timeout was not reached, activate as many additional operations as
489
    // can be done without any further blocking.
490
    //
491
    // May occasionally return with no active operations and before the timeout
492
    // has been reached, but this can be assumed to happen rarely enough that it
493
    // will never amount to a performance problem.
494
    //
495
    // Argument `now` is unused if `timeout.time_since_epoch() <= 0`.
496
    //
497
    // Returns true if, and only if a wakeup pipe signal was
498
    // received. Operations may already have been activated in this case.
499
    bool wait_and_activate(clock::time_point timeout, clock::time_point now);
500

501
    void advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept;
502

503
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
504

505
    struct OperSlot {
506
        std::size_t pollfd_slot_ndx = 0; // Zero when slot is unused
507
        OperQueue<IoOper> read_ops, write_ops;
508
    };
509

510
    std::vector<OperSlot> m_operations; // Indexed by file descriptor
511

512
    // First entry in `m_pollfd_slots` is always the read end of the wakeup
513
    // pipe. There is then an additional entry for each entry in `m_operations`
514
    // where `pollfd_slot_ndx` is nonzero. All entries always have `pollfd::fd`
515
    // >= 0.
516
    //
517
    // INVARIANT: m_pollfd_slots.size() == 1 + N, where N is the number of
518
    // entries in m_operations where pollfd_slot_ndx is nonzero.
519
    std::vector<pollfd> m_pollfd_slots;
520

521
    void discard_pollfd_slot_by_move_last_over(OperSlot&) noexcept;
522

523
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
524

525
    std::size_t m_num_operations = 0;
526
    WakeupPipe m_wakeup_pipe;
527

528
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
529
    clock::duration m_sleep_time = clock::duration::zero();
530
#endif
531
};
532

533

534
inline bool Service::IoReactor::empty() const noexcept
535
{
1,692,454✔
536
    return (m_num_operations == 0);
1,692,454✔
537
}
1,692,454✔
538

539

540
inline void Service::IoReactor::interrupt() noexcept
541
{
1,210,696✔
542
    m_wakeup_pipe.signal();
1,210,696✔
543
}
1,210,696✔
544

545

546
#if REALM_NETWORK_USE_EPOLL
547

548
inline Service::IoReactor::IoReactor()
549
    : m_epoll_event_buffer{make_epoll_event_buffer()} // Throws
550
    , m_epoll_fd{make_epoll_fd()}                     // Throws
551
    , m_wakeup_pipe{}                                 // Throws
552
{
553
    epoll_event event = epoll_event(); // Clear
554
    event.events = EPOLLIN;
555
    event.data.ptr = nullptr;
556
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_wakeup_pipe.wait_fd(), &event);
557
    if (REALM_UNLIKELY(ret == -1)) {
558
        std::error_code ec = make_basic_system_error_code(errno);
559
        throw std::system_error(ec);
560
    }
561
}
562

563

564
inline Service::IoReactor::~IoReactor() noexcept {}
565

566

567
inline void Service::IoReactor::register_desc(Descriptor& desc)
568
{
569
    epoll_event event = epoll_event();                        // Clear
570
    event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; // Enable edge triggering
571
    event.data.ptr = &desc;
572
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, desc.m_fd, &event);
573
    if (REALM_UNLIKELY(ret == -1)) {
574
        std::error_code ec = make_basic_system_error_code(errno);
575
        throw std::system_error(ec);
576
    }
577
}
578

579

580
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
581
{
582
    epoll_event event = epoll_event(); // Clear
583
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, desc.m_fd, &event);
584
    REALM_ASSERT(ret != -1);
585
}
586

587

588
inline std::unique_ptr<epoll_event[]> Service::IoReactor::make_epoll_event_buffer()
589
{
590
    return std::make_unique<epoll_event[]>(s_epoll_event_buffer_size); // Throws
591
}
592

593

594
inline CloseGuard Service::IoReactor::make_epoll_fd()
595
{
596
    int flags = 0;
597
    flags |= EPOLL_CLOEXEC;
598
    int ret = epoll_create1(flags);
599
    if (REALM_UNLIKELY(ret == -1)) {
600
        std::error_code ec = make_basic_system_error_code(errno);
601
        throw std::system_error(ec);
602
    }
603
    int epoll_fd = ret;
604
    return CloseGuard{epoll_fd};
605
}
606

607

608
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
609
{
610
    int max_wait_millis = 0;
611
    bool allow_blocking_wait = m_active_ops.empty();
612
    if (allow_blocking_wait) {
613
        if (timeout.time_since_epoch().count() <= 0) {
614
            max_wait_millis = -1; // Allow indefinite blocking
615
        }
616
        else if (now < timeout) {
617
            auto diff = timeout - now;
618
            int max_int_millis = std::numeric_limits<int>::max();
619
            // 17592186044415 is the largest value (45-bit signed integer)
620
            // garanteed to be supported by std::chrono::milliseconds. In the
621
            // worst case, `int` is a 16-bit integer, meaning that we can only
622
            // wait about 30 seconds at a time. In the best case
623
            // (17592186044415) we can wait more than 500 years at a time. In
624
            // the typical case (`int` has 32 bits), we can wait 24 days at a
625
            // time.
626
            long long max_chrono_millis = 17592186044415;
627
            if (max_chrono_millis < max_int_millis)
628
                max_int_millis = int(max_chrono_millis);
629
#if EPOLL_LARGE_TIMEOUT_BUG
630
            long max_safe_millis = 2147482; // Circa 35 minutes
631
            if (max_safe_millis < max_int_millis)
632
                max_int_millis = int(max_safe_millis);
633
#endif
634
            if (diff > std::chrono::milliseconds(max_int_millis)) {
635
                max_wait_millis = max_int_millis;
636
            }
637
            else {
638
                // Overflow is impossible here, due to the preceding check
639
                auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
640
                // The conversion to milliseconds will round down if the tick
641
                // period of `diff` is less than a millisecond, which it usually
642
                // is. This is a problem, because it can lead to premature
643
                // wakeups, which in turn could cause extranous iterations in
644
                // the event loop. This is especially problematic when a small
645
                // `diff` is rounded down to zero milliseconds, becuase that can
646
                // easily produce a "busy wait" condition for up to a
647
                // millisecond every time this happens. Obviously, the solution
648
                // is to round up, instead of down.
649
                if (diff_millis < diff) {
650
                    // Note that the following increment cannot overflow,
651
                    // because diff_millis < diff <= max_int_millis <=
652
                    // std::numeric_limits<int>::max().
653
                    ++diff_millis;
654
                }
655
                max_wait_millis = int(diff_millis.count());
656
            }
657
        }
658
    }
659
    for (int i = 0; i < 2; ++i) {
660
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
661
        clock::time_point sleep_start_time = clock::now();
662
#endif
663
        int ret = epoll_wait(m_epoll_fd, m_epoll_event_buffer.get(), s_epoll_event_buffer_size, max_wait_millis);
664
        if (REALM_UNLIKELY(ret == -1)) {
665
            int err = errno;
666
            if (err == EINTR)
667
                return false; // Infrequent premature return is ok
668
            std::error_code ec = make_basic_system_error_code(err);
669
            throw std::system_error(ec);
670
        }
671
        REALM_ASSERT(ret >= 0);
672
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
673
        m_sleep_time += clock::now() - sleep_start_time;
674
#endif
675
        int n = ret;
676
        bool got_wakeup_pipe_signal = false;
677
        for (int j = 0; j < n; ++j) {
678
            const epoll_event& event = m_epoll_event_buffer[j];
679
            bool is_wakeup_pipe_signal = !event.data.ptr;
680
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
681
                m_wakeup_pipe.acknowledge_signal();
682
                got_wakeup_pipe_signal = true;
683
                continue;
684
            }
685
            Descriptor& desc = *static_cast<Descriptor*>(event.data.ptr);
686
            if ((event.events & (EPOLLIN | EPOLLHUP | EPOLLERR)) != 0) {
687
                if (!desc.m_read_ready) {
688
                    desc.m_read_ready = true;
689
                    m_active_ops.push_back(desc.m_suspended_read_ops);
690
                }
691
            }
692
            if ((event.events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) != 0) {
693
                if (!desc.m_write_ready) {
694
                    desc.m_write_ready = true;
695
                    m_active_ops.push_back(desc.m_suspended_write_ops);
696
                }
697
            }
698
            if ((event.events & EPOLLRDHUP) != 0)
699
                desc.m_imminent_end_of_input = true;
700
        }
701
        if (got_wakeup_pipe_signal)
702
            return true;
703
        if (n < s_epoll_event_buffer_size)
704
            break;
705
        max_wait_millis = 0;
706
    }
707
    return false;
708
}
709

710

711
#elif REALM_HAVE_KQUEUE // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
712

713

714
inline Service::IoReactor::IoReactor()
715
    : m_kevent_buffer{make_kevent_buffer()} // Throws
716
    , m_kqueue_fd{make_kqueue_fd()}         // Throws
717
    , m_wakeup_pipe{}                       // Throws
718
{
5,880✔
719
    struct kevent event;
5,880✔
720
    EV_SET(&event, m_wakeup_pipe.wait_fd(), EVFILT_READ, EV_ADD, 0, 0, nullptr);
5,880✔
721
    int ret = ::kevent(m_kqueue_fd, &event, 1, nullptr, 0, nullptr);
5,880✔
722
    if (REALM_UNLIKELY(ret == -1)) {
5,880✔
723
        std::error_code ec = make_basic_system_error_code(errno);
724
        throw std::system_error(ec);
725
    }
726
}
5,880✔
727

728

729
inline Service::IoReactor::~IoReactor() noexcept {}
5,880✔
730

731

732
inline void Service::IoReactor::register_desc(Descriptor& desc)
733
{
4,044✔
734
    struct kevent events[2];
4,044✔
735
    // EV_CLEAR enables edge-triggered behavior
736
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &desc);
4,044✔
737
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, &desc);
4,044✔
738
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
4,044✔
739
    if (REALM_UNLIKELY(ret == -1)) {
4,044✔
740
        std::error_code ec = make_basic_system_error_code(errno);
741
        throw std::system_error(ec);
742
    }
743
}
4,044✔
744

745

746
inline void Service::IoReactor::deregister_desc(Descriptor& desc) noexcept
747
{
4,044✔
748
    struct kevent events[2];
4,044✔
749
    EV_SET(&events[0], desc.m_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
4,044✔
750
    EV_SET(&events[1], desc.m_fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
4,044✔
751
    int ret = ::kevent(m_kqueue_fd, events, 2, nullptr, 0, nullptr);
4,044✔
752
    REALM_ASSERT(ret != -1);
4,044✔
753
}
4,044✔
754

755

756
inline std::unique_ptr<struct kevent[]> Service::IoReactor::make_kevent_buffer()
757
{
5,880✔
758
    return std::make_unique<struct kevent[]>(s_kevent_buffer_size); // Throws
5,880✔
759
}
5,880✔
760

761

762
inline CloseGuard Service::IoReactor::make_kqueue_fd()
763
{
5,880✔
764
    int ret = ::kqueue();
5,880✔
765
    if (REALM_UNLIKELY(ret == -1)) {
5,880✔
766
        std::error_code ec = make_basic_system_error_code(errno);
767
        throw std::system_error(ec);
768
    }
769
    int epoll_fd = ret;
5,880✔
770
    return CloseGuard{epoll_fd};
5,880✔
771
}
5,880✔
772

773

774
bool Service::IoReactor::wait_and_activate(clock::time_point timeout, clock::time_point now)
775
{
1,139,240✔
776
    timespec max_wait_time{}; // Clear to zero
1,139,240✔
777
    bool allow_blocking_wait = m_active_ops.empty();
1,139,240✔
778
    if (allow_blocking_wait) {
1,139,240✔
779
        // Note that ::kevent() will silently clamp `max_wait_time` to 24 hours
780
        // (86400 seconds), but that is ok, because the caller is prepared for
781
        // premature return as long as it happens infrequently enough to not
782
        // pose a performance problem.
783
        constexpr std::time_t max_wait_seconds = 86400;
168,206✔
784
        if (timeout.time_since_epoch().count() <= 0) {
168,206✔
785
            max_wait_time.tv_sec = max_wait_seconds;
20,958✔
786
        }
20,958✔
787
        else if (now < timeout) {
147,258✔
788
            auto diff = timeout - now;
147,258✔
789
            auto secs = std::chrono::duration_cast<std::chrono::seconds>(diff);
147,258✔
790
            auto nsecs = std::chrono::duration_cast<std::chrono::nanoseconds>(diff - secs);
147,258✔
791
            auto secs_2 = std::min(secs.count(), std::chrono::seconds::rep(max_wait_seconds));
147,258✔
792
            max_wait_time.tv_sec = std::time_t(secs_2);
147,258✔
793
            max_wait_time.tv_nsec = long(nsecs.count());
147,258✔
794
        }
147,258✔
795
    }
168,206✔
796
    for (int i = 0; i < 4; ++i) {
2,147,483,647✔
797
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
798
        clock::time_point sleep_start_time = clock::now();
799
#endif
800
        int ret = ::kevent(m_kqueue_fd, nullptr, 0, m_kevent_buffer.get(), s_kevent_buffer_size, &max_wait_time);
1,139,478✔
801
        if (REALM_UNLIKELY(ret == -1)) {
1,139,478✔
802
            int err = errno;
803
            if (err == EINTR)
×
804
                return false; // Infrequent premature return is ok
805
            std::error_code ec = make_basic_system_error_code(err);
806
            throw std::system_error(ec);
807
        }
808
        REALM_ASSERT(ret >= 0);
1,139,478✔
809
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
810
        m_sleep_time += clock::now() - sleep_start_time;
811
#endif
812
        int n = ret;
1,139,478✔
813
        bool got_wakeup_pipe_signal = false;
1,139,478✔
814
        for (int j = 0; j < n; ++j) {
1,733,996✔
815
            const struct kevent& event = m_kevent_buffer[j];
594,518✔
816
            bool is_wakeup_pipe_signal = !event.udata;
594,518✔
817
            if (REALM_UNLIKELY(is_wakeup_pipe_signal)) {
594,518✔
818
                REALM_ASSERT(m_wakeup_pipe.wait_fd() == int(event.ident));
156,862✔
819
                m_wakeup_pipe.acknowledge_signal();
156,862✔
820
                got_wakeup_pipe_signal = true;
156,862✔
821
                continue;
156,862✔
822
            }
156,862✔
823
            Descriptor& desc = *static_cast<Descriptor*>(event.udata);
437,656✔
824
            REALM_ASSERT(desc.m_fd == int(event.ident));
437,656✔
825
            if (event.filter == EVFILT_READ) {
437,656✔
826
                if (!desc.m_read_ready) {
222,580✔
827
                    desc.m_read_ready = true;
60,720✔
828
                    m_active_ops.push_back(desc.m_suspended_read_ops);
60,720✔
829
                }
60,720✔
830
                if ((event.flags & EV_EOF) != 0)
222,580✔
831
                    desc.m_imminent_end_of_input = true;
764✔
832
            }
222,580✔
833
            if (event.filter == EVFILT_WRITE) {
437,656✔
834
                if (!desc.m_write_ready) {
215,056✔
835
                    desc.m_write_ready = true;
8,038✔
836
                    m_active_ops.push_back(desc.m_suspended_write_ops);
8,038✔
837
                }
8,038✔
838
            }
215,056✔
839
        }
437,656✔
840
        if (got_wakeup_pipe_signal)
1,139,478✔
841
            return true;
156,862✔
842
        if (n < s_kevent_buffer_size)
982,616✔
843
            break;
982,712✔
844
        // Clear to zero to disable blocking for any additional opportunistic
845
        // event extractions.
846
        max_wait_time = timespec{};
2,147,483,647✔
847
    }
2,147,483,647✔
848
    return false;
982,378✔
849
}
1,139,240✔
850

851
#endif // !REALM_NETWORK_USE_EPOLL && REALM_HAVE_KQUEUE
852

853

854
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
855

856
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
857
{
1,119,204✔
858
    if (REALM_UNLIKELY(!desc.m_is_registered)) {
1,119,204✔
859
        register_desc(desc); // Throws
4,044✔
860
        desc.m_is_registered = true;
4,044✔
861
    }
4,044✔
862

863
    switch (want) {
1,119,204✔
864
        case Want::read:
618,628✔
865
            if (REALM_UNLIKELY(desc.m_read_ready))
618,628✔
866
                goto active;
610,790✔
867
            desc.m_suspended_read_ops.push_back(std::move(op));
7,838✔
868
            goto proceed;
7,838✔
869
        case Want::write:
501,150✔
870
            if (REALM_UNLIKELY(desc.m_write_ready))
501,150✔
871
                goto active;
496,054✔
872
            desc.m_suspended_write_ops.push_back(std::move(op));
5,096✔
873
            goto proceed;
5,096✔
874
        case Want::nothing:
2✔
875
            break;
876
    }
1,119,204✔
877
    REALM_ASSERT(false);
878

879
active:
1,106,272✔
880
    m_active_ops.push_back(std::move(op));
1,106,272✔
881

882
proceed:
1,118,656✔
883
    ++m_num_operations;
1,118,656✔
884
}
1,118,656✔
885

886

887
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
888
{
240,430✔
889
    // Note: Canceled operations that are currently active (in m_active_ops)
890
    // will be removed later by advance_active_ops().
891

892
    while (LendersIoOperPtr op = desc.m_suspended_read_ops.pop_front()) {
246,314✔
893
        completed_ops.push_back(std::move(op));
5,884✔
894
        --m_num_operations;
5,884✔
895
    }
5,884✔
896
    while (LendersIoOperPtr op = desc.m_suspended_write_ops.pop_front()) {
243,354✔
897
        completed_ops.push_back(std::move(op));
2,924✔
898
        --m_num_operations;
2,924✔
899
    }
2,924✔
900
}
240,430✔
901

902

903
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
904
                                          OperQueue<AsyncOper>& completed_ops)
905
{
904,186✔
906
    clock::time_point now_2 = now;
904,186✔
907
    for (;;) {
1,139,248✔
908
        bool wakeup_pipe_signal = wait_and_activate(timeout, now_2); // Throws
1,139,248✔
909
        if (REALM_UNLIKELY(wakeup_pipe_signal)) {
1,139,248✔
910
            interrupted = true;
156,860✔
911
            return false;
156,860✔
912
        }
156,860✔
913
        advance_active_ops(completed_ops);
982,388✔
914
        if (!completed_ops.empty())
982,388✔
915
            return true;
741,316✔
916
        if (timeout.time_since_epoch().count() > 0) {
241,072✔
917
            now_2 = clock::now();
125,458✔
918
            bool timed_out = (now_2 >= timeout);
125,458✔
919
            if (timed_out)
125,458✔
920
                return false;
6,294✔
921
        }
125,458✔
922
    }
241,072✔
923
}
904,186✔
924

925

926
void Service::IoReactor::advance_active_ops(OperQueue<AsyncOper>& completed_ops) noexcept
927
{
982,662✔
928
    OperQueue<IoOper> new_active_ops;
982,662✔
929
    while (LendersIoOperPtr op = m_active_ops.pop_front()) {
2,325,856✔
930
        if (op->is_canceled()) {
1,342,444✔
931
            completed_ops.push_back(std::move(op));
420,996✔
932
            --m_num_operations;
420,996✔
933
            continue;
420,996✔
934
        }
420,996✔
935
        Want want = op->advance();
921,448✔
936
        switch (want) {
921,448✔
937
            case Want::nothing:
689,834✔
938
                REALM_ASSERT(op->is_complete());
689,834✔
939
                completed_ops.push_back(std::move(op));
689,834✔
940
                --m_num_operations;
689,834✔
941
                continue;
689,834✔
942
            case Want::read: {
222,428✔
943
                Descriptor& desc = op->descriptor();
222,428✔
944
                if (REALM_UNLIKELY(desc.m_read_ready))
222,428✔
945
                    goto still_active;
163,728✔
946
                desc.m_suspended_read_ops.push_back(std::move(op));
58,700✔
947
                continue;
58,700✔
948
            }
222,428✔
949
            case Want::write: {
9,936✔
950
                Descriptor& desc = op->descriptor();
9,936✔
951
                if (REALM_UNLIKELY(desc.m_write_ready))
9,936✔
952
                    goto still_active;
4,412✔
953
                desc.m_suspended_write_ops.push_back(std::move(op));
5,524✔
954
                continue;
5,524✔
955
            }
9,936✔
956
        }
921,448✔
957
        REALM_ASSERT(false);
958

959
    still_active:
168,140✔
960
        new_active_ops.push_back(std::move(op));
168,140✔
961
    }
168,140✔
962
    m_active_ops.push_back(new_active_ops);
983,412✔
963
}
983,412✔
964

965

966
#else // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
967

968

969
inline Service::IoReactor::IoReactor()
970
    : m_wakeup_pipe{} // Throws
5,640✔
971
{
5,640✔
972
    pollfd slot = pollfd(); // Cleared slot
5,640✔
973
    slot.fd = m_wakeup_pipe.wait_fd();
5,640✔
974
    slot.events = POLLRDNORM;
5,640✔
975
    m_pollfd_slots.emplace_back(slot); // Throws
5,640✔
976
}
5,640✔
977

978

979
inline Service::IoReactor::~IoReactor() noexcept
980
{
5,640✔
981
#if REALM_ASSERTIONS_ENABLED
5,640✔
982
    std::size_t n = 0;
5,640✔
983
    for (std::size_t i = 0; i < m_operations.size(); ++i) {
139,376✔
984
        OperSlot& oper_slot = m_operations[i];
133,736✔
985
        while (oper_slot.read_ops.pop_front())
133,736✔
986
            ++n;
987
        while (oper_slot.write_ops.pop_front())
133,736✔
988
            ++n;
989
    }
133,736✔
990
    REALM_ASSERT(n == m_num_operations);
5,640✔
991
#endif
5,640✔
992
}
5,640✔
993

994

995
void Service::IoReactor::add_oper(Descriptor& desc, LendersIoOperPtr op, Want want)
996
{
920,052✔
997
    native_handle_type fd = desc.m_fd;
920,052✔
998

999
    // Make sure there are enough slots in m_operations
1000
    {
920,052✔
1001
        std::size_t n = std::size_t(fd) + 1; // FIXME: Check for arithmetic overflow
920,052✔
1002
        if (m_operations.size() < n)
920,052✔
1003
            m_operations.resize(n); // Throws
2,448✔
1004
    }
920,052✔
1005

1006
    // Allocate a pollfd_slot unless we already have one
1007
    OperSlot& oper_slot = m_operations[fd];
920,052✔
1008
    if (oper_slot.pollfd_slot_ndx == 0) {
920,052✔
1009
        pollfd pollfd_slot = pollfd(); // Cleared slot
635,764✔
1010
        pollfd_slot.fd = fd;
635,764✔
1011
        std::size_t pollfd_slot_ndx = m_pollfd_slots.size();
635,764✔
1012
        REALM_ASSERT(pollfd_slot_ndx > 0);
635,764✔
1013
        m_pollfd_slots.emplace_back(pollfd_slot); // Throws
635,764✔
1014
        oper_slot.pollfd_slot_ndx = pollfd_slot_ndx;
635,764✔
1015
    }
635,764✔
1016

1017
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
920,052✔
1018
    REALM_ASSERT(pollfd_slot.fd == fd);
920,052✔
1019
    REALM_ASSERT(((pollfd_slot.events & POLLRDNORM) != 0) == !oper_slot.read_ops.empty());
920,052✔
1020
    REALM_ASSERT(((pollfd_slot.events & POLLWRNORM) != 0) == !oper_slot.write_ops.empty());
920,052✔
1021
    REALM_ASSERT((pollfd_slot.events & ~(POLLRDNORM | POLLWRNORM)) == 0);
920,052✔
1022
    switch (want) {
920,052✔
1023
        case Want::nothing:
2✔
1024
            break;
1025
        case Want::read:
517,054✔
1026
            pollfd_slot.events |= POLLRDNORM;
517,054✔
1027
            oper_slot.read_ops.push_back(std::move(op));
517,054✔
1028
            goto finish;
517,054✔
1029
        case Want::write:
404,214✔
1030
            pollfd_slot.events |= POLLWRNORM;
404,214✔
1031
            oper_slot.write_ops.push_back(std::move(op));
404,214✔
1032
            goto finish;
404,214✔
1033
    }
920,052✔
1034
    REALM_ASSERT(false);
1035
    return;
1036

1037
finish:
917,908✔
1038
    ++m_num_operations;
917,908✔
1039
}
917,908✔
1040

1041

1042
void Service::IoReactor::remove_canceled_ops(Descriptor& desc, OperQueue<AsyncOper>& completed_ops) noexcept
1043
{
152,176✔
1044
    native_handle_type fd = desc.m_fd;
152,176✔
1045
    REALM_ASSERT(fd >= 0);
152,176✔
1046
    REALM_ASSERT(std::size_t(fd) < m_operations.size());
152,176✔
1047
    OperSlot& oper_slot = m_operations[fd];
152,176✔
1048
    REALM_ASSERT(oper_slot.pollfd_slot_ndx > 0);
152,176✔
1049
    REALM_ASSERT(!oper_slot.read_ops.empty() || !oper_slot.write_ops.empty());
152,176✔
1050
    pollfd& pollfd_slot = m_pollfd_slots[oper_slot.pollfd_slot_ndx];
152,176✔
1051
    REALM_ASSERT(pollfd_slot.fd == fd);
152,176✔
1052
    while (LendersIoOperPtr op = oper_slot.read_ops.pop_front()) {
282,376✔
1053
        completed_ops.push_back(std::move(op));
130,200✔
1054
        --m_num_operations;
130,200✔
1055
    }
130,200✔
1056
    while (LendersIoOperPtr op = oper_slot.write_ops.pop_front()) {
276,720✔
1057
        completed_ops.push_back(std::move(op));
124,544✔
1058
        --m_num_operations;
124,544✔
1059
    }
124,544✔
1060
    discard_pollfd_slot_by_move_last_over(oper_slot);
152,176✔
1061
}
152,176✔
1062

1063

1064
bool Service::IoReactor::wait_and_advance(clock::time_point timeout, clock::time_point now, bool& interrupted,
1065
                                          OperQueue<AsyncOper>& completed_ops)
1066
{
787,726✔
1067
#ifdef _WIN32
1068
    using nfds_type = std::size_t;
1069
#else
1070
    using nfds_type = nfds_t;
787,726✔
1071
#endif
787,726✔
1072
    clock::time_point now_2 = now;
787,726✔
1073
    std::size_t num_ready_descriptors = 0;
787,726✔
1074
    {
787,726✔
1075
        // std::vector guarantees contiguous storage
1076
        pollfd* fds = &m_pollfd_slots.front();
787,726✔
1077
        nfds_type nfds = nfds_type(m_pollfd_slots.size());
787,726✔
1078
        for (;;) {
787,726✔
1079
            int max_wait_millis = -1; // Wait indefinitely
787,676✔
1080
            if (timeout.time_since_epoch().count() > 0) {
787,676✔
1081
                if (now_2 >= timeout)
437,570✔
1082
                    return false; // No operations completed
1083
                auto diff = timeout - now_2;
437,570✔
1084
                int max_int_millis = std::numeric_limits<int>::max();
437,570✔
1085
                // 17592186044415 is the largest value (45-bit signed integer)
1086
                // garanteed to be supported by std::chrono::milliseconds. In
1087
                // the worst case, `int` is a 16-bit integer, meaning that we
1088
                // can only wait about 30 seconds at a time. In the best case
1089
                // (17592186044415) we can wait more than 500 years at a
1090
                // time. In the typical case (`int` has 32 bits), we can wait 24
1091
                // days at a time.
1092
                long long max_chrono_millis = 17592186044415;
437,570✔
1093
                if (max_int_millis > max_chrono_millis)
437,570✔
1094
                    max_int_millis = int(max_chrono_millis);
1095
                if (diff > std::chrono::milliseconds(max_int_millis)) {
437,570✔
1096
                    max_wait_millis = max_int_millis;
3,246✔
1097
                }
3,246✔
1098
                else {
434,324✔
1099
                    // Overflow is impossible here, due to the preceeding check
1100
                    auto diff_millis = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
434,324✔
1101
                    // The conversion to milliseconds will round down if the
1102
                    // tick period of `diff` is less than a millisecond, which
1103
                    // it usually is. This is a problem, because it can lead to
1104
                    // premature wakeups, which in turn could cause extranous
1105
                    // iterations in the event loop. This is especially
1106
                    // problematic when a small `diff` is rounded down to zero
1107
                    // milliseconds, becuase that can easily produce a "busy
1108
                    // wait" condition for up to a millisecond every time this
1109
                    // happens. Obviously, the solution is to round up, instead
1110
                    // of down.
1111
                    if (diff_millis < diff) {
434,324✔
1112
                        // Note that the following increment cannot overflow,
1113
                        // because diff_millis < diff <= max_int_millis <=
1114
                        // std::numeric_limits<int>::max().
1115
                        ++diff_millis;
434,108✔
1116
                    }
434,108✔
1117
                    max_wait_millis = int(diff_millis.count());
434,324✔
1118
                }
434,324✔
1119
            }
437,570✔
1120

1121
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1122
            clock::time_point sleep_start_time = clock::now();
1123
#endif
1124

1125
#ifdef _WIN32
1126
            max_wait_millis = 1000;
1127

1128
            // Windows does not have a single API call to wait for pipes and
1129
            // sockets with a timeout. So we repeatedly poll them individually
1130
            // in a loop until max_wait_millis has elapsed or an event happend.
1131
            //
1132
            // FIXME: Maybe switch to Windows IOCP instead.
1133

1134
            // Following variable is the poll time for the sockets in
1135
            // miliseconds. Adjust it to find a balance between CPU usage and
1136
            // response time:
1137
            constexpr INT socket_poll_timeout = 10;
1138

1139
            for (size_t t = 0; t < m_pollfd_slots.size(); t++)
1140
                m_pollfd_slots[t].revents = 0;
1141

1142
            using namespace std::chrono;
1143
            auto started = steady_clock::now();
1144
            int ret = 0;
1145

1146
            for (;;) {
1147
                if (m_pollfd_slots.size() > 1) {
1148
                    // Poll all network sockets
1149
                    ret = WSAPoll(LPWSAPOLLFD(&m_pollfd_slots[1]), ULONG(m_pollfd_slots.size() - 1),
1150
                                  socket_poll_timeout);
1151
                    REALM_ASSERT(ret != SOCKET_ERROR);
1152
                }
1153

1154
                if (m_wakeup_pipe.is_signaled()) {
1155
                    m_pollfd_slots[0].revents = POLLIN;
1156
                    ret++;
1157
                }
1158

1159
                if (ret != 0 ||
1160
                    (duration_cast<milliseconds>(steady_clock::now() - started).count() >= max_wait_millis)) {
1161
                    break;
1162
                }
1163

1164
                // If we don't have any sockets to poll for (m_pollfd_slots is less than 2) and no one signals
1165
                // the wakeup pipe, we'd be stuck busy waiting for either condition to become true.
1166
                std::this_thread::sleep_for(std::chrono::milliseconds(socket_poll_timeout));
1167
            }
1168

1169
#else // !defined _WIN32
1170
            int ret = ::poll(fds, nfds, max_wait_millis);
787,676✔
1171
#endif
787,676✔
1172
            bool interrupted_2 = false;
787,676✔
1173
            if (REALM_UNLIKELY(ret == -1)) {
787,676✔
1174
#ifndef _WIN32
1175
                int err = errno;
1176
                if (REALM_UNLIKELY(err != EINTR)) {
×
1177
                    std::error_code ec = make_basic_system_error_code(err);
1178
                    throw std::system_error(ec);
1179
                }
1180
#endif
1181
                interrupted_2 = true;
1182
            }
1183

1184
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1185
            m_sleep_time += clock::now() - sleep_start_time;
1186
#endif
1187

1188
            if (REALM_LIKELY(!interrupted_2)) {
788,030✔
1189
                REALM_ASSERT(ret >= 0);
788,030✔
1190
                num_ready_descriptors = ret;
788,030✔
1191
                break;
788,030✔
1192
            }
788,030✔
1193

1194
            // Retry on interruption by system signal
1195
            if (timeout.time_since_epoch().count() > 0)
2,147,483,647✔
1196
                now_2 = clock::now();
1197
        }
2,147,483,647✔
1198
    }
787,726✔
1199

1200
    if (num_ready_descriptors == 0)
787,726✔
1201
        return false; // No operations completed
5,158✔
1202

1203
    // Check wake-up descriptor
1204
    if (m_pollfd_slots[0].revents != 0) {
782,568✔
1205
        REALM_ASSERT((m_pollfd_slots[0].revents & POLLNVAL) == 0);
118,070✔
1206
        m_wakeup_pipe.acknowledge_signal();
118,070✔
1207
        interrupted = true;
118,070✔
1208
        return false;
118,070✔
1209
    }
118,070✔
1210

1211
    std::size_t orig_num_operations = m_num_operations;
664,498✔
1212
    std::size_t num_pollfd_slots = m_pollfd_slots.size();
664,498✔
1213
    std::size_t pollfd_slot_ndx = 1;
664,498✔
1214
    while (pollfd_slot_ndx < num_pollfd_slots && num_ready_descriptors > 0) {
1,456,608✔
1215
        pollfd& pollfd_slot = m_pollfd_slots[pollfd_slot_ndx];
792,110✔
1216
        REALM_ASSERT(pollfd_slot.fd >= 0);
792,110✔
1217
        if (REALM_LIKELY(pollfd_slot.revents == 0)) {
792,110✔
1218
            ++pollfd_slot_ndx;
71,226✔
1219
            continue;
71,226✔
1220
        }
71,226✔
1221
        --num_ready_descriptors;
720,884✔
1222

1223
        REALM_ASSERT((pollfd_slot.revents & POLLNVAL) == 0);
720,884✔
1224

1225
        // Treat errors like read and/or write-readiness
1226
        if ((pollfd_slot.revents & (POLLHUP | POLLERR)) != 0) {
720,884✔
1227
            REALM_ASSERT((pollfd_slot.events & (POLLRDNORM | POLLWRNORM)) != 0);
284✔
1228
            if ((pollfd_slot.events & POLLRDNORM) != 0)
284✔
1229
                pollfd_slot.revents |= POLLRDNORM;
284✔
1230
            if ((pollfd_slot.events & POLLWRNORM) != 0)
284✔
1231
                pollfd_slot.revents |= POLLWRNORM;
14✔
1232
        }
284✔
1233

1234
        OperSlot& oper_slot = m_operations[pollfd_slot.fd];
720,884✔
1235
        REALM_ASSERT(oper_slot.pollfd_slot_ndx == pollfd_slot_ndx);
720,884✔
1236

1237
        OperQueue<IoOper> new_read_ops, new_write_ops;
720,884✔
1238
        auto advance_ops = [&](OperQueue<IoOper>& ops) noexcept {
828,658✔
1239
            while (LendersIoOperPtr op = ops.pop_front()) {
1,657,536✔
1240
                Want want = op->advance();
828,506✔
1241
                switch (want) {
828,506✔
1242
                    case Want::nothing:
668,102✔
1243
                        REALM_ASSERT(op->is_complete());
668,102✔
1244
                        completed_ops.push_back(std::move(op));
668,102✔
1245
                        --m_num_operations;
668,102✔
1246
                        continue;
668,102✔
1247
                    case Want::read:
160,638✔
1248
                        new_read_ops.push_back(std::move(op));
160,638✔
1249
                        continue;
160,638✔
1250
                    case Want::write:
138✔
1251
                        new_write_ops.push_back(std::move(op));
138✔
1252
                        continue;
138✔
1253
                }
828,506✔
1254
                REALM_ASSERT(false);
1255
            }
1256
        };
828,658✔
1257

1258
        // Check read-readiness
1259
        if ((pollfd_slot.revents & POLLRDNORM) != 0) {
720,884✔
1260
            REALM_ASSERT(!oper_slot.read_ops.empty());
548,308✔
1261
            advance_ops(oper_slot.read_ops);
548,308✔
1262
            pollfd_slot.events &= ~POLLRDNORM;
548,308✔
1263
        }
548,308✔
1264

1265
        // Check write-readiness
1266
        if ((pollfd_slot.revents & POLLWRNORM) != 0) {
720,884✔
1267
            REALM_ASSERT(!oper_slot.write_ops.empty());
280,590✔
1268
            advance_ops(oper_slot.write_ops);
280,590✔
1269
            pollfd_slot.events &= ~POLLWRNORM;
280,590✔
1270
        }
280,590✔
1271

1272
        if (!new_read_ops.empty()) {
720,884✔
1273
            oper_slot.read_ops.push_back(new_read_ops);
160,628✔
1274
            pollfd_slot.events |= POLLRDNORM;
160,628✔
1275
        }
160,628✔
1276

1277
        if (!new_write_ops.empty()) {
720,884✔
1278
            oper_slot.write_ops.push_back(new_write_ops);
138✔
1279
            pollfd_slot.events |= POLLWRNORM;
138✔
1280
        }
138✔
1281

1282
        if (pollfd_slot.events == 0) {
720,884✔
1283
            discard_pollfd_slot_by_move_last_over(oper_slot);
484,238✔
1284
            --num_pollfd_slots;
484,238✔
1285
        }
484,238✔
1286
        else {
236,646✔
1287
            ++pollfd_slot_ndx;
236,646✔
1288
        }
236,646✔
1289
    }
720,884✔
1290

1291
    REALM_ASSERT(num_ready_descriptors == 0);
664,498✔
1292

1293
    bool any_operations_completed = (m_num_operations < orig_num_operations);
664,498✔
1294
    return any_operations_completed;
664,498✔
1295
}
782,568✔
1296

1297

1298
void Service::IoReactor::discard_pollfd_slot_by_move_last_over(OperSlot& oper_slot) noexcept
1299
{
636,102✔
1300
    std::size_t pollfd_slot_ndx = oper_slot.pollfd_slot_ndx;
636,102✔
1301
    oper_slot.pollfd_slot_ndx = 0; // Mark unused
636,102✔
1302
    if (pollfd_slot_ndx < m_pollfd_slots.size() - 1) {
636,102✔
1303
        pollfd& last_pollfd_slot = m_pollfd_slots.back();
59,024✔
1304
        m_operations[last_pollfd_slot.fd].pollfd_slot_ndx = pollfd_slot_ndx;
59,024✔
1305
        m_pollfd_slots[pollfd_slot_ndx] = last_pollfd_slot;
59,024✔
1306
    }
59,024✔
1307
    m_pollfd_slots.pop_back();
636,102✔
1308
}
636,102✔
1309

1310
#endif // !(REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE)
1311

1312

1313
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1314

1315
auto Service::IoReactor::get_and_reset_sleep_time() noexcept -> clock::duration
1316
{
1317
    clock::duration sleep_time = m_sleep_time;
1318
    m_sleep_time = clock::duration::zero();
1319
    return sleep_time;
1320
}
1321

1322
#endif // REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1323

1324

1325
class Service::Impl {
1326
public:
1327
    Service& service;
1328
    IoReactor io_reactor;
1329

1330
    Impl(Service& s)
1331
        : service{s}
5,640✔
1332
        , io_reactor{} // Throws
5,640✔
1333
    {
11,520✔
1334
    }
11,520✔
1335

1336
    ~Impl()
1337
    {
11,520✔
1338
        bool resolver_thread_started = m_resolver_thread.joinable();
11,520✔
1339
        if (resolver_thread_started) {
11,520✔
1340
            {
2,002✔
1341
                std::lock_guard lock{m_mutex};
2,002✔
1342
                m_stop_resolver_thread = true;
2,002✔
1343
                m_resolver_cond.notify_all();
2,002✔
1344
            }
2,002✔
1345
            m_resolver_thread.join();
2,002✔
1346
        }
2,002✔
1347

1348
        // Avoid calls to recycle_post_oper() after destruction has begun.
1349
        m_completed_operations.clear();
11,520✔
1350
    }
11,520✔
1351

1352
    void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1353
    {
×
1354
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1355
        m_event_loop_metrics_timer.emplace(service);
1356
        m_event_loop_metrics_timer->async_wait(
1357
            std::chrono::seconds{30}, [this, handler = std::move(handler)](Status status) {
1358
                REALM_ASSERT(status.is_ok());
1359
                clock::time_point now = clock::now();
1360
                clock::duration elapsed_time = now - m_event_loop_metrics_start_time;
1361
                clock::duration sleep_time = io_reactor.get_and_reset_sleep_time();
1362
                clock::duration nonsleep_time = elapsed_time - sleep_time;
1363
                double saturation = double(nonsleep_time.count()) / double(elapsed_time.count());
1364
                clock::duration internal_exec_time = nonsleep_time - m_handler_exec_time;
1365
                internal_exec_time += now - m_handler_exec_start_time;
1366
                double inefficiency = double(internal_exec_time.count()) / double(elapsed_time.count());
1367
                m_event_loop_metrics_start_time = now;
1368
                m_handler_exec_start_time = now;
1369
                m_handler_exec_time = clock::duration::zero();
1370
                handler(saturation, inefficiency);             // Throws
1371
                report_event_loop_metrics(std::move(handler)); // Throws
1372
            });                                                // Throws
1373
#else
1374
        static_cast<void>(handler);
×
1375
#endif
×
1376
    }
×
1377

1378
    void run()
1379
    {
1,720✔
1380
        run_impl(true);
1,720✔
1381
    }
1,720✔
1382

1383
    void run_until_stopped()
1384
    {
9,972✔
1385
        run_impl(false);
9,972✔
1386
    }
9,972✔
1387

1388
    void stop() noexcept
1389
    {
11,188✔
1390
        {
11,188✔
1391
            std::lock_guard lock{m_mutex};
11,188✔
1392
            if (m_stopped)
11,188✔
1393
                return;
×
1394
            m_stopped = true;
11,188✔
1395
        }
11,188✔
1396
        io_reactor.interrupt();
×
1397
    }
11,188✔
1398

1399
    void reset() noexcept
1400
    {
12✔
1401
        std::lock_guard lock{m_mutex};
12✔
1402
        m_stopped = false;
12✔
1403
    }
12✔
1404

1405
    static Endpoint::List resolve(const Resolver::Query&, std::error_code&);
1406

1407
    void add_resolve_oper(LendersResolveOperPtr op)
1408
    {
3,708✔
1409
        {
3,708✔
1410
            std::lock_guard lock{m_mutex};
3,708✔
1411
            m_resolve_operations.push_back(std::move(op)); // Throws
3,708✔
1412
            m_resolver_cond.notify_all();
3,708✔
1413
        }
3,708✔
1414
        bool resolver_thread_started = m_resolver_thread.joinable();
3,708✔
1415
        if (resolver_thread_started)
3,708✔
1416
            return;
1,706✔
1417
        auto func = [this]() noexcept {
2,002✔
1418
            resolver_thread();
2,002✔
1419
        };
2,002✔
1420
        m_resolver_thread = std::thread{std::move(func)};
2,002✔
1421
    }
2,002✔
1422

1423
    void add_wait_oper(LendersWaitOperPtr op)
1424
    {
447,144✔
1425
        m_wait_operations.push(std::move(op)); // Throws
447,144✔
1426
    }
447,144✔
1427

1428
    void post(PostOperConstr constr, std::size_t size, void* cookie)
1429
    {
1,196,242✔
1430
        {
1,196,242✔
1431
            std::lock_guard lock{m_mutex};
1,196,242✔
1432
            std::unique_ptr<char[]> mem;
1,196,242✔
1433
            if (m_post_oper && m_post_oper->m_size >= size) {
1,196,242✔
1434
                // Reuse old memory
1435
                AsyncOper* op = m_post_oper.release();
418,148✔
1436
                REALM_ASSERT(dynamic_cast<UnusedOper*>(op));
418,148✔
1437
                static_cast<UnusedOper*>(op)->UnusedOper::~UnusedOper(); // Static dispatch
418,148✔
1438
                mem.reset(static_cast<char*>(static_cast<void*>(op)));
418,148✔
1439
            }
418,148✔
1440
            else {
778,094✔
1441
                // Allocate new memory
1442
                mem.reset(new char[size]); // Throws
778,094✔
1443
            }
778,094✔
1444

1445
            std::unique_ptr<PostOperBase, LendersOperDeleter> op;
1,196,242✔
1446
            op.reset((*constr)(mem.get(), size, *this, cookie)); // Throws
1,196,242✔
1447
            mem.release();
1,196,242✔
1448
            m_completed_operations_2.push_back(std::move(op));
1,196,242✔
1449
        }
1,196,242✔
1450
        io_reactor.interrupt();
1,196,242✔
1451
    }
1,196,242✔
1452

1453
    void recycle_post_oper(PostOperBase* op) noexcept
1454
    {
1,197,494✔
1455
        std::size_t size = op->m_size;
1,197,494✔
1456
        op->~PostOperBase();                           // Dynamic dispatch
1,197,494✔
1457
        OwnersOperPtr op_2(new (op) UnusedOper(size)); // Does not throw
1,197,494✔
1458

1459
        // Keep the larger memory chunk (`op_2` or m_post_oper)
1460
        {
1,197,494✔
1461
            std::lock_guard lock{m_mutex};
1,197,494✔
1462
            if (!m_post_oper || m_post_oper->m_size < size)
1,197,494✔
1463
                swap(op_2, m_post_oper);
434,220✔
1464
        }
1,197,494✔
1465
    }
1,197,494✔
1466

1467
    void trigger_exec(TriggerExecOperBase& op) noexcept
1468
    {
×
1469
        {
×
1470
            std::lock_guard lock{m_mutex};
×
1471
            if (op.m_in_use)
×
1472
                return;
×
1473
            op.m_in_use = true;
×
1474
            bind_ptr<TriggerExecOperBase> op_2{&op}; // Increment use count
×
1475
            LendersOperPtr op_3{op_2.release()};
×
1476
            m_completed_operations_2.push_back(std::move(op_3));
×
1477
        }
×
1478
        io_reactor.interrupt();
×
1479
    }
×
1480

1481
    void reset_trigger_exec(TriggerExecOperBase& op) noexcept
1482
    {
×
1483
        std::lock_guard lock{m_mutex};
×
1484
        op.m_in_use = false;
×
1485
    }
×
1486

1487
    void add_completed_oper(LendersOperPtr op) noexcept
1488
    {
1,221,456✔
1489
        m_completed_operations.push_back(std::move(op));
1,221,456✔
1490
    }
1,221,456✔
1491

1492
    void remove_canceled_ops(Descriptor& desc) noexcept
1493
    {
392,626✔
1494
        io_reactor.remove_canceled_ops(desc, m_completed_operations);
392,626✔
1495
    }
392,626✔
1496

1497
    void cancel_resolve_oper(ResolveOperBase& op) noexcept
1498
    {
10✔
1499
        std::lock_guard lock{m_mutex};
10✔
1500
        op.cancel();
10✔
1501
    }
10✔
1502

1503
    void cancel_incomplete_wait_oper(WaitOperBase& op) noexcept
1504
    {
15,034✔
1505
        auto p = std::equal_range(m_wait_operations.begin(), m_wait_operations.end(), op.m_expiration_time,
15,034✔
1506
                                  WaitOperCompare{});
15,034✔
1507
        auto pred = [&op](const LendersWaitOperPtr& op_2) {
15,036✔
1508
            return &*op_2 == &op;
15,036✔
1509
        };
15,036✔
1510
        auto i = std::find_if(p.first, p.second, pred);
15,034✔
1511
        REALM_ASSERT(i != p.second);
15,034✔
1512
        m_completed_operations.push_back(m_wait_operations.erase(i));
15,034✔
1513
    }
15,034✔
1514

1515
private:
1516
    OperQueue<AsyncOper> m_completed_operations; // Completed, canceled, and post operations
1517

1518
    struct WaitOperCompare {
1519
        bool operator()(const LendersWaitOperPtr& a, clock::time_point b)
1520
        {
22,912✔
1521
            return a->m_expiration_time > b;
22,912✔
1522
        }
22,912✔
1523
        bool operator()(clock::time_point a, const LendersWaitOperPtr& b)
1524
        {
19,582✔
1525
            return a > b->m_expiration_time;
19,582✔
1526
        }
19,582✔
1527
        bool operator()(const LendersWaitOperPtr& a, const LendersWaitOperPtr& b)
1528
        {
132,336✔
1529
            return a->m_expiration_time > b->m_expiration_time;
132,336✔
1530
        }
132,336✔
1531
    };
1532

1533
    using WaitQueue = util::PriorityQueue<LendersWaitOperPtr, std::vector<LendersWaitOperPtr>, WaitOperCompare>;
1534
    WaitQueue m_wait_operations;
1535

1536
    std::mutex m_mutex;
1537
    OwnersOperPtr m_post_oper;                       // Protected by `m_mutex`
1538
    OperQueue<ResolveOperBase> m_resolve_operations; // Protected by `m_mutex`
1539
    OperQueue<AsyncOper> m_completed_operations_2;   // Protected by `m_mutex`
1540
    bool m_stopped = false;                          // Protected by `m_mutex`
1541
    bool m_stop_resolver_thread = false;             // Protected by `m_mutex`
1542
    bool m_resolve_in_progress = false;              // Protected by `m_mutex`
1543
    std::condition_variable m_resolver_cond;         // Protected by `m_mutex`
1544

1545
    std::thread m_resolver_thread;
1546

1547
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1548
    util::Optional<DeadlineTimer> m_event_loop_metrics_timer;
1549
    clock::time_point m_event_loop_metrics_start_time = clock::now();
1550
    clock::time_point m_handler_exec_start_time;
1551
    clock::duration m_handler_exec_time = clock::duration::zero();
1552
#endif
1553
    void run_impl(bool return_when_idle)
1554
    {
11,690✔
1555
        bool no_incomplete_resolve_operations;
11,690✔
1556

1557
    on_handlers_executed_or_interrupted : {
2,290,196✔
1558
        std::lock_guard lock{m_mutex};
2,290,196✔
1559
        if (m_stopped)
2,290,196✔
1560
            return;
11,140✔
1561
        // Note: Order of post operations must be preserved.
1562
        m_completed_operations.push_back(m_completed_operations_2);
2,279,056✔
1563
        no_incomplete_resolve_operations = (!m_resolve_in_progress && m_resolve_operations.empty());
2,281,738✔
1564

1565
        if (m_completed_operations.empty())
2,279,056✔
1566
            goto on_time_progressed;
1,942,372✔
1567
    }
2,279,056✔
1568

1569
    on_operations_completed : {
2,005,450✔
1570
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1571
        m_handler_exec_start_time = clock::now();
1572
#endif
1573
        while (LendersOperPtr op = m_completed_operations.pop_front())
6,897,216✔
1574
            execute(op); // Throws
4,891,766✔
1575
#ifdef REALM_UTIL_NETWORK_EVENT_LOOP_METRICS
1576
        m_handler_exec_time += clock::now() - m_handler_exec_start_time;
1577
#endif
1578
        goto on_handlers_executed_or_interrupted;
2,005,450✔
1579
    }
2,279,056✔
1580

1581
    on_time_progressed : {
2,106,536✔
1582
        clock::time_point now = clock::now();
2,106,536✔
1583
        if (process_timers(now))
2,106,536✔
1584
            goto on_operations_completed;
413,268✔
1585

1586
        bool no_incomplete_operations =
1,693,268✔
1587
            (io_reactor.empty() && m_wait_operations.empty() && no_incomplete_resolve_operations);
1,693,268✔
1588
        if (no_incomplete_operations && return_when_idle) {
1,693,268✔
1589
            // We can only get to this point when there are no completion
1590
            // handlers ready to execute. It happens either because of a
1591
            // fall-through from on_operations_completed, or because of a
1592
            // jump to on_time_progressed, but that only happens if no
1593
            // completions handlers became ready during
1594
            // wait_and_process_io().
1595
            //
1596
            // We can also only get to this point when there are no
1597
            // asynchronous operations in progress (due to the preceeding
1598
            // if-condition.
1599
            //
1600
            // It is possible that an other thread has added new post
1601
            // operations since we checked, but there is really no point in
1602
            // rechecking that, as it is always possible, even after a
1603
            // recheck, that new post handlers get added after we decide to
1604
            // return, but before we actually do return. Also, if would
1605
            // offer no additional guarantees to the application.
1606
            return; // Out of work
518✔
1607
        }
518✔
1608

1609
        // Blocking wait for I/O
1610
        bool interrupted = false;
1,692,750✔
1611
        if (wait_and_process_io(now, interrupted)) // Throws
1,692,750✔
1612
            goto on_operations_completed;
1,253,528✔
1613
        if (interrupted)
439,222✔
1614
            goto on_handlers_executed_or_interrupted;
274,926✔
1615
        goto on_time_progressed;
164,296✔
1616
    }
439,222✔
1617
    }
439,222✔
1618
    bool process_timers(clock::time_point now)
1619
    {
2,105,700✔
1620
        bool any_operations_completed = false;
2,105,700✔
1621
        for (;;) {
2,536,470✔
1622
            if (m_wait_operations.empty())
2,536,470✔
1623
                break;
932,134✔
1624
            auto& op = m_wait_operations.top();
1,604,336✔
1625
            if (now < op->m_expiration_time)
1,604,336✔
1626
                break;
1,173,888✔
1627
            op->complete();
430,448✔
1628
            m_completed_operations.push_back(m_wait_operations.pop_top());
430,448✔
1629
            any_operations_completed = true;
430,448✔
1630
        }
430,448✔
1631
        return any_operations_completed;
2,105,700✔
1632
    }
2,105,700✔
1633

1634
    bool wait_and_process_io(clock::time_point now, bool& interrupted)
1635
    {
1,692,004✔
1636
        clock::time_point timeout;
1,692,004✔
1637
        if (!m_wait_operations.empty())
1,692,004✔
1638
            timeout = m_wait_operations.top()->m_expiration_time;
1,074,478✔
1639
        bool operations_completed = io_reactor.wait_and_advance(timeout, now, interrupted,
1,692,004✔
1640
                                                                m_completed_operations); // Throws
1,692,004✔
1641
        return operations_completed;
1,692,004✔
1642
    }
1,692,004✔
1643

1644
    static void execute(LendersOperPtr& lenders_ptr)
1645
    {
4,890,730✔
1646
        lenders_ptr.release()->recycle_and_execute(); // Throws
4,890,730✔
1647
    }
4,890,730✔
1648

1649
    void resolver_thread() noexcept
1650
    {
2,002✔
1651
        LendersResolveOperPtr op;
2,002✔
1652
        for (;;) {
5,710✔
1653
            {
5,710✔
1654
                std::unique_lock lock{m_mutex};
5,710✔
1655
                if (op) {
5,710✔
1656
                    m_completed_operations_2.push_back(std::move(op));
3,708✔
1657
                    io_reactor.interrupt();
3,708✔
1658
                }
3,708✔
1659
                m_resolve_in_progress = false;
5,710✔
1660
                while (m_resolve_operations.empty() && !m_stop_resolver_thread)
9,410✔
1661
                    m_resolver_cond.wait(lock);
3,700✔
1662
                if (m_stop_resolver_thread)
5,710✔
1663
                    return;
2,002✔
1664
                op = m_resolve_operations.pop_front();
3,708✔
1665
                m_resolve_in_progress = true;
3,708✔
1666
                if (op->is_canceled())
3,708✔
1667
                    continue;
4✔
1668
            }
3,708✔
1669
            try {
3,704✔
1670
                op->m_endpoints = resolve(op->m_query, op->m_error_code); // Throws only std::bad_alloc
3,704✔
1671
            }
3,704✔
1672
            catch (std::bad_alloc&) {
3,704✔
1673
                op->m_error_code = make_basic_system_error_code(ENOMEM);
×
1674
            }
×
1675
            op->complete();
3,704✔
1676
        }
3,704✔
1677
    }
2,002✔
1678
};
1679

1680

1681
// This function promises to only ever throw std::bad_alloc.
1682
Endpoint::List Service::Impl::resolve(const Resolver::Query& query, std::error_code& ec)
1683
{
4,916✔
1684
    Endpoint::List list;
4,916✔
1685

1686
    using addrinfo_type = struct addrinfo;
4,916✔
1687
    addrinfo_type hints = addrinfo_type(); // Clear
4,916✔
1688
    hints.ai_flags = query.m_flags;
4,916✔
1689
    hints.ai_family = query.m_protocol.m_family;
4,916✔
1690
    hints.ai_socktype = query.m_protocol.m_socktype;
4,916✔
1691
    hints.ai_protocol = query.m_protocol.m_protocol;
4,916✔
1692

1693
    const char* query_host = query.m_host.empty() ? 0 : query.m_host.c_str();
4,916✔
1694
    const char* query_service = query.m_service.empty() ? 0 : query.m_service.c_str();
4,916✔
1695
    struct addrinfo* first = nullptr;
4,916✔
1696
    int ret = ::getaddrinfo(query_host, query_service, &hints, &first);
4,916✔
1697
    if (REALM_UNLIKELY(ret != 0)) {
4,916✔
1698
#ifdef EAI_SYSTEM
10✔
1699
        if (ret == EAI_SYSTEM) {
10✔
1700
            if (errno != 0) {
×
1701
                ec = make_basic_system_error_code(errno);
×
1702
            }
×
1703
            else {
×
1704
                ec = error::unknown;
×
1705
            }
×
1706
            return list;
×
1707
        }
×
1708
#endif
10✔
1709
        ec = translate_addrinfo_error(ret);
10✔
1710
        return list;
10✔
1711
    }
10✔
1712

1713
    GetaddrinfoResultOwner gro(first);
4,906✔
1714

1715
    // Count number of IPv4/IPv6 endpoints
1716
    std::size_t num_endpoints = 0;
4,906✔
1717
    {
4,906✔
1718
        struct addrinfo* curr = first;
4,906✔
1719
        while (curr) {
11,064✔
1720
            bool ip_v4 = curr->ai_family == AF_INET;
6,158✔
1721
            bool ip_v6 = curr->ai_family == AF_INET6;
6,158✔
1722
            if (ip_v4 || ip_v6)
6,158✔
1723
                ++num_endpoints;
6,158✔
1724
            curr = curr->ai_next;
6,158✔
1725
        }
6,158✔
1726
    }
4,906✔
1727
    REALM_ASSERT(num_endpoints >= 1);
4,906✔
1728

1729
    // Copy the IPv4/IPv6 endpoints
1730
    list.m_endpoints.set_size(num_endpoints); // Throws
4,906✔
1731
    struct addrinfo* curr = first;
4,906✔
1732
    std::size_t endpoint_ndx = 0;
4,906✔
1733
    while (curr) {
11,064✔
1734
        bool ip_v4 = curr->ai_family == AF_INET;
6,158✔
1735
        bool ip_v6 = curr->ai_family == AF_INET6;
6,158✔
1736
        if (ip_v4 || ip_v6) {
6,158✔
1737
            REALM_ASSERT((ip_v4 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v4_type)) ||
6,158✔
1738
                         (ip_v6 && curr->ai_addrlen == sizeof(Endpoint::sockaddr_ip_v6_type)));
6,158✔
1739
            Endpoint& ep = list.m_endpoints[endpoint_ndx];
6,158✔
1740
            ep.m_protocol.m_family = curr->ai_family;
6,158✔
1741
            ep.m_protocol.m_socktype = curr->ai_socktype;
6,158✔
1742
            ep.m_protocol.m_protocol = curr->ai_protocol;
6,158✔
1743
            if (ip_v4) {
6,158✔
1744
                ep.m_sockaddr_union.m_ip_v4 = reinterpret_cast<Endpoint::sockaddr_ip_v4_type&>(*curr->ai_addr);
4,906✔
1745
            }
4,906✔
1746
            else {
1,252✔
1747
                ep.m_sockaddr_union.m_ip_v6 = reinterpret_cast<Endpoint::sockaddr_ip_v6_type&>(*curr->ai_addr);
1,252✔
1748
            }
1,252✔
1749
            ++endpoint_ndx;
6,158✔
1750
        }
6,158✔
1751
        curr = curr->ai_next;
6,158✔
1752
    }
6,158✔
1753

1754
    ec = std::error_code(); // Success
4,906✔
1755
    return list;
4,906✔
1756
}
4,916✔
1757

1758

1759
Service::Service()
1760
    : m_impl{std::make_unique<Impl>(*this)} // Throws
5,640✔
1761
{
11,520✔
1762
}
11,520✔
1763

1764

1765
Service::~Service() noexcept {}
11,520✔
1766

1767

1768
void Service::run()
1769
{
1,720✔
1770
    m_impl->run(); // Throws
1,720✔
1771
}
1,720✔
1772

1773

1774
void Service::run_until_stopped()
1775
{
9,972✔
1776
    m_impl->run_until_stopped();
9,972✔
1777
}
9,972✔
1778

1779

1780
void Service::stop() noexcept
1781
{
11,188✔
1782
    m_impl->stop();
11,188✔
1783
}
11,188✔
1784

1785

1786
void Service::reset() noexcept
1787
{
12✔
1788
    m_impl->reset();
12✔
1789
}
12✔
1790

1791

1792
void Service::report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler> handler)
1793
{
×
1794
    m_impl->report_event_loop_metrics(std::move(handler)); // Throws
×
1795
}
×
1796

1797

1798
void Service::do_post(PostOperConstr constr, std::size_t size, void* cookie)
1799
{
1,197,026✔
1800
    m_impl->post(constr, size, cookie); // Throws
1,197,026✔
1801
}
1,197,026✔
1802

1803

1804
void Service::recycle_post_oper(Impl& impl, PostOperBase* op) noexcept
1805
{
1,197,418✔
1806
    impl.recycle_post_oper(op);
1,197,418✔
1807
}
1,197,418✔
1808

1809

1810
void Service::trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1811
{
×
1812
    impl.trigger_exec(op);
×
1813
}
×
1814

1815

1816
void Service::reset_trigger_exec(Impl& impl, TriggerExecOperBase& op) noexcept
1817
{
×
1818
    impl.reset_trigger_exec(op);
×
1819
}
×
1820

1821

1822
void Service::Descriptor::accept(Descriptor& desc, StreamProtocol protocol, Endpoint* ep,
1823
                                 std::error_code& ec) noexcept
1824
{
3,968✔
1825
    REALM_ASSERT(is_open());
3,968✔
1826

1827
    union union_type {
3,968✔
1828
        Endpoint::sockaddr_union_type m_sockaddr_union;
3,968✔
1829
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
3,968✔
1830
    };
3,968✔
1831
    union_type buffer;
3,968✔
1832
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
3,968✔
1833
    socklen_t addr_len = sizeof buffer;
3,968✔
1834
    CloseGuard new_sock_fd;
3,968✔
1835
    for (;;) {
3,968✔
1836
#if HAVE_LINUX_ACCEPT4
1,598✔
1837
        // On Linux (HAVE_LINUX_ACCEPT4), make the accepted socket inherit the
1838
        // O_NONBLOCK status flag from the accepting socket to avoid an extra
1839
        // call to fcntl(). Note, it is deemed most likely that the accepted
1840
        // socket is going to be used in nonblocking when, and only when the
1841
        // accepting socket is used in nonblocking mode. Other platforms are
1842
        // handled below.
1843
        int flags = SOCK_CLOEXEC;
1,598✔
1844
        if (!in_blocking_mode())
1,598✔
1845
            flags |= SOCK_NONBLOCK;
1,576✔
1846
        native_handle_type ret = ::accept4(m_fd, addr, &addr_len, flags);
1,598✔
1847
#else
1848
        native_handle_type ret = ::accept(m_fd, addr, &addr_len);
2,370✔
1849
#endif
2,370✔
1850
#ifdef _WIN32
1851
        if (ret == INVALID_SOCKET) {
1852
            int err = WSAGetLastError();
1853
            if (err == WSAEINTR)
1854
                continue; // Retry on interruption by system signal
1855
            set_read_ready(err != WSAEWOULDBLOCK);
1856
            ec = make_winsock_error_code(err); // Failure
1857
            return;
1858
        }
1859
#else
1860
        if (REALM_UNLIKELY(ret == -1)) {
3,968✔
1861
            int err = errno;
1,114✔
1862
            if (err == EINTR)
1,114✔
1863
                continue; // Retry on interruption by system signal
×
1864
            if (err == EWOULDBLOCK)
1,114✔
1865
                err = EAGAIN;
1,114✔
1866
            set_read_ready(err != EAGAIN);
1,114✔
1867
            ec = make_basic_system_error_code(err); // Failure
1,114✔
1868
            return;
1,114✔
1869
        }
1,114✔
1870
#endif
2,854✔
1871
        new_sock_fd.reset(ret);
2,854✔
1872

1873
#if REALM_PLATFORM_APPLE
1,256✔
1874
        int optval = 1;
1,256✔
1875
        ret = ::setsockopt(new_sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
1,256✔
1876
        if (REALM_UNLIKELY(ret == -1)) {
1,256✔
1877
            // setsockopt() reports EINVAL if the other side disconnected while
1878
            // the connection was waiting in the listen queue.
1879
            int err = errno;
1880
            if (err == EINVAL) {
×
1881
                continue;
1882
            }
1883
            ec = make_basic_system_error_code(err);
1884
            return;
1885
        }
1886
#endif
1,256✔
1887

1888
        set_read_ready(true);
2,854✔
1889
        break;
2,854✔
1890
    }
2,854✔
1891
    socklen_t expected_addr_len =
2,854✔
1892
        protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
2,854✔
1893
    if (REALM_UNLIKELY(addr_len != expected_addr_len))
2,854✔
1894
        REALM_TERMINATE("Unexpected peer address length");
1895

1896
#if !HAVE_LINUX_ACCEPT4
1,256✔
1897
    {
1,256✔
1898
        bool value = true;
1,256✔
1899
        if (REALM_UNLIKELY(set_cloexec_flag(new_sock_fd, value, ec)))
1,256✔
1900
            return;
1901
    }
1,256✔
1902
#endif
1,256✔
1903

1904
    // On some platforms (such as Mac OS X), the accepted socket automatically
1905
    // inherits file status flags from the accepting socket, but on other
1906
    // systems, this is not the case. In the case of Linux (HAVE_LINUX_ACCEPT4),
1907
    // the inheriting behaviour is obtained by using the Linux specific
1908
    // accept4() system call.
1909
    //
1910
    // For other platforms, we need to be sure that m_in_blocking_mode for the
1911
    // new socket is initialized to reflect the actual state of O_NONBLOCK on
1912
    // the new socket.
1913
    //
1914
    // Note: This implementation currently never modifies status flags other
1915
    // than O_NONBLOCK, so we only need to consider that flag.
1916

1917
#if !REALM_PLATFORM_APPLE && !HAVE_LINUX_ACCEPT4
1918
    // Make the accepted socket inherit the state of O_NONBLOCK from the
1919
    // accepting socket.
1920
    {
1921
        bool value = !m_in_blocking_mode;
1922
        if (::set_nonblock_flag(new_sock_fd, value, ec))
1923
            return;
1924
    }
1925
#endif
1926

1927
    desc.assign(new_sock_fd.release(), m_in_blocking_mode);
2,854✔
1928
    desc.set_write_ready(true);
2,854✔
1929
    if (ep) {
2,854✔
1930
        ep->m_protocol = protocol;
2,078✔
1931
        ep->m_sockaddr_union = buffer.m_sockaddr_union;
2,078✔
1932
    }
2,078✔
1933
    ec = std::error_code(); // Success
2,854✔
1934
}
2,854✔
1935

1936

1937
std::size_t Service::Descriptor::read_some(char* buffer, std::size_t size, std::error_code& ec) noexcept
1938
{
2,177,536✔
1939
    if (REALM_UNLIKELY(assume_read_would_block())) {
2,177,536✔
1940
        ec = error::resource_unavailable_try_again; // Failure
134✔
1941
        return 0;
134✔
1942
    }
134✔
1943
    for (;;) {
2,177,772✔
1944
        int flags = 0;
2,177,740✔
1945
#ifdef _WIN32
1946
        ssize_t ret = ::recv(m_fd, buffer, int(size), flags);
1947
        if (ret == SOCKET_ERROR) {
1948
            int err = WSAGetLastError();
1949
            // Retry on interruption by system signal
1950
            if (err == WSAEINTR)
1951
                continue;
1952
            set_read_ready(err != WSAEWOULDBLOCK);
1953
            ec = make_winsock_error_code(err); // Failure
1954
            return 0;
1955
        }
1956
#else
1957
        ssize_t ret = ::recv(m_fd, buffer, size, flags);
2,177,740✔
1958
        if (ret == -1) {
2,177,740✔
1959
            int err = errno;
58,288✔
1960
            // Retry on interruption by system signal
1961
            if (err == EINTR)
58,288✔
1962
                continue;
×
1963
            if (err == EWOULDBLOCK)
58,288✔
1964
                err = EAGAIN;
58,014✔
1965
            set_read_ready(err != EAGAIN);
58,288✔
1966
            ec = make_basic_system_error_code(err); // Failure
58,288✔
1967
            return 0;
58,288✔
1968
        }
58,288✔
1969
#endif
2,119,452✔
1970
        if (REALM_UNLIKELY(ret == 0)) {
2,119,452✔
1971
            set_read_ready(true);
1,044✔
1972
            ec = MiscExtErrors::end_of_input;
1,044✔
1973
            return 0;
1,044✔
1974
        }
1,044✔
1975
        REALM_ASSERT(ret > 0);
2,118,408✔
1976
        std::size_t n = std::size_t(ret);
2,118,408✔
1977
        REALM_ASSERT(n <= size);
2,118,408✔
1978
#if REALM_NETWORK_USE_EPOLL
1979
        // On Linux a partial read (n < size) on a nonblocking stream-mode
1980
        // socket is guaranteed to only ever happen if a complete read would
1981
        // have been impossible without blocking (i.e., without failing with
1982
        // EAGAIN/EWOULDBLOCK), or if the end of input from the remote peer was
1983
        // detected by the Linux kernel.
1984
        //
1985
        // Further more, after a partial read, and when working with Linux epoll
1986
        // in edge-triggered mode (EPOLLET), it is safe to suspend further
1987
        // reading until a new read-readiness notification is received, provided
1988
        // that we registered interest in EPOLLRDHUP events, and an EPOLLRDHUP
1989
        // event was not received prior to the partial read. This is safe in the
1990
        // sense that reading is guaranteed to be resumed in a timely fashion
1991
        // (without unnessesary blocking), and in a manner that is free of race
1992
        // conditions. Note in particular that if a read was partial because the
1993
        // kernel had detected the end of input prior to that read, but the
1994
        // EPOLLRDHUP event was not received prior the that read, then reading
1995
        // will still be resumed immediately by the pending EPOLLRDHUP event.
1996
        //
1997
        // Note that without this extra "loss of read-readiness" trigger, it
1998
        // would have been necessary for the caller to immediately follow up
1999
        // with an (otherwise redundant) additional invocation of read_some()
2000
        // just to detect the loss of read-readiness.
2001
        //
2002
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2003
        // In particular, do we know that a partial read (n < size) on a
2004
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2005
        // complete read would have been impossible without blocking, or if the
2006
        // end of input from the remote peer was detected by the FreeBSD and/or
2007
        // macOS kernel? See http://stackoverflow.com/q/40123626/1698548.
2008
        set_read_ready(n == size || m_imminent_end_of_input);
2009
#else
2010
        set_read_ready(true);
2,118,408✔
2011
#endif
2,118,408✔
2012
        ec = std::error_code(); // Success
2,118,408✔
2013
        return n;
2,118,408✔
2014
    }
2,119,452✔
2015
}
2,177,402✔
2016

2017

2018
std::size_t Service::Descriptor::write_some(const char* data, std::size_t size, std::error_code& ec) noexcept
2019
{
1,500,346✔
2020
    if (REALM_UNLIKELY(assume_write_would_block())) {
1,500,346✔
2021
        ec = error::resource_unavailable_try_again; // Failure
90,814✔
2022
        return 0;
90,814✔
2023
    }
90,814✔
2024
    for (;;) {
1,409,794✔
2025
        int flags = 0;
1,409,786✔
2026
#ifdef __linux__
717,886✔
2027
        // Prevent SIGPIPE when remote peer has closed the connection.
2028
        flags |= MSG_NOSIGNAL;
717,886✔
2029
#endif
717,886✔
2030
#ifdef _WIN32
2031
        ssize_t ret = ::send(m_fd, data, int(size), flags);
2032
        if (ret == SOCKET_ERROR) {
2033
            int err = WSAGetLastError();
2034
            // Retry on interruption by system signal
2035
            if (err == WSAEINTR)
2036
                continue;
2037
            set_write_ready(err != WSAEWOULDBLOCK);
2038
            ec = make_winsock_error_code(err); // Failure
2039
            return 0;
2040
        }
2041
#else
2042
        ssize_t ret = ::send(m_fd, data, size, flags);
1,409,786✔
2043
        if (ret == -1) {
1,409,786✔
2044
            int err = errno;
6,534✔
2045
            // Retry on interruption by system signal
2046
            if (err == EINTR)
6,534✔
2047
                continue;
×
2048
#if REALM_PLATFORM_APPLE
6,040✔
2049
            // The macOS kernel can generate an undocumented EPROTOTYPE in
2050
            // certain cases where the peer has closed the connection (in
2051
            // tcp_usr_send() in bsd/netinet/tcp_usrreq.c) See also
2052
            // http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/.
2053
            if (REALM_UNLIKELY(err == EPROTOTYPE))
6,040✔
2054
                err = EPIPE;
2055
#endif
6,040✔
2056
            if (err == EWOULDBLOCK)
6,534✔
2057
                err = EAGAIN;
6,438✔
2058
            set_write_ready(err != EAGAIN);
6,534✔
2059
            ec = make_basic_system_error_code(err); // Failure
6,534✔
2060
            return 0;
6,534✔
2061
        }
6,534✔
2062
#endif
1,403,252✔
2063
        REALM_ASSERT(ret >= 0);
1,403,252✔
2064
        std::size_t n = std::size_t(ret);
1,403,252✔
2065
        REALM_ASSERT(n <= size);
1,403,252✔
2066
#if REALM_NETWORK_USE_EPOLL
2067
        // On Linux a partial write (n < size) on a nonblocking stream-mode
2068
        // socket is guaranteed to only ever happen if a complete write would
2069
        // have been impossible without blocking (i.e., without failing with
2070
        // EAGAIN/EWOULDBLOCK).
2071
        //
2072
        // Further more, after a partial write, and when working with Linux
2073
        // epoll in edge-triggered mode (EPOLLET), it is safe to suspend further
2074
        // writing until a new write-readiness notification is received. This is
2075
        // safe in the sense that writing is guaranteed to be resumed in a
2076
        // timely fashion (without unnessesary blocking), and in a manner that
2077
        // is free of race conditions.
2078
        //
2079
        // Note that without this extra "loss of write-readiness" trigger, it
2080
        // would have been necessary for the caller to immediately follow up
2081
        // with an (otherwise redundant) additional invocation of write_some()
2082
        // just to detect the loss of write-readiness.
2083
        //
2084
        // FIXME: Will this scheme also work with Kqueue on FreeBSD and macOS?
2085
        // In particular, do we know that a partial write (n < size) on a
2086
        // nonblocking stream-mode socket is guaranteed to only ever happen if a
2087
        // complete write would have been impossible without blocking? See
2088
        // http://stackoverflow.com/q/40123626/1698548.
2089
        set_write_ready(n == size);
2090
#else
2091
        set_write_ready(true);
1,403,252✔
2092
#endif
1,403,252✔
2093
        ec = std::error_code(); // Success
1,403,252✔
2094
        return n;
1,403,252✔
2095
    }
1,409,786✔
2096
}
1,409,532✔
2097

2098

2099
#if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2100

2101
void Service::Descriptor::deregister_for_async() noexcept
2102
{
4,044✔
2103
    service_impl.io_reactor.deregister_desc(*this);
4,044✔
2104
}
4,044✔
2105

2106
#endif // REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE
2107

2108

2109
void Service::Descriptor::set_nonblock_flag(bool value)
2110
{
5,510✔
2111
    ::set_nonblock_flag(m_fd, value); // Throws
5,510✔
2112
}
5,510✔
2113

2114

2115
void Service::Descriptor::add_initiated_oper(LendersIoOperPtr op, Want want)
2116
{
3,260,140✔
2117
    if (REALM_UNLIKELY(want == Want::nothing)) {
3,260,140✔
2118
        REALM_ASSERT(op->is_complete());
1,221,136✔
2119
        service_impl.add_completed_oper(std::move(op));
1,221,136✔
2120
        return;
1,221,136✔
2121
    }
1,221,136✔
2122
    REALM_ASSERT(!op->is_complete());
2,039,004✔
2123
    service_impl.io_reactor.add_oper(*this, std::move(op), want); // Throws
2,039,004✔
2124
}
2,039,004✔
2125

2126

2127
void Service::Descriptor::do_close() noexcept
2128
{
8,288✔
2129
    checked_close(m_fd);
8,288✔
2130
    m_fd = -1;
8,288✔
2131
}
8,288✔
2132

2133

2134
auto Service::Descriptor::do_release() noexcept -> native_handle_type
2135
{
×
2136
    native_handle_type fd = m_fd;
×
2137
    m_fd = -1;
×
2138
    return fd;
×
2139
}
×
2140

2141

2142
Service& Resolver::get_service() noexcept
2143
{
×
2144
    return m_service_impl.service;
×
2145
}
×
2146

2147

2148
Endpoint::List Resolver::resolve(const Query& query, std::error_code& ec)
2149
{
1,212✔
2150
    return Service::Impl::resolve(query, ec); // Throws
1,212✔
2151
}
1,212✔
2152

2153

2154
void Resolver::cancel() noexcept
2155
{
4,924✔
2156
    if (m_resolve_oper && m_resolve_oper->in_use() && !m_resolve_oper->is_canceled()) {
4,924✔
2157
        Service::ResolveOperBase& op = static_cast<Service::ResolveOperBase&>(*m_resolve_oper);
10✔
2158
        m_service_impl.cancel_resolve_oper(op);
10✔
2159
    }
10✔
2160
}
4,924✔
2161

2162

2163
void Resolver::initiate_oper(Service::LendersResolveOperPtr op)
2164
{
3,708✔
2165
    m_service_impl.add_resolve_oper(std::move(op)); // Throws
3,708✔
2166
}
3,708✔
2167

2168

2169
Service& SocketBase::get_service() noexcept
2170
{
480✔
2171
    return m_desc.service_impl.service;
480✔
2172
}
480✔
2173

2174

2175
void SocketBase::cancel() noexcept
2176
{
920,426✔
2177
    bool any_incomplete = false;
920,426✔
2178
    if (m_read_oper && m_read_oper->in_use() && !m_read_oper->is_canceled()) {
920,426✔
2179
        m_read_oper->cancel();
347,610✔
2180
        if (!m_read_oper->is_complete())
347,610✔
2181
            any_incomplete = true;
347,514✔
2182
    }
347,610✔
2183
    if (m_write_oper && m_write_oper->in_use() && !m_write_oper->is_canceled()) {
920,426✔
2184
        m_write_oper->cancel();
337,266✔
2185
        if (!m_write_oper->is_complete())
337,266✔
2186
            any_incomplete = true;
337,040✔
2187
    }
337,266✔
2188
    if (any_incomplete)
920,426✔
2189
        m_desc.service_impl.remove_canceled_ops(m_desc);
392,280✔
2190
}
920,426✔
2191

2192

2193
std::error_code SocketBase::bind(const Endpoint& ep, std::error_code& ec)
2194
{
1,452✔
2195
    if (!is_open()) {
1,452✔
2196
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
×
2197
            return ec;
×
2198
    }
×
2199

2200
    native_handle_type sock_fd = m_desc.native_handle();
1,452✔
2201
    socklen_t addr_len =
1,452✔
2202
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
1,452✔
2203

2204
    int ret = ::bind(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
1,452✔
2205
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
1,452✔
2206
        return ec;
×
2207
    ec = std::error_code(); // Success
1,452✔
2208
    return ec;
1,452✔
2209
}
1,452✔
2210

2211

2212
Endpoint SocketBase::local_endpoint(std::error_code& ec) const
2213
{
6,408✔
2214
    Endpoint ep;
6,408✔
2215
    union union_type {
6,408✔
2216
        Endpoint::sockaddr_union_type m_sockaddr_union;
6,408✔
2217
        char m_extra_byte[sizeof(Endpoint::sockaddr_union_type) + 1];
6,408✔
2218
    };
6,408✔
2219
    native_handle_type sock_fd = m_desc.native_handle();
6,408✔
2220
    union_type buffer;
6,408✔
2221
    struct sockaddr* addr = &buffer.m_sockaddr_union.m_base;
6,408✔
2222
    socklen_t addr_len = sizeof buffer;
6,408✔
2223
    int ret = ::getsockname(sock_fd, addr, &addr_len);
6,408✔
2224
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
6,408✔
2225
        return ep;
×
2226

2227
    socklen_t expected_addr_len =
6,408✔
2228
        m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
6,408✔
2229
    if (addr_len != expected_addr_len)
6,408✔
2230
        throw util::runtime_error("Unexpected local address length");
×
2231
    ep.m_protocol = m_protocol;
6,408✔
2232
    ep.m_sockaddr_union = buffer.m_sockaddr_union;
6,408✔
2233
    ec = std::error_code(); // Success
6,408✔
2234
#ifdef _WIN32
2235
    ep.m_sockaddr_union.m_ip_v4.sin_addr.s_addr = inet_addr("127.0.0.1");
2236
#endif
2237
    return ep;
6,408✔
2238
}
6,408✔
2239

2240

2241
std::error_code SocketBase::open(const StreamProtocol& prot, std::error_code& ec)
2242
{
5,430✔
2243
    if (REALM_UNLIKELY(is_open()))
5,430✔
2244
        throw util::runtime_error("Socket is already open");
×
2245
    int type = prot.m_socktype;
5,430✔
2246
#if HAVE_LINUX_SOCK_CLOEXEC
2,552✔
2247
    type |= SOCK_CLOEXEC;
2,552✔
2248
#endif
2,552✔
2249
    native_handle_type ret = ::socket(prot.m_family, type, prot.m_protocol);
5,430✔
2250
#ifdef _WIN32
2251
    if (REALM_UNLIKELY(ret == INVALID_SOCKET)) {
2252
        ec = make_winsock_error_code(WSAGetLastError());
2253
        return ec;
2254
    }
2255
#else
2256
    if (REALM_UNLIKELY(ret == -1)) {
5,430✔
2257
        ec = make_basic_system_error_code(errno);
×
2258
        return ec;
×
2259
    }
×
2260
#endif
5,430✔
2261

2262
    CloseGuard sock_fd{ret};
5,430✔
2263

2264
#if !HAVE_LINUX_SOCK_CLOEXEC
2,878✔
2265
    {
2,878✔
2266
        bool value = true;
2,878✔
2267
        if (REALM_UNLIKELY(set_cloexec_flag(sock_fd, value, ec)))
2,878✔
2268
            return ec;
2269
    }
2,878✔
2270
#endif
2,878✔
2271

2272
#if REALM_PLATFORM_APPLE
2,878✔
2273
    {
2,878✔
2274
        int optval = 1;
2,878✔
2275
        int ret = setsockopt(sock_fd, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof optval);
2,878✔
2276
        if (REALM_UNLIKELY(ret == -1)) {
2,878✔
2277
            ec = make_basic_system_error_code(errno);
2278
            return ec;
2279
        }
2280
    }
2,878✔
2281
#endif
2,878✔
2282

2283
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
5,430✔
2284
    m_desc.assign(sock_fd.release(), in_blocking_mode);
5,430✔
2285
    m_protocol = prot;
5,430✔
2286
    ec = std::error_code(); // Success
5,430✔
2287
    return ec;
5,430✔
2288
}
5,430✔
2289

2290

2291
std::error_code SocketBase::do_assign(const StreamProtocol& prot, native_handle_type sock_fd, std::error_code& ec)
2292
{
4✔
2293
    if (REALM_UNLIKELY(is_open()))
4✔
2294
        throw util::runtime_error("Socket is already open");
×
2295

2296
    // We need to know whether the specified socket is in blocking or in
2297
    // nonblocking mode. Rather than reading the current mode, we set it to
2298
    // blocking mode (disable nonblocking mode), and initialize
2299
    // `m_in_blocking_mode` to true.
2300
    {
4✔
2301
        bool value = false;
4✔
2302
        if (::set_nonblock_flag(sock_fd, value, ec))
4✔
2303
            return ec;
×
2304
    }
4✔
2305

2306
    bool in_blocking_mode = true; // New sockets are in blocking mode by default
4✔
2307
    m_desc.assign(sock_fd, in_blocking_mode);
4✔
2308
    m_protocol = prot;
4✔
2309
    ec = std::error_code(); // Success
4✔
2310
    return ec;
4✔
2311
}
4✔
2312

2313

2314
void SocketBase::get_option(opt_enum opt, void* value_data, std::size_t& value_size, std::error_code& ec) const
2315
{
8✔
2316
    int level = 0;
8✔
2317
    int option_name = 0;
8✔
2318
    map_option(opt, level, option_name);
8✔
2319

2320
    native_handle_type sock_fd = m_desc.native_handle();
8✔
2321
    socklen_t option_len = socklen_t(value_size);
8✔
2322
    int ret = ::getsockopt(sock_fd, level, option_name, static_cast<char*>(value_data), &option_len);
8✔
2323
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
8✔
2324
        return;
×
2325
    value_size = std::size_t(option_len);
8✔
2326
    ec = std::error_code(); // Success
8✔
2327
}
8✔
2328

2329

2330
void SocketBase::set_option(opt_enum opt, const void* value_data, std::size_t value_size, std::error_code& ec)
2331
{
3,194✔
2332
    int level = 0;
3,194✔
2333
    int option_name = 0;
3,194✔
2334
    map_option(opt, level, option_name);
3,194✔
2335

2336
    native_handle_type sock_fd = m_desc.native_handle();
3,194✔
2337
    int ret = ::setsockopt(sock_fd, level, option_name, static_cast<const char*>(value_data), socklen_t(value_size));
3,194✔
2338
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
3,194✔
2339
        return;
×
2340
    ec = std::error_code(); // Success
3,194✔
2341
}
3,194✔
2342

2343

2344
void SocketBase::map_option(opt_enum opt, int& level, int& option_name) const
2345
{
3,202✔
2346
    switch (opt) {
3,202✔
2347
        case opt_ReuseAddr:
1,212✔
2348
            level = SOL_SOCKET;
1,212✔
2349
            option_name = SO_REUSEADDR;
1,212✔
2350
            return;
1,212✔
2351
        case opt_Linger:
✔
2352
            level = SOL_SOCKET;
×
2353
#if REALM_PLATFORM_APPLE
2354
            // By default, SO_LINGER on Darwin uses "ticks" instead of
2355
            // seconds for better accuracy, but we want to be cross-platform.
2356
            option_name = SO_LINGER_SEC;
2357
#else
2358
            option_name = SO_LINGER;
2359
#endif // REALM_PLATFORM_APPLE
2360
            return;
×
2361
        case opt_NoDelay:
1,990✔
2362
            level = IPPROTO_TCP;
1,990✔
2363
            option_name = TCP_NODELAY; // Specified by POSIX.1-2001
1,990✔
2364
            return;
1,990✔
2365
    }
3,202✔
2366
    REALM_ASSERT(false);
×
2367
}
×
2368

2369

2370
std::error_code Socket::connect(const Endpoint& ep, std::error_code& ec)
2371
{
68✔
2372
    REALM_ASSERT(!m_write_oper || !m_write_oper->in_use());
68✔
2373

2374
    if (!is_open()) {
68✔
2375
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
64✔
2376
            return ec;
×
2377
    }
64✔
2378

2379
    m_desc.ensure_blocking_mode(); // Throws
68✔
2380

2381
    native_handle_type sock_fd = m_desc.native_handle();
68✔
2382
    socklen_t addr_len =
68✔
2383
        (ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type));
68✔
2384
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
68✔
2385
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
68✔
2386
        return ec;
4✔
2387
    ec = std::error_code(); // Success
64✔
2388
    return ec;
64✔
2389
}
68✔
2390

2391

2392
std::error_code Socket::shutdown(shutdown_type what, std::error_code& ec)
2393
{
52✔
2394
    native_handle_type sock_fd = m_desc.native_handle();
52✔
2395
    int how = what;
52✔
2396
    int ret = ::shutdown(sock_fd, how);
52✔
2397
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
52✔
2398
        return ec;
×
2399
    ec = std::error_code(); // Success
52✔
2400
    return ec;
52✔
2401
}
52✔
2402

2403

2404
bool Socket::initiate_async_connect(const Endpoint& ep, std::error_code& ec)
2405
{
3,902✔
2406
    if (!is_open()) {
3,902✔
2407
        if (REALM_UNLIKELY(open(ep.protocol(), ec)))
3,898✔
2408
            return true; // Failure
×
2409
    }
3,898✔
2410
    m_desc.ensure_nonblocking_mode(); // Throws
3,902✔
2411

2412
    // Initiate connect operation.
2413
    native_handle_type sock_fd = m_desc.native_handle();
3,902✔
2414
    socklen_t addr_len =
3,902✔
2415
        ep.m_protocol.is_ip_v4() ? sizeof(Endpoint::sockaddr_ip_v4_type) : sizeof(Endpoint::sockaddr_ip_v6_type);
3,902✔
2416
    int ret = ::connect(sock_fd, &ep.m_sockaddr_union.m_base, addr_len);
3,902✔
2417
    if (ret != -1) {
3,902✔
2418
        ec = std::error_code(); // Success
2✔
2419
        return true;            // Immediate completion.
2✔
2420
    }
2✔
2421

2422
    // EINPROGRESS (and on Windows, also WSAEWOULDBLOCK) indicates that the
2423
    // underlying connect operation was successfully initiated, but not
2424
    // immediately completed, and EALREADY indicates that an underlying connect
2425
    // operation was already initiated, and still not completed, presumably
2426
    // because a previous call to connect() or async_connect() failed, or was
2427
    // canceled.
2428

2429
#ifdef _WIN32
2430
    int err = WSAGetLastError();
2431
    if (err != WSAEWOULDBLOCK) {
2432
        ec = make_winsock_error_code(err);
2433
        return true; // Failure
2434
    }
2435
#else
2436
    int err = errno;
3,900✔
2437
    if (REALM_UNLIKELY(err != EINPROGRESS && err != EALREADY)) {
3,900✔
UNCOV
2438
        ec = make_basic_system_error_code(err);
×
UNCOV
2439
        return true; // Failure
×
UNCOV
2440
    }
×
2441
#endif
3,900✔
2442

2443
    return false; // Successful initiation, but no immediate completion.
3,900✔
2444
}
3,900✔
2445

2446

2447
std::error_code Socket::finalize_async_connect(std::error_code& ec) noexcept
2448
{
3,894✔
2449
    native_handle_type sock_fd = m_desc.native_handle();
3,894✔
2450
    int connect_errno = 0;
3,894✔
2451
    socklen_t connect_errno_size = sizeof connect_errno;
3,894✔
2452
    int ret =
3,894✔
2453
        ::getsockopt(sock_fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&connect_errno), &connect_errno_size);
3,894✔
2454
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
3,894✔
2455
        return ec; // getsockopt() failed
×
2456
    if (REALM_UNLIKELY(connect_errno)) {
3,894✔
2457
        ec = make_basic_system_error_code(connect_errno);
2✔
2458
        return ec; // connect failed
2✔
2459
    }
2✔
2460
    return std::error_code(); // Success
3,892✔
2461
}
3,894✔
2462

2463

2464
std::error_code Acceptor::listen(int backlog, std::error_code& ec)
2465
{
1,452✔
2466
    native_handle_type sock_fd = m_desc.native_handle();
1,452✔
2467
    int ret = ::listen(sock_fd, backlog);
1,452✔
2468
    if (REALM_UNLIKELY(check_socket_error(ret, ec)))
1,452✔
2469
        return ec;
×
2470
    ec = std::error_code(); // Success
1,452✔
2471
    return ec;
1,452✔
2472
}
1,452✔
2473

2474

2475
Service& DeadlineTimer::get_service() noexcept
2476
{
×
2477
    return m_service_impl.service;
×
2478
}
×
2479

2480

2481
void DeadlineTimer::cancel() noexcept
2482
{
19,436✔
2483
    if (m_wait_oper && m_wait_oper->in_use() && !m_wait_oper->is_canceled()) {
19,436✔
2484
        m_wait_oper->cancel();
15,034✔
2485
        if (!m_wait_oper->is_complete()) {
15,034✔
2486
            using WaitOperBase = Service::WaitOperBase;
15,034✔
2487
            WaitOperBase& wait_operation = static_cast<WaitOperBase&>(*m_wait_oper);
15,034✔
2488
            m_service_impl.cancel_incomplete_wait_oper(wait_operation);
15,034✔
2489
        }
15,034✔
2490
    }
15,034✔
2491
}
19,436✔
2492

2493

2494
void DeadlineTimer::initiate_oper(Service::LendersWaitOperPtr op)
2495
{
447,118✔
2496
    m_service_impl.add_wait_oper(std::move(op)); // Throws
447,118✔
2497
}
447,118✔
2498

2499

2500
bool ReadAheadBuffer::read(char*& begin, char* end, int delim, std::error_code& ec) noexcept
2501
{
2,056,386✔
2502
    std::size_t in_avail = m_end - m_begin;
2,056,386✔
2503
    std::size_t out_avail = end - begin;
2,056,386✔
2504
    std::size_t n = std::min(in_avail, out_avail);
2,056,386✔
2505
    // If n is 0, return whether or not the read expects 0 bytes for the completed response
2506
    if (n == 0)
2,056,386✔
2507
        return out_avail == 0;
498,766✔
2508

2509
    bool delim_mode = (delim != std::char_traits<char>::eof());
1,557,620✔
2510
    char* i =
1,557,620✔
2511
        (!delim_mode ? m_begin + n : std::find(m_begin, m_begin + n, std::char_traits<char>::to_char_type(delim)));
1,557,620✔
2512
    begin = std::copy(m_begin, i, begin);
1,557,620✔
2513
    m_begin = i;
1,557,620✔
2514
    if (begin == end) {
1,557,620✔
2515
        if (delim_mode)
846,962✔
2516
            ec = MiscExtErrors::delim_not_found;
×
2517
    }
846,962✔
2518
    else {
710,658✔
2519
        if (m_begin == m_end)
710,658✔
2520
            return false;
657,666✔
2521
        REALM_ASSERT(delim_mode);
52,992✔
2522
        *begin++ = *m_begin++; // Transfer delimiter
52,992✔
2523
    }
52,992✔
2524
    return true;
899,954✔
2525
}
1,557,620✔
2526

2527

2528
namespace realm::sync::network {
2529

2530
std::string host_name()
2531
{
4✔
2532
    // POSIX allows for gethostname() to report success even if the buffer is
2533
    // too small to hold the name, and in that case POSIX requires that the
2534
    // buffer is filled, but not that it contains a final null-termination.
2535
    char small_stack_buffer[256];
4✔
2536
    int ret = ::gethostname(small_stack_buffer, sizeof small_stack_buffer);
4✔
2537
    if (ret != -1) {
4✔
2538
        // Check that a null-termination was included
2539
        char* end = small_stack_buffer + sizeof small_stack_buffer;
4✔
2540
        char* i = std::find(small_stack_buffer, end, 0);
4✔
2541
        if (i != end)
4✔
2542
            return std::string(small_stack_buffer, i);
4✔
2543
    }
4✔
2544
    constexpr std::size_t large_heap_buffer_size = 4096;
×
2545
    std::unique_ptr<char[]> large_heap_buffer(new char[large_heap_buffer_size]); // Throws
×
2546
    ret = ::gethostname(large_heap_buffer.get(), large_heap_buffer_size);
×
2547
    if (REALM_LIKELY(ret != -1)) {
×
2548
        // Check that a null-termination was included
2549
        char* end = large_heap_buffer.get() + large_heap_buffer_size;
×
2550
        char* i = std::find(large_heap_buffer.get(), end, 0);
×
2551
        if (i != end)
×
2552
            return std::string(large_heap_buffer.get(), i);
×
2553
    }
×
2554
    throw std::system_error(errno, std::system_category(), "gethostname() failed");
×
2555
}
×
2556

2557

2558
Address make_address(const char* c_str, std::error_code& ec) noexcept
2559
{
×
2560
    Address addr;
×
2561
    int ret = ::inet_pton(AF_INET6, c_str, &addr.m_union);
×
2562
    REALM_ASSERT(ret == 0 || ret == 1);
×
2563
    if (ret == 1) {
×
2564
        addr.m_is_ip_v6 = true;
×
2565
        ec = std::error_code(); // Success (IPv6)
×
2566
        return addr;
×
2567
    }
×
2568
    ret = ::inet_pton(AF_INET, c_str, &addr.m_union);
×
2569
    REALM_ASSERT(ret == 0 || ret == 1);
×
2570
    if (ret == 1) {
×
2571
        ec = std::error_code(); // Success (IPv4)
×
2572
        return addr;
×
2573
    }
×
2574
    ec = error::invalid_argument;
×
2575
    return Address();
×
2576

2577
    // FIXME: Currently. `addr.m_ip_v6_scope_id` is always set to zero. It nees
2578
    // to be set based on a combined inspection of the original string
2579
    // representation, and the parsed address. The following code is "borrowed"
2580
    // from ASIO:
2581
    /*
2582
        *scope_id = 0;
2583
        if (const char* if_name = strchr(src, '%'))
2584
        {
2585
          in6_addr_type* ipv6_address = static_cast<in6_addr_type*>(dest);
2586
          bool is_link_local = ((ipv6_address->s6_addr[0] == 0xfe)
2587
              && ((ipv6_address->s6_addr[1] & 0xc0) == 0x80));
2588
          bool is_multicast_link_local = ((ipv6_address->s6_addr[0] == 0xff)
2589
              && ((ipv6_address->s6_addr[1] & 0x0f) == 0x02));
2590
          if (is_link_local || is_multicast_link_local)
2591
            *scope_id = if_nametoindex(if_name + 1);
2592
          if (*scope_id == 0)
2593
            *scope_id = atoi(if_name + 1);
2594
        }
2595
    */
2596
}
×
2597

2598
class ResolveErrorCategory : public std::error_category {
2599
public:
2600
    const char* name() const noexcept final
2601
    {
×
2602
        return "realm.sync.network.resolve";
×
2603
    }
×
2604

2605
    std::string message(int value) const final
2606
    {
8✔
2607
        switch (ResolveErrors(value)) {
8✔
2608
            case ResolveErrors::host_not_found:
4✔
2609
                return "Host not found (authoritative)";
4✔
2610
            case ResolveErrors::host_not_found_try_again:
4✔
2611
                return "Host not found (non-authoritative)";
4✔
2612
            case ResolveErrors::no_data:
✔
2613
                return "The query is valid but does not have associated address data";
×
2614
            case ResolveErrors::no_recovery:
✔
2615
                return "A non-recoverable error occurred";
×
2616
            case ResolveErrors::service_not_found:
✔
2617
                return "The service is not supported for the given socket type";
×
2618
            case ResolveErrors::socket_type_not_supported:
✔
2619
                return "The socket type is not supported";
×
2620
        }
8✔
2621
        REALM_ASSERT(false);
×
2622
        return {};
×
2623
    }
8✔
2624
};
2625

2626
const std::error_category& resolve_error_category() noexcept
2627
{
10✔
2628
    static const ResolveErrorCategory resolve_error_category;
10✔
2629
    return resolve_error_category;
10✔
2630
}
10✔
2631

2632
std::error_code make_error_code(ResolveErrors err)
2633
{
10✔
2634
    return std::error_code(int(err), resolve_error_category());
10✔
2635
}
10✔
2636

2637
} // namespace realm::sync::network
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc