• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_281922

31 Oct 2023 09:13AM UTC coverage: 90.445% (-0.08%) from 90.528%
github_pull_request_281922

Pull #7039

Evergreen

jedelbo
Merge branch 'next-major' into je/global-key
Pull Request #7039: Remove ability to synchronize objects without primary key

95324 of 175822 branches covered (0.0%)

101 of 105 new or added lines in 13 files covered. (96.19%)

238 existing lines in 19 files now uncovered.

232657 of 257235 relevant lines covered (90.45%)

6351359.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.2
/test/test_util_network.cpp
1
#include <algorithm>
2
#include <atomic>
3
#include <stdexcept>
4
#include <sstream>
5
#include <memory>
6
#include <thread>
7

8
#include <realm/status.hpp>
9
#include <realm/util/future.hpp>
10
#include <realm/util/memory_stream.hpp>
11
#include <realm/sync/network/network.hpp>
12
#include <realm/sync/trigger.hpp>
13

14
#include "test.hpp"
15
#include "util/semaphore.hpp"
16

17
using namespace realm;
18
using namespace realm::util;
19
using namespace realm::test_util;
20

21

22
// Test independence and thread-safety
23
// -----------------------------------
24
//
25
// All tests must be thread safe and independent of each other. This
26
// is required because it allows for both shuffling of the execution
27
// order and for parallelized testing.
28
//
29
// In particular, avoid using std::rand() since it is not guaranteed
30
// to be thread safe. Instead use the API offered in
31
// `test/util/random.hpp`.
32
//
33
// All files created in tests must use the TEST_PATH macro (or one of
34
// its friends) to obtain a suitable file system path. See
35
// `test/util/test_path.hpp`.
36
//
37
//
38
// Debugging and the ONLY() macro
39
// ------------------------------
40
//
41
// A simple way of disabling all tests except one called `Foo`, is to
42
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
43
// test suite. Note that you can also use filtering by setting the
44
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
45
// this.
46
//
47
// Another way to debug a particular test, is to copy that test into
48
// `experiments/testcase.cpp` and then run `sh build.sh
49
// check-testcase` (or one of its friends) from the command line.
50

51
using namespace realm::sync;
52

53
namespace {
54

55
network::Endpoint bind_acceptor(network::Acceptor& acceptor)
56
{
80✔
57
    network::Endpoint ep; // Wildcard
80✔
58
    acceptor.open(ep.protocol());
80✔
59
    acceptor.bind(ep);
80✔
60
    ep = acceptor.local_endpoint(); // Get actual bound endpoint
80✔
61
    acceptor.listen();
80✔
62
    return ep;
80✔
63
}
80✔
64

65
void connect_sockets(network::Socket& socket_1, network::Socket& socket_2)
66
{
48✔
67
    network::Service& service_1 = socket_1.get_service();
48✔
68
    network::Service& service_2 = socket_2.get_service();
48✔
69
    network::Acceptor acceptor{service_1};
48✔
70
    network::Endpoint ep = bind_acceptor(acceptor);
48✔
71
    bool accept_occurred = false, connect_occurred = false;
48✔
72
    auto accept_handler = [&](std::error_code ec) {
48✔
73
        REALM_ASSERT(!ec);
48✔
74
        accept_occurred = true;
48✔
75
    };
48✔
76
    auto connect_handler = [&](std::error_code ec) {
48✔
77
        REALM_ASSERT(!ec);
48✔
78
        connect_occurred = true;
48✔
79
    };
48✔
80
    acceptor.async_accept(socket_1, std::move(accept_handler));
48✔
81
    socket_2.async_connect(ep, std::move(connect_handler));
48✔
82
    if (&service_1 == &service_2) {
48✔
83
        service_1.run();
44✔
84
    }
44✔
85
    else {
4✔
86
        std::thread thread{[&] {
4✔
87
            service_1.run();
4✔
88
        }};
4✔
89
        service_2.run();
4✔
90
        thread.join();
4✔
91
    }
4✔
92
    REALM_ASSERT(accept_occurred);
48✔
93
    REALM_ASSERT(connect_occurred);
48✔
94
    socket_1.set_option(network::SocketBase::no_delay(true));
48✔
95
    socket_2.set_option(network::SocketBase::no_delay(true));
48✔
96
}
48✔
97

98
void connect_socket(network::Socket& socket, std::string port)
99
{
4✔
100
    network::Service& service = socket.get_service();
4✔
101
    network::Resolver resolver{service};
4✔
102
    network::Resolver::Query query("localhost", port);
4✔
103
    network::Endpoint::List endpoints = resolver.resolve(query);
4✔
104

2✔
105
    auto i = endpoints.begin();
4✔
106
    auto end = endpoints.end();
4✔
107
    for (;;) {
4✔
108
        std::error_code ec;
4✔
109
        socket.connect(*i, ec);
4✔
110
        if (!ec)
4✔
111
            break;
4✔
UNCOV
112
        socket.close();
×
UNCOV
113
        if (++i == end)
×
114
            throw std::runtime_error("Failed to connect to localhost:" + port);
×
UNCOV
115
    }
×
116
}
4✔
117

118

119
TEST(Network_Hostname)
120
{
2✔
121
    // Just check that we call call network::host_name()
1✔
122
    network::host_name();
2✔
123
}
2✔
124

125

126
TEST(Network_PostOperation)
127
{
2✔
128
    network::Service service;
2✔
129
    int var_1 = 381, var_2 = 743;
2✔
130
    service.post([&](Status status) {
2✔
131
        CHECK(status.is_ok());
2✔
132
        var_1 = 824;
2✔
133
    });
2✔
134
    service.post([&](Status) {
2✔
135
        var_2 = 216;
2✔
136
    });
2✔
137
    CHECK_EQUAL(var_1, 381);
2✔
138
    CHECK_EQUAL(var_2, 743);
2✔
139
    service.run();
2✔
140
    CHECK_EQUAL(var_1, 824);
2✔
141
    CHECK_EQUAL(var_2, 216);
2✔
142
    service.post([&](Status) {
2✔
143
        var_2 = 191;
2✔
144
    });
2✔
145
    service.post([&](Status) {
2✔
146
        var_1 = 476;
2✔
147
    });
2✔
148
    CHECK_EQUAL(var_1, 824);
2✔
149
    CHECK_EQUAL(var_2, 216);
2✔
150
    service.run();
2✔
151
    CHECK_EQUAL(var_1, 476);
2✔
152
    CHECK_EQUAL(var_2, 191);
2✔
153
}
2✔
154

155

156
TEST(Network_RunUntilStopped)
157
{
2✔
158
    network::Service service;
2✔
159
    auto post_to_service = [&](util::UniqueFunction<void()> func = {}) {
8✔
160
        auto [promise, future] = util::make_promise_future<void>();
8✔
161
        service.post([promise = std::move(promise), func = std::move(func)](Status s) mutable {
8✔
162
            if (!s.is_ok()) {
8✔
163
                promise.set_error(s);
×
164
                return;
×
165
            }
×
166
            if (func) {
8✔
167
                func();
2✔
168
            }
2✔
169
            promise.emplace_value();
8✔
170
        });
8✔
171
        return std::move(future);
8✔
172
    };
8✔
173

1✔
174
    auto before_run = post_to_service();
2✔
175

1✔
176
    auto [thread_stopped_promise, thread_stopped_future] = util::make_promise_future<void>();
2✔
177
    std::thread thread([&service, promise = std::move(thread_stopped_promise)]() mutable {
2✔
178
        service.run_until_stopped();
2✔
179
        promise.emplace_value();
2✔
180
    });
2✔
181

1✔
182
    before_run.get();
2✔
183
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
184
    // Post while it's running. This should get fulfilled immediately.
1✔
185
    post_to_service().get();
2✔
186
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
187

1✔
188
    util::Optional<util::Future<void>> timer_ran_future;
2✔
189
    network::DeadlineTimer timer(service);
2✔
190
    post_to_service([&] {
2✔
191
        auto [promise, future] = util::make_promise_future<void>();
2✔
192
        timer.async_wait(std::chrono::milliseconds{250}, [promise = std::move(promise)](Status s) mutable {
2✔
193
            if (!s.is_ok()) {
2✔
194
                promise.set_error(s);
×
195
                return;
×
196
            }
×
197
            promise.emplace_value();
2✔
198
        });
2✔
199
        timer_ran_future = std::move(future);
2✔
200
    }).get();
2✔
201

1✔
202
    CHECK_NOT(thread_stopped_future.is_ready());
2✔
203
    CHECK(timer_ran_future);
2✔
204
    timer_ran_future->get();
2✔
205

1✔
206
    service.stop();
2✔
207
    thread.join();
2✔
208

1✔
209
    thread_stopped_future.get();
2✔
210

1✔
211
    auto after_stop = post_to_service();
2✔
212
    CHECK_NOT(after_stop.is_ready());
2✔
213

1✔
214
    service.reset();
2✔
215
    service.run();
2✔
216
    after_stop.get();
2✔
217
}
2✔
218

219
TEST(Network_EventLoopStopAndReset_1)
220
{
2✔
221
    network::Service service;
2✔
222

1✔
223
    // Prestop
1✔
224
    int var = 381;
2✔
225
    service.stop();
2✔
226
    service.post([&](Status) {
2✔
227
        var = 824;
2✔
228
    });
2✔
229
    service.run(); // Must return immediately
2✔
230
    CHECK_EQUAL(var, 381);
2✔
231
    service.run(); // Must still return immediately
2✔
232
    CHECK_EQUAL(var, 381);
2✔
233

1✔
234
    // Reset
1✔
235
    service.reset();
2✔
236
    service.post([&](Status) {
2✔
237
        var = 824;
2✔
238
    });
2✔
239
    CHECK_EQUAL(var, 381);
2✔
240
    service.run();
2✔
241
    CHECK_EQUAL(var, 824);
2✔
242
    service.post([&](Status) {
2✔
243
        var = 476;
2✔
244
    });
2✔
245
    CHECK_EQUAL(var, 824);
2✔
246
    service.run();
2✔
247
    CHECK_EQUAL(var, 476);
2✔
248
}
2✔
249

250

251
TEST(Network_EventLoopStopAndReset_2)
252
{
2✔
253
    // Introduce a blocking operation that will keep the event loop running
1✔
254
    network::Service service;
2✔
255
    network::Acceptor acceptor{service};
2✔
256
    bind_acceptor(acceptor);
2✔
257
    network::Socket socket{service};
2✔
258
    acceptor.async_accept(socket, [](std::error_code) {});
2✔
259

1✔
260
    // Start event loop execution in the background
1✔
261
    ThreadWrapper thread_1;
2✔
262
    thread_1.start([&] {
2✔
263
        service.run();
2✔
264
    });
2✔
265

1✔
266
    // Check that the event loop is actually running
1✔
267
    BowlOfStonesSemaphore bowl_1; // Empty
2✔
268
    service.post([&](Status) {
2✔
269
        bowl_1.add_stone();
2✔
270
    });
2✔
271
    bowl_1.get_stone(); // Block until the stone is added
2✔
272

1✔
273
    // Stop the event loop
1✔
274
    service.stop();
2✔
275
    CHECK_NOT(thread_1.join());
2✔
276

1✔
277
    // Check that the event loop remains in the stopped state
1✔
278
    int var = 381;
2✔
279
    service.post([&](Status) {
2✔
280
        var = 824;
2✔
281
    });
2✔
282
    CHECK_EQUAL(var, 381);
2✔
283
    service.run(); // Still stopped, so run() must return immediately
2✔
284
    CHECK_EQUAL(var, 381);
2✔
285

1✔
286
    // Put the event loop back into the unstopped state, and restart it in the
1✔
287
    // background
1✔
288
    service.reset();
2✔
289
    ThreadWrapper thread_2;
2✔
290
    thread_2.start([&] {
2✔
291
        service.run();
2✔
292
    });
2✔
293

1✔
294
    // Check that the event loop is actually running
1✔
295
    BowlOfStonesSemaphore bowl_2; // Empty
2✔
296
    service.post([&](Status) {
2✔
297
        bowl_2.add_stone();
2✔
298
    });
2✔
299
    bowl_2.get_stone(); // Block until the stone is added
2✔
300

1✔
301
    // Stop the event loop by canceling the blocking operation
1✔
302
    service.post([&](Status) {
2✔
303
        acceptor.cancel();
2✔
304
    });
2✔
305
    CHECK_NOT(thread_2.join());
2✔
306

1✔
307
    CHECK_EQUAL(var, 824);
2✔
308
}
2✔
309

310

311
TEST(Network_GetSetSocketOption)
312
{
2✔
313
    network::Service service;
2✔
314
    network::Socket socket{service};
2✔
315
    socket.open(network::StreamProtocol::ip_v4());
2✔
316
    network::Socket::reuse_address opt_reuse_addr;
2✔
317
    socket.get_option(opt_reuse_addr);
2✔
318
    CHECK_NOT(opt_reuse_addr.value());
2✔
319
    socket.set_option(network::Socket::reuse_address(true));
2✔
320
    socket.get_option(opt_reuse_addr);
2✔
321
    CHECK(opt_reuse_addr.value());
2✔
322
}
2✔
323

324

325
TEST(Network_AsyncConnectAndAsyncAccept)
326
{
2✔
327
    network::Service service;
2✔
328
    network::Acceptor acceptor{service};
2✔
329
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
330
    network::Socket socket_1{service}, socket_2{service};
2✔
331
    bool connected = false;
2✔
332
    auto connect_handler = [&](std::error_code ec) {
2✔
333
        if (ec)
2✔
334
            throw std::system_error(ec);
×
335
        connected = true;
2✔
336
        log("connected");
2✔
337
    };
2✔
338
    bool accepted = false;
2✔
339
    auto accept_handler = [&](std::error_code ec) {
2✔
340
        if (ec)
2✔
341
            throw std::system_error(ec);
×
342
        accepted = true;
2✔
343
        log("accepted");
2✔
344
    };
2✔
345
    socket_1.async_connect(listening_endpoint, std::move(connect_handler));
2✔
346
    acceptor.async_accept(socket_2, std::move(accept_handler));
2✔
347
    service.run();
2✔
348
    CHECK(connected);
2✔
349
    CHECK(accepted);
2✔
350
}
2✔
351

352

353
TEST(Network_ReadWrite)
354
{
2✔
355
    network::Service service_1;
2✔
356
    network::Acceptor acceptor{service_1};
2✔
357
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
358

1✔
359
    char data[] = {'X', 'F', 'M'};
2✔
360

1✔
361
    auto reader = [&] {
2✔
362
        network::Socket socket_1{service_1};
2✔
363
        acceptor.accept(socket_1);
2✔
364
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
365
        network::ReadAheadBuffer rab;
2✔
366
        char buffer[sizeof data];
2✔
367
        size_t n = socket_1.read(buffer, sizeof data, rab);
2✔
368
        if (CHECK_EQUAL(sizeof data, n))
2✔
369
            CHECK(std::equal(buffer, buffer + n, data));
2✔
370
        std::error_code ec;
2✔
371
        n = socket_1.read(buffer, 1, rab, ec);
2✔
372
        CHECK_EQUAL(0, n);
2✔
373
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
374
    };
2✔
375
    ThreadWrapper thread;
2✔
376
    thread.start(reader);
2✔
377

1✔
378
    network::Service service_2;
2✔
379
    network::Socket socket_2{service_2};
2✔
380
    socket_2.connect(listening_endpoint);
2✔
381
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
382
    socket_2.write(data, sizeof data);
2✔
383
    socket_2.close();
2✔
384

1✔
385
    CHECK_NOT(thread.join());
2✔
386
}
2✔
387

388

389
TEST(Network_ReadWriteNativeHandle)
390
{
2✔
391
    network::Service service_1;
2✔
392
    network::Acceptor acceptor{service_1};
2✔
393
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
394

1✔
395
    char data[] = {'X', 'F', 'M'};
2✔
396

1✔
397
    auto reader = [&] {
2✔
398
        network::Socket socket_1{service_1};
2✔
399
        acceptor.accept(socket_1);
2✔
400
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
401
        network::ReadAheadBuffer rab;
2✔
402
        char buffer[sizeof data];
2✔
403
        size_t n = socket_1.read(buffer, sizeof data, rab);
2✔
404
        if (CHECK_EQUAL(sizeof data, n))
2✔
405
            CHECK(std::equal(buffer, buffer + n, data));
2✔
406
        std::error_code ec;
2✔
407
        n = socket_1.read(buffer, 1, rab, ec);
2✔
408
        CHECK_EQUAL(0, n);
2✔
409
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
410
    };
2✔
411
    ThreadWrapper thread;
2✔
412
    thread.start(reader);
2✔
413

1✔
414
    network::Service service_2;
2✔
415

1✔
416
    // Connect with plain POSIX APIs.
1✔
417
    int family = listening_endpoint.protocol().family();
2✔
418
    int protocol = listening_endpoint.protocol().protocol();
2✔
419
    using native_handle_type = network::SocketBase::native_handle_type;
2✔
420
    native_handle_type sockfd = ::socket(family, SOCK_STREAM, protocol);
2✔
421
    CHECK_GREATER(sockfd, 0);
2✔
422

1✔
423
    int endpoint_size = listening_endpoint.protocol().is_ip_v4() ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
2✔
424
    int ret = ::connect(sockfd, listening_endpoint.data(), endpoint_size);
2✔
425
    CHECK_EQUAL(ret, 0);
2✔
426

1✔
427
    network::Socket socket_2{service_2, listening_endpoint.protocol(), sockfd};
2✔
428
    socket_2.write(data, sizeof data);
2✔
429
    socket_2.close();
2✔
430

1✔
431
    CHECK_NOT(thread.join());
2✔
432
}
2✔
433

434

435
TEST(Network_ReadWriteLargeAmount)
436
{
2✔
437
    network::Service service_1;
2✔
438
    network::Acceptor acceptor{service_1};
2✔
439
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
440

1✔
441
    size_t num_bytes_per_chunk = 1048576L / 2;
2✔
442
    std::unique_ptr<char[]> chunk(new char[num_bytes_per_chunk]);
2✔
443
    for (size_t i = 0; i < num_bytes_per_chunk; ++i)
1,048,578✔
444
        chunk[i] = char(i % 128);
1,048,576✔
445
    int num_chunks = 128;
2✔
446

1✔
447
    auto reader = [&] {
2✔
448
        network::Socket socket_1{service_1};
2✔
449
        acceptor.accept(socket_1);
2✔
450
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
451
        network::ReadAheadBuffer rab;
2✔
452
        size_t buffer_size = 8191; // Prime
2✔
453
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
454
        size_t offset_in_chunk = 0;
2✔
455
        int chunk_index = 0;
2✔
456
        for (;;) {
16,388✔
457
            std::error_code ec;
16,388✔
458
            size_t n = socket_1.read(buffer.get(), buffer_size, rab, ec);
16,388✔
459
            bool equal = true;
16,388✔
460
            for (size_t i = 0; i < n; ++i) {
134,234,116✔
461
                if (chunk[offset_in_chunk] != buffer[i]) {
134,217,728✔
462
                    equal = false;
×
463
                    break;
×
464
                }
×
465
                if (++offset_in_chunk == num_bytes_per_chunk) {
134,217,728✔
466
                    offset_in_chunk = 0;
256✔
467
                    ++chunk_index;
256✔
468
                }
256✔
469
            }
134,217,728✔
470
            CHECK(equal);
16,388✔
471
            if (ec == MiscExtErrors::end_of_input)
16,388✔
472
                break;
2✔
473
            CHECK_NOT(ec);
16,386✔
474
        }
16,386✔
475
        CHECK_EQUAL(0, offset_in_chunk);
2✔
476
        CHECK_EQUAL(num_chunks, chunk_index);
2✔
477
    };
2✔
478
    ThreadWrapper thread;
2✔
479
    thread.start(reader);
2✔
480

1✔
481
    network::Service service_2;
2✔
482
    network::Socket socket_2{service_2};
2✔
483
    socket_2.connect(listening_endpoint);
2✔
484
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
485
    for (int i = 0; i < num_chunks; ++i)
258✔
486
        socket_2.write(chunk.get(), num_bytes_per_chunk);
256✔
487
    socket_2.close();
2✔
488

1✔
489
    CHECK_NOT(thread.join());
2✔
490
}
2✔
491

492

493
TEST(Network_AsyncReadWriteLargeAmount)
494
{
2✔
495
    network::Service service_1;
2✔
496
    network::Acceptor acceptor{service_1};
2✔
497
    network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
498

1✔
499
    size_t num_bytes_per_chunk = 1048576 / 2;
2✔
500
    std::unique_ptr<char[]> chunk(new char[num_bytes_per_chunk]);
2✔
501
    for (size_t i = 0; i < num_bytes_per_chunk; ++i)
1,048,578✔
502
        chunk[i] = char(i % 128);
1,048,576✔
503
    int num_chunks = 128;
2✔
504

1✔
505
    auto reader = [&] {
2✔
506
        network::Socket socket_1{service_1};
2✔
507
        acceptor.accept(socket_1);
2✔
508
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
509
        network::ReadAheadBuffer rab;
2✔
510
        size_t buffer_size = 8191; // Prime
2✔
511
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
512
        size_t offset_in_chunk = 0;
2✔
513
        int chunk_index = 0;
2✔
514
        realm::util::UniqueFunction<void()> read_chunk = [&] {
16,388✔
515
            auto handler = [&](std::error_code ec, size_t n) {
16,388✔
516
                bool equal = true;
16,388✔
517
                for (size_t i = 0; i < n; ++i) {
134,234,116✔
518
                    if (buffer[i] != chunk[offset_in_chunk]) {
134,217,728✔
519
                        equal = false;
×
520
                        break;
×
521
                    }
×
522
                    if (++offset_in_chunk == num_bytes_per_chunk) {
134,217,728✔
523
                        offset_in_chunk = 0;
256✔
524
                        ++chunk_index;
256✔
525
                    }
256✔
526
                }
134,217,728✔
527
                CHECK(equal);
16,388✔
528
                if (ec == MiscExtErrors::end_of_input)
16,388✔
529
                    return;
2✔
530
                CHECK_NOT(ec);
16,386✔
531
                read_chunk();
16,386✔
532
            };
16,386✔
533
            socket_1.async_read(buffer.get(), buffer_size, rab, handler);
16,388✔
534
        };
16,388✔
535
        read_chunk();
2✔
536
        service_1.run();
2✔
537
        CHECK_EQUAL(0, offset_in_chunk);
2✔
538
        CHECK_EQUAL(num_chunks, chunk_index);
2✔
539
    };
2✔
540
    ThreadWrapper thread;
2✔
541
    thread.start(reader);
2✔
542

1✔
543
    network::Service service_2;
2✔
544
    network::Socket socket_2{service_2};
2✔
545
    socket_2.connect(listening_endpoint);
2✔
546
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
547
    std::function<void(int)> write_chunk = [&](int i) {
256✔
548
        auto handler = [this, i, num_chunks, num_bytes_per_chunk, write_chunk](std::error_code ec, size_t n) {
256✔
549
            if (CHECK_NOT(ec)) {
256✔
550
                CHECK_EQUAL(num_bytes_per_chunk, n);
256✔
551
                if (i + 1 == num_chunks)
256✔
552
                    return;
2✔
553
                write_chunk(i + 1);
254✔
554
            }
254✔
555
        };
256✔
556
        socket_2.async_write(chunk.get(), num_bytes_per_chunk, handler);
256✔
557
    };
256✔
558
    write_chunk(0);
2✔
559
    service_2.run();
2✔
560
    socket_2.close();
2✔
561

1✔
562
    CHECK_NOT(thread.join());
2✔
563
}
2✔
564

565

566
TEST(Network_SocketAndAcceptorOpen)
567
{
2✔
568
    network::Service service_1;
2✔
569
    network::Acceptor acceptor{service_1};
2✔
570
    network::Resolver resolver{service_1};
2✔
571
    network::Resolver::Query query("localhost", "",
2✔
572
                                   network::Resolver::Query::passive | network::Resolver::Query::address_configured);
2✔
573
    network::Endpoint::List endpoints = resolver.resolve(query);
2✔
574
    {
2✔
575
        auto i = endpoints.begin();
2✔
576
        auto end = endpoints.end();
2✔
577
        for (;;) {
2✔
578
            std::error_code ec;
2✔
579
            acceptor.open(i->protocol(), ec);
2✔
580
            if (!ec) {
2✔
581
                acceptor.bind(*i, ec);
2✔
582
                if (!ec)
2✔
583
                    break;
2✔
584
                acceptor.close();
×
585
            }
×
586
            if (++i == end)
1!
587
                throw std::runtime_error("Failed to bind to localhost:*");
×
588
        }
×
589
    }
2✔
590
    network::Endpoint listening_endpoint = acceptor.local_endpoint();
2✔
591
    acceptor.listen();
2✔
592
    network::Socket socket_1{service_1};
2✔
593
    ThreadWrapper thread;
2✔
594
    thread.start([&] {
2✔
595
        acceptor.accept(socket_1);
2✔
596
    });
2✔
597

1✔
598
    network::Service service_2;
2✔
599
    network::Socket socket_2{service_2};
2✔
600
    socket_2.open(listening_endpoint.protocol());
2✔
601
    socket_2.connect(listening_endpoint);
2✔
602

1✔
603
    thread.join();
2✔
604
}
2✔
605

606

607
TEST(Network_CancelAsyncAccept)
608
{
2✔
609
    network::Service service;
2✔
610
    network::Acceptor acceptor{service};
2✔
611
    bind_acceptor(acceptor);
2✔
612
    network::Socket socket{service};
2✔
613

1✔
614
    bool accept_was_canceled = false;
2✔
615
    auto handler = [&](std::error_code ec) {
4✔
616
        if (ec == error::operation_aborted)
4✔
617
            accept_was_canceled = true;
4✔
618
    };
4✔
619
    acceptor.async_accept(socket, handler);
2✔
620
    acceptor.cancel();
2✔
621
    service.run();
2✔
622
    CHECK(accept_was_canceled);
2✔
623

1✔
624
    accept_was_canceled = false;
2✔
625
    acceptor.async_accept(socket, handler);
2✔
626
    acceptor.close();
2✔
627
    service.run();
2✔
628
    CHECK(accept_was_canceled);
2✔
629
}
2✔
630

631

632
TEST(Network_CancelAsyncConnect)
633
{
2✔
634
    network::Service service;
2✔
635
    network::Acceptor acceptor{service};
2✔
636
    network::Endpoint ep = bind_acceptor(acceptor);
2✔
637
    network::Socket socket{service};
2✔
638

1✔
639
    bool connect_was_canceled = false;
2✔
640
    auto handler = [&](std::error_code ec) {
4✔
641
        if (ec == error::operation_aborted)
4✔
642
            connect_was_canceled = true;
4✔
643
    };
4✔
644
    socket.async_connect(ep, std::move(handler));
2✔
645
    socket.cancel();
2✔
646
    service.run();
2✔
647
    CHECK(connect_was_canceled);
2✔
648

1✔
649
    connect_was_canceled = false;
2✔
650
    socket.async_connect(ep, std::move(handler));
2✔
651
    socket.close();
2✔
652
    service.run();
2✔
653
    CHECK(connect_was_canceled);
2✔
654
}
2✔
655

656

657
TEST(Network_CancelAsyncReadWrite)
658
{
2✔
659
    network::Service service;
2✔
660
    network::Acceptor acceptor{service};
2✔
661
    bind_acceptor(acceptor);
2✔
662
    network::Socket socket_1{service};
2✔
663
    bool was_accepted = false;
2✔
664
    auto accept_handler = [&](std::error_code ec) {
2✔
665
        if (!ec)
2✔
666
            was_accepted = true;
2✔
667
    };
2✔
668
    acceptor.async_accept(socket_1, accept_handler);
2✔
669
    network::Socket socket_2{service};
2✔
670
    socket_2.connect(acceptor.local_endpoint());
2✔
671
    socket_2.set_option(network::SocketBase::no_delay(true));
2✔
672
    service.run();
2✔
673
    CHECK(was_accepted);
2✔
674
    socket_1.set_option(network::SocketBase::no_delay(true));
2✔
675
    const size_t size = 1;
2✔
676
    char data[size] = {'a'};
2✔
677
    bool write_was_canceled = false;
2✔
678
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
679
        if (ec == error::operation_aborted)
2✔
680
            write_was_canceled = true;
2✔
681
    };
2✔
682
    socket_2.async_write(data, size, write_handler);
2✔
683
    network::ReadAheadBuffer rab;
2✔
684
    char buffer[size];
2✔
685
    bool read_was_canceled = false;
2✔
686
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
687
        if (ec == error::operation_aborted)
2✔
688
            read_was_canceled = true;
2✔
689
    };
2✔
690
    socket_2.async_read(buffer, size, rab, read_handler);
2✔
691
    socket_2.close();
2✔
692
    service.run();
2✔
693
    CHECK(read_was_canceled);
2✔
694
    CHECK(write_was_canceled);
2✔
695
}
2✔
696

697
TEST(Network_CancelEmptyRead)
698
{
2✔
699
    // Make sure that an immediately completable read operation is still
1✔
700
    // cancelable
1✔
701

1✔
702
    network::Service service;
2✔
703
    network::Socket socket_1{service}, socket_2{service};
2✔
704
    connect_sockets(socket_1, socket_2);
2✔
705
    network::ReadAheadBuffer rab;
2✔
706
    const size_t size = 1;
2✔
707
    char data[size] = {'a'};
2✔
708
    bool write_was_canceled = false;
2✔
709
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
710
        if (ec == error::operation_aborted)
2✔
711
            write_was_canceled = true;
2✔
712
    };
2✔
713
    socket_2.async_write(data, size, write_handler);
2✔
714
    char buffer[size];
2✔
715
    bool read_was_canceled = false;
2✔
716
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
717
        if (ec == error::operation_aborted)
2✔
718
            read_was_canceled = true;
2✔
719
    };
2✔
720
    socket_2.async_read(buffer, 0, rab, read_handler);
2✔
721
    socket_2.close();
2✔
722
    service.run();
2✔
723
    CHECK(read_was_canceled);
2✔
724
    CHECK(write_was_canceled);
2✔
725
}
2✔
726

727
TEST(Network_CancelEmptyWrite)
728
{
2✔
729
    // Make sure that an immediately completable write operation is still
1✔
730
    // cancelable
1✔
731

1✔
732
    network::Service service;
2✔
733
    network::Socket socket_1{service}, socket_2{service};
2✔
734
    connect_sockets(socket_1, socket_2);
2✔
735
    network::ReadAheadBuffer rab;
2✔
736
    char buffer[1];
2✔
737
    bool read_was_canceled = false;
2✔
738
    auto read_handler = [&](std::error_code ec, size_t) {
2✔
739
        if (ec == error::operation_aborted)
2✔
740
            read_was_canceled = true;
2✔
741
    };
2✔
742
    socket_2.async_read(buffer, 1, rab, read_handler);
2✔
743
    char data[1] = {'a'};
2✔
744
    bool write_was_canceled = false;
2✔
745
    auto write_handler = [&](std::error_code ec, size_t) {
2✔
746
        if (ec == error::operation_aborted)
2✔
747
            write_was_canceled = true;
2✔
748
    };
2✔
749
    socket_2.async_write(data, 0, write_handler);
2✔
750
    socket_2.close();
2✔
751
    service.run();
2✔
752
    CHECK(read_was_canceled);
2✔
753
    CHECK(write_was_canceled);
2✔
754
}
2✔
755

756

757
TEST(Network_CancelReadByDestroy)
758
{
2✔
759
    // Check that canceled read operations never try to access socket, stream,
1✔
760
    // or input buffer objects, even if they were partially completed.
1✔
761

1✔
762
    const int num_connections = 16;
2✔
763
    network::Service service;
2✔
764
    std::unique_ptr<std::unique_ptr<network::Socket>[]> write_sockets;
2✔
765
    std::unique_ptr<std::unique_ptr<network::Socket>[]> read_sockets;
2✔
766
    std::unique_ptr<std::unique_ptr<network::ReadAheadBuffer>[]> read_ahead_buffers;
2✔
767
    write_sockets.reset(new std::unique_ptr<network::Socket>[num_connections]);
2✔
768
    read_sockets.reset(new std::unique_ptr<network::Socket>[num_connections]);
2✔
769
    read_ahead_buffers.reset(new std::unique_ptr<network::ReadAheadBuffer>[num_connections]);
2✔
770
    char output_buffer[2] = {'x', '\n'};
2✔
771
    std::unique_ptr<char[][2]> input_buffers(new char[num_connections][2]);
2✔
772
    for (int i = 0; i < num_connections; ++i) {
34✔
773
        write_sockets[i].reset(new network::Socket{service});
32✔
774
        read_sockets[i].reset(new network::Socket{service});
32✔
775
        connect_sockets(*write_sockets[i], *read_sockets[i]);
32✔
776
        read_ahead_buffers[i].reset(new network::ReadAheadBuffer);
32✔
777
    }
32✔
778
    for (int i = 0; i < num_connections; ++i) {
34✔
779
        auto read_handler = [&](std::error_code ec, size_t n) {
32✔
780
            CHECK(n == 0 || n == 1 || n == 2);
32✔
781
            if (n == 2) {
32✔
782
                CHECK_NOT(ec);
2✔
783
                for (int j = 0; j < num_connections; ++j)
34✔
784
                    read_sockets[j]->cancel();
32✔
785
                read_ahead_buffers.reset(); // Destroy all input streams
2✔
786
                read_sockets.reset();       // Destroy all read sockets
2✔
787
                input_buffers.reset();      // Destroy all input buffers
2✔
788
                return;
2✔
789
            }
2✔
790
            CHECK_EQUAL(error::operation_aborted, ec);
30✔
791
        };
30✔
792
        read_sockets[i]->async_read_until(input_buffers[i], 2, '\n', *read_ahead_buffers[i], read_handler);
32✔
793
        auto write_handler = [&](std::error_code ec, size_t) {
32✔
794
            CHECK_NOT(ec);
32✔
795
        };
32✔
796
        int n = (i == num_connections / 2 ? 2 : 1);
31✔
797
        write_sockets[i]->async_write(output_buffer, n, write_handler);
32✔
798
    }
32✔
799
    service.run();
2✔
800
}
2✔
801

802

803
TEST(Network_AcceptorMixedAsyncSync)
804
{
2✔
805
    network::Service service;
2✔
806
    network::Acceptor acceptor{service};
2✔
807
    bind_acceptor(acceptor);
2✔
808
    network::Endpoint ep = acceptor.local_endpoint();
2✔
809
    auto connect = [ep] {
6✔
810
        network::Service connect_service;
6✔
811
        network::Socket socket{connect_service};
6✔
812
        socket.connect(ep);
6✔
813
    };
6✔
814

1✔
815
    // Synchronous accept -> stay on blocking mode
1✔
816
    {
2✔
817
        ThreadWrapper thread;
2✔
818
        thread.start(connect);
2✔
819
        network::Socket socket{service};
2✔
820
        acceptor.accept(socket);
2✔
821
        CHECK_NOT(thread.join());
2✔
822
    }
2✔
823

1✔
824
    // Asynchronous accept -> switch to nonblocking mode
1✔
825
    {
2✔
826
        ThreadWrapper thread;
2✔
827
        thread.start(connect);
2✔
828
        network::Socket socket{service};
2✔
829
        bool was_accepted = false;
2✔
830
        auto accept_handler = [&](std::error_code ec) {
2✔
831
            if (!ec)
2✔
832
                was_accepted = true;
2✔
833
        };
2✔
834
        acceptor.async_accept(socket, accept_handler);
2✔
835
        service.run();
2✔
836
        CHECK(was_accepted);
2✔
837
        CHECK_NOT(thread.join());
2✔
838
    }
2✔
839

1✔
840
    // Synchronous accept -> switch back to blocking mode
1✔
841
    {
2✔
842
        ThreadWrapper thread;
2✔
843
        thread.start(connect);
2✔
844
        network::Socket socket{service};
2✔
845
        acceptor.accept(socket);
2✔
846
        CHECK_NOT(thread.join());
2✔
847
    }
2✔
848
}
2✔
849

850

851
TEST(Network_SocketMixedAsyncSync)
852
{
2✔
853
    network::Service acceptor_service;
2✔
854
    network::Acceptor acceptor{acceptor_service};
2✔
855
    bind_acceptor(acceptor);
2✔
856
    network::Endpoint ep = acceptor.local_endpoint();
2✔
857
    auto accept_and_echo = [&] {
4✔
858
        network::Socket socket{acceptor_service};
4✔
859
        acceptor.accept(socket);
4✔
860
        socket.set_option(network::SocketBase::no_delay(true));
4✔
861
        network::ReadAheadBuffer rab;
4✔
862
        size_t buffer_size = 1024;
4✔
863
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
4✔
864
        size_t size = socket.read_until(buffer.get(), buffer_size, '\n', rab);
4✔
865
        socket.write(buffer.get(), size);
4✔
866
    };
4✔
867

1✔
868
    {
2✔
869
        ThreadWrapper thread;
2✔
870
        thread.start(accept_and_echo);
2✔
871
        network::Service service;
2✔
872

1✔
873
        // Synchronous connect -> stay in blocking mode
1✔
874
        network::Socket socket{service};
2✔
875
        socket.connect(ep);
2✔
876
        socket.set_option(network::SocketBase::no_delay(true));
2✔
877
        network::ReadAheadBuffer rab;
2✔
878

1✔
879
        // Asynchronous write -> switch to nonblocking mode
1✔
880
        const char* message = "Calabi-Yau\n";
2✔
881
        bool was_written = false;
2✔
882
        auto write_handler = [&](std::error_code ec, size_t) {
2✔
883
            if (!ec)
2✔
884
                was_written = true;
2✔
885
        };
2✔
886
        socket.async_write(message, strlen(message), write_handler);
2✔
887
        service.run();
2✔
888
        CHECK(was_written);
2✔
889

1✔
890
        // Synchronous read -> switch back to blocking mode
1✔
891
        size_t buffer_size = 1024;
2✔
892
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
893
        std::error_code ec;
2✔
894
        size_t size = socket.read(buffer.get(), buffer_size, rab, ec);
2✔
895
        if (CHECK_EQUAL(ec, MiscExtErrors::end_of_input)) {
2✔
896
            if (CHECK_EQUAL(size, strlen(message)))
2✔
897
                CHECK(std::equal(buffer.get(), buffer.get() + size, message));
2✔
898
        }
2✔
899

1✔
900
        CHECK_NOT(thread.join());
2✔
901
    }
2✔
902

1✔
903
    {
2✔
904
        ThreadWrapper thread;
2✔
905
        thread.start(accept_and_echo);
2✔
906
        network::Service service;
2✔
907

1✔
908
        // Asynchronous connect -> switch to nonblocking mode
1✔
909
        network::Socket socket{service};
2✔
910
        bool is_connected = false;
2✔
911
        auto connect_handler = [&](std::error_code ec) {
2✔
912
            if (!ec)
2✔
913
                is_connected = true;
2✔
914
        };
2✔
915
        socket.async_connect(ep, std::move(connect_handler));
2✔
916
        service.run();
2✔
917
        CHECK(is_connected);
2✔
918
        network::ReadAheadBuffer rab;
2✔
919

1✔
920
        // Synchronous write -> switch back to blocking mode
1✔
921
        const char* message = "The Verlinde Algebra And The Cohomology Of The Grassmannian\n";
2✔
922
        socket.write(message, strlen(message));
2✔
923

1✔
924
        // Asynchronous read -> swich once again to nonblocking mode
1✔
925
        size_t buffer_size = 1024;
2✔
926
        std::unique_ptr<char[]> buffer(new char[buffer_size]);
2✔
927
        auto read_handler = [&](std::error_code ec, size_t size) {
2✔
928
            if (CHECK_EQUAL(ec, MiscExtErrors::end_of_input)) {
2✔
929
                if (CHECK_EQUAL(size, strlen(message)))
2✔
930
                    CHECK(std::equal(buffer.get(), buffer.get() + size, message));
2✔
931
            }
2✔
932
        };
2✔
933
        socket.async_read(buffer.get(), buffer_size, rab, read_handler);
2✔
934
        service.run();
2✔
935

1✔
936
        CHECK_NOT(thread.join());
2✔
937
    }
2✔
938
}
2✔
939

940

941
TEST(Network_SocketShutdown)
942
{
2✔
943
    network::Service service;
2✔
944
    network::Socket socket_1{service}, socket_2{service};
2✔
945
    connect_sockets(socket_1, socket_2);
2✔
946
    network::ReadAheadBuffer read_ahead_buffer;
2✔
947

1✔
948
    bool end_of_input_seen = false;
2✔
949
    auto handler = [&](std::error_code ec, size_t) {
2✔
950
        if (ec == MiscExtErrors::end_of_input)
2✔
951
            end_of_input_seen = true;
2✔
952
    };
2✔
953
    char ch;
2✔
954
    socket_2.async_read(&ch, 1, read_ahead_buffer, std::move(handler));
2✔
955
    socket_1.shutdown(network::Socket::shutdown_send);
2✔
956
    service.run();
2✔
957
    CHECK(end_of_input_seen);
2✔
958
}
2✔
959

960

961
TEST(Network_DeadlineTimer)
962
{
2✔
963
    network::Service service;
2✔
964
    network::DeadlineTimer timer{service};
2✔
965

1✔
966
    // Check that the completion handler is executed
1✔
967
    bool completed = false;
2✔
968
    bool canceled = false;
2✔
969
    const auto wait_handler = [&](Status status) {
6✔
970
        if (status.is_ok())
6✔
971
            completed = true;
2✔
972
        if (status == ErrorCodes::OperationAborted)
6✔
973
            canceled = true;
4✔
974
    };
6✔
975
    timer.async_wait(std::chrono::seconds(0), wait_handler);
2✔
976
    CHECK(!completed);
2✔
977
    CHECK(!canceled);
2✔
978
    service.run();
2✔
979
    CHECK(completed);
2✔
980
    CHECK(!canceled);
2✔
981
    completed = false;
2✔
982

1✔
983
    // Check that an immediately completed wait operation can be canceled
1✔
984
    timer.async_wait(std::chrono::seconds(0), wait_handler);
2✔
985
    CHECK(!completed);
2✔
986
    CHECK(!canceled);
2✔
987
    timer.cancel();
2✔
988
    CHECK(!completed);
2✔
989
    CHECK(!canceled);
2✔
990
    service.run();
2✔
991
    CHECK(!completed);
2✔
992
    CHECK(canceled);
2✔
993
    canceled = false;
2✔
994

1✔
995
    // Check that a long running wait operation can be canceled
1✔
996
    timer.async_wait(std::chrono::hours(10000), wait_handler);
2✔
997
    CHECK(!completed);
2✔
998
    CHECK(!canceled);
2✔
999
    timer.cancel();
2✔
1000
    CHECK(!completed);
2✔
1001
    CHECK(!canceled);
2✔
1002
    service.run();
2✔
1003
    CHECK(!completed);
2✔
1004
    CHECK(canceled);
2✔
1005
}
2✔
1006

1007

1008
/*
1009
TEST(Network_DeadlineTimer_Special)
1010
{
1011
    network::Service service;
1012
    network::DeadlineTimer timer_1{service};
1013
    network::DeadlineTimer timer_2{service};
1014
    network::DeadlineTimer timer_3{service};
1015
    network::DeadlineTimer timer_4{service};
1016
    network::DeadlineTimer timer_5{service};
1017
    network::DeadlineTimer timer_6{service};
1018
    timer_1.async_wait(std::chrono::seconds(3), [](Status) { std::cerr << "*3*\n";   });
1019
    timer_2.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2*\n";   });
1020
    timer_3.async_wait(std::chrono::seconds(3), [](Status) { std::cerr << "*3-2*\n"; });
1021
    timer_4.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2-2*\n"; });
1022
    timer_5.async_wait(std::chrono::seconds(1), [](Status) { std::cerr << "*1*\n";   });
1023
    timer_6.async_wait(std::chrono::seconds(2), [](Status) { std::cerr << "*2-3*\n"; });
1024
    service.run();
1025
}
1026
*/
1027

1028

1029
TEST(Network_ThrowFromHandlers)
1030
{
2✔
1031
    // Check that exceptions can propagate correctly out from any type of
1✔
1032
    // completion handler
1✔
1033
    network::Service service;
2✔
1034
    struct TestException1 {
2✔
1035
    };
2✔
1036
    service.post([](Status) {
2✔
1037
        throw TestException1();
2✔
1038
    });
2✔
1039
    CHECK_THROW(service.run(), TestException1);
2✔
1040

1✔
1041
    {
2✔
1042
        network::Acceptor acceptor{service};
2✔
1043
        network::Endpoint ep = bind_acceptor(acceptor);
2✔
1044
        network::Socket socket_1{service};
2✔
1045
        struct TestException2 {
2✔
1046
        };
2✔
1047
        acceptor.async_accept(socket_1, [](std::error_code) {
2✔
1048
            throw TestException2();
2✔
1049
        });
2✔
1050
        network::Socket socket_2{service};
2✔
1051
        socket_2.async_connect(ep, [](std::error_code) {});
2✔
1052
        CHECK_THROW(service.run(), TestException2);
2✔
1053
    }
2✔
1054
    {
2✔
1055
        network::Acceptor acceptor{service};
2✔
1056
        network::Endpoint ep = bind_acceptor(acceptor);
2✔
1057
        network::Socket socket_1{service};
2✔
1058
        acceptor.async_accept(socket_1, [](std::error_code) {});
2✔
1059
        network::Socket socket_2{service};
2✔
1060
        struct TestException3 {
2✔
1061
        };
2✔
1062
        socket_2.async_connect(ep, [](std::error_code) {
2✔
1063
            throw TestException3();
2✔
1064
        });
2✔
1065
        CHECK_THROW(service.run(), TestException3);
2✔
1066
    }
2✔
1067
    {
2✔
1068
        network::Socket socket_1{service}, socket_2{service};
2✔
1069
        connect_sockets(socket_1, socket_2);
2✔
1070
        network::ReadAheadBuffer rab;
2✔
1071
        char ch_1;
2✔
1072
        struct TestException4 {
2✔
1073
        };
2✔
1074
        socket_1.async_read(&ch_1, 1, rab, [](std::error_code, size_t) {
2✔
1075
            throw TestException4();
2✔
1076
        });
2✔
1077
        char ch_2 = 0;
2✔
1078
        socket_2.async_write(&ch_2, 1, [](std::error_code, size_t) {});
2✔
1079
        CHECK_THROW(service.run(), TestException4);
2✔
1080
    }
2✔
1081
    {
2✔
1082
        network::Socket socket_1{service}, socket_2{service};
2✔
1083
        connect_sockets(socket_1, socket_2);
2✔
1084
        network::ReadAheadBuffer rab;
2✔
1085
        char ch_1;
2✔
1086
        socket_1.async_read(&ch_1, 1, rab, [](std::error_code, size_t) {});
2✔
1087
        char ch_2 = 0;
2✔
1088
        struct TestException5 {
2✔
1089
        };
2✔
1090
        socket_2.async_write(&ch_2, 1, [](std::error_code, size_t) {
2✔
1091
            throw TestException5();
2✔
1092
        });
2✔
1093
        CHECK_THROW(service.run(), TestException5);
2✔
1094
    }
2✔
1095
    {
2✔
1096
        network::DeadlineTimer timer{service};
2✔
1097
        struct TestException6 {
2✔
1098
        };
2✔
1099
        timer.async_wait(std::chrono::seconds(0), [](Status) {
2✔
1100
            throw TestException6();
2✔
1101
        });
2✔
1102
        CHECK_THROW(service.run(), TestException6);
2✔
1103
    }
2✔
1104
}
2✔
1105

1106

1107
TEST(Network_HandlerDealloc)
1108
{
2✔
1109
    // Check that dynamically allocated handlers are properly freed when the
1✔
1110
    // service object is destroyed.
1✔
1111
    {
2✔
1112
        // m_post_handlers
1✔
1113
        network::Service service;
2✔
1114
        service.post([](Status) {});
1✔
1115
    }
2✔
1116
    {
2✔
1117
        // m_imm_handlers
1✔
1118
        network::Service service;
2✔
1119
        // By adding two post handlers that throw, one is going to be left
1✔
1120
        // behind in `m_imm_handlers`
1✔
1121
        service.post([&](Status) {
2✔
1122
            throw std::runtime_error("");
2✔
1123
        });
2✔
1124
        service.post([&](Status) {
1✔
1125
            throw std::runtime_error("");
×
1126
        });
×
1127
        CHECK_THROW(service.run(), std::runtime_error);
2✔
1128
    }
2✔
1129
    {
2✔
1130
        // m_poll_handlers
1✔
1131
        network::Service service;
2✔
1132
        network::Acceptor acceptor{service};
2✔
1133
        acceptor.open(network::StreamProtocol::ip_v4());
2✔
1134
        network::Socket socket{service};
2✔
1135
        // This leaves behind a read handler in m_poll_handlers
1✔
1136
        acceptor.async_accept(socket, [&](std::error_code) {});
1✔
1137
    }
2✔
1138
    {
2✔
1139
        // m_cancel_handlers
1✔
1140
        network::Service service;
2✔
1141
        network::Acceptor acceptor{service};
2✔
1142
        acceptor.open(network::StreamProtocol::ip_v4());
2✔
1143
        network::Socket socket{service};
2✔
1144
        acceptor.async_accept(socket, [&](std::error_code) {});
1✔
1145
        // This leaves behind a read handler in m_cancel_handlers
1✔
1146
        acceptor.close();
2✔
1147
    }
2✔
1148
    {
2✔
1149
        // m_poll_handlers
1✔
1150
        network::Service service_1;
2✔
1151
        network::Acceptor acceptor{service_1};
2✔
1152
        network::Endpoint listening_endpoint = bind_acceptor(acceptor);
2✔
1153
        network::Socket socket_1{service_1};
2✔
1154
        ThreadWrapper thread;
2✔
1155
        thread.start([&] {
2✔
1156
            acceptor.accept(socket_1);
2✔
1157
        });
2✔
1158
        network::Service service_2;
2✔
1159
        network::Socket socket_2{service_2};
2✔
1160
        socket_2.connect(listening_endpoint);
2✔
1161
        socket_2.set_option(network::SocketBase::no_delay(true));
2✔
1162
        thread.join();
2✔
1163
        socket_1.set_option(network::SocketBase::no_delay(true));
2✔
1164
        network::ReadAheadBuffer rab;
2✔
1165
        char buffer[1];
2✔
1166
        char data[] = {'X', 'F', 'M'};
2✔
1167
        // This leaves behind both a read and a write handler in m_poll_handlers
1✔
1168
        socket_1.async_read(buffer, sizeof buffer, rab, [](std::error_code, size_t) {});
1✔
1169
        socket_1.async_write(data, sizeof data, [](std::error_code, size_t) {});
1✔
1170
    }
2✔
1171
}
2✔
1172

1173

1174
namespace {
1175

1176
template <int size>
1177
struct PostReallocHandler {
1178
    PostReallocHandler(int& v)
1179
        : var(v)
1180
    {
18✔
1181
    }
18✔
1182
    void operator()(Status)
1183
    {
18✔
1184
        var = size;
18✔
1185
    }
18✔
1186
    int& var;
1187
    char strut[size];
1188
};
1189

1190
} // unnamed namespace
1191

1192
TEST(Network_PostRealloc)
1193
{
2✔
1194
    // Use progressively larger post handlers to check that memory reallocation
1✔
1195
    // works
1✔
1196

1✔
1197
    network::Service service;
2✔
1198
    int var = 0;
2✔
1199
    for (int i = 0; i < 3; ++i) {
8✔
1200
        service.post(PostReallocHandler<10>(var));
6✔
1201
        service.run();
6✔
1202
        CHECK_EQUAL(10, var);
6✔
1203
        service.post(PostReallocHandler<100>(var));
6✔
1204
        service.run();
6✔
1205
        CHECK_EQUAL(100, var);
6✔
1206
        service.post(PostReallocHandler<1000>(var));
6✔
1207
        service.run();
6✔
1208
        CHECK_EQUAL(1000, var);
6✔
1209
    }
6✔
1210
}
2✔
1211

1212

1213
namespace {
1214

1215
struct AsyncReadWriteRealloc {
1216
    network::Service service;
1217
    network::Socket read_socket{service}, write_socket{service};
1218
    network::ReadAheadBuffer rab;
1219
    char read_buffer[3];
1220
    char write_buffer[3] = {'0', '1', '2'};
1221
    Random random{random_int<unsigned long>()}; // Seed from slow global generator
1222

1223
    const size_t num_bytes_to_write = 65536;
1224
    size_t num_bytes_written = 0;
1225
    size_t num_bytes_read = 0;
1226

1227
    template <int size>
1228
    struct WriteHandler {
1229
        WriteHandler(AsyncReadWriteRealloc& s)
1230
            : state(s)
1231
        {
87,464✔
1232
        }
87,464✔
1233
        void operator()(std::error_code ec, size_t n)
1234
        {
87,464✔
1235
            if (ec)
87,464✔
1236
                throw std::system_error(ec);
×
1237
            state.num_bytes_written += n;
87,464✔
1238
            state.initiate_write();
87,464✔
1239
        }
87,464✔
1240
        AsyncReadWriteRealloc& state;
1241
        char strut[size];
1242
    };
1243

1244
    void initiate_write()
1245
    {
87,466✔
1246
        if (num_bytes_written >= num_bytes_to_write) {
87,466✔
1247
            write_socket.close();
2✔
1248
            return;
2✔
1249
        }
2✔
1250
        int v = random.draw_int_max(3);
87,464✔
1251
        size_t n = std::min(size_t(v), size_t(num_bytes_to_write - num_bytes_written));
87,464✔
1252
        switch (v) {
87,464✔
1253
            case 0:
22,039✔
1254
                write_socket.async_write(write_buffer, n, WriteHandler<1>(*this));
22,039✔
1255
                return;
22,039✔
1256
            case 1:
21,654✔
1257
                write_socket.async_write(write_buffer, n, WriteHandler<10>(*this));
21,654✔
1258
                return;
21,654✔
1259
            case 2:
21,894✔
1260
                write_socket.async_write(write_buffer, n, WriteHandler<100>(*this));
21,894✔
1261
                return;
21,894✔
1262
            case 3:
21,877✔
1263
                write_socket.async_write(write_buffer, n, WriteHandler<1000>(*this));
21,877✔
1264
                return;
21,877✔
1265
        }
×
1266
        REALM_ASSERT(false);
×
1267
    }
×
1268

1269
    template <int size>
1270
    struct ReadHandler {
1271
        ReadHandler(AsyncReadWriteRealloc& s)
1272
            : state(s)
1273
        {
87,493✔
1274
        }
87,493✔
1275
        void operator()(std::error_code ec, size_t n)
1276
        {
87,493✔
1277
            if (ec && ec != MiscExtErrors::end_of_input)
87,493!
1278
                throw std::system_error(ec);
×
1279
            state.num_bytes_read += n;
87,493✔
1280
            if (ec != MiscExtErrors::end_of_input)
87,493✔
1281
                state.initiate_read();
87,491✔
1282
        }
87,493✔
1283
        AsyncReadWriteRealloc& state;
1284
        char strut[size];
1285
    };
1286

1287
    void initiate_read()
1288
    {
87,493✔
1289
        int v = random.draw_int_max(3);
87,493✔
1290
        size_t n = size_t(v);
87,493✔
1291
        switch (v) {
87,493✔
1292
            case 0:
21,933✔
1293
                read_socket.async_read(read_buffer, n, rab, ReadHandler<1>(*this));
21,933✔
1294
                return;
21,933✔
1295
            case 1:
21,964✔
1296
                read_socket.async_read(read_buffer, n, rab, ReadHandler<10>(*this));
21,964✔
1297
                return;
21,964✔
1298
            case 2:
21,676✔
1299
                read_socket.async_read(read_buffer, n, rab, ReadHandler<100>(*this));
21,676✔
1300
                return;
21,676✔
1301
            case 3:
21,920✔
1302
                read_socket.async_read(read_buffer, n, rab, ReadHandler<1000>(*this));
21,920✔
1303
                return;
21,920✔
1304
        }
×
1305
        REALM_ASSERT(false);
×
1306
    }
×
1307
};
1308

1309
} // unnamed namespace
1310

1311
TEST(Network_AsyncReadWriteRealloc)
1312
{
2✔
1313
    // Use progressively larger completion handlers to check that memory
1✔
1314
    // reallocation works
1✔
1315

1✔
1316
    AsyncReadWriteRealloc state;
2✔
1317
    connect_sockets(state.read_socket, state.write_socket);
2✔
1318
    state.initiate_read();
2✔
1319
    state.initiate_write();
2✔
1320
    state.service.run();
2✔
1321
    CHECK_EQUAL(state.num_bytes_to_write, state.num_bytes_written);
2✔
1322
    CHECK_EQUAL(state.num_bytes_written, state.num_bytes_read);
2✔
1323
}
2✔
1324

1325

1326
namespace {
1327

1328
char echo_body[] = {'\xC1', '\x2C', '\xEF', '\x48', '\x8C', '\xCD', '\x41', '\xFA', '\x12', '\xF9', '\xF4',
1329
                    '\x72', '\xDF', '\x92', '\x8E', '\x68', '\xAB', '\x8F', '\x6B', '\xDF', '\x80', '\x26',
1330
                    '\xD1', '\x60', '\x21', '\x91', '\x20', '\xC8', '\x94', '\x0C', '\xDB', '\x07', '\xB0',
1331
                    '\x1C', '\x3A', '\xDA', '\x5E', '\x9B', '\x62', '\xDE', '\x30', '\xA3', '\x7E', '\xED',
1332
                    '\xB4', '\x30', '\xD7', '\x43', '\x3F', '\xDE', '\xF2', '\x6D', '\x9A', '\x1D', '\xAE',
1333
                    '\xF4', '\xD5', '\xFB', '\xAC', '\xE8', '\x67', '\x37', '\xFD', '\xF3'};
1334

1335
void sync_server(network::Acceptor& acceptor, unit_test::TestContext& test_context)
1336
{
2✔
1337
    network::Service& service = acceptor.get_service();
2✔
1338
    network::Socket socket{service};
2✔
1339
    network::Endpoint endpoint;
2✔
1340
    acceptor.accept(socket, endpoint);
2✔
1341
    socket.set_option(network::SocketBase::no_delay(true));
2✔
1342

1✔
1343
    network::ReadAheadBuffer rab;
2✔
1344
    const size_t max_header_size = 32;
2✔
1345
    char header_buffer[max_header_size];
2✔
1346
    size_t n = socket.read_until(header_buffer, max_header_size, '\n', rab);
2✔
1347
    if (!CHECK_GREATER(n, 0))
2✔
1348
        return;
1✔
1349
    if (!CHECK_LESS_EQUAL(n, max_header_size))
2✔
1350
        return;
1✔
1351
    if (!CHECK_EQUAL(header_buffer[n - 1], '\n'))
2✔
1352
        return;
1✔
1353
    MemoryInputStream in;
2✔
1354
    in.set_buffer(header_buffer, header_buffer + (n - 1));
2✔
1355
    in.unsetf(std::ios_base::skipws);
2✔
1356
    std::string message_type;
2✔
1357
    in >> message_type;
2✔
1358
    if (!CHECK_EQUAL(message_type, "echo"))
2✔
1359
        return;
1✔
1360
    char sp;
2✔
1361
    size_t body_size;
2✔
1362
    in >> sp >> body_size;
2✔
1363
    if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1364
        return;
1✔
1365
    std::unique_ptr<char[]> body_buffer(new char[body_size]);
2✔
1366
    size_t m = socket.read(body_buffer.get(), body_size, rab);
2✔
1367
    if (!CHECK_EQUAL(m, body_size))
2✔
1368
        return;
1✔
1369
    MemoryOutputStream out;
2✔
1370
    out.set_buffer(header_buffer, header_buffer + max_header_size);
2✔
1371
    out << "was " << body_size << '\n';
2✔
1372
    socket.write(header_buffer, out.size());
2✔
1373
    socket.write(body_buffer.get(), body_size);
2✔
1374
}
2✔
1375

1376

1377
void sync_client(unsigned short listen_port, unit_test::TestContext& test_context)
1378
{
2✔
1379
    network::Service service;
2✔
1380
    network::Socket socket{service};
2✔
1381
    {
2✔
1382
        std::ostringstream out;
2✔
1383
        out << listen_port;
2✔
1384
        std::string listen_port_2 = out.str();
2✔
1385
        connect_socket(socket, listen_port_2);
2✔
1386
    }
2✔
1387
    socket.set_option(network::SocketBase::no_delay(true));
2✔
1388

1✔
1389
    const size_t max_header_size = 32;
2✔
1390
    char header_buffer[max_header_size];
2✔
1391
    MemoryOutputStream out;
2✔
1392
    out.set_buffer(header_buffer, header_buffer + max_header_size);
2✔
1393
    out << "echo " << sizeof echo_body << '\n';
2✔
1394
    socket.write(header_buffer, out.size());
2✔
1395
    socket.write(echo_body, sizeof echo_body);
2✔
1396

1✔
1397
    network::ReadAheadBuffer rab;
2✔
1398
    size_t n = socket.read_until(header_buffer, max_header_size, '\n', rab);
2✔
1399
    if (!CHECK_GREATER(n, 0))
2✔
1400
        return;
1✔
1401
    if (!CHECK_LESS_EQUAL(n, max_header_size))
2✔
1402
        return;
1✔
1403
    if (!CHECK_EQUAL(header_buffer[n - 1], '\n'))
2✔
1404
        return;
1✔
1405
    MemoryInputStream in;
2✔
1406
    in.set_buffer(header_buffer, header_buffer + (n - 1));
2✔
1407
    in.unsetf(std::ios_base::skipws);
2✔
1408
    std::string message_type;
2✔
1409
    in >> message_type;
2✔
1410
    if (!CHECK_EQUAL(message_type, "was"))
2✔
1411
        return;
1✔
1412
    char sp;
2✔
1413
    size_t echo_size;
2✔
1414
    in >> sp >> echo_size;
2✔
1415
    if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1416
        return;
1✔
1417
    std::unique_ptr<char[]> echo_buffer(new char[echo_size]);
2✔
1418
    size_t m = socket.read(echo_buffer.get(), echo_size, rab);
2✔
1419
    if (!CHECK_EQUAL(m, echo_size))
2✔
1420
        return;
1✔
1421
    if (!CHECK_EQUAL(echo_size, sizeof echo_body))
2✔
1422
        return;
1✔
1423
    CHECK(std::equal(echo_body, echo_body + sizeof echo_body, echo_buffer.get()));
2✔
1424
}
2✔
1425

1426
} // anonymous namespace
1427

1428

1429
TEST(Network_Sync)
1430
{
2✔
1431
    network::Service service;
2✔
1432
    network::Acceptor acceptor{service};
2✔
1433
    network::Endpoint listen_endpoint = bind_acceptor(acceptor);
2✔
1434
    network::Endpoint::port_type listen_port = listen_endpoint.port();
2✔
1435

1✔
1436
    ThreadWrapper server_thread, client_thread;
2✔
1437
    server_thread.start([&] {
2✔
1438
        sync_server(acceptor, test_context);
2✔
1439
    });
2✔
1440
    client_thread.start([&] {
2✔
1441
        sync_client(listen_port, test_context);
2✔
1442
    });
2✔
1443
    client_thread.join();
2✔
1444
    server_thread.join();
2✔
1445
}
2✔
1446

1447

1448
namespace {
1449

1450
class async_server {
1451
public:
1452
    async_server(unit_test::TestContext& test_context)
1453
        : m_acceptor{m_service}
1454
        , m_socket{m_service}
1455
        , m_test_context{test_context}
1456
    {
2✔
1457
    }
2✔
1458

1459
    unsigned short init()
1460
    {
2✔
1461
        network::Endpoint listen_endpoint = bind_acceptor(m_acceptor);
2✔
1462
        network::Endpoint::port_type listen_port = listen_endpoint.port();
2✔
1463
        return listen_port;
2✔
1464
    }
2✔
1465

1466
    void run()
1467
    {
2✔
1468
        auto handler = [this](std::error_code ec) {
2✔
1469
            handle_accept(ec);
2✔
1470
        };
2✔
1471
        network::Endpoint endpoint;
2✔
1472
        m_acceptor.async_accept(m_socket, endpoint, handler);
2✔
1473
        m_service.run();
2✔
1474
    }
2✔
1475

1476
private:
1477
    network::Service m_service;
1478
    network::Acceptor m_acceptor;
1479
    network::Socket m_socket;
1480
    network::ReadAheadBuffer m_read_ahead_buffer;
1481
    static const size_t s_max_header_size = 32;
1482
    char m_header_buffer[s_max_header_size];
1483
    size_t m_body_size;
1484
    std::unique_ptr<char[]> m_body_buffer;
1485
    unit_test::TestContext& m_test_context;
1486

1487
    void handle_accept(std::error_code ec)
1488
    {
2✔
1489
        if (ec)
2✔
1490
            throw std::system_error(ec);
×
1491
        m_socket.set_option(network::SocketBase::no_delay(true));
2✔
1492
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1493
            handle_read_header(handler_ec, handler_n);
2✔
1494
        };
2✔
1495
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1496
    }
2✔
1497

1498
    void handle_read_header(std::error_code ec, size_t n)
1499
    {
2✔
1500
        if (ec)
2✔
1501
            throw std::system_error(ec);
×
1502
        unit_test::TestContext& test_context = m_test_context;
2✔
1503
        if (!CHECK_GREATER(n, 0))
2✔
1504
            return;
1✔
1505
        if (!CHECK_LESS_EQUAL(n, s_max_header_size + 0))
2✔
1506
            return;
1✔
1507
        if (!CHECK_EQUAL(m_header_buffer[n - 1], '\n'))
2✔
1508
            return;
1✔
1509
        MemoryInputStream in;
2✔
1510
        in.set_buffer(m_header_buffer, m_header_buffer + (n - 1));
2✔
1511
        in.unsetf(std::ios_base::skipws);
2✔
1512
        std::string message_type;
2✔
1513
        in >> message_type;
2✔
1514
        if (!CHECK_EQUAL(message_type, "echo"))
2✔
1515
            return;
1✔
1516
        char sp;
2✔
1517
        in >> sp >> m_body_size;
2✔
1518
        if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1519
            return;
1✔
1520
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1521
            handle_read_body(handler_ec, handler_n);
2✔
1522
        };
2✔
1523
        m_body_buffer.reset(new char[m_body_size]);
2✔
1524
        m_socket.async_read(m_body_buffer.get(), m_body_size, m_read_ahead_buffer, handler);
2✔
1525
    }
2✔
1526

1527
    void handle_read_body(std::error_code ec, size_t n)
1528
    {
2✔
1529
        if (ec)
2✔
1530
            throw std::system_error(ec);
×
1531
        unit_test::TestContext& test_context = m_test_context;
2✔
1532
        if (!CHECK_EQUAL(n, m_body_size))
2✔
1533
            return;
1✔
1534
        MemoryOutputStream out;
2✔
1535
        out.set_buffer(m_header_buffer, m_header_buffer + s_max_header_size);
2✔
1536
        out << "was " << m_body_size << '\n';
2✔
1537
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1538
            handle_write_header(handler_ec);
2✔
1539
        };
2✔
1540
        m_socket.async_write(m_header_buffer, out.size(), handler);
2✔
1541
    }
2✔
1542

1543
    void handle_write_header(std::error_code ec)
1544
    {
2✔
1545
        if (ec)
2✔
1546
            throw std::system_error(ec);
×
1547
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1548
            handle_write_body(handler_ec);
2✔
1549
        };
2✔
1550
        m_socket.async_write(m_body_buffer.get(), m_body_size, handler);
2✔
1551
    }
2✔
1552

1553
    void handle_write_body(std::error_code ec)
1554
    {
2✔
1555
        if (ec)
2✔
1556
            throw std::system_error(ec);
×
1557
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1558
            handle_read_header_2(handler_ec);
2✔
1559
        };
2✔
1560
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1561
    }
2✔
1562

1563
    void handle_read_header_2(std::error_code ec)
1564
    {
2✔
1565
        if (ec && ec != MiscExtErrors::end_of_input)
2✔
1566
            throw std::system_error(ec);
×
1567
        unit_test::TestContext& test_context = m_test_context;
2✔
1568
        CHECK(ec == MiscExtErrors::end_of_input);
2✔
1569
    }
2✔
1570
};
1571

1572

1573
class async_client {
1574
public:
1575
    async_client(unsigned short listen_port, unit_test::TestContext& test_context)
1576
        : m_listen_port(listen_port)
1577
        , m_socket(m_service)
1578
        , m_test_context(test_context)
1579
    {
2✔
1580
    }
2✔
1581

1582
    void run()
1583
    {
2✔
1584
        std::string service;
2✔
1585
        {
2✔
1586
            std::ostringstream out;
2✔
1587
            out << m_listen_port;
2✔
1588
            service = out.str();
2✔
1589
        }
2✔
1590
        connect_socket(m_socket, service);
2✔
1591
        m_socket.set_option(network::SocketBase::no_delay(true));
2✔
1592

1✔
1593
        MemoryOutputStream out;
2✔
1594
        out.set_buffer(m_header_buffer, m_header_buffer + s_max_header_size);
2✔
1595
        out << "echo " << sizeof echo_body << '\n';
2✔
1596
        auto handler = [this](std::error_code ec, size_t) {
2✔
1597
            handle_write_header(ec);
2✔
1598
        };
2✔
1599
        m_socket.async_write(m_header_buffer, out.size(), handler);
2✔
1600

1✔
1601
        m_service.run();
2✔
1602

1✔
1603
        m_socket.close();
2✔
1604
    }
2✔
1605

1606
private:
1607
    unsigned short m_listen_port;
1608
    network::Service m_service;
1609
    network::Socket m_socket;
1610
    network::ReadAheadBuffer m_read_ahead_buffer;
1611
    static const size_t s_max_header_size = 32;
1612
    char m_header_buffer[s_max_header_size];
1613
    size_t m_body_size;
1614
    std::unique_ptr<char[]> m_body_buffer;
1615
    unit_test::TestContext& m_test_context;
1616

1617
    void handle_write_header(std::error_code ec)
1618
    {
2✔
1619
        if (ec)
2✔
1620
            throw std::system_error(ec);
×
1621
        auto handler = [this](std::error_code handler_ec, size_t) {
2✔
1622
            handle_write_body(handler_ec);
2✔
1623
        };
2✔
1624
        m_socket.async_write(echo_body, sizeof echo_body, handler);
2✔
1625
    }
2✔
1626

1627
    void handle_write_body(std::error_code ec)
1628
    {
2✔
1629
        if (ec)
2✔
1630
            throw std::system_error(ec);
×
1631
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1632
            handle_read_header(handler_ec, handler_n);
2✔
1633
        };
2✔
1634
        m_socket.async_read_until(m_header_buffer, s_max_header_size, '\n', m_read_ahead_buffer, handler);
2✔
1635
    }
2✔
1636

1637
    void handle_read_header(std::error_code ec, size_t n)
1638
    {
2✔
1639
        if (ec)
2✔
1640
            throw std::system_error(ec);
×
1641
        unit_test::TestContext& test_context = m_test_context;
2✔
1642
        if (!CHECK_GREATER(n, 0))
2✔
1643
            return;
1✔
1644
        if (!CHECK_LESS_EQUAL(n, s_max_header_size + 0))
2✔
1645
            return;
1✔
1646
        if (!CHECK_EQUAL(m_header_buffer[n - 1], '\n'))
2✔
1647
            return;
1✔
1648
        MemoryInputStream in;
2✔
1649
        in.set_buffer(m_header_buffer, m_header_buffer + (n - 1));
2✔
1650
        in.unsetf(std::ios_base::skipws);
2✔
1651
        std::string message_type;
2✔
1652
        in >> message_type;
2✔
1653
        if (!CHECK_EQUAL(message_type, "was"))
2✔
1654
            return;
1✔
1655
        char sp;
2✔
1656
        in >> sp >> m_body_size;
2✔
1657
        if (!CHECK(in) || !CHECK(in.eof()) || !CHECK_EQUAL(sp, ' '))
2✔
1658
            return;
1✔
1659
        auto handler = [this](std::error_code handler_ec, size_t handler_n) {
2✔
1660
            handle_read_body(handler_ec, handler_n);
2✔
1661
        };
2✔
1662
        m_body_buffer.reset(new char[m_body_size]);
2✔
1663
        m_socket.async_read(m_body_buffer.get(), m_body_size, m_read_ahead_buffer, handler);
2✔
1664
    }
2✔
1665

1666
    void handle_read_body(std::error_code ec, size_t n)
1667
    {
2✔
1668
        if (ec)
2✔
1669
            throw std::system_error(ec);
×
1670
        unit_test::TestContext& test_context = m_test_context;
2✔
1671
        if (!CHECK_EQUAL(n, m_body_size))
2✔
1672
            return;
1✔
1673
        if (!CHECK_EQUAL(m_body_size, sizeof echo_body))
2✔
1674
            return;
1✔
1675
        CHECK(std::equal(echo_body, echo_body + sizeof echo_body, m_body_buffer.get()));
2✔
1676
    }
2✔
1677
};
1678

1679
} // anonymous namespace
1680

1681

1682
TEST(Network_Async)
1683
{
2✔
1684
    async_server server(test_context);
2✔
1685
    unsigned short listen_port = server.init();
2✔
1686
    async_client client(listen_port, test_context);
2✔
1687

1✔
1688
    ThreadWrapper server_thread, client_thread;
2✔
1689
    server_thread.start([&] {
2✔
1690
        server.run();
2✔
1691
    });
2✔
1692
    client_thread.start([&] {
2✔
1693
        client.run();
2✔
1694
    });
2✔
1695
    CHECK_NOT(client_thread.join());
2✔
1696
    CHECK_NOT(server_thread.join());
2✔
1697
}
2✔
1698

1699

1700
TEST(Network_HeavyAsyncPost)
1701
{
2✔
1702
    network::Service service;
2✔
1703
    network::DeadlineTimer dummy_timer{service};
2✔
1704
    dummy_timer.async_wait(std::chrono::hours(10000), [](Status) {});
2✔
1705

1✔
1706
    ThreadWrapper looper_thread;
2✔
1707
    looper_thread.start([&] {
2✔
1708
        service.run();
2✔
1709
    });
2✔
1710

1✔
1711
    std::vector<std::pair<int, long>> entries;
2✔
1712
    const long num_iterations = 10000L;
2✔
1713
    auto func = [&](int thread_index) {
16✔
1714
        for (long i = 0; i < num_iterations; ++i)
159,886✔
1715
            service.post([&entries, thread_index, i](Status) {
160,000✔
1716
                entries.emplace_back(thread_index, i);
160,000✔
1717
            });
160,000✔
1718
    };
16✔
1719

1✔
1720
    const int num_threads = 8;
2✔
1721
    std::unique_ptr<ThreadWrapper[]> threads(new ThreadWrapper[num_threads]);
2✔
1722
    for (int i = 0; i < num_threads; ++i)
18✔
1723
        threads[i].start([&func, i] {
16✔
1724
            func(i);
16✔
1725
        });
16✔
1726
    for (int i = 0; i < num_threads; ++i)
18✔
1727
        CHECK_NOT(threads[i].join());
16✔
1728

1✔
1729
    service.post([&](Status) {
2✔
1730
        dummy_timer.cancel();
2✔
1731
    });
2✔
1732
    CHECK_NOT(looper_thread.join());
2✔
1733

1✔
1734
    // Check that every post operation ran exactly once
1✔
1735
    using longlong = long long;
2✔
1736
    if (CHECK_EQUAL(num_threads * longlong(num_iterations), entries.size())) {
2✔
1737
        bool every_post_operation_ran_exactly_once = true;
2✔
1738
        std::sort(entries.begin(), entries.end());
2✔
1739
        auto i = entries.begin();
2✔
1740
        for (int i_1 = 0; i_1 < num_threads; ++i_1) {
18✔
1741
            for (long i_2 = 0; i_2 < num_iterations; ++i_2) {
160,016✔
1742
                int thread_index = i->first;
160,000✔
1743
                long iteration_index = i->second;
160,000✔
1744
                if (i_1 != thread_index || i_2 != iteration_index) {
160,000✔
1745
                    every_post_operation_ran_exactly_once = false;
×
1746
                    break;
×
1747
                }
×
1748
                ++i;
160,000✔
1749
            }
160,000✔
1750
        }
16✔
1751
        CHECK(every_post_operation_ran_exactly_once);
2✔
1752
    }
2✔
1753
}
2✔
1754

1755

1756
TEST(Network_RepeatedCancelAndRestartRead)
1757
{
2✔
1758
    Random random{random_int<unsigned long>()}; // Seed from slow global generator
2✔
1759
    for (int i = 0; i < 1; ++i) {
4✔
1760
        network::Service service_1, service_2;
2✔
1761
        network::Socket socket_1{service_1}, socket_2{service_2};
2✔
1762
        connect_sockets(socket_1, socket_2);
2✔
1763
        network::ReadAheadBuffer rab;
2✔
1764

1✔
1765
        const size_t read_buffer_size = 1024;
2✔
1766
        char read_buffer[read_buffer_size];
2✔
1767
        size_t num_bytes_read = 0;
2✔
1768
        bool end_of_input_seen = false;
2✔
1769
        realm::util::UniqueFunction<void()> initiate_read = [&] {
136,624✔
1770
            auto handler = [&](std::error_code ec, size_t n) {
136,624✔
1771
                num_bytes_read += n;
136,624✔
1772
                if (ec == MiscExtErrors::end_of_input) {
136,624✔
1773
                    end_of_input_seen = true;
2✔
1774
                    return;
2✔
1775
                }
2✔
1776
                CHECK(!ec || ec == error::operation_aborted);
136,622✔
1777
                initiate_read();
136,622✔
1778
            };
136,622✔
1779
            socket_2.async_read(read_buffer, read_buffer_size, rab, handler);
136,624✔
1780
        };
136,624✔
1781
        initiate_read();
2✔
1782

1✔
1783
        auto thread_func = [&] {
2✔
1784
            try {
2✔
1785
                service_2.run();
2✔
1786
            }
2✔
1787
            catch (...) {
1✔
1788
                socket_2.close();
×
1789
                throw;
×
1790
            }
×
1791
        };
2✔
1792
        ThreadWrapper thread;
2✔
1793
        thread.start(thread_func);
2✔
1794

1✔
1795
        const size_t write_buffer_size = 1024;
2✔
1796
        const char write_buffer[write_buffer_size] = {'\0'};
2✔
1797
        size_t num_bytes_to_write = 0x4000000; // 64 MiB
2✔
1798
        size_t num_bytes_written = 0;
2✔
1799
        while (num_bytes_written < num_bytes_to_write) {
262,206✔
1800
            size_t n =
262,204✔
1801
                std::min(random.draw_int<size_t>(1, write_buffer_size), num_bytes_to_write - num_bytes_written);
262,204✔
1802
            socket_1.write(write_buffer, n);
262,204✔
1803
            num_bytes_written += n;
262,204✔
1804
            service_2.post([&](Status) {
262,204✔
1805
                socket_2.cancel();
262,204✔
1806
            });
262,204✔
1807
        }
262,204✔
1808
        socket_1.close();
2✔
1809

1✔
1810
        CHECK_NOT(thread.join());
2✔
1811
        CHECK_EQUAL(num_bytes_written, num_bytes_read);
2✔
1812
    }
2✔
1813
}
2✔
1814

1815

1816
TEST(Network_StressTest)
1817
{
2✔
1818
    network::Service service_1, service_2;
2✔
1819
    network::Socket socket_1{service_1}, socket_2{service_2};
2✔
1820
    connect_sockets(socket_1, socket_2);
2✔
1821
    constexpr size_t original_size = 0x100000; // 1MiB
2✔
1822
    std::unique_ptr<char[]> original_1, original_2;
2✔
1823
    original_1.reset(new char[original_size]);
2✔
1824
    original_2.reset(new char[original_size]);
2✔
1825
    {
2✔
1826
        std::mt19937_64 prng{std::random_device()()};
2✔
1827
        std::uniform_int_distribution<int> dist(std::numeric_limits<char>::min(), std::numeric_limits<char>::max());
2✔
1828
        log("Initializing...");
2✔
1829
        for (size_t i = 0; i < original_size; ++i)
2,097,154✔
1830
            original_1[i] = dist(prng);
2,097,152✔
1831
        for (size_t i = 0; i < original_size; ++i)
2,097,154✔
1832
            original_2[i] = dist(prng);
2,097,152✔
1833
        log("Initialized");
2✔
1834
    }
2✔
1835

1✔
1836
    struct Stats {
2✔
1837
        std::uint_fast64_t num_cancellations = 0;
2✔
1838
        std::uint_fast64_t num_reads = 0, num_canceled_reads = 0;
2✔
1839
        std::uint_fast64_t num_writes = 0, num_canceled_writes = 0;
2✔
1840
    };
2✔
1841

1✔
1842
#ifdef _WIN32 // slow
1843
    constexpr int num_cycles = 16;
1844
#else
1845
    constexpr int num_cycles = 512;
2✔
1846
#endif
2✔
1847
    auto thread = [&](int id, network::Socket& socket, const char* read_original, const char* write_original,
2✔
1848
                      Stats& stats) {
4✔
1849
        std::unique_ptr<char[]> read_buffer{new char[original_size]};
4✔
1850
        std::mt19937_64 prng{std::random_device()()};
4✔
1851
        std::uniform_int_distribution<size_t> read_write_size_dist(1, 32 * 1024);
4✔
1852
        std::uniform_int_distribution<int> delayed_read_write_dist(0, 49);
4✔
1853
        network::Service& service = socket.get_service();
4✔
1854
        network::DeadlineTimer cancellation_timer{service};
4✔
1855
        network::DeadlineTimer read_timer{service};
4✔
1856
        network::DeadlineTimer write_timer{service};
4✔
1857
        std::uint_fast64_t microseconds_per_cancellation = 10;
4✔
1858
        bool progress = false;
4✔
1859
        bool read_done = false, write_done = false;
4✔
1860
        realm::util::UniqueFunction<void()> shedule_cancellation = [&] {
233,284✔
1861
            if (progress) {
233,284✔
1862
                microseconds_per_cancellation /= 2;
130,707✔
1863
                progress = false;
130,707✔
1864
            }
130,707✔
1865
            else {
102,577✔
1866
                microseconds_per_cancellation *= 2;
102,577✔
1867
            }
102,577✔
1868
            if (microseconds_per_cancellation < 10)
233,284✔
1869
                microseconds_per_cancellation = 10;
28,008✔
1870
            cancellation_timer.async_wait(std::chrono::microseconds(microseconds_per_cancellation),
233,284✔
1871
                                          [&](Status status) {
233,299✔
1872
                                              REALM_ASSERT(status.is_ok() || status == ErrorCodes::OperationAborted);
232,932✔
1873
                                              if (status == ErrorCodes::OperationAborted)
232,932✔
1874
                                                  return;
4✔
1875
                                              if (read_done && write_done)
232,928✔
1876
                                                  return;
×
1877
                                              socket.cancel();
232,928✔
1878
                                              ++stats.num_cancellations;
232,928✔
1879
                                              shedule_cancellation();
232,928✔
1880
                                          });
232,928✔
1881
        };
233,284✔
1882
        shedule_cancellation();
4✔
1883
        char* read_begin = read_buffer.get();
4✔
1884
        char* read_end = read_buffer.get() + original_size;
4✔
1885
        int num_read_cycles = 0;
4✔
1886
        realm::util::UniqueFunction<void()> read = [&] {
347,969✔
1887
            if (read_begin == read_end) {
347,969✔
1888
                //                log("<R%1>", id);
1,024✔
1889
                CHECK(std::equal(read_original, read_original + original_size, read_buffer.get()));
2,048✔
1890
                ++num_read_cycles;
2,048✔
1891
                if (num_read_cycles == num_cycles) {
2,048✔
1892
                    log("End of read %1", id);
4✔
1893
                    read_done = true;
4✔
1894
                    if (write_done)
4✔
1895
                        cancellation_timer.cancel();
2✔
1896
                    return;
4✔
1897
                }
4✔
1898
                read_begin = read_buffer.get();
2,044✔
1899
                read_end = read_buffer.get() + original_size;
2,044✔
1900
            }
2,044✔
1901
            auto handler = [&](std::error_code ec, size_t n) {
347,967✔
1902
                REALM_ASSERT(!ec || ec == error::operation_aborted);
347,220✔
1903
                ++stats.num_reads;
347,220✔
1904
                if (ec == error::operation_aborted) {
347,220✔
1905
                    ++stats.num_canceled_reads;
211,619✔
1906
                }
211,619✔
1907
                else {
135,601✔
1908
                    read_begin += n;
135,601✔
1909
                    progress = true;
135,601✔
1910
                }
135,601✔
1911
                if (delayed_read_write_dist(prng) == 0) {
347,220✔
1912
                    read_timer.async_wait(std::chrono::microseconds(100), [&](Status status) {
7,045✔
1913
                        REALM_ASSERT(status.is_ok());
7,045✔
1914
                        read();
7,045✔
1915
                    });
7,045✔
1916
                }
7,045✔
1917
                else {
340,175✔
1918
                    read();
340,175✔
1919
                }
340,175✔
1920
            };
347,220✔
1921
            char* buffer = read_begin;
347,965✔
1922
            size_t size = read_write_size_dist(prng);
347,965✔
1923
            size_t max_size = read_end - read_begin;
347,965✔
1924
            if (size > max_size)
347,965✔
1925
                size = max_size;
5,436✔
1926
            socket.async_read_some(buffer, size, std::move(handler));
347,965✔
1927
        };
347,965✔
1928
        read();
4✔
1929
        const char* write_begin = write_original;
4✔
1930
        const char* write_end = write_original + original_size;
4✔
1931
        int num_write_cycles = 0;
4✔
1932
        realm::util::UniqueFunction<void()> write = [&] {
344,109✔
1933
            if (write_begin == write_end) {
344,109✔
1934
                //                log("<W%1>", id);
1,024✔
1935
                ++num_write_cycles;
2,048✔
1936
                if (num_write_cycles == num_cycles) {
2,048✔
1937
                    log("End of write %1", id);
4✔
1938
                    write_done = true;
4✔
1939
                    if (read_done)
4✔
1940
                        cancellation_timer.cancel();
2✔
1941
                    socket.shutdown(network::Socket::shutdown_send);
4✔
1942
                    log("Properly shut down %1", id);
4✔
1943
                    return;
4✔
1944
                }
4✔
1945
                write_begin = write_original;
2,044✔
1946
                write_end = write_original + original_size;
2,044✔
1947
            }
2,044✔
1948
            auto handler = [&](std::error_code ec, size_t n) {
344,107✔
1949
                REALM_ASSERT(!ec || ec == error::operation_aborted);
343,406✔
1950
                ++stats.num_writes;
343,406✔
1951
                if (ec == error::operation_aborted) {
343,406✔
1952
                    ++stats.num_canceled_writes;
210,252✔
1953
                }
210,252✔
1954
                else {
133,154✔
1955
                    write_begin += n;
133,154✔
1956
                    progress = true;
133,154✔
1957
                }
133,154✔
1958
                if (delayed_read_write_dist(prng) == 0) {
343,406✔
1959
                    write_timer.async_wait(std::chrono::microseconds(100), [&](Status status) {
6,900✔
1960
                        REALM_ASSERT(status.is_ok());
6,899✔
1961
                        write();
6,899✔
1962
                    });
6,899✔
1963
                }
6,900✔
1964
                else {
336,506✔
1965
                    write();
336,506✔
1966
                }
336,506✔
1967
            };
343,406✔
1968
            const char* data = write_begin;
344,105✔
1969
            size_t size = read_write_size_dist(prng);
344,105✔
1970
            size_t max_size = write_end - write_begin;
344,105✔
1971
            if (size > max_size)
344,105✔
1972
                size = max_size;
5,258✔
1973
            socket.async_write_some(data, size, std::move(handler));
344,105✔
1974
        };
344,105✔
1975
        write();
4✔
1976
        service.run();
4✔
1977
    };
4✔
1978

1✔
1979
    Stats stats_1, stats_2;
2✔
1980
    std::thread thread_1{[&] {
2✔
1981
        thread(1, socket_1, original_1.get(), original_2.get(), stats_1);
2✔
1982
    }};
2✔
1983
    std::thread thread_2{[&] {
2✔
1984
        thread(2, socket_2, original_2.get(), original_1.get(), stats_2);
2✔
1985
    }};
2✔
1986
    thread_1.join();
2✔
1987
    thread_2.join();
2✔
1988

1✔
1989
    char ch;
2✔
1990
    CHECK_SYSTEM_ERROR(socket_1.read_some(&ch, 1), MiscExtErrors::end_of_input);
2✔
1991
    CHECK_SYSTEM_ERROR(socket_2.read_some(&ch, 1), MiscExtErrors::end_of_input);
2✔
1992

1✔
1993
    log("Cancellations: %1, %2", stats_1.num_cancellations, stats_2.num_cancellations);
2✔
1994
    log("Reads:  %1 (%2 canceled), %3 (%4 canceled)", stats_1.num_reads, stats_1.num_canceled_reads,
2✔
1995
        stats_2.num_reads, stats_2.num_canceled_reads);
2✔
1996
    log("Writes: %1 (%2 canceled), %3 (%4 canceled)", stats_1.num_writes, stats_1.num_canceled_writes,
2✔
1997
        stats_2.num_writes, stats_2.num_canceled_writes);
2✔
1998
}
2✔
1999

2000

2001
TEST(Sync_Trigger_Basics)
2002
{
2✔
2003
    network::Service service;
2✔
2004

1✔
2005
    // Check that triggering works
1✔
2006
    bool was_triggered = false;
2✔
2007
    auto func = [&](realm::Status) {
4✔
2008
        was_triggered = true;
4✔
2009
    };
4✔
2010
    Trigger<network::Service> trigger(&service, std::move(func));
2✔
2011
    trigger.trigger();
2✔
2012
    service.run();
2✔
2013
    CHECK(was_triggered);
2✔
2014

1✔
2015
    // Check that the function is not called without triggering
1✔
2016
    was_triggered = false;
2✔
2017
    service.run();
2✔
2018
    CHECK_NOT(was_triggered);
2✔
2019

1✔
2020
    // Check double-triggering
1✔
2021
    was_triggered = false;
2✔
2022
    trigger.trigger();
2✔
2023
    trigger.trigger();
2✔
2024
    service.run();
2✔
2025
    CHECK(was_triggered);
2✔
2026

1✔
2027
    // Check that retriggering from triggered function works
1✔
2028
    realm::util::UniqueFunction<void()> func_2;
2✔
2029
    Trigger<network::Service> trigger_2(&service, [&](realm::Status) {
4✔
2030
        func_2();
4✔
2031
    });
4✔
2032
    was_triggered = false;
2✔
2033
    bool was_triggered_twice = false;
2✔
2034
    func_2 = [&] {
4✔
2035
        if (was_triggered) {
4✔
2036
            was_triggered_twice = true;
2✔
2037
        }
2✔
2038
        else {
2✔
2039
            was_triggered = true;
2✔
2040
            trigger_2.trigger();
2✔
2041
        }
2✔
2042
    };
4✔
2043
    trigger_2.trigger();
2✔
2044
    service.run();
2✔
2045
    CHECK(was_triggered_twice);
2✔
2046

1✔
2047
    // Check that the function is not called after destruction of the Trigger
1✔
2048
    // object
1✔
2049
    was_triggered = false;
2✔
2050
    {
2✔
2051
        auto func_3 = [&](realm::Status) {
1✔
2052
            was_triggered = true;
×
2053
        };
×
2054
        Trigger<network::Service> trigger_3(&service, std::move(func_3));
2✔
2055
        trigger_3.trigger();
2✔
2056
    }
2✔
2057
    service.run();
2✔
2058
    CHECK_NOT(was_triggered);
2✔
2059

1✔
2060
    // Check that two functions can be triggered in an overlapping fashion
1✔
2061
    bool was_triggered_4 = false;
2✔
2062
    bool was_triggered_5 = false;
2✔
2063
    auto func_4 = [&](realm::Status) {
2✔
2064
        was_triggered_4 = true;
2✔
2065
    };
2✔
2066
    auto func_5 = [&](realm::Status) {
2✔
2067
        was_triggered_5 = true;
2✔
2068
    };
2✔
2069
    Trigger<network::Service> trigger_4(&service, std::move(func_4));
2✔
2070
    Trigger<network::Service> trigger_5(&service, std::move(func_5));
2✔
2071
    trigger_4.trigger();
2✔
2072
    trigger_5.trigger();
2✔
2073
    service.run();
2✔
2074
    CHECK(was_triggered_4);
2✔
2075
    CHECK(was_triggered_5);
2✔
2076
}
2✔
2077

2078

2079
TEST(Sync_Trigger_ThreadSafety)
2080
{
2✔
2081
    network::Service service;
2✔
2082
    network::DeadlineTimer keep_alive{service};
2✔
2083
    keep_alive.async_wait(std::chrono::hours(10000), [](Status) {});
2✔
2084
    long n_1 = 0, n_2 = 0;
2✔
2085
    std::atomic<bool> flag{false};
2✔
2086
    auto func = [&](realm::Status) {
19,389✔
2087
        ++n_1;
19,389✔
2088
        if (flag)
19,389✔
2089
            ++n_2;
2✔
2090
    };
19,389✔
2091
    Trigger<network::Service> trigger(&service, std::move(func));
2✔
2092
    ThreadWrapper thread;
2✔
2093
    thread.start([&] {
2✔
2094
        service.run();
2✔
2095
    });
2✔
2096
    long m = 1000000;
2✔
2097
    for (long i = 0; i < m; ++i)
2,000,002✔
2098
        trigger.trigger();
2,000,000✔
2099
    flag = true;
2✔
2100
    trigger.trigger();
2✔
2101
    service.post([&](Status) {
2✔
2102
        keep_alive.cancel();
2✔
2103
    });
2✔
2104
    CHECK_NOT(thread.join());
2✔
2105
    CHECK_GREATER_EQUAL(n_1, 1);
2✔
2106
    CHECK_LESS_EQUAL(n_1, m + 1);
2✔
2107
    CHECK_GREATER_EQUAL(n_2, 1);
2✔
2108
    CHECK_LESS_EQUAL(n_2, 2);
2✔
2109
}
2✔
2110

2111

2112
TEST(Network_AsyncResolve_Basics)
2113
{
2✔
2114
    network::Service service;
2✔
2115
    network::Resolver resolver{service};
2✔
2116
    network::Resolver::Query query("localhost", "");
2✔
2117
    bool was_called = false;
2✔
2118
    auto handler = [&](std::error_code ec, network::Endpoint::List endpoints) {
2✔
2119
        CHECK_NOT(ec);
2✔
2120
        CHECK_GREATER(endpoints.size(), 0);
2✔
2121
        was_called = true;
2✔
2122
    };
2✔
2123
    resolver.async_resolve(query, std::move(handler));
2✔
2124
    service.run();
2✔
2125
    CHECK(was_called);
2✔
2126
}
2✔
2127

2128

2129
TEST(Network_AsyncResolve_Cancellation)
2130
{
2✔
2131
    network::Service service;
2✔
2132
    network::Resolver resolver{service};
2✔
2133
    network::Resolver::Query query("localhost", "");
2✔
2134
    bool was_called = false;
2✔
2135
    auto handler = [&](std::error_code ec, network::Endpoint::List) {
2✔
2136
        CHECK_EQUAL(error::operation_aborted, ec);
2✔
2137
        was_called = true;
2✔
2138
    };
2✔
2139
    resolver.async_resolve(query, std::move(handler));
2✔
2140
    resolver.cancel();
2✔
2141
    service.run();
2✔
2142
    CHECK(was_called);
2✔
2143
}
2✔
2144

2145
} // unnamed namespace
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc