• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2468

02 Jul 2024 07:51PM UTC coverage: 90.983% (+0.009%) from 90.974%
2468

push

Evergreen

web-flow
[RCORE-2146] CAPI Remove `is_fatal` flag flip (#7751)

102314 of 180446 branches covered (56.7%)

0 of 1 new or added line in 1 file covered. (0.0%)

61 existing lines in 16 files now uncovered.

215154 of 236478 relevant lines covered (90.98%)

5853171.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.56
/test/test_util_network.cpp
1
#include <algorithm>
2
#include <atomic>
3
#include <stdexcept>
4
#include <sstream>
5
#include <memory>
6
#include <thread>
7

8
#include <realm/status.hpp>
9
#include <realm/util/future.hpp>
10
#include <realm/util/memory_stream.hpp>
11
#include <realm/sync/network/network.hpp>
12
#include <realm/sync/trigger.hpp>
13

14
#include "test.hpp"
15
#include "util/semaphore.hpp"
16

17
using namespace realm;
18
using namespace realm::util;
19
using namespace realm::test_util;
20

21

22
// Test independence and thread-safety
23
// -----------------------------------
24
//
25
// All tests must be thread safe and independent of each other. This
26
// is required because it allows for both shuffling of the execution
27
// order and for parallelized testing.
28
//
29
// In particular, avoid using std::rand() since it is not guaranteed
30
// to be thread safe. Instead use the API offered in
31
// `test/util/random.hpp`.
32
//
33
// All files created in tests must use the TEST_PATH macro (or one of
34
// its friends) to obtain a suitable file system path. See
35
// `test/util/test_path.hpp`.
36
//
37
//
38
// Debugging and the ONLY() macro
39
// ------------------------------
40
//
41
// A simple way of disabling all tests except one called `Foo`, is to
42
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
43
// test suite. Note that you can also use filtering by setting the
44
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
45
// this.
46
//
47
// Another way to debug a particular test, is to copy that test into
48
// `experiments/testcase.cpp` and then run `sh build.sh
49
// check-testcase` (or one of its friends) from the command line.
50

51
using namespace realm::sync;
52

53
namespace {
54

55
network::Endpoint bind_acceptor(network::Acceptor& acceptor)
56
{
80✔
57
    network::Endpoint ep; // Wildcard
80✔
58
    acceptor.open(ep.protocol());
80✔
59
    acceptor.bind(ep);
80✔
60
    ep = acceptor.local_endpoint(); // Get actual bound endpoint
80✔
61
    acceptor.listen();
80✔
62
    return ep;
80✔
63
}
80✔
64

65
void connect_sockets(network::Socket& socket_1, network::Socket& socket_2)
66
{
48✔
67
    network::Service& service_1 = socket_1.get_service();
48✔
68
    network::Service& service_2 = socket_2.get_service();
48✔
69
    network::Acceptor acceptor{service_1};
48✔
70
    network::Endpoint ep = bind_acceptor(acceptor);
48✔
71
    bool accept_occurred = false, connect_occurred = false;
48✔
72
    auto accept_handler = [&](std::error_code ec) {
48✔
73
        REALM_ASSERT(!ec);
48✔
74
        accept_occurred = true;
48✔
75
    };
48✔
76
    auto connect_handler = [&](std::error_code ec) {
48✔
77
        REALM_ASSERT(!ec);
48✔
78
        connect_occurred = true;
48✔
79
    };
48✔
80
    acceptor.async_accept(socket_1, std::move(accept_handler));
48✔
81
    socket_2.async_connect(ep, std::move(connect_handler));
48✔
82
    if (&service_1 == &service_2) {
48✔
83
        service_1.run();
44✔
84
    }
44✔
85
    else {
4✔
86
        std::thread thread{[&] {
4✔
87
            service_1.run();
4✔
88
        }};
4✔
89
        service_2.run();
4✔
90
        thread.join();
4✔
91
    }
4✔
92
    REALM_ASSERT(accept_occurred);
48✔
93
    REALM_ASSERT(connect_occurred);
48✔
94
    socket_1.set_option(network::SocketBase::no_delay(true));
48✔
95
    socket_2.set_option(network::SocketBase::no_delay(true));
48✔
96
}
48✔
97

98
void connect_socket(network::Socket& socket, std::string port)
99
{
4✔
100
    network::Service& service = socket.get_service();
4✔
101
    network::Resolver resolver{service};
4✔
102
    network::Resolver::Query query("localhost", port);
4✔
103
    network::Endpoint::List endpoints = resolver.resolve(query);
4✔
104

105
    auto i = endpoints.begin();
4✔
106
    auto end = endpoints.end();
4✔
107
    for (;;) {
6✔
108
        std::error_code ec;
6✔
109
        socket.connect(*i, ec);
6✔
110
        if (!ec)
6✔
111
            break;
4✔
112
        socket.close();
2✔
113
        if (++i == end)
2✔
114
            throw std::runtime_error("Failed to connect to localhost:" + port);
×
115
    }
2✔
116
}
4✔
117

118

119
TEST(Network_Hostname)
120
{
2✔
121
    // Just check that we call call network::host_name()
122
    network::host_name();
2✔
123
}
2✔
124

125

126
TEST(Network_PostOperation)
127
{
2✔
128
    network::Service service;
2✔
129
    int var_1 = 381, var_2 = 743;
2✔
130
    service.post([&](Status status) {
2✔
131
        CHECK(status.is_ok());
2✔
132
        var_1 = 824;
2✔
133
    });
2✔
134
    service.post([&](Status) {
2✔
135
        var_2 = 216;
2✔
136
    });
2✔
137
    CHECK_EQUAL(var_1, 381);
2✔
138
    CHECK_EQUAL(var_2, 743);
2✔
139
    service.run();
2✔
140
    CHECK_EQUAL(var_1, 824);
2✔
141
    CHECK_EQUAL(var_2, 216);
2✔
142
    service.post([&](Status) {
2✔
143
        var_2 = 191;
2✔
144
    });
2✔
145
    service.post([&](Status) {
2✔
146
        var_1 = 476;
2✔
147
    });
2✔
148
    CHECK_EQUAL(var_1, 824);
2✔
149
    CHECK_EQUAL(var_2, 216);
2✔
150
    service.run();
2✔
151
    CHECK_EQUAL(var_1, 476);
2✔
152
    CHECK_EQUAL(var_2, 191);
2✔
153
}
2✔
154

155

156
TEST(Network_RunUntilStopped)
157
{
2✔
158
    network::Service service;
2✔
159
    auto post_to_service = [&](util::UniqueFunction<void()> func = {}) {
8✔
160
        auto [promise, future] = util::make_promise_future<void>();
8✔
161
        service.post([promise = std::move(promise), func = std::move(func)](Status s) mutable {
8✔
162
            if (!s.is_ok()) {
8✔
163
                promise.set_error(s);
×
164
                return;
×
165
            }
×
166
            if (func) {
8✔
167
                func();
2✔
168
            }
2✔
169
            promise.emplace_value();
8✔
170
        });
8✔
171
        return std::move(future);
8✔
172
    };
8✔
173

174
    auto before_run = post_to_service();
2✔
175

176
    auto [thread_stopped_promise, thread_stopped_future] = util::make_promise_future<void>();
2✔
177
    std::thread thread([&service, promise = std::move(thread_stopped_promise)]() mutable {
2✔
178
        service.run_until_stopped();
2✔
179
        promise.emplace_value();
2✔
180
    });
2✔
181

182
    before_run.get();
2✔
183
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
184
    // Post while it's running. This should get fulfilled immediately.
185
    post_to_service().get();
2✔
186
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
187

188
    util::Optional<util::Future<void>> timer_ran_future;
2✔
189
    network::DeadlineTimer timer(service);
2✔
190
    post_to_service([&] {
2✔
191
        auto [promise, future] = util::make_promise_future<void>();
2✔
192
        timer.async_wait(std::chrono::milliseconds{250}, [promise = std::move(promise)](Status s) mutable {
2✔
193
            if (!s.is_ok()) {
2✔
194
                promise.set_error(s);
×
195
                return;
×
196
            }
×
197
            promise.emplace_value();
2✔
198
        });
2✔
199
        timer_ran_future = std::move(future);
2✔
200
    }).get();
2✔
201

202
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
203
    CHECK(timer_ran_future);
2✔
204
    timer_ran_future->get();
2✔
205

206
    service.stop();
2✔
207
    thread.join();
2✔
208

209
    thread_stopped_future.get();
2✔
210

211
    auto after_stop = post_to_service();
2✔
212
    CHECK_NOT(after_stop.is_ready());
2✔
213

214
    service.reset();
2✔
215
    service.run();
2✔
216
    after_stop.get();
2✔
217
}
2✔
218

219
TEST(Network_EventLoopStopAndReset_1)
220
{
2✔
221
    network::Service service;
2✔
222

223
    // Prestop
224
    int var = 381;
2✔
225
    service.stop();
2✔
226
    service.post([&](Status) {
2✔
227
        var = 824;
2✔
228
    });
2✔
229
    service.run(); // Must return immediately
2✔
230
    CHECK_EQUAL(var, 381);
2✔
231
    service.run(); // Must still return immediately
2✔
232
    CHECK_EQUAL(var, 381);
2✔
233

234
    // Reset
235
    service.reset();
2✔
236
    service.post([&](Status) {
2✔
237
        var = 824;
2✔
238
    });
2✔
239
    CHECK_EQUAL(var, 381);
2✔
240
    service.run();
2✔
241
    CHECK_EQUAL(var, 824);
2✔
242
    service.post([&](Status) {
2✔
243
        var = 476;
2✔
244
    });
2✔
245
    CHECK_EQUAL(var, 824);
2✔
246
    service.run();
2✔
247
    CHECK_EQUAL(var, 476);
2✔
248
}
2✔
249

250

251
TEST(Network_EventLoopStopAndReset_2)
252
{
2✔
253
    // Introduce a blocking operation that will keep the event loop running
254
    network::Service service;
2✔
255
    network::Acceptor acceptor{service};
2✔
256
    bind_acceptor(acceptor);
2✔
257
    network::Socket socket{service};
2✔
258
    acceptor.async_accept(socket, [](std::error_code) {});
2✔
259

260
    // Start event loop execution in the background
261
    ThreadWrapper thread_1;
2✔
262
    thread_1.start([&] {
2✔
263
        service.run();
2✔
264
    });
2✔
265

266
    // Check that the event loop is actually running
267
    BowlOfStonesSemaphore bowl_1; // Empty
2✔
268
    service.post([&](Status) {
2✔
269
        bowl_1.add_stone();
2✔
270
    });
2✔
271
    bowl_1.get_stone(); // Block until the stone is added
2✔
272

273
    // Stop the event loop
274
    service.stop();
2✔
275
    CHECK_NOT(thread_1.join());
2✔
276

277
    // Check that the event loop remains in the stopped state
278
    int var = 381;
2✔
279
    service.post([&](Status) {
2✔
280
        var = 824;
2✔
281
    });
2✔
282
    CHECK_EQUAL(var, 381);
2✔
283
    service.run(); // Still stopped, so run() must return immediately
2✔
284
    CHECK_EQUAL(var, 381);
2✔
285

286
    // Put the event loop back into the unstopped state, and restart it in the
287
    // background
288
    service.reset();
2✔
289
    ThreadWrapper thread_2;
2✔
290
    thread_2.start([&] {
2✔
291
        service.run();
2✔
292
    });
2✔
293

294
    // Check that the event loop is actually running
295
    BowlOfStonesSemaphore bowl_2; // Empty
2✔
296
    service.post([&](Status) {
2✔
297
        bowl_2.add_stone();
2✔
298
    });
2✔
299
    bowl_2.get_stone(); // Block until the stone is added
2✔
300

301
    // Stop the event loop by canceling the blocking operation
302
    service.post([&](Status) {
2✔
303
        acceptor.cancel();
2✔
304
    });
2✔
305
    CHECK_NOT(thread_2.join());
2✔
306

307
    CHECK_EQUAL(var, 824);
2✔
308
}
2✔
309

310

311
TEST(Network_GetSetSocketOption)
312
{
2✔
313
    network::Service service;
2✔
314
    network::Socket socket{service};
2✔
315
    socket.open(network::StreamProtocol::ip_v4());
2✔
316
    network::Socket::reuse_address opt_reuse_addr;
2✔
317
    socket.get_option(opt_reuse_addr);
2✔
318
    CHECK_NOT(opt_reuse_addr.value());
2✔
319
    socket.set_option(network::Socket::reuse_address(true));
2✔
320
    socket.get_option(opt_reuse_addr);
2✔
321
    CHECK(opt_reuse_addr.value());
2✔
322
}
2✔
323

324

325
TEST(Network_AsyncConnectAndAsyncAccept)
326
{
2✔
327
    network::Service service;
2✔
328
    network::Acceptor acceptor{service};
2✔
329
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
330
    network::Socket socket_1{service}, socket_2{service};
2✔
331
    bool connected = false;
2✔
332
    auto connect_handler = [&](std::error_code ec) {
2✔
333
        if (ec)
2✔
334
            throw std::system_error(ec);
×
335
        connected = true;
2✔
336
        log("connected");
2✔
337
    };
2✔
338
    bool accepted = false;
2✔
339
    auto accept_handler = [&](std::error_code ec) {
2✔
340
        if (ec)
2✔
341
            throw std::system_error(ec);
×
342
        accepted = true;
2✔
343
        log("accepted");
2✔
344
    };
2✔
345
    socket_1.async_connect(listening_endpoint, std::move(connect_handler));
2✔
346
    acceptor.async_accept(socket_2, std::move(accept_handler));
2✔
347
    service.run();
2✔
348
    CHECK(connected);
2✔
349
    CHECK(accepted);
2✔
350
}
2✔
351

352

353
TEST(Network_ReadWrite)
354
{
2✔
355
    network::Service service_1;
2✔
356
    network::Acceptor acceptor{service_1};
2✔
357
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
358

359
    char data[] = {'X', 'F', 'M'};
2✔
360

361
    auto reader = [&] {
2✔
362
        network::Socket socket_1{service_1};
2✔
363
        acceptor.accept(socket_1);
2✔
364
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
365
        network::ReadAheadBuffer rab;
2✔
366
        char buffer[sizeof data];
2✔
367
        size_t n = socket_1.read(buffer, sizeof data, rab);
2✔
368
        if (CHECK_EQUAL(sizeof data, n))
2✔
369
            CHECK(std::equal(buffer, buffer + n, data));
2✔
370
        std::error_code ec;
2✔
371
        n = socket_1.read(buffer, 1, rab, ec);
2✔
372
        CHECK_EQUAL(0, n);
2✔
373
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
374
    };
2✔
375
    ThreadWrapper thread;
2✔
376
    thread.start(reader);
2✔
377

378
    network::Service service_2;
2✔
379
    network::Socket socket_2{service_2};
2✔
380
    socket_2.connect(listening_endpoint);
2✔
381
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
382
    socket_2.write(data, sizeof data);
2✔
383
    socket_2.close();
2✔
384

385
    CHECK_NOT(thread.join());
2✔
386
}
2✔
387

388

389
TEST(Network_ReadWriteNativeHandle)
390
{
2✔
391
    network::Service service_1;
2✔
392
    network::Acceptor acceptor{service_1};
2✔
393
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
394

395
    char data[] = {'X', 'F', 'M'};
2✔
396

397
    auto reader = [&] {
2✔
398
        network::Socket socket_1{service_1};
2✔
399
        acceptor.accept(socket_1);
2✔
400
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
401
        network::ReadAheadBuffer rab;
2✔
402
        char buffer[sizeof data];
2✔
403
        size_t n = socket_1.read(buffer, sizeof data, rab);
2✔
404
        if (CHECK_EQUAL(sizeof data, n))
2✔
405
            CHECK(std::equal(buffer, buffer + n, data));
2✔
406
        std::error_code ec;
2✔
407
        n = socket_1.read(buffer, 1, rab, ec);
2✔
408
        CHECK_EQUAL(0, n);
2✔
409
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
410
    };
2✔
411
    ThreadWrapper thread;
2✔
412
    thread.start(reader);
2✔
413

414
    network::Service service_2;
2✔
415

416
    // Connect with plain POSIX APIs.
417
    int family = listening_endpoint.protocol().family();
2✔
418
    int protocol = listening_endpoint.protocol().protocol();
2✔
419
    using native_handle_type = network::SocketBase::native_handle_type;
2✔
420
    native_handle_type sockfd = ::socket(family, SOCK_STREAM, protocol);
2✔
421
    CHECK_GREATER(sockfd, 0);
2✔
422

423
    int endpoint_size = listening_endpoint.protocol().is_ip_v4() ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
2✔
424
    int ret = ::connect(sockfd, listening_endpoint.data(), endpoint_size);
2✔
425
    CHECK_EQUAL(ret, 0);
2✔
426

427
    network::Socket socket_2{service_2, listening_endpoint.protocol(), sockfd};
2✔
428
    socket_2.write(data, sizeof data);
2✔
429
    socket_2.close();
2✔
430

431
    CHECK_NOT(thread.join());
2✔
432
}
2✔
433

434

435
TEST(Network_ReadWriteLargeAmount)
436
{
2✔
437
    network::Service service_1;
2✔
438
    network::Acceptor acceptor{service_1};
2✔
439
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
440

441
    size_t num_bytes_per_chunk = 1048576L / 2;
2✔
442
    std::unique_ptr<char[]> chunk(new char[num_bytes_per_chunk]);
2✔
443
    for (size_t i = 0; i < num_bytes_per_chunk; ++i)
1,048,578✔
444
        chunk[i] = char(i % 128);
1,048,576✔
445
    int num_chunks = 128;
2✔
446

447
    auto reader = [&] {
2✔
448
        network::Socket socket_1{service_1};
2✔
449
        acceptor.accept(socket_1);
2✔
450
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
451
        network::ReadAheadBuffer rab;
2✔
452
        size_t buffer_size = 8191; // Prime
2✔
453
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
454
        size_t offset_in_chunk = 0;
2✔
455
        int chunk_index = 0;
2✔
456
        for (;;) {
16,388✔
457
            std::error_code ec;
16,388✔
458
            size_t n = socket_1.read(buffer.get(), buffer_size, rab, ec);
16,388✔
459
            bool equal = true;
16,388✔
460
            for (size_t i = 0; i < n; ++i) {
134,234,116✔
461
                if (chunk[offset_in_chunk] != buffer[i]) {
134,217,728✔
462
                    equal = false;
×
463
                    break;
×
464
                }
×
465
                if (++offset_in_chunk == num_bytes_per_chunk) {
134,217,728✔
466
                    offset_in_chunk = 0;
256✔
467
                    ++chunk_index;
256✔
468
                }
256✔
469
            }
134,217,728✔
470
            CHECK(equal);
16,388✔
471
            if (ec == MiscExtErrors::end_of_input)
16,388✔
472
                break;
2✔
473
            CHECK_NOT(ec);
16,386✔
474
        }
16,386✔
475
        CHECK_EQUAL(0, offset_in_chunk);
2✔
476
        CHECK_EQUAL(num_chunks, chunk_index);
2✔
477
    };
2✔
478
    ThreadWrapper thread;
2✔
479
    thread.start(reader);
2✔
480

481
    network::Service service_2;
2✔
482
    network::Socket socket_2{service_2};
2✔
483
    socket_2.connect(listening_endpoint);
2✔
484
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
485
    for (int i = 0; i < num_chunks; ++i)
258✔
486
        socket_2.write(chunk.get(), num_bytes_per_chunk);
256✔
487
    socket_2.close();
2✔
488

489
    CHECK_NOT(thread.join());
2✔
490
}
2✔
491

492

493
TEST(Network_AsyncReadWriteLargeAmount)
494
{
2✔
495
    network::Service service_1;
2✔
496
    network::Acceptor acceptor{service_1};
2✔
497
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
498

499
    size_t num_bytes_per_chunk = 1048576 / 2;
2✔
500
    std::unique_ptr<char[]> chunk(new char[num_bytes_per_chunk]);
2✔
501
    for (size_t i = 0; i < num_bytes_per_chunk; ++i)
1,048,578✔
502
        chunk[i] = char(i % 128);
1,048,576✔
503
    int num_chunks = 128;
2✔
504

505
    auto reader = [&] {
2✔
506
        network::Socket socket_1{service_1};
2✔
507
        acceptor.accept(socket_1);
2✔
508
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
509
        network::ReadAheadBuffer rab;
2✔
510
        size_t buffer_size = 8191; // Prime
2✔
511
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
512
        size_t offset_in_chunk = 0;
2✔
513
        int chunk_index = 0;
2✔
514
        realm::util::UniqueFunction<void()> read_chunk = [&] {
16,388✔
515
            auto handler = [&](std::error_code ec, size_t n) {
16,388✔
516
                bool equal = true;
16,388✔
517
                for (size_t i = 0; i < n; ++i) {
134,234,116✔
518
                    if (buffer[i] != chunk[offset_in_chunk]) {
134,217,728✔
519
                        equal = false;
×
520
                        break;
×
521
                    }
×
522
                    if (++offset_in_chunk == num_bytes_per_chunk) {
134,217,728✔
523
                        offset_in_chunk = 0;
256✔
524
                        ++chunk_index;
256✔
525
                    }
256✔
526
                }
134,217,728✔
527
                CHECK(equal);
16,388✔
528
                if (ec == MiscExtErrors::end_of_input)
16,388✔
529
                    return;
2✔
530
                CHECK_NOT(ec);
16,386✔
531
                read_chunk();
16,386✔
532
            };
16,386✔
533
            socket_1.async_read(buffer.get(), buffer_size, rab, handler);
16,388✔
534
        };
16,388✔
535
        read_chunk();
2✔
536
        service_1.run();
2✔
537
        CHECK_EQUAL(0, offset_in_chunk);
2✔
538
        CHECK_EQUAL(num_chunks, chunk_index);
2✔
539
    };
2✔
540
    ThreadWrapper thread;
2✔
541
    thread.start(reader);
2✔
542

543
    network::Service service_2;
2✔
544
    network::Socket socket_2{service_2};
2✔
545
    socket_2.connect(listening_endpoint);
2✔
546
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
547
    std::function<void(int)> write_chunk = [&](int i) {
256✔
548
        auto handler = [this, i, num_chunks, num_bytes_per_chunk, write_chunk](std::error_code ec, size_t n) {
256✔
549
            if (CHECK_NOT(ec)) {
256✔
550
                CHECK_EQUAL(num_bytes_per_chunk, n);
256✔
551
                if (i + 1 == num_chunks)
256✔
552
                    return;
2✔
553
                write_chunk(i + 1);
254✔
554
            }
254✔
555
        };
256✔
556
        socket_2.async_write(chunk.get(), num_bytes_per_chunk, handler);
256✔
557
    };
256✔
558
    write_chunk(0);
2✔
559
    service_2.run();
2✔
560
    socket_2.close();
2✔
561

562
    CHECK_NOT(thread.join());
2✔
563
}
2✔
564

565

566
TEST(Network_SocketAndAcceptorOpen)
567
{
2✔
568
    network::Service service_1;
2✔
569
    network::Acceptor acceptor{service_1};
2✔
570
    network::Resolver resolver{service_1};
2✔
571
    network::Resolver::Query query("localhost", "",
2✔
572
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
2✔
573
    network::Endpoint::List endpoints = resolver.resolve(query);
2✔
574
    {
2✔
575
        auto i = endpoints.begin();
2✔
576
        auto end = endpoints.end();
2✔
577
        for (;;) {
2✔
578
            std::error_code ec;
2✔
579
            acceptor.open(i->protocol(), ec);
2✔
580
            if (!ec) {
2✔
581
                acceptor.bind(*i, ec);
2✔
582
                if (!ec)
2✔
583
                    break;
2✔
584
                acceptor.close();
×
585
            }
×
586
            if (++i == end)
×
587
                throw std::runtime_error("Failed to bind to localhost:*");
×
588
        }
×
589
    }
2✔
590
    network::Endpoint listening_endpoint = acceptor.local_endpoint();
2✔
591
    acceptor.listen();
2✔
592
    network::Socket socket_1{service_1};
2✔
593
    ThreadWrapper thread;
2✔
594
    thread.start([&] {
2✔
595
        acceptor.accept(socket_1);
2✔
596
    });
2✔
597

598
    network::Service service_2;
2✔
599
    network::Socket socket_2{service_2};
2✔
600
    socket_2.open(listening_endpoint.protocol());
2✔
601
    socket_2.connect(listening_endpoint);
2✔
602

603
    thread.join();
2✔
604
}
2✔
605

606

607
TEST(Network_CancelAsyncAccept)
608
{
2✔
609
    network::Service service;
2✔
610
    network::Acceptor acceptor{service};
2✔
611
    bind_acceptor(acceptor);
2✔
612
    network::Socket socket{service};
2✔
613

614
    bool accept_was_canceled = false;
2✔
615
    auto handler = [&](std::error_code ec) {
4✔
616
        if (ec == error::operation_aborted)
4✔
617
            accept_was_canceled = true;
4✔
618
    };
4✔
619
    acceptor.async_accept(socket, handler);
2✔
620
    acceptor.cancel();
2✔
621
    service.run();
2✔
622
    CHECK(accept_was_canceled);
2✔
623

624
    accept_was_canceled = false;
2✔
625
    acceptor.async_accept(socket, handler);
2✔
626
    acceptor.close();
2✔
627
    service.run();
2✔
628
    CHECK(accept_was_canceled);
2✔
629
}
2✔
630

631

632
TEST(Network_CancelAsyncConnect)
633
{
2✔
634
    network::Service service;
2✔
635
    network::Acceptor acceptor{service};
2✔
636
    network::Endpoint ep = bind_acceptor(acceptor);
2✔
637
    network::Socket socket{service};
2✔
638

639
    bool connect_was_canceled = false;
2✔
640
    auto handler = [&](std::error_code ec) {
4✔
641
        if (ec == error::operation_aborted)
4✔
642
            connect_was_canceled = true;
4✔
643
    };
4✔
644
    socket.async_connect(ep, std::move(handler));
2✔
645
    socket.cancel();
2✔
646
    service.run();
2✔
647
    CHECK(connect_was_canceled);
2✔
648

649
    connect_was_canceled = false;
2✔
650
    socket.async_connect(ep, std::move(handler));
2✔
651
    socket.close();
2✔
652
    service.run();
2✔
653
    CHECK(connect_was_canceled);
2✔
654
}
2✔
655

656

657
TEST(Network_CancelAsyncReadWrite)
658
{
2✔
659
    network::Service service;
2✔
660
    network::Acceptor acceptor{service};
2✔
661
    bind_acceptor(acceptor);
2✔
662
    network::Socket socket_1{service};
2✔
663
    bool was_accepted = false;
2✔
664
    auto accept_handler = [&](std::error_code ec) {
2✔
665
        if (!ec)
2✔
666
            was_accepted = true;
2✔
667
    };
2✔
668
    acceptor.async_accept(socket_1, accept_handler);
2✔
669
    network::Socket socket_2{service};
2✔
670
    socket_2.connect(acceptor.local_endpoint());
2✔
671
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
672
    service.run();
2✔
673
    CHECK(was_accepted);
2✔
674
    socket_1.set_option(network::SocketBase::no_delay(true));
2✔
675
    const size_t size = 1;
2✔
676
    char data[size] = {'a'};
2✔
677
    bool write_was_canceled = false;
2✔
678
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
679
        if (ec == error::operation_aborted)
2✔
680
            write_was_canceled = true;
2✔
681
    };
2✔
682
    socket_2.async_write(data, size, write_handler);
2✔
683
    network::ReadAheadBuffer rab;
2✔
684
    char buffer[size];
2✔
685
    bool read_was_canceled = false;
2✔
686
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
687
        if (ec == error::operation_aborted)
2✔
688
            read_was_canceled = true;
2✔
689
    };
2✔
690
    socket_2.async_read(buffer, size, rab, read_handler);
2✔
691
    socket_2.close();
2✔
692
    service.run();
2✔
693
    CHECK(read_was_canceled);
2✔
694
    CHECK(write_was_canceled);
2✔
695
}
2✔
696

697
TEST(Network_CancelEmptyRead)
698
{
2✔
699
    // Make sure that an immediately completable read operation is still
700
    // cancelable
701

702
    network::Service service;
2✔
703
    network::Socket socket_1{service}, socket_2{service};
2✔
704
    connect_sockets(socket_1, socket_2);
2✔
705
    network::ReadAheadBuffer rab;
2✔
706
    const size_t size = 1;
2✔
707
    char data[size] = {'a'};
2✔
708
    bool write_was_canceled = false;
2✔
709
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
710
        if (ec == error::operation_aborted)
2✔
711
            write_was_canceled = true;
2✔
712
    };
2✔
713
    socket_2.async_write(data, size, write_handler);
2✔
714
    char buffer[size];
2✔
715
    bool read_was_canceled = false;
2✔
716
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
717
        if (ec == error::operation_aborted)
2✔
718
            read_was_canceled = true;
2✔
719
    };
2✔
720
    socket_2.async_read(buffer, 0, rab, read_handler);
2✔
721
    socket_2.close();
2✔
722
    service.run();
2✔
723
    CHECK(read_was_canceled);
2✔
724
    CHECK(write_was_canceled);
2✔
725
}
2✔
726

727
TEST(Network_CancelEmptyWrite)
728
{
2✔
729
    // Make sure that an immediately completable write operation is still
730
    // cancelable
731

732
    network::Service service;
2✔
733
    network::Socket socket_1{service}, socket_2{service};
2✔
734
    connect_sockets(socket_1, socket_2);
2✔
735
    network::ReadAheadBuffer rab;
2✔
736
    char buffer[1];
2✔
737
    bool read_was_canceled = false;
2✔
738
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
739
        if (ec == error::operation_aborted)
2✔
740
            read_was_canceled = true;
2✔
741
    };
2✔
742
    socket_2.async_read(buffer, 1, rab, read_handler);
2✔
743
    char data[1] = {'a'};
2✔
744
    bool write_was_canceled = false;
2✔
745
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
746
        if (ec == error::operation_aborted)
2✔
747
            write_was_canceled = true;
2✔
748
    };
2✔
749
    socket_2.async_write(data, 0, write_handler);
2✔
750
    socket_2.close();
2✔
751
    service.run();
2✔
752
    CHECK(read_was_canceled);
2✔
753
    CHECK(write_was_canceled);
2✔
754
}
2✔
755

756

757
TEST(Network_CancelReadByDestroy)
758
{
2✔
759
    // Check that canceled read operations never try to access socket, stream,
760
    // or input buffer objects, even if they were partially completed.
761

762
    const int num_connections = 16;
2✔
763
    network::Service service;
2✔
764
    std::unique_ptr<std::unique_ptr<network::Socket>[]> write_sockets;
2✔
765
    std::unique_ptr<std::unique_ptr<network::Socket>[]> read_sockets;
2✔
766
    std::unique_ptr<std::unique_ptr<network::ReadAheadBuffer>[]> read_ahead_buffers;
2✔
767
    write_sockets.reset(new std::unique_ptr<network::Socket>[num_connections]);
2✔
768
    read_sockets.reset(new std::unique_ptr<network::Socket>[num_connections]);
2✔
769
    read_ahead_buffers.reset(new std::unique_ptr<network::ReadAheadBuffer>[num_connections]);
2✔
770
    char output_buffer[2] = {'x', '\n'};
2✔
771
    std::unique_ptr<char[][2]> input_buffers(new char[num_connections][2]);
2✔
772
    for (int i = 0; i < num_connections; ++i) {
34✔
773
        write_sockets[i].reset(new network::Socket{service});
32✔
774
        read_sockets[i].reset(new network::Socket{service});
32✔
775
        connect_sockets(*write_sockets[i], *read_sockets[i]);
32✔
776
        read_ahead_buffers[i].reset(new network::ReadAheadBuffer);
32✔
777
    }
32✔
778
    for (int i = 0; i < num_connections; ++i) {
34✔
779
        auto read_handler = [&](std::error_code ec, size_t n) {
32✔
780
            CHECK(n == 0 || n == 1 || n == 2);
32✔
781
            if (n == 2) {
32✔
782
                CHECK_NOT(ec);
2✔
783
                for (int j = 0; j < num_connections; ++j)
34✔
784
                    read_sockets[j]->cancel();
32✔
785
                read_ahead_buffers.reset(); // Destroy all input streams
2✔
786
                read_sockets.reset();       // Destroy all read sockets
2✔
787
                input_buffers.reset();      // Destroy all input buffers
2✔
788
                return;
2✔
789
            }
2✔
790
            CHECK_EQUAL(error::operation_aborted, ec);
30✔
791
        };
30✔
792
        read_sockets[i]->async_read_until(input_buffers[i], 2, '\n', *read_ahead_buffers[i], read_handler);
32✔
793
        auto write_handler = [&](std::error_code ec, size_t) {
32✔
794
            CHECK_NOT(ec);
32✔
795
        };
32✔
796
        int n = (i == num_connections / 2 ? 2 : 1);
32✔
797
        write_sockets[i]->async_write(output_buffer, n, write_handler);
32✔
798
    }
32✔
799
    service.run();
2✔
800
}
2✔
801

802

803
TEST(Network_AcceptorMixedAsyncSync)
804
{
2✔
805
    network::Service service;
2✔
806
    network::Acceptor acceptor{service};
2✔
807
    bind_acceptor(acceptor);
2✔
808
    network::Endpoint ep = acceptor.local_endpoint();
2✔
809
    auto connect = [ep] {
6✔
810
        network::Service connect_service;
6✔
811
        network::Socket socket{connect_service};
6✔
812
        socket.connect(ep);
6✔
813
    };
6✔
814

815
    // Synchronous accept -> stay on blocking mode
816
    {
2✔
817
        ThreadWrapper thread;
2✔
818
        thread.start(connect);
2✔
819
        network::Socket socket{service};
2✔
820
        acceptor.accept(socket);
2✔
821
        CHECK_NOT(thread.join());
2✔
822
    }
2✔
823

824
    // Asynchronous accept -> switch to nonblocking mode
825
    {
2✔
826
        ThreadWrapper thread;
2✔
827
        thread.start(connect);
2✔
828
        network::Socket socket{service};
2✔
829
        bool was_accepted = false;
2✔
830
        auto accept_handler = [&](std::error_code ec) {
2✔
831
            if (!ec)
2✔
832
                was_accepted = true;
2✔
833
        };
2✔
834
        acceptor.async_accept(socket, accept_handler);
2✔
835
        service.run();
2✔
836
        CHECK(was_accepted);
2✔
837
        CHECK_NOT(thread.join());
2✔
838
    }
2✔
839

840
    // Synchronous accept -> switch back to blocking mode
841
    {
2✔
842
        ThreadWrapper thread;
2✔
843
        thread.start(connect);
2✔
844
        network::Socket socket{service};
2✔
845
        acceptor.accept(socket);
2✔
846
        CHECK_NOT(thread.join());
2✔
847
    }
2✔
848
}
2✔
849

850

851
TEST(Network_SocketMixedAsyncSync)
852
{
2✔
853
    network::Service acceptor_service;
2✔
854
    network::Acceptor acceptor{acceptor_service};
2✔
855
    bind_acceptor(acceptor);
2✔
856
    network::Endpoint ep = acceptor.local_endpoint();
2✔
857
    auto accept_and_echo = [&] {
4✔
858
        network::Socket socket{acceptor_service};
4✔
859
        acceptor.accept(socket);
4✔
860
        socket.set_option(network::SocketBase::no_delay(true));
4✔
861
        network::ReadAheadBuffer rab;
4✔
862
        size_t buffer_size = 1024;
4✔
863
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
4✔
864
        size_t size = socket.read_until(buffer.get(), buffer_size, '\n', rab);
4✔
865
        socket.write(buffer.get(), size);
4✔
866
    };
4✔
867

868
    {
2✔
869
        ThreadWrapper thread;
2✔
870
        thread.start(accept_and_echo);
2✔
871
        network::Service service;
2✔
872

873
        // Synchronous connect -> stay in blocking mode
874
        network::Socket socket{service};
2✔
875
        socket.connect(ep);
2✔
876
        socket.set_option(network::SocketBase::no_delay(true));
2✔
877
        network::ReadAheadBuffer rab;
2✔
878

879
        // Asynchronous write -> switch to nonblocking mode
880
        const char* message = "Calabi-Yau\n";
2✔
881
        bool was_written = false;
2✔
882
        auto write_handler = [&](std::error_code ec, size_t) {
2✔
883
            if (!ec)
2✔
884
                was_written = true;
2✔
885
        };
2✔
886
        socket.async_write(message, strlen(message), write_handler);
2✔
887
        service.run();
2✔
888
        CHECK(was_written);
2✔
889

890
        // Synchronous read -> switch back to blocking mode
891
        size_t buffer_size = 1024;
2✔
892
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
893
        std::error_code ec;
2✔
894
        size_t size = socket.read(buffer.get(), buffer_size, rab, ec);
2✔
895
        if (CHECK_EQUAL(ec, MiscExtErrors::end_of_input)) {
2✔
896
            if (CHECK_EQUAL(size, strlen(message)))
2✔
897
                CHECK(std::equal(buffer.get(), buffer.get() + size, message));
2✔
898
        }
2✔
899

900
        CHECK_NOT(thread.join());
2✔
901
    }
2✔
902

903
    {
2✔
904
        ThreadWrapper thread;
2✔
905
        thread.start(accept_and_echo);
2✔
906
        network::Service service;
2✔
907

908
        // Asynchronous connect -> switch to nonblocking mode
909
        network::Socket socket{service};
2✔
910
        bool is_connected = false;
2✔
911
        auto connect_handler = [&](std::error_code ec) {
2✔
912
            if (!ec)
2✔
913
                is_connected = true;
2✔
914
        };
2✔
915
        socket.async_connect(ep, std::move(connect_handler));
2✔
916
        service.run();
2✔
917
        CHECK(is_connected);
2✔
918
        network::ReadAheadBuffer rab;
2✔
919

920
        // Synchronous write -> switch back to blocking mode
921
        const char* message = "The Verlinde Algebra And The Cohomology Of The Grassmannian\n";
2✔
922
        socket.write(message, strlen(message));
2✔
923

924
        // Asynchronous read -> swich once again to nonblocking mode
925
        size_t buffer_size = 1024;
2✔
926
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
927
        auto read_handler = [&](std::error_code ec, size_t size) {
2✔
928
            if (CHECK_EQUAL(ec, MiscExtErrors::end_of_input)) {
2✔
929
                if (CHECK_EQUAL(size, strlen(message)))
2✔
930
                    CHECK(std::equal(buffer.get(), buffer.get() + size, message));
2✔
931
            }
2✔
932
        };
2✔
933
        socket.async_read(buffer.get(), buffer_size, rab, read_handler);
2✔
934
        service.run();
2✔
935

936
        CHECK_NOT(thread.join());
2✔
937
    }
2✔
938
}
2✔
939

940

941
TEST(Network_SocketShutdown)
942
{
2✔
943
    network::Service service;
2✔
944
    network::Socket socket_1{service}, socket_2{service};
2✔
945
    connect_sockets(socket_1, socket_2);
2✔
946
    network::ReadAheadBuffer read_ahead_buffer;
2✔
947

948
    bool end_of_input_seen = false;
2✔
949
    auto handler = [&](std::error_code ec, size_t) {
2✔
950
        if (ec == MiscExtErrors::end_of_input)
2✔
951
            end_of_input_seen = true;
2✔
952
    };
2✔
953
    char ch;
2✔
954
    socket_2.async_read(&ch, 1, read_ahead_buffer, std::move(handler));
2✔
955
    socket_1.shutdown(network::Socket::shutdown_send);
2✔
956
    service.run();
2✔
957
    CHECK(end_of_input_seen);
2✔
958
}
2✔
959

960

961
TEST(Network_DeadlineTimer)
962
{
2✔
963
    network::Service service;
2✔
964
    network::DeadlineTimer timer{service};
2✔
965

966
    // Check that the completion handler is executed
967
    bool completed = false;
2✔
968
    bool canceled = false;
2✔
969
    const auto wait_handler = [&](Status status) {
6✔
970
        if (status.is_ok())
6✔
971
            completed = true;
2✔
972
        if (status == ErrorCodes::OperationAborted)
6✔
973
            canceled = true;
4✔
974
    };
6✔
975
    timer.async_wait(std::chrono::seconds(0), wait_handler);
2✔
976
    CHECK(!completed);
2✔
977
    CHECK(!canceled);
2✔
978
    service.run();
2✔
979
    CHECK(completed);
2✔
980
    CHECK(!canceled);
2✔
981
    completed = false;
2✔
982

983
    // Check that an immediately completed wait operation can be canceled
984
    timer.async_wait(std::chrono::seconds(0), wait_handler);
2✔
985
    CHECK(!completed);
2✔
986
    CHECK(!canceled);
2✔
987
    timer.cancel();
2✔
988
    CHECK(!completed);
2✔
989
    CHECK(!canceled);
2✔
990
    service.run();
2✔
991
    CHECK(!completed);
2✔
992
    CHECK(canceled);
2✔
993
    canceled = false;
2✔
994

995
    // Check that a long running wait operation can be canceled
996
    timer.async_wait(std::chrono::hours(10000), wait_handler);
2✔
997
    CHECK(!completed);
2✔
998
    CHECK(!canceled);
2✔
999
    timer.cancel();
2✔
1000
    CHECK(!completed);
2✔
1001
    CHECK(!canceled);
2✔
1002
    service.run();
2✔
1003
    CHECK(!completed);
2✔
1004
    CHECK(canceled);
2✔
1005
}
2✔
1006

1007

1008
/*
1009
TEST(Network_DeadlineTimer_Special)
1010
{
1011
    network::Service service;
1012
    network::DeadlineTimer timer_1{service};
1013
    network::DeadlineTimer timer_2{service};
1014
    network::DeadlineTimer timer_3{service};
1015
    network::DeadlineTimer timer_4{service};
1016
    network::DeadlineTimer timer_5{service};
1017
    network::DeadlineTimer timer_6{service};
1018
    timer_1.async_wait(std::chrono::seconds(3), [](Status) { std::cerr << "*3*\n";   });
1019
    timer_2.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2*\n";   });
1020
    timer_3.async_wait(std::chrono::seconds(3), [](Status) { std::cerr << "*3-2*\n"; });
1021
    timer_4.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2-2*\n"; });
1022
    timer_5.async_wait(std::chrono::seconds(1), [](Status) { std::cerr << "*1*\n";   });
1023
    timer_6.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2-3*\n"; });
1024
    service.run();
1025
}
1026
*/
1027

1028

1029
TEST(Network_ThrowFromHandlers)
1030
{
2✔
1031
    // Check that exceptions can propagate correctly out from any type of
1032
    // completion handler
1033
    network::Service service;
2✔
1034
    struct TestException1 {};
2✔
1035
    service.post([](Status) {
2✔
1036
        throw TestException1();
2✔
1037
    });
2✔
1038
    CHECK_THROW(service.run(), TestException1);
2✔
1039

1040
    {
2✔
1041
        network::Acceptor acceptor{service};
2✔
1042
        network::Endpoint ep = bind_acceptor(acceptor);
2✔
1043
        network::Socket socket_1{service};
2✔
1044
        struct TestException2 {};
2✔
1045
        acceptor.async_accept(socket_1, [](std::error_code) {
2✔
1046
            throw TestException2();
2✔
1047
        });
2✔
1048
        network::Socket socket_2{service};
2✔
1049
        socket_2.async_connect(ep, [](std::error_code) {});
2✔
1050
        CHECK_THROW(service.run(), TestException2);
2✔
1051
    }
2✔
1052
    {
2✔
1053
        network::Acceptor acceptor{service};
2✔
1054
        network::Endpoint ep = bind_acceptor(acceptor);
2✔
1055
        network::Socket socket_1{service};
2✔
1056
        acceptor.async_accept(socket_1, [](std::error_code) {});
2✔
1057
        network::Socket socket_2{service};
2✔
1058
        struct TestException3 {};
2✔
1059
        socket_2.async_connect(ep, [](std::error_code) {
2✔
1060
            throw TestException3();
2✔
1061
        });
2✔
1062
        CHECK_THROW(service.run(), TestException3);
2✔
1063
    }
2✔
1064
    {
2✔
1065
        network::Socket socket_1{service}, socket_2{service};
2✔
1066
        connect_sockets(socket_1, socket_2);
2✔
1067
        network::ReadAheadBuffer rab;
2✔
1068
        char ch_1;
2✔
1069
        struct TestException4 {};
2✔
1070
        socket_1.async_read(&ch_1, 1, rab, [](std::error_code, size_t) {
2✔
1071
            throw TestException4();
2✔
1072
        });
2✔
1073
        char ch_2 = 0;
2✔
1074
        socket_2.async_write(&ch_2, 1, [](std::error_code, size_t) {});
2✔
1075
        CHECK_THROW(service.run(), TestException4);
2✔
1076
    }
2✔
1077
    {
2✔
1078
        network::Socket socket_1{service}, socket_2{service};
2✔
1079
        connect_sockets(socket_1, socket_2);
2✔
1080
        network::ReadAheadBuffer rab;
2✔
1081
        char ch_1;
2✔
1082
        socket_1.async_read(&ch_1, 1, rab, [](std::error_code, size_t) {});
2✔
1083
        char ch_2 = 0;
2✔
1084
        struct TestException5 {};
2✔
1085
        socket_2.async_write(&ch_2, 1, [](std::error_code, size_t) {
2✔
1086
            throw TestException5();
2✔
1087
        });
2✔
1088
        CHECK_THROW(service.run(), TestException5);
2✔
1089
    }
2✔
1090
    {
2✔
1091
        network::DeadlineTimer timer{service};
2✔
1092
        struct TestException6 {};
2✔
1093
        timer.async_wait(std::chrono::seconds(0), [](Status) {
2✔
1094
            throw TestException6();
2✔
1095
        });
2✔
1096
        CHECK_THROW(service.run(), TestException6);
2✔
1097
    }
2✔
1098
}
2✔
1099

1100

1101
TEST(Network_HandlerDealloc)
1102
{
2✔
1103
    // Check that dynamically allocated handlers are properly freed when the
1104
    // service object is destroyed.
1105
    {
2✔
1106
        // m_post_handlers
1107
        network::Service service;
2✔
1108
        service.post([](Status) {});
2✔
1109
    }
2✔
1110
    {
2✔
1111
        // m_imm_handlers
1112
        network::Service service;
2✔
1113
        // By adding two post handlers that throw, one is going to be left
1114
        // behind in `m_imm_handlers`
1115
        service.post([&](Status) {
2✔
1116
            throw std::runtime_error("");
2✔
1117
        });
2✔
1118
        service.post([&](Status) {
2✔
1119
            throw std::runtime_error("");
×
1120
        });
×
1121
        CHECK_THROW(service.run(), std::runtime_error);
2✔
1122
    }
2✔
1123
    {
2✔
1124
        // m_poll_handlers
1125
        network::Service service;
2✔
1126
        network::Acceptor acceptor{service};
2✔
1127
        acceptor.open(network::StreamProtocol::ip_v4());
2✔
1128
        network::Socket socket{service};
2✔
1129
        // This leaves behind a read handler in m_poll_handlers
1130
        acceptor.async_accept(socket, [&](std::error_code) {});
2✔
1131
    }
2✔
1132
    {
2✔
1133
        // m_cancel_handlers
1134
        network::Service service;
2✔
1135
        network::Acceptor acceptor{service};
2✔
1136
        acceptor.open(network::StreamProtocol::ip_v4());
2✔
1137
        network::Socket socket{service};
2✔
1138
        acceptor.async_accept(socket, [&](std::error_code) {});
2✔
1139
        // This leaves behind a read handler in m_cancel_handlers
1140
        acceptor.close();
2✔
1141
    }
2✔
1142
    {
2✔
1143
        // m_poll_handlers
1144
        network::Service service_1;
2✔
1145
        network::Acceptor acceptor{service_1};
2✔
1146
        network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
1147
        network::Socket socket_1{service_1};
2✔
1148
        ThreadWrapper thread;
2✔
1149
        thread.start([&] {
2✔
1150
            acceptor.accept(socket_1);
2✔
1151
        });
2✔
1152
        network::Service service_2;
2✔
1153
        network::Socket socket_2{service_2};
2✔
1154
        socket_2.connect(listening_endpoint);
2✔
1155
        socket_2.set_option(network::SocketBase::no_delay(true));
2✔
1156
        thread.join();
2✔
1157
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
1158
        network::ReadAheadBuffer rab;
2✔
1159
        char buffer[1];
2✔
1160
        char data[] = {'X', 'F', 'M'};
2✔
1161
        // This leaves behind both a read and a write handler in m_poll_handlers
1162
        socket_1.async_read(buffer, sizeof buffer, rab, [](std::error_code, size_t) {});
2✔
1163
        socket_1.async_write(data, sizeof data, [](std::error_code, size_t) {});
2✔
1164
    }
2✔
1165
}
2✔
1166

1167

1168
namespace {
1169

1170
template <int size>
1171
struct PostReallocHandler {
1172
    PostReallocHandler(int& v)
1173
        : var(v)
9✔
1174
    {
18✔
1175
    }
18✔
1176
    void operator()(Status)
1177
    {
18✔
1178
        var = size;
18✔
1179
    }
18✔
1180
    int& var;
1181
    char strut[size];
1182
};
1183

1184
} // unnamed namespace
1185

1186
TEST(Network_PostRealloc)
1187
{
2✔
1188
    // Use progressively larger post handlers to check that memory reallocation
1189
    // works
1190

1191
    network::Service service;
2✔
1192
    int var = 0;
2✔
1193
    for (int i = 0; i < 3; ++i) {
8✔
1194
        service.post(PostReallocHandler<10>(var));
6✔
1195
        service.run();
6✔
1196
        CHECK_EQUAL(10, var);
6✔
1197
        service.post(PostReallocHandler<100>(var));
6✔
1198
        service.run();
6✔
1199
        CHECK_EQUAL(100, var);
6✔
1200
        service.post(PostReallocHandler<1000>(var));
6✔
1201
        service.run();
6✔
1202
        CHECK_EQUAL(1000, var);
6✔
1203
    }
6✔
1204
}
2✔
1205

1206

1207
namespace {
1208

1209
struct AsyncReadWriteRealloc {
1210
    network::Service service;
1211
    network::Socket read_socket{service}, write_socket{service};
1212
    network::ReadAheadBuffer rab;
1213
    char read_buffer[3];
1214
    char write_buffer[3] = {'0', '1', '2'};
1215
    Random random{random_int<unsigned long>()}; // Seed from slow global generator
1216

1217
    const size_t num_bytes_to_write = 65536;
1218
    size_t num_bytes_written = 0;
1219
    size_t num_bytes_read = 0;
1220

1221
    template <int size>
1222
    struct WriteHandler {
1223
        WriteHandler(AsyncReadWriteRealloc& s)
1224
            : state(s)
43,287✔
1225
        {
86,848✔
1226
        }
86,848✔
1227
        void operator()(std::error_code ec, size_t n)
1228
        {
86,848✔
1229
            if (ec)
86,848✔
1230
                throw std::system_error(ec);
×
1231
            state.num_bytes_written += n;
86,848✔
1232
            state.initiate_write();
86,848✔
1233
        }
86,848✔
1234
        AsyncReadWriteRealloc& state;
1235
        char strut[size];
1236
    };
1237

1238
    void initiate_write()
1239
    {
86,850✔
1240
        if (num_bytes_written >= num_bytes_to_write) {
86,850✔
1241
            write_socket.close();
2✔
1242
            return;
2✔
1243
        }
2✔
1244
        int v = random.draw_int_max(3);
86,848✔
1245
        size_t n = std::min(size_t(v), size_t(num_bytes_to_write - num_bytes_written));
86,848✔
1246
        switch (v) {
86,848✔
1247
            case 0:
21,407✔
1248
                write_socket.async_write(write_buffer, n, WriteHandler<1>(*this));
21,407✔
1249
                return;
21,407✔
1250
            case 1:
21,727✔
1251
                write_socket.async_write(write_buffer, n, WriteHandler<10>(*this));
21,727✔
1252
                return;
21,727✔
1253
            case 2:
21,797✔
1254
                write_socket.async_write(write_buffer, n, WriteHandler<100>(*this));
21,797✔
1255
                return;
21,797✔
1256
            case 3:
21,917✔
1257
                write_socket.async_write(write_buffer, n, WriteHandler<1000>(*this));
21,917✔
1258
                return;
21,917✔
1259
        }
86,848✔
1260
        REALM_ASSERT(false);
×
1261
    }
×
1262

1263
    template <int size>
1264
    struct ReadHandler {
1265
        ReadHandler(AsyncReadWriteRealloc& s)
1266
            : state(s)
43,632✔
1267
        {
87,125✔
1268
        }
87,125✔
1269
        void operator()(std::error_code ec, size_t n)
1270
        {
87,125✔
1271
            if (ec && ec != MiscExtErrors::end_of_input)
87,125!
1272
                throw std::system_error(ec);
×
1273
            state.num_bytes_read += n;
87,125✔
1274
            if (ec != MiscExtErrors::end_of_input)
87,125✔
1275
                state.initiate_read();
87,123✔
1276
        }
87,125✔
1277
        AsyncReadWriteRealloc& state;
1278
        char strut[size];
1279
    };
1280

1281
    void initiate_read()
1282
    {
87,125✔
1283
        int v = random.draw_int_max(3);
87,125✔
1284
        size_t n = size_t(v);
87,125✔
1285
        switch (v) {
87,125✔
1286
            case 0:
21,673✔
1287
                read_socket.async_read(read_buffer, n, rab, ReadHandler<1>(*this));
21,673✔
1288
                return;
21,673✔
1289
            case 1:
21,736✔
1290
                read_socket.async_read(read_buffer, n, rab, ReadHandler<10>(*this));
21,736✔
1291
                return;
21,736✔
1292
            case 2:
21,810✔
1293
                read_socket.async_read(read_buffer, n, rab, ReadHandler<100>(*this));
21,810✔
1294
                return;
21,810✔
1295
            case 3:
21,906✔
1296
                read_socket.async_read(read_buffer, n, rab, ReadHandler<1000>(*this));
21,906✔
1297
                return;
21,906✔
1298
        }
87,125✔
1299
        REALM_ASSERT(false);
×
1300
    }
×
1301
};
1302

1303
} // unnamed namespace
1304

1305
TEST(Network_AsyncReadWriteRealloc)
1306
{
2✔
1307
    // Use progressively larger completion handlers to check that memory
1308
    // reallocation works
1309

1310
    AsyncReadWriteRealloc state;
2✔
1311
    connect_sockets(state.read_socket, state.write_socket);
2✔
1312
    state.initiate_read();
2✔
1313
    state.initiate_write();
2✔
1314
    state.service.run();
2✔
1315
    CHECK_EQUAL(state.num_bytes_to_write, state.num_bytes_written);
2✔
1316
    CHECK_EQUAL(state.num_bytes_written, state.num_bytes_read);
2✔
1317
}
2✔
1318

1319

1320
namespace {
1321

1322
char echo_body[] = {'\xC1', '\x2C', '\xEF', '\x48', '\x8C', '\xCD', '\x41', '\xFA', '\x12', '\xF9', '\xF4',
1323
                    '\x72', '\xDF', '\x92', '\x8E', '\x68', '\xAB', '\x8F', '\x6B', '\xDF', '\x80', '\x26',
1324
                    '\xD1', '\x60', '\x21', '\x91', '\x20', '\xC8', '\x94', '\x0C', '\xDB', '\x07', '\xB0',
1325
                    '\x1C', '\x3A', '\xDA', '\x5E', '\x9B', '\x62', '\xDE', '\x30', '\xA3', '\x7E', '\xED',
1326
                    '\xB4', '\x30', '\xD7', '\x43', '\x3F', '\xDE', '\xF2', '\x6D', '\x9A', '\x1D', '\xAE',
1327
                    '\xF4', '\xD5', '\xFB', '\xAC', '\xE8', '\x67', '\x37', '\xFD', '\xF3'};
1328

1329
void sync_server(network::Acceptor& acceptor, unit_test::TestContext& test_context)
1330
{
2✔
1331
    network::Service& service = acceptor.get_service();
2✔
1332
    network::Socket socket{service};
2✔
1333
    network::Endpoint endpoint;
2✔
1334
    acceptor.accept(socket, endpoint);
2✔
1335
    socket.set_option(network::SocketBase::no_delay(true));
2✔
1336

1337
    network::ReadAheadBuffer rab;
2✔
1338
    const size_t max_header_size = 32;
2✔
1339
    char header_buffer[max_header_size];
2✔
1340
    size_t n = socket.read_until(header_buffer, max_header_size, '\n', rab);
2✔
1341
    if (!CHECK_GREATER(n, 0))
2✔
1342
        return;
×
1343
    if (!CHECK_LESS_EQUAL(n, max_header_size))
2✔
1344
        return;
×
1345
    if (!CHECK_EQUAL(header_buffer[n - 1], '\n'))
2✔
1346
        return;
×
1347
    MemoryInputStream in;
2✔
1348
    in.set_buffer(header_buffer, header_buffer + (n - 1));
2✔
1349
    in.unsetf(std::ios_base::skipws);
2✔
1350
    std::string message_type;
2✔
1351
    in >> message_type;
2✔
1352
    if (!CHECK_EQUAL(message_type, "echo"))
2✔
1353
        return;
×
1354
    char sp;
2✔
1355
    size_t body_size;
2✔
1356
    in >> sp >> body_size;
2✔
1357
    if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1358
        return;
×
1359
    std::unique_ptr<char[]> body_buffer(new char[body_size]);
2✔
1360
    size_t m = socket.read(body_buffer.get(), body_size, rab);
2✔
1361
    if (!CHECK_EQUAL(m, body_size))
2✔
1362
        return;
×
1363
    MemoryOutputStream out;
2✔
1364
    out.set_buffer(header_buffer, header_buffer + max_header_size);
2✔
1365
    out << "was " << body_size << '\n';
2✔
1366
    socket.write(header_buffer, out.size());
2✔
1367
    socket.write(body_buffer.get(), body_size);
2✔
1368
}
2✔
1369

1370

1371
void sync_client(unsigned short listen_port, unit_test::TestContext& test_context)
1372
{
2✔
1373
    network::Service service;
2✔
1374
    network::Socket socket{service};
2✔
1375
    {
2✔
1376
        std::ostringstream out;
2✔
1377
        out << listen_port;
2✔
1378
        std::string listen_port_2 = out.str();
2✔
1379
        connect_socket(socket, listen_port_2);
2✔
1380
    }
2✔
1381
    socket.set_option(network::SocketBase::no_delay(true));
2✔
1382

1383
    const size_t max_header_size = 32;
2✔
1384
    char header_buffer[max_header_size];
2✔
1385
    MemoryOutputStream out;
2✔
1386
    out.set_buffer(header_buffer, header_buffer + max_header_size);
2✔
1387
    out << "echo " << sizeof echo_body << '\n';
2✔
1388
    socket.write(header_buffer, out.size());
2✔
1389
    socket.write(echo_body, sizeof echo_body);
2✔
1390

1391
    network::ReadAheadBuffer rab;
2✔
1392
    size_t n = socket.read_until(header_buffer, max_header_size, '\n', rab);
2✔
1393
    if (!CHECK_GREATER(n, 0))
2✔
1394
        return;
×
1395
    if (!CHECK_LESS_EQUAL(n, max_header_size))
2✔
1396
        return;
×
1397
    if (!CHECK_EQUAL(header_buffer[n - 1], '\n'))
2✔
1398
        return;
×
1399
    MemoryInputStream in;
2✔
1400
    in.set_buffer(header_buffer, header_buffer + (n - 1));
2✔
1401
    in.unsetf(std::ios_base::skipws);
2✔
1402
    std::string message_type;
2✔
1403
    in >> message_type;
2✔
1404
    if (!CHECK_EQUAL(message_type, "was"))
2✔
1405
        return;
×
1406
    char sp;
2✔
1407
    size_t echo_size;
2✔
1408
    in >> sp >> echo_size;
2✔
1409
    if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1410
        return;
×
1411
    std::unique_ptr<char[]> echo_buffer(new char[echo_size]);
2✔
1412
    size_t m = socket.read(echo_buffer.get(), echo_size, rab);
2✔
1413
    if (!CHECK_EQUAL(m, echo_size))
2✔
1414
        return;
×
1415
    if (!CHECK_EQUAL(echo_size, sizeof echo_body))
2✔
1416
        return;
×
1417
    CHECK(std::equal(echo_body, echo_body + sizeof echo_body, echo_buffer.get()));
2✔
1418
}
2✔
1419

1420
} // anonymous namespace
1421

1422

1423
TEST(Network_Sync)
1424
{
2✔
1425
    network::Service service;
2✔
1426
    network::Acceptor acceptor{service};
2✔
1427
    network::Endpoint listen_endpoint = bind_acceptor(acceptor);
2✔
1428
    network::Endpoint::port_type listen_port = listen_endpoint.port();
2✔
1429

1430
    ThreadWrapper server_thread, client_thread;
2✔
1431
    server_thread.start([&] {
2✔
1432
        sync_server(acceptor, test_context);
2✔
1433
    });
2✔
1434
    client_thread.start([&] {
2✔
1435
        sync_client(listen_port, test_context);
2✔
1436
    });
2✔
1437
    client_thread.join();
2✔
1438
    server_thread.join();
2✔
1439
}
2✔
1440

1441

1442
namespace {
1443

1444
class async_server {
1445
public:
1446
    async_server(unit_test::TestContext& test_context)
1447
        : m_acceptor{m_service}
1✔
1448
        , m_socket{m_service}
1✔
1449
        , m_test_context{test_context}
1✔
1450
    {
2✔
1451
    }
2✔
1452

1453
    unsigned short init()
1454
    {
2✔
1455
        network::Endpoint listen_endpoint = bind_acceptor(m_acceptor);
2✔
1456
        network::Endpoint::port_type listen_port = listen_endpoint.port();
2✔
1457
        return listen_port;
2✔
1458
    }
2✔
1459

1460
    void run()
1461
    {
2✔
1462
        auto handler = [this](std::error_code ec) {
2✔
1463
            handle_accept(ec);
2✔
1464
        };
2✔
1465
        network::Endpoint endpoint;
2✔
1466
        m_acceptor.async_accept(m_socket, endpoint, handler);
2✔
1467
        m_service.run();
2✔
1468
    }
2✔
1469

1470
private:
1471
    network::Service m_service;
1472
    network::Acceptor m_acceptor;
1473
    network::Socket m_socket;
1474
    network::ReadAheadBuffer m_read_ahead_buffer;
1475
    static const size_t s_max_header_size = 32;
1476
    char m_header_buffer[s_max_header_size];
1477
    size_t m_body_size;
1478
    std::unique_ptr<char[]> m_body_buffer;
1479
    unit_test::TestContext& m_test_context;
1480

1481
    void handle_accept(std::error_code ec)
1482
    {
2✔
1483
        if (ec)
2✔
1484
            throw std::system_error(ec);
×
1485
        m_socket.set_option(network::SocketBase::no_delay(true));
2✔
1486
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1487
            handle_read_header(handler_ec, handler_n);
2✔
1488
        };
2✔
1489
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1490
    }
2✔
1491

1492
    void handle_read_header(std::error_code ec, size_t n)
1493
    {
2✔
1494
        if (ec)
2✔
1495
            throw std::system_error(ec);
×
1496
        unit_test::TestContext& test_context = m_test_context;
2✔
1497
        if (!CHECK_GREATER(n, 0))
2✔
1498
            return;
×
1499
        if (!CHECK_LESS_EQUAL(n, s_max_header_size + 0))
2✔
1500
            return;
×
1501
        if (!CHECK_EQUAL(m_header_buffer[n - 1], '\n'))
2✔
1502
            return;
×
1503
        MemoryInputStream in;
2✔
1504
        in.set_buffer(m_header_buffer, m_header_buffer + (n - 1));
2✔
1505
        in.unsetf(std::ios_base::skipws);
2✔
1506
        std::string message_type;
2✔
1507
        in >> message_type;
2✔
1508
        if (!CHECK_EQUAL(message_type, "echo"))
2✔
1509
            return;
×
1510
        char sp;
2✔
1511
        in >> sp >> m_body_size;
2✔
1512
        if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1513
            return;
×
1514
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1515
            handle_read_body(handler_ec, handler_n);
2✔
1516
        };
2✔
1517
        m_body_buffer.reset(new char[m_body_size]);
2✔
1518
        m_socket.async_read(m_body_buffer.get(), m_body_size, m_read_ahead_buffer, handler);
2✔
1519
    }
2✔
1520

1521
    void handle_read_body(std::error_code ec, size_t n)
1522
    {
2✔
1523
        if (ec)
2✔
1524
            throw std::system_error(ec);
×
1525
        unit_test::TestContext& test_context = m_test_context;
2✔
1526
        if (!CHECK_EQUAL(n, m_body_size))
2✔
1527
            return;
×
1528
        MemoryOutputStream out;
2✔
1529
        out.set_buffer(m_header_buffer, m_header_buffer + s_max_header_size);
2✔
1530
        out << "was " << m_body_size << '\n';
2✔
1531
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1532
            handle_write_header(handler_ec);
2✔
1533
        };
2✔
1534
        m_socket.async_write(m_header_buffer, out.size(), handler);
2✔
1535
    }
2✔
1536

1537
    void handle_write_header(std::error_code ec)
1538
    {
2✔
1539
        if (ec)
2✔
1540
            throw std::system_error(ec);
×
1541
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1542
            handle_write_body(handler_ec);
2✔
1543
        };
2✔
1544
        m_socket.async_write(m_body_buffer.get(), m_body_size, handler);
2✔
1545
    }
2✔
1546

1547
    void handle_write_body(std::error_code ec)
1548
    {
2✔
1549
        if (ec)
2✔
1550
            throw std::system_error(ec);
×
1551
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1552
            handle_read_header_2(handler_ec);
2✔
1553
        };
2✔
1554
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1555
    }
2✔
1556

1557
    void handle_read_header_2(std::error_code ec)
1558
    {
2✔
1559
        if (ec && ec != MiscExtErrors::end_of_input)
2✔
1560
            throw std::system_error(ec);
×
1561
        unit_test::TestContext& test_context = m_test_context;
2✔
1562
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
1563
    }
2✔
1564
};
1565

1566

1567
class async_client {
1568
public:
1569
    async_client(unsigned short listen_port, unit_test::TestContext& test_context)
1570
        : m_listen_port(listen_port)
1✔
1571
        , m_socket(m_service)
1✔
1572
        , m_test_context(test_context)
1✔
1573
    {
2✔
1574
    }
2✔
1575

1576
    void run()
1577
    {
2✔
1578
        std::string service;
2✔
1579
        {
2✔
1580
            std::ostringstream out;
2✔
1581
            out << m_listen_port;
2✔
1582
            service = out.str();
2✔
1583
        }
2✔
1584
        connect_socket(m_socket, service);
2✔
1585
        m_socket.set_option(network::SocketBase::no_delay(true));
2✔
1586

1587
        MemoryOutputStream out;
2✔
1588
        out.set_buffer(m_header_buffer, m_header_buffer + s_max_header_size);
2✔
1589
        out << "echo " << sizeof echo_body << '\n';
2✔
1590
        auto handler = [this](std::error_code ec, size_t) {
2✔
1591
            handle_write_header(ec);
2✔
1592
        };
2✔
1593
        m_socket.async_write(m_header_buffer, out.size(), handler);
2✔
1594

1595
        m_service.run();
2✔
1596

1597
        m_socket.close();
2✔
1598
    }
2✔
1599

1600
private:
1601
    unsigned short m_listen_port;
1602
    network::Service m_service;
1603
    network::Socket m_socket;
1604
    network::ReadAheadBuffer m_read_ahead_buffer;
1605
    static const size_t s_max_header_size = 32;
1606
    char m_header_buffer[s_max_header_size];
1607
    size_t m_body_size;
1608
    std::unique_ptr<char[]> m_body_buffer;
1609
    unit_test::TestContext& m_test_context;
1610

1611
    void handle_write_header(std::error_code ec)
1612
    {
2✔
1613
        if (ec)
2✔
1614
            throw std::system_error(ec);
×
1615
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1616
            handle_write_body(handler_ec);
2✔
1617
        };
2✔
1618
        m_socket.async_write(echo_body, sizeof echo_body, handler);
2✔
1619
    }
2✔
1620

1621
    void handle_write_body(std::error_code ec)
1622
    {
2✔
1623
        if (ec)
2✔
1624
            throw std::system_error(ec);
×
1625
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1626
            handle_read_header(handler_ec, handler_n);
2✔
1627
        };
2✔
1628
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1629
    }
2✔
1630

1631
    void handle_read_header(std::error_code ec, size_t n)
1632
    {
2✔
1633
        if (ec)
2✔
1634
            throw std::system_error(ec);
×
1635
        unit_test::TestContext& test_context = m_test_context;
2✔
1636
        if (!CHECK_GREATER(n, 0))
2✔
1637
            return;
×
1638
        if (!CHECK_LESS_EQUAL(n, s_max_header_size + 0))
2✔
1639
            return;
×
1640
        if (!CHECK_EQUAL(m_header_buffer[n - 1], '\n'))
2✔
1641
            return;
×
1642
        MemoryInputStream in;
2✔
1643
        in.set_buffer(m_header_buffer, m_header_buffer + (n - 1));
2✔
1644
        in.unsetf(std::ios_base::skipws);
2✔
1645
        std::string message_type;
2✔
1646
        in >> message_type;
2✔
1647
        if (!CHECK_EQUAL(message_type, "was"))
2✔
1648
            return;
×
1649
        char sp;
2✔
1650
        in >> sp >> m_body_size;
2✔
1651
        if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1652
            return;
×
1653
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1654
            handle_read_body(handler_ec, handler_n);
2✔
1655
        };
2✔
1656
        m_body_buffer.reset(new char[m_body_size]);
2✔
1657
        m_socket.async_read(m_body_buffer.get(), m_body_size, m_read_ahead_buffer, handler);
2✔
1658
    }
2✔
1659

1660
    void handle_read_body(std::error_code ec, size_t n)
1661
    {
2✔
1662
        if (ec)
2✔
1663
            throw std::system_error(ec);
×
1664
        unit_test::TestContext& test_context = m_test_context;
2✔
1665
        if (!CHECK_EQUAL(n, m_body_size))
2✔
1666
            return;
×
1667
        if (!CHECK_EQUAL(m_body_size, sizeof echo_body))
2✔
1668
            return;
×
1669
        CHECK(std::equal(echo_body, echo_body + sizeof echo_body, m_body_buffer.get()));
2✔
1670
    }
2✔
1671
};
1672

1673
} // anonymous namespace
1674

1675

1676
TEST(Network_Async)
1677
{
2✔
1678
    async_server server(test_context);
2✔
1679
    unsigned short listen_port = server.init();
2✔
1680
    async_client client(listen_port, test_context);
2✔
1681

1682
    ThreadWrapper server_thread, client_thread;
2✔
1683
    server_thread.start([&] {
2✔
1684
        server.run();
2✔
1685
    });
2✔
1686
    client_thread.start([&] {
2✔
1687
        client.run();
2✔
1688
    });
2✔
1689
    CHECK_NOT(client_thread.join());
2✔
1690
    CHECK_NOT(server_thread.join());
2✔
1691
}
2✔
1692

1693

1694
TEST(Network_HeavyAsyncPost)
1695
{
2✔
1696
    network::Service service;
2✔
1697
    network::DeadlineTimer dummy_timer{service};
2✔
1698
    dummy_timer.async_wait(std::chrono::hours(10000), [](Status) {});
2✔
1699

1700
    ThreadWrapper looper_thread;
2✔
1701
    looper_thread.start([&] {
2✔
1702
        service.run();
2✔
1703
    });
2✔
1704

1705
    std::vector<std::pair<int, long>> entries;
2✔
1706
    const long num_iterations = 10000L;
2✔
1707
    auto func = [&](int thread_index) {
16✔
1708
        for (long i = 0; i < num_iterations; ++i)
159,782✔
1709
            service.post([&entries, thread_index, i](Status) {
160,000✔
1710
                entries.emplace_back(thread_index, i);
160,000✔
1711
            });
160,000✔
1712
    };
16✔
1713

1714
    const int num_threads = 8;
2✔
1715
    std::unique_ptr<ThreadWrapper[]> threads(new ThreadWrapper[num_threads]);
2✔
1716
    for (int i = 0; i < num_threads; ++i)
18✔
1717
        threads[i].start([&func, i] {
16✔
1718
            func(i);
16✔
1719
        });
16✔
1720
    for (int i = 0; i < num_threads; ++i)
18✔
1721
        CHECK_NOT(threads[i].join());
16✔
1722

1723
    service.post([&](Status) {
2✔
1724
        dummy_timer.cancel();
2✔
1725
    });
2✔
1726
    CHECK_NOT(looper_thread.join());
2✔
1727

1728
    // Check that every post operation ran exactly once
1729
    using longlong = long long;
2✔
1730
    if (CHECK_EQUAL(num_threads * longlong(num_iterations), entries.size())) {
2✔
1731
        bool every_post_operation_ran_exactly_once = true;
2✔
1732
        std::sort(entries.begin(), entries.end());
2✔
1733
        auto i = entries.begin();
2✔
1734
        for (int i_1 = 0; i_1 < num_threads; ++i_1) {
18✔
1735
            for (long i_2 = 0; i_2 < num_iterations; ++i_2) {
160,016✔
1736
                int thread_index = i->first;
160,000✔
1737
                long iteration_index = i->second;
160,000✔
1738
                if (i_1 != thread_index || i_2 != iteration_index) {
160,000✔
1739
                    every_post_operation_ran_exactly_once = false;
×
1740
                    break;
×
1741
                }
×
1742
                ++i;
160,000✔
1743
            }
160,000✔
1744
        }
16✔
1745
        CHECK(every_post_operation_ran_exactly_once);
2✔
1746
    }
2✔
1747
}
2✔
1748

1749

1750
TEST(Network_RepeatedCancelAndRestartRead)
1751
{
2✔
1752
    Random random{random_int<unsigned long>()}; // Seed from slow global generator
2✔
1753
    for (int i = 0; i < 1; ++i) {
4✔
1754
        network::Service service_1, service_2;
2✔
1755
        network::Socket socket_1{service_1}, socket_2{service_2};
2✔
1756
        connect_sockets(socket_1, socket_2);
2✔
1757
        network::ReadAheadBuffer rab;
2✔
1758

1759
        const size_t read_buffer_size = 1024;
2✔
1760
        char read_buffer[read_buffer_size];
2✔
1761
        size_t num_bytes_read = 0;
2✔
1762
        bool end_of_input_seen = false;
2✔
1763
        realm::util::UniqueFunction<void()> initiate_read = [&] {
132,735✔
1764
            auto handler = [&](std::error_code ec, size_t n) {
132,735✔
1765
                num_bytes_read += n;
132,735✔
1766
                if (ec == MiscExtErrors::end_of_input) {
132,735✔
1767
                    end_of_input_seen = true;
2✔
1768
                    return;
2✔
1769
                }
2✔
1770
                CHECK(!ec || ec == error::operation_aborted);
132,733✔
1771
                initiate_read();
132,733✔
1772
            };
132,733✔
1773
            socket_2.async_read(read_buffer, read_buffer_size, rab, handler);
132,735✔
1774
        };
132,735✔
1775
        initiate_read();
2✔
1776

1777
        auto thread_func = [&] {
2✔
1778
            try {
2✔
1779
                service_2.run();
2✔
1780
            }
2✔
1781
            catch (...) {
2✔
1782
                socket_2.close();
×
1783
                throw;
×
1784
            }
×
1785
        };
2✔
1786
        ThreadWrapper thread;
2✔
1787
        thread.start(thread_func);
2✔
1788

1789
        const size_t write_buffer_size = 1024;
2✔
1790
        const char write_buffer[write_buffer_size] = {'\0'};
2✔
1791
        size_t num_bytes_to_write = 0x4000000; // 64 MiB
2✔
1792
        size_t num_bytes_written = 0;
2✔
1793
        while (num_bytes_written < num_bytes_to_write) {
262,084✔
1794
            size_t n =
262,082✔
1795
                std::min(random.draw_int<size_t>(1, write_buffer_size), num_bytes_to_write - num_bytes_written);
262,082✔
1796
            socket_1.write(write_buffer, n);
262,082✔
1797
            num_bytes_written += n;
262,082✔
1798
            service_2.post([&](Status) {
262,082✔
1799
                socket_2.cancel();
262,082✔
1800
            });
262,082✔
1801
        }
262,082✔
1802
        socket_1.close();
2✔
1803

1804
        CHECK_NOT(thread.join());
2✔
1805
        CHECK_EQUAL(num_bytes_written, num_bytes_read);
2✔
1806
    }
2✔
1807
}
2✔
1808

1809

1810
TEST(Network_StressTest)
1811
{
2✔
1812
    network::Service service_1, service_2;
2✔
1813
    network::Socket socket_1{service_1}, socket_2{service_2};
2✔
1814
    connect_sockets(socket_1, socket_2);
2✔
1815
    constexpr size_t original_size = 0x100000; // 1MiB
2✔
1816
    std::unique_ptr<char[]> original_1, original_2;
2✔
1817
    original_1.reset(new char[original_size]);
2✔
1818
    original_2.reset(new char[original_size]);
2✔
1819
    {
2✔
1820
        std::mt19937_64 prng{std::random_device()()};
2✔
1821
        std::uniform_int_distribution<int> dist(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
2✔
1822
        log("Initializing...");
2✔
1823
        for (size_t i = 0; i < original_size; ++i)
2,097,154✔
1824
            original_1[i] = dist(prng);
2,097,152✔
1825
        for (size_t i = 0; i < original_size; ++i)
2,097,154✔
1826
            original_2[i] = dist(prng);
2,097,152✔
1827
        log("Initialized");
2✔
1828
    }
2✔
1829

1830
    struct Stats {
2✔
1831
        std::uint_fast64_t num_cancellations = 0;
2✔
1832
        std::uint_fast64_t num_reads = 0, num_canceled_reads = 0;
2✔
1833
        std::uint_fast64_t num_writes = 0, num_canceled_writes = 0;
2✔
1834
    };
2✔
1835

1836
#ifdef _WIN32 // slow
1837
    constexpr int num_cycles = 16;
1838
#else
1839
    constexpr int num_cycles = 512;
2✔
1840
#endif
2✔
1841
    auto thread = [&](int id, network::Socket& socket, const char* read_original, const char* write_original,
2✔
1842
                      Stats& stats) {
4✔
1843
        std::unique_ptr<char[]> read_buffer{new char[original_size]};
4✔
1844
        std::mt19937_64 prng{std::random_device()()};
4✔
1845
        std::uniform_int_distribution<size_t> read_write_size_dist(1, 32 * 1024);
4✔
1846
        std::uniform_int_distribution<int> delayed_read_write_dist(0, 49);
4✔
1847
        network::Service& service = socket.get_service();
4✔
1848
        network::DeadlineTimer cancellation_timer{service};
4✔
1849
        network::DeadlineTimer read_timer{service};
4✔
1850
        network::DeadlineTimer write_timer{service};
4✔
1851
        std::uint_fast64_t microseconds_per_cancellation = 10;
4✔
1852
        bool progress = false;
4✔
1853
        bool read_done = false, write_done = false;
4✔
1854
        realm::util::UniqueFunction<void()> shedule_cancellation = [&] {
198,865✔
1855
            if (progress) {
198,865✔
1856
                microseconds_per_cancellation /= 2;
134,630✔
1857
                progress = false;
134,630✔
1858
            }
134,630✔
1859
            else {
64,235✔
1860
                microseconds_per_cancellation *= 2;
64,235✔
1861
            }
64,235✔
1862
            if (microseconds_per_cancellation < 10)
198,865✔
1863
                microseconds_per_cancellation = 10;
69,906✔
1864
            cancellation_timer.async_wait(std::chrono::microseconds(microseconds_per_cancellation),
198,865✔
1865
                                          [&](Status status) {
198,865✔
1866
                                              REALM_ASSERT(status.is_ok() || status == ErrorCodes::OperationAborted);
198,682✔
1867
                                              if (status == ErrorCodes::OperationAborted)
198,682✔
1868
                                                  return;
4✔
1869
                                              if (read_done && write_done)
198,678!
1870
                                                  return;
×
1871
                                              socket.cancel();
198,678✔
1872
                                              ++stats.num_cancellations;
198,678✔
1873
                                              shedule_cancellation();
198,678✔
1874
                                          });
198,678✔
1875
        };
198,865✔
1876
        shedule_cancellation();
4✔
1877
        char* read_begin = read_buffer.get();
4✔
1878
        char* read_end = read_buffer.get() + original_size;
4✔
1879
        int num_read_cycles = 0;
4✔
1880
        realm::util::UniqueFunction<void()> read = [&] {
307,220✔
1881
            if (read_begin == read_end) {
307,220✔
1882
                //                log("<R%1>", id);
1883
                CHECK(std::equal(read_original, read_original + original_size, read_buffer.get()));
2,048✔
1884
                ++num_read_cycles;
2,048✔
1885
                if (num_read_cycles == num_cycles) {
2,048✔
1886
                    log("End of read %1", id);
4✔
1887
                    read_done = true;
4✔
1888
                    if (write_done)
4✔
1889
                        cancellation_timer.cancel();
4✔
1890
                    return;
4✔
1891
                }
4✔
1892
                read_begin = read_buffer.get();
2,044✔
1893
                read_end = read_buffer.get() + original_size;
2,044✔
1894
            }
2,044✔
1895
            auto handler = [&](std::error_code ec, size_t n) {
307,528✔
1896
                REALM_ASSERT(!ec || ec == error::operation_aborted);
307,528✔
1897
                ++stats.num_reads;
307,528✔
1898
                if (ec == error::operation_aborted) {
307,528✔
1899
                    ++stats.num_canceled_reads;
173,791✔
1900
                }
173,791✔
1901
                else {
133,737✔
1902
                    read_begin += n;
133,737✔
1903
                    progress = true;
133,737✔
1904
                }
133,737✔
1905
                if (delayed_read_write_dist(prng) == 0) {
307,528✔
1906
                    read_timer.async_wait(std::chrono::microseconds(100), [&](Status status) {
6,147✔
1907
                        REALM_ASSERT(status.is_ok());
6,147✔
1908
                        read();
6,147✔
1909
                    });
6,147✔
1910
                }
6,147✔
1911
                else {
301,381✔
1912
                    read();
301,381✔
1913
                }
301,381✔
1914
            };
307,528✔
1915
            char* buffer = read_begin;
307,216✔
1916
            size_t size = read_write_size_dist(prng);
307,216✔
1917
            size_t max_size = read_end - read_begin;
307,216✔
1918
            if (size > max_size)
307,216✔
1919
                size = max_size;
4,764✔
1920
            socket.async_read_some(buffer, size, std::move(handler));
307,216✔
1921
        };
307,216✔
1922
        read();
4✔
1923
        const char* write_begin = write_original;
4✔
1924
        const char* write_end = write_original + original_size;
4✔
1925
        int num_write_cycles = 0;
4✔
1926
        realm::util::UniqueFunction<void()> write = [&] {
305,341✔
1927
            if (write_begin == write_end) {
305,341✔
1928
                //                log("<W%1>", id);
1929
                ++num_write_cycles;
2,048✔
1930
                if (num_write_cycles == num_cycles) {
2,048✔
1931
                    log("End of write %1", id);
4✔
1932
                    write_done = true;
4✔
1933
                    if (read_done)
4✔
UNCOV
1934
                        cancellation_timer.cancel();
×
1935
                    socket.shutdown(network::Socket::shutdown_send);
4✔
1936
                    log("Properly shut down %1", id);
4✔
1937
                    return;
4✔
1938
                }
4✔
1939
                write_begin = write_original;
2,044✔
1940
                write_end = write_original + original_size;
2,044✔
1941
            }
2,044✔
1942
            auto handler = [&](std::error_code ec, size_t n) {
306,007✔
1943
                REALM_ASSERT(!ec || ec == error::operation_aborted);
305,997✔
1944
                ++stats.num_writes;
305,997✔
1945
                if (ec == error::operation_aborted) {
305,997✔
1946
                    ++stats.num_canceled_writes;
173,832✔
1947
                }
173,832✔
1948
                else {
132,165✔
1949
                    write_begin += n;
132,165✔
1950
                    progress = true;
132,165✔
1951
                }
132,165✔
1952
                if (delayed_read_write_dist(prng) == 0) {
305,997✔
1953
                    write_timer.async_wait(std::chrono::microseconds(100), [&](Status status) {
6,048✔
1954
                        REALM_ASSERT(status.is_ok());
6,048✔
1955
                        write();
6,048✔
1956
                    });
6,048✔
1957
                }
6,047✔
1958
                else {
299,950✔
1959
                    write();
299,950✔
1960
                }
299,950✔
1961
            };
305,997✔
1962
            const char* data = write_begin;
305,337✔
1963
            size_t size = read_write_size_dist(prng);
305,337✔
1964
            size_t max_size = write_end - write_begin;
305,337✔
1965
            if (size > max_size)
305,337✔
1966
                size = max_size;
4,822✔
1967
            socket.async_write_some(data, size, std::move(handler));
305,337✔
1968
        };
305,337✔
1969
        write();
4✔
1970
        service.run();
4✔
1971
    };
4✔
1972

1973
    Stats stats_1, stats_2;
2✔
1974
    std::thread thread_1{[&] {
2✔
1975
        thread(1, socket_1, original_1.get(), original_2.get(), stats_1);
2✔
1976
    }};
2✔
1977
    std::thread thread_2{[&] {
2✔
1978
        thread(2, socket_2, original_2.get(), original_1.get(), stats_2);
2✔
1979
    }};
2✔
1980
    thread_1.join();
2✔
1981
    thread_2.join();
2✔
1982

1983
    char ch;
2✔
1984
    CHECK_SYSTEM_ERROR(socket_1.read_some(&ch, 1), MiscExtErrors::end_of_input);
2✔
1985
    CHECK_SYSTEM_ERROR(socket_2.read_some(&ch, 1), MiscExtErrors::end_of_input);
2✔
1986

1987
    log("Cancellations: %1, %2", stats_1.num_cancellations, stats_2.num_cancellations);
2✔
1988
    log("Reads:  %1 (%2 canceled), %3 (%4 canceled)", stats_1.num_reads, stats_1.num_canceled_reads,
2✔
1989
        stats_2.num_reads, stats_2.num_canceled_reads);
2✔
1990
    log("Writes: %1 (%2 canceled), %3 (%4 canceled)", stats_1.num_writes, stats_1.num_canceled_writes,
2✔
1991
        stats_2.num_writes, stats_2.num_canceled_writes);
2✔
1992
}
2✔
1993

1994

1995
TEST(Sync_Trigger_Basics)
1996
{
2✔
1997
    network::Service service;
2✔
1998

1999
    // Check that triggering works
2000
    bool was_triggered = false;
2✔
2001
    auto func = [&](realm::Status) {
4✔
2002
        was_triggered = true;
4✔
2003
    };
4✔
2004
    Trigger<network::Service> trigger(&service, std::move(func));
2✔
2005
    trigger.trigger();
2✔
2006
    service.run();
2✔
2007
    CHECK(was_triggered);
2✔
2008

2009
    // Check that the function is not called without triggering
2010
    was_triggered = false;
2✔
2011
    service.run();
2✔
2012
    CHECK_NOT(was_triggered);
2✔
2013

2014
    // Check double-triggering
2015
    was_triggered = false;
2✔
2016
    trigger.trigger();
2✔
2017
    trigger.trigger();
2✔
2018
    service.run();
2✔
2019
    CHECK(was_triggered);
2✔
2020

2021
    // Check that retriggering from triggered function works
2022
    realm::util::UniqueFunction<void()> func_2;
2✔
2023
    Trigger<network::Service> trigger_2(&service, [&](realm::Status) {
4✔
2024
        func_2();
4✔
2025
    });
4✔
2026
    was_triggered = false;
2✔
2027
    bool was_triggered_twice = false;
2✔
2028
    func_2 = [&] {
4✔
2029
        if (was_triggered) {
4✔
2030
            was_triggered_twice = true;
2✔
2031
        }
2✔
2032
        else {
2✔
2033
            was_triggered = true;
2✔
2034
            trigger_2.trigger();
2✔
2035
        }
2✔
2036
    };
4✔
2037
    trigger_2.trigger();
2✔
2038
    service.run();
2✔
2039
    CHECK(was_triggered_twice);
2✔
2040

2041
    // Check that the function is not called after destruction of the Trigger
2042
    // object
2043
    was_triggered = false;
2✔
2044
    {
2✔
2045
        auto func_3 = [&](realm::Status) {
2✔
2046
            was_triggered = true;
×
2047
        };
×
2048
        Trigger<network::Service> trigger_3(&service, std::move(func_3));
2✔
2049
        trigger_3.trigger();
2✔
2050
    }
2✔
2051
    service.run();
2✔
2052
    CHECK_NOT(was_triggered);
2✔
2053

2054
    // Check that two functions can be triggered in an overlapping fashion
2055
    bool was_triggered_4 = false;
2✔
2056
    bool was_triggered_5 = false;
2✔
2057
    auto func_4 = [&](realm::Status) {
2✔
2058
        was_triggered_4 = true;
2✔
2059
    };
2✔
2060
    auto func_5 = [&](realm::Status) {
2✔
2061
        was_triggered_5 = true;
2✔
2062
    };
2✔
2063
    Trigger<network::Service> trigger_4(&service, std::move(func_4));
2✔
2064
    Trigger<network::Service> trigger_5(&service, std::move(func_5));
2✔
2065
    trigger_4.trigger();
2✔
2066
    trigger_5.trigger();
2✔
2067
    service.run();
2✔
2068
    CHECK(was_triggered_4);
2✔
2069
    CHECK(was_triggered_5);
2✔
2070
}
2✔
2071

2072

2073
TEST(Sync_Trigger_ThreadSafety)
2074
{
2✔
2075
    network::Service service;
2✔
2076
    network::DeadlineTimer keep_alive{service};
2✔
2077
    keep_alive.async_wait(std::chrono::hours(10000), [](Status) {});
2✔
2078
    long n_1 = 0, n_2 = 0;
2✔
2079
    std::atomic<bool> flag{false};
2✔
2080
    auto func = [&](realm::Status) {
7,109✔
2081
        ++n_1;
7,109✔
2082
        if (flag)
7,109✔
2083
            ++n_2;
2✔
2084
    };
7,109✔
2085
    Trigger<network::Service> trigger(&service, std::move(func));
2✔
2086
    ThreadWrapper thread;
2✔
2087
    thread.start([&] {
2✔
2088
        service.run();
2✔
2089
    });
2✔
2090
    long m = 1000000;
2✔
2091
    for (long i = 0; i < m; ++i)
2,000,002✔
2092
        trigger.trigger();
2,000,000✔
2093
    flag = true;
2✔
2094
    trigger.trigger();
2✔
2095
    service.post([&](Status) {
2✔
2096
        keep_alive.cancel();
2✔
2097
    });
2✔
2098
    CHECK_NOT(thread.join());
2✔
2099
    CHECK_GREATER_EQUAL(n_1, 1);
2✔
2100
    CHECK_LESS_EQUAL(n_1, m + 1);
2✔
2101
    CHECK_GREATER_EQUAL(n_2, 1);
2✔
2102
    CHECK_LESS_EQUAL(n_2, 2);
2✔
2103
}
2✔
2104

2105

2106
TEST(Network_AsyncResolve_Basics)
2107
{
2✔
2108
    network::Service service;
2✔
2109
    network::Resolver resolver{service};
2✔
2110
    network::Resolver::Query query("localhost", "");
2✔
2111
    bool was_called = false;
2✔
2112
    auto handler = [&](std::error_code ec, network::Endpoint::List endpoints) {
2✔
2113
        CHECK_NOT(ec);
2✔
2114
        CHECK_GREATER(endpoints.size(), 0);
2✔
2115
        was_called = true;
2✔
2116
    };
2✔
2117
    resolver.async_resolve(query, std::move(handler));
2✔
2118
    service.run();
2✔
2119
    CHECK(was_called);
2✔
2120
}
2✔
2121

2122

2123
TEST(Network_AsyncResolve_Cancellation)
2124
{
2✔
2125
    network::Service service;
2✔
2126
    network::Resolver resolver{service};
2✔
2127
    network::Resolver::Query query("localhost", "");
2✔
2128
    bool was_called = false;
2✔
2129
    auto handler = [&](std::error_code ec, network::Endpoint::List) {
2✔
2130
        CHECK_EQUAL(error::operation_aborted, ec);
2✔
2131
        was_called = true;
2✔
2132
    };
2✔
2133
    resolver.async_resolve(query, std::move(handler));
2✔
2134
    resolver.cancel();
2✔
2135
    service.run();
2✔
2136
    CHECK(was_called);
2✔
2137
}
2✔
2138

2139
} // unnamed namespace
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc