• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / daniel.tabacaru_937

27 Sep 2024 06:53AM UTC coverage: 91.124% (+0.02%) from 91.109%
daniel.tabacaru_937

Pull #7983

Evergreen

danieltabacaru
Small refactoring
Pull Request #7983: RCORE-2126 Clear incomplete bootstraps when the connection is established

102826 of 181492 branches covered (56.66%)

49 of 50 new or added lines in 4 files covered. (98.0%)

67 existing lines in 16 files now uncovered.

217244 of 238404 relevant lines covered (91.12%)

5968864.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.14
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
struct Shared {
84
    Mutex m_mutex;
85
    int m_value;
86

87
    // 10000 takes less than 0.1 sec
88
    void increment_10000_times()
89
    {
20✔
90
        for (int i = 0; i < 10000; ++i) {
199,828✔
91
            LockGuard lock(m_mutex);
199,808✔
92
            ++m_value;
199,808✔
93
        }
199,808✔
94
    }
20✔
95

96
    void increment_10000_times2()
97
    {
20✔
98
        for (int i = 0; i < 10000; ++i) {
199,240✔
99
            LockGuard lock(m_mutex);
199,220✔
100
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
101
            // could assemble into 'inc [addr]' which has very tiny gap
102
            double f = m_value;
199,220✔
103
            f += 1.;
199,220✔
104
            m_value = int(f);
199,220✔
105
        }
199,220✔
106
    }
20✔
107
};
108

109
struct SharedWithEmulated {
110
    InterprocessMutex m_mutex;
111
    InterprocessMutex::SharedPart m_shared_part;
112
    int m_value;
113

114
    SharedWithEmulated(std::string name)
115
    {
2✔
116
        m_mutex.set_shared_part(m_shared_part, name, "0");
2✔
117
    }
2✔
118
    ~SharedWithEmulated()
119
    {
2✔
120
        m_mutex.release_shared_part();
2✔
121
    }
2✔
122

123
    // 10000 takes less than 0.1 sec
124
    void increment_10000_times()
125
    {
20✔
126
        for (int i = 0; i < 10000; ++i) {
199,065✔
127
            std::lock_guard<InterprocessMutex> lock(m_mutex);
199,045✔
128
            ++m_value;
199,045✔
129
        }
199,045✔
130
    }
20✔
131

132
    void increment_10000_times2()
133
    {
×
134
        for (int i = 0; i < 10000; ++i) {
×
135
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
136
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
137
            // could assemble into 'inc [addr]' which has very tiny gap
×
138
            double f = m_value;
×
139
            f += 1.;
×
140
            m_value = int(f);
×
141
        }
×
142
    }
×
143
};
144

145
struct Robust {
146
    RobustMutex m_mutex;
147
    bool m_recover_called;
148

149
    void simulate_death()
150
    {
×
151
        m_mutex.lock(std::bind(&Robust::recover, this));
×
152
        // Do not unlock
×
153
    }
×
154

155
    void simulate_death_during_recovery()
156
    {
×
157
        bool no_thread_has_died = m_mutex.low_level_lock();
×
158
        if (!no_thread_has_died)
×
159
            m_recover_called = true;
×
160
        // Do not unlock
×
161
    }
×
162

163
    void recover()
164
    {
×
165
        m_recover_called = true;
×
166
    }
×
167

168
    void recover_throw()
169
    {
×
170
        m_recover_called = true;
×
171
        throw RobustMutex::NotRecoverable();
×
172
    }
×
173
};
174

175

176
class QueueMonitor {
177
public:
178
    QueueMonitor()
179
        : m_closed(false)
1✔
180
    {
2✔
181
    }
2✔
182

183
    bool get(int& value)
184
    {
63,920✔
185
        LockGuard lock(m_mutex);
63,920✔
186
        for (;;) {
93,459✔
187
            if (!m_queue.empty())
93,459✔
188
                break;
64,000✔
189
            if (m_closed)
29,459✔
190
                return false;
64✔
191
            m_nonempty_or_closed.wait(lock); // Wait for producer
29,395✔
192
        }
29,395✔
193
        bool was_full = m_queue.size() == max_queue_size;
63,856✔
194
        value = m_queue.front();
63,856✔
195
        m_queue.pop();
63,856✔
196
        if (was_full)
63,856✔
197
            m_nonfull.notify_all(); // Resume a waiting producer
10,624✔
198
        return true;
63,856✔
199
    }
63,920✔
200

201
    void put(int value)
202
    {
63,877✔
203
        LockGuard lock(m_mutex);
63,877✔
204
        while (m_queue.size() == max_queue_size)
91,185✔
205
            m_nonfull.wait(lock); // Wait for consumer
27,308✔
206
        bool was_empty = m_queue.empty();
63,877✔
207
        m_queue.push(value);
63,877✔
208
        if (was_empty)
63,877✔
209
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
11,210✔
210
    }
63,877✔
211

212
    void close()
213
    {
2✔
214
        LockGuard lock(m_mutex);
2✔
215
        m_closed = true;
2✔
216
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
2✔
217
    }
2✔
218

219
private:
220
    Mutex m_mutex;
221
    CondVar m_nonempty_or_closed, m_nonfull;
222
    std::queue<int> m_queue;
223
    bool m_closed;
224

225
    static const unsigned max_queue_size = 8;
226
};
227

228
void producer_thread(QueueMonitor* queue, int value)
229
{
64✔
230
    for (int i = 0; i < 1000; ++i) {
63,965✔
231
        queue->put(value);
63,901✔
232
    }
63,901✔
233
}
64✔
234

235
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
236
{
64✔
237
    for (;;) {
63,957✔
238
        int value = 0;
63,957✔
239
        bool closed = !queue->get(value);
63,957✔
240
        if (closed)
63,957✔
241
            return;
64✔
242
        ++consumed_counts[value];
63,893✔
243
    }
63,893✔
244
}
64✔
245

246

247
} // anonymous namespace
248

249

250
TEST(Thread_MutexLock)
251
{
2✔
252
    Mutex mutex;
2✔
253
    {
2✔
254
        LockGuard lock(mutex);
2✔
255
    }
2✔
256
    {
2✔
257
        LockGuard lock(mutex);
2✔
258
    }
2✔
259
}
2✔
260

261
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
262
TEST(Thread_ProcessSharedMutex)
263
{
264
    Mutex mutex((Mutex::process_shared_tag()));
265
    {
266
        LockGuard lock(mutex);
267
    }
268
    {
269
        LockGuard lock(mutex);
270
    }
271
}
272
#endif
273

274
TEST(Thread_CriticalSection)
275
{
2✔
276
    Shared shared;
2✔
277
    shared.m_value = 0;
2✔
278
    std::thread threads[10];
2✔
279
    for (int i = 0; i < 10; ++i)
22✔
280
        threads[i] = std::thread(&Shared::increment_10000_times, &shared);
20✔
281
    for (int i = 0; i < 10; ++i)
22✔
282
        threads[i].join();
20✔
283
    CHECK_EQUAL(100000, shared.m_value);
2✔
284
}
2✔
285

286

287
TEST(Thread_EmulatedMutex_CriticalSection)
288
{
2✔
289
    TEST_PATH(path);
2✔
290
    SharedWithEmulated shared(path);
2✔
291
    shared.m_value = 0;
2✔
292
    std::thread threads[10];
2✔
293
    for (int i = 0; i < 10; ++i)
22✔
294
        threads[i] = std::thread(&SharedWithEmulated::increment_10000_times, &shared);
20✔
295
    for (int i = 0; i < 10; ++i)
22✔
296
        threads[i].join();
20✔
297
    CHECK_EQUAL(100000, shared.m_value);
2✔
298
}
2✔
299

300

301
TEST(Thread_CriticalSection2)
302
{
2✔
303
    Shared shared;
2✔
304
    shared.m_value = 0;
2✔
305
    std::thread threads[10];
2✔
306
    for (int i = 0; i < 10; ++i)
22✔
307
        threads[i] = std::thread(&Shared::increment_10000_times2, &shared);
20✔
308
    for (int i = 0; i < 10; ++i)
22✔
309
        threads[i].join();
20✔
310
    CHECK_EQUAL(100000, shared.m_value);
2✔
311
}
2✔
312

313

314
// Todo. Not supported on Windows in particular? Keywords: winbug
315
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
316
{
×
317
    // Abort if robust mutexes are not supported on the current
318
    // platform. Otherwise we would probably get into a dead-lock.
319
    if (!RobustMutex::is_robust_on_this_platform)
×
320
        return;
×
321

322
    Robust robust;
×
323

324
    // Check that lock/unlock cycle works and does not involve recovery
325
    robust.m_recover_called = false;
×
326
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
327
    CHECK(!robust.m_recover_called);
×
328
    robust.m_mutex.unlock();
×
329
    robust.m_recover_called = false;
×
330
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
331
    CHECK(!robust.m_recover_called);
×
332
    robust.m_mutex.unlock();
×
333

334
    // Check recovery by simulating a death
335
    robust.m_recover_called = false;
×
336
    std::thread(&Robust::simulate_death, &robust).join();
×
337
    CHECK(!robust.m_recover_called);
×
338
    robust.m_recover_called = false;
×
339
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
340
    CHECK(robust.m_recover_called);
×
341
    robust.m_mutex.unlock();
×
342

343
    // One more round of recovery
344
    robust.m_recover_called = false;
×
345
    std::thread(&Robust::simulate_death, &robust).join();
×
346
    CHECK(!robust.m_recover_called);
×
347
    robust.m_recover_called = false;
×
348
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
349
    CHECK(robust.m_recover_called);
×
350
    robust.m_mutex.unlock();
×
351

352
    // Simulate a case where recovery fails or is impossible
353
    robust.m_recover_called = false;
×
354
    std::thread(&Robust::simulate_death, &robust).join();
×
355
    CHECK(!robust.m_recover_called);
×
356
    robust.m_recover_called = false;
×
357
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
358
    CHECK(robust.m_recover_called);
×
359

360
    // Check that successive attempts at locking will throw
361
    robust.m_recover_called = false;
×
362
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
363
    CHECK(!robust.m_recover_called);
×
364
    robust.m_recover_called = false;
×
365
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
366
    CHECK(!robust.m_recover_called);
×
367
}
×
368

369

370
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
371
{
×
372
    // Abort if robust mutexes are not supported on the current
373
    // platform. Otherwise we would probably get into a dead-lock.
374
    if (!RobustMutex::is_robust_on_this_platform)
×
375
        return;
×
376

377
    // This test checks that death during recovery causes a robust
378
    // mutex to stay in the 'inconsistent' state.
379

380
    Robust robust;
×
381

382
    // Bring the mutex into the 'inconsistent' state
383
    robust.m_recover_called = false;
×
384
    std::thread(&Robust::simulate_death, &robust).join();
×
385
    CHECK(!robust.m_recover_called);
×
386

387
    // Die while recovering
388
    robust.m_recover_called = false;
×
389
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
390
    CHECK(robust.m_recover_called);
×
391

392
    // The mutex is still in the 'inconsistent' state if another
393
    // attempt at locking it calls the recovery function
394
    robust.m_recover_called = false;
×
395
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
396
    CHECK(robust.m_recover_called);
×
397
    robust.m_mutex.unlock();
×
398

399
    // Now that the mutex is fully recovered, we should be able to
400
    // carry out a regular round of lock/unlock
401
    robust.m_recover_called = false;
×
402
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
403
    CHECK(!robust.m_recover_called);
×
404
    robust.m_mutex.unlock();
×
405

406
    // Try a double death during recovery
407
    robust.m_recover_called = false;
×
408
    std::thread(&Robust::simulate_death, &robust).join();
×
409
    CHECK(!robust.m_recover_called);
×
410
    robust.m_recover_called = false;
×
411
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
412
    CHECK(robust.m_recover_called);
×
413
    robust.m_recover_called = false;
×
414
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
415
    CHECK(robust.m_recover_called);
×
416
    robust.m_recover_called = false;
×
417
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
418
    CHECK(robust.m_recover_called);
×
419
    robust.m_mutex.unlock();
×
420
    robust.m_recover_called = false;
×
421
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
422
    CHECK(!robust.m_recover_called);
×
423
    robust.m_mutex.unlock();
×
424
}
×
425

426

427
TEST(Thread_CondVar)
428
{
2✔
429
    QueueMonitor queue;
2✔
430
    const int num_producers = 32;
2✔
431
    const int num_consumers = 32;
2✔
432
    std::thread producers[num_producers], consumers[num_consumers];
2✔
433
    int consumed_counts[num_consumers][num_producers];
2✔
434
    memset(consumed_counts, 0, sizeof consumed_counts);
2✔
435

436
    for (int i = 0; i < num_producers; ++i)
66✔
437
        producers[i] = std::thread(&producer_thread, &queue, i);
64✔
438
    for (int i = 0; i < num_consumers; ++i)
66✔
439
        consumers[i] = std::thread(&consumer_thread, &queue, &consumed_counts[i][0]);
64✔
440
    for (int i = 0; i < num_producers; ++i)
66✔
441
        producers[i].join();
64✔
442
    queue.close(); // Stop consumers when queue is empty
2✔
443
    for (int i = 0; i < num_consumers; ++i)
66✔
444
        consumers[i].join();
64✔
445

446
    for (int i = 0; i < num_producers; ++i) {
66✔
447
        int n = 0;
64✔
448
        for (int j = 0; j < num_consumers; ++j)
2,112✔
449
            n += consumed_counts[j][i];
2,048✔
450
        CHECK_EQUAL(1000, n);
64✔
451
    }
64✔
452
}
2✔
453

454
TEST(Thread_MutexTryLock)
455
{
2✔
456
    Mutex base_mutex;
2✔
457
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
2✔
458

459
    std::condition_variable cv;
2✔
460
    std::mutex cv_lock;
2✔
461

462
    // basic same thread try_lock
463
    CHECK(m.try_lock());
2✔
464
    CHECK(m.owns_lock());
2✔
465
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
2✔
466
    m.unlock();
2✔
467

468
    bool init_done = false;
2✔
469
    auto do_async = [&]() {
2✔
470
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
2✔
471
        CHECK(!mutex2.owns_lock());
2✔
472
        CHECK(!mutex2.try_lock());
2✔
473
        {
2✔
474
            std::lock_guard<std::mutex> guard(cv_lock);
2✔
475
            init_done = true;
2✔
476
        }
2✔
477
        cv.notify_one();
2✔
478
        while (!mutex2.try_lock()) {
4✔
479
            millisleep(1);
2✔
480
        }
2✔
481
        CHECK(mutex2.owns_lock());
2✔
482
        mutex2.unlock();
2✔
483
    };
2✔
484

485
    // Check basic locking across threads.
486
    CHECK(!m.owns_lock());
2✔
487
    CHECK(m.try_lock());
2✔
488
    CHECK(m.owns_lock());
2✔
489
    std::thread thread(do_async);
2✔
490
    {
2✔
491
        std::unique_lock<std::mutex> guard(cv_lock);
2✔
492
        cv.wait(guard, [&] {
4✔
493
            return init_done;
4✔
494
        });
4✔
495
    }
2✔
496
    m.unlock();
2✔
497
    thread.join();
2✔
498
}
2✔
499

500
TEST(Thread_RobustMutexTryLock)
501
{
2✔
502
    // Abort if robust mutexes are not supported on the current
503
    // platform. Otherwise we would probably get into a dead-lock.
504
    if (!RobustMutex::is_robust_on_this_platform)
2✔
505
        return;
2✔
506

507
    RobustMutex m;
×
508
    int times_recover_function_was_called = 0;
×
509

510
    auto recover_function = [&]() {
×
511
        ++times_recover_function_was_called;
×
512
    };
×
513
    // basic same thread try_lock
514
    CHECK(m.try_lock(recover_function));
×
515
    CHECK(!m.try_lock(recover_function));
×
516
    m.unlock();
×
517
    CHECK(times_recover_function_was_called == 0);
×
518

519
    bool init_done = false;
×
520
    std::mutex control_mutex;
×
521
    std::condition_variable control_cv;
×
522

523
    auto do_async = [&]() {
×
524
        CHECK(!m.try_lock(recover_function));
×
525
        {
×
526
            std::lock_guard<std::mutex> guard(control_mutex);
×
527
            init_done = true;
×
528
        }
×
529
        control_cv.notify_one();
×
530
        while (!m.try_lock(recover_function)) {
×
531
            millisleep(1);
×
532
        }
×
533
        // exit the thread with the lock held to check robustness
534
    };
×
535

536
    // Check basic locking across threads.
537
    CHECK(m.try_lock(recover_function));
×
538
    std::thread thread(do_async);
×
539
    {
×
540
        std::unique_lock<std::mutex> lock(control_mutex);
×
541
        control_cv.wait(lock, [&] {
×
542
            return init_done;
×
543
        });
×
544
    }
×
545
    m.unlock();
×
546
    thread.join();
×
547
    CHECK(times_recover_function_was_called == 0);
×
548
    // at this point the thread that obtained the mutex is dead with the lock
549
    CHECK(m.try_lock(recover_function));
×
550
    CHECK(times_recover_function_was_called == 1);
×
551
    m.unlock();
×
552
}
×
553

554
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
555
               // because we are going to switch to native API soon and discard win32-pthread entirely
556
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
557
{
2✔
558
    InterprocessMutex::SharedPart mutex_part;
2✔
559

560
    InterprocessMutex m;
2✔
561
    TEST_PATH(path);
2✔
562
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
2✔
563
    m.set_shared_part(mutex_part, path, mutex_file_name);
2✔
564

565
    // basic same thread try_lock
566
    CHECK(m.try_lock());
2✔
567
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
2✔
568
    m.unlock();
2✔
569

570
    bool init_done = false;
2✔
571
    std::condition_variable cv;
2✔
572
    std::mutex cv_mutex;
2✔
573
    auto do_async = [&]() {
2✔
574
        InterprocessMutex m2;
2✔
575
        m2.set_shared_part(mutex_part, path, mutex_file_name);
2✔
576

577
        CHECK(!m2.try_lock());
2✔
578
        {
2✔
579
            std::lock_guard<std::mutex> guard(cv_mutex);
2✔
580
            init_done = true;
2✔
581
        }
2✔
582
        cv.notify_one();
2✔
583
        while (!m2.try_lock()) {
4✔
584
            millisleep(1);
2✔
585
        }
2✔
586
        m2.unlock();
2✔
587
    };
2✔
588

589
    // Check basic locking across threads.
590
    CHECK(m.try_lock());
2✔
591
    std::thread thread(do_async);
2✔
592
    {
2✔
593
        std::unique_lock<std::mutex> ul(cv_mutex);
2✔
594
        cv.wait(ul, [&] {
4✔
595
            return init_done;
4✔
596
        });
4✔
597
    }
2✔
598
    m.unlock();
2✔
599
    thread.join();
2✔
600
    m.release_shared_part();
2✔
601
}
2✔
602

603
#endif
604

605
// Detect and flag trivial implementations of condvars.
606
namespace {
607

608
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
609
{
2✔
610
    millisleep(200);
2✔
611
    {
2✔
612
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
613
        *signals = 1;
2✔
614
        // wakeup any waiters
615
        cv->notify_all();
2✔
616
    }
2✔
617
    // exit scope to allow waiters to get lock
618
    millisleep(200);
2✔
619
    {
2✔
620
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
621
        *signals = 2;
2✔
622
        // wakeup any waiters, 2nd time
623
        cv->notify_all();
2✔
624
    }
2✔
625
    millisleep(200);
2✔
626
    {
2✔
627
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
628
        *signals = 3;
2✔
629
        // wakeup any waiters, 2nd time
630
        cv->notify_all();
2✔
631
    }
2✔
632
    millisleep(200);
2✔
633
    {
2✔
634
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
635
        *signals = 4;
2✔
636
    }
2✔
637
}
2✔
638

639
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
640
{
2✔
641
    millisleep(1000);
2✔
642
    *signal_state = 2;
2✔
643
    std::lock_guard<InterprocessMutex> l(*mutex);
2✔
644
    cv->notify_all();
2✔
645
}
2✔
646

647

648
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
649
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
650
{
20✔
651
    std::lock_guard<InterprocessMutex> l(*mutex);
20✔
652

653
    {
20✔
654
        std::lock_guard<std::mutex> guard(*control_mutex);
20✔
655
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
20✔
656
    }
20✔
657
    control_cv->notify_one();
20✔
658

659
    cv->wait(*mutex, nullptr);
20✔
660
}
20✔
661
} // namespace
662

663
// Verify, that a wait on a condition variable actually waits
664
// - this test relies on assumptions about scheduling, which
665
//   may not hold on a heavily loaded system.
666
NONCONCURRENT_TEST(Thread_CondvarWaits)
667
{
2✔
668
    int signals = 0;
2✔
669
    InterprocessMutex mutex;
2✔
670
    InterprocessMutex::SharedPart mutex_part;
2✔
671
    InterprocessCondVar changed;
2✔
672
    InterprocessCondVar::SharedPart condvar_part;
2✔
673
    TEST_PATH(path);
2✔
674
    DBOptions default_options;
2✔
675
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
2✔
676
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
2✔
677
    changed.init_shared_part(condvar_part);
2✔
678
    signals = 0;
2✔
679
    std::thread signal_thread(signaller, &signals, &mutex, &changed);
2✔
680
    {
2✔
681
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
682
        changed.wait(mutex, nullptr);
2✔
683
        CHECK_EQUAL(signals, 1);
2✔
684
        changed.wait(mutex, nullptr);
2✔
685
        CHECK_EQUAL(signals, 2);
2✔
686
        changed.wait(mutex, nullptr);
2✔
687
        CHECK_EQUAL(signals, 3);
2✔
688
    }
2✔
689
    signal_thread.join();
2✔
690
    changed.release_shared_part();
2✔
691
    mutex.release_shared_part();
2✔
692
}
2✔
693

694
// Verify that a condition variable looses its signal if no one
695
// is waiting on it
696
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
697
{
2✔
698
    int signal_state = 0;
2✔
699
    InterprocessMutex mutex;
2✔
700
    InterprocessMutex::SharedPart mutex_part;
2✔
701
    InterprocessCondVar changed;
2✔
702
    InterprocessCondVar::SharedPart condvar_part;
2✔
703
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
704
    TEST_PATH(path);
2✔
705
    DBOptions default_options;
2✔
706

707
    // Must have names because default_options.temp_dir is empty string on Windows
708
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
2✔
709
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
2✔
710
    signal_state = 1;
2✔
711
    // send some signals:
712
    {
2✔
713
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
714
        for (int i = 0; i < 10; ++i)
22✔
715
            changed.notify_all();
20✔
716
    }
2✔
717
    // spawn a thread which will later do one more signal in order
718
    // to wake us up.
719
    std::thread signal_thread(wakeup_signaller, &signal_state, &mutex, &changed);
2✔
720
    // Wait for a signal - the signals sent above should be lost, so
721
    // that this wait will actually wait for the thread to signal.
722
    {
2✔
723
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
724
        changed.wait(mutex, 0);
2✔
725
        CHECK_EQUAL(signal_state, 2);
2✔
726
    }
2✔
727
    signal_thread.join();
2✔
728
    changed.release_shared_part();
2✔
729
    mutex.release_shared_part();
2✔
730
}
2✔
731

732

733
// this test hangs, if timeout doesn't work.
734
NONCONCURRENT_TEST(Thread_CondvarTimeout)
735
{
2✔
736
    InterprocessMutex mutex;
2✔
737
    InterprocessMutex::SharedPart mutex_part;
2✔
738
    InterprocessCondVar changed;
2✔
739
    InterprocessCondVar::SharedPart condvar_part;
2✔
740
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
741
    TEST_PATH(path);
2✔
742
    DBOptions default_options;
2✔
743
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
2✔
744
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
745
    struct timespec time_limit;
2✔
746
    timeval tv;
2✔
747
    gettimeofday(&tv, nullptr);
2✔
748
    time_limit.tv_sec = tv.tv_sec;
2✔
749
    time_limit.tv_nsec = tv.tv_usec * 1000;
2✔
750
    time_limit.tv_nsec += 100000000;        // 100 msec wait
2✔
751
    if (time_limit.tv_nsec >= 1000000000) { // overflow
2✔
UNCOV
752
        time_limit.tv_nsec -= 1000000000;
×
UNCOV
753
        time_limit.tv_sec += 1;
×
UNCOV
754
    }
×
755
    {
2✔
756
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
757
        for (int i = 0; i < 5; ++i)
12✔
758
            changed.wait(mutex, &time_limit);
10✔
759
    }
2✔
760
    changed.release_shared_part();
2✔
761
    mutex.release_shared_part();
2✔
762
}
2✔
763

764

765
// test that notify_all will wake up all waiting threads, if there
766
// are many waiters:
767
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
768
{
2✔
769
    InterprocessMutex mutex;
2✔
770
    InterprocessMutex::SharedPart mutex_part;
2✔
771
    InterprocessCondVar changed;
2✔
772
    InterprocessCondVar::SharedPart condvar_part;
2✔
773
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
774
    TEST_PATH(path);
2✔
775
    DBOptions default_options;
2✔
776
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
2✔
777
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
2✔
778

779
    size_t num_threads_holding_lock = 0;
2✔
780
    std::mutex control_mutex;
2✔
781
    std::condition_variable control_cv;
2✔
782

783
    const size_t num_waiters = 10;
2✔
784
    std::thread waiters[num_waiters];
2✔
785
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
786
        waiters[i] = std::thread(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock);
20✔
787
    }
20✔
788
    {
2✔
789
        // allow all waiters to start and obtain the InterprocessCondVar
790
        std::unique_lock<std::mutex> unique_lock(control_mutex);
2✔
791
        control_cv.wait(unique_lock, [&] {
6✔
792
            return num_threads_holding_lock == num_waiters;
6✔
793
        });
6✔
794
    }
2✔
795

796
    mutex.lock();
2✔
797
    changed.notify_all();
2✔
798
    mutex.unlock();
2✔
799

800
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
801
        waiters[i].join();
20✔
802
    }
20✔
803
    changed.release_shared_part();
2✔
804
    mutex.release_shared_part();
2✔
805
}
2✔
806

807

808
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
809
// gap between them where another thread could invoke signal() which could go undetected by the wait.
810
// This test takes more than 3 days with valgrind.
811
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
812
{
×
813
    SHARED_GROUP_TEST_PATH(path);
×
814

815
    const int iter = 10000;
×
816

817
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
818
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
819
    std::vector<std::thread> threads;
×
820
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
821

822
        threads.push_back(std::thread([&]() {
×
823
            InterprocessMutex mutex;
×
824
            InterprocessMutex::SharedPart mutex_part;
×
825
            InterprocessCondVar condvar;
×
826
            InterprocessCondVar::SharedPart condvar_part;
×
827
            DBOptions default_options;
×
828

829
            std::stringstream ss;
×
830
            ss << std::this_thread::get_id();
×
831
            std::string id = ss.str();
×
832

833
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
834
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
835
            InterprocessCondVar::init_shared_part(condvar_part);
×
836

837
            std::atomic<bool> signal(false);
×
838

839
            std::thread t1([&]() {
×
840
                for (int i = 0; i < iter; i++) {
×
841
                    mutex.lock();
×
842
                    signal = true;
×
843

844
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
845
                    // Problem is that we have so many/frequent operating system wait calls in this that they might
846
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create
847
                    // some work that some times willsome times bring the current time slice close to its end.
848

849
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
850
                    // time slice)
851
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL);
×
852

853
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than
854
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
855
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
856
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
857
                        sum = sum + 1;
×
858
                    }
×
859

860
                    condvar.wait(mutex, nullptr);
×
861
                    mutex.unlock();
×
862
                }
×
863
            });
×
864

865
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
866
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
867
            // by both threads hanging infinitely.
868
            std::thread t2([&]() {
×
869
                for (int i = 0; i < iter; i++) {
×
870
                    while (!signal) {
×
871
                    }
×
872
                    signal = false;
×
873
                    mutex.lock();
×
874
                    condvar.notify_all();
×
875
                    mutex.unlock();
×
876
                }
×
877
            });
×
878

879
            t1.join();
×
880
            t2.join();
×
881
        }));
×
882
    }
×
883

884
    for (int i = 0; i < thread_pair_count; i++) {
×
885
        threads[i].join();
×
886
    }
×
887
}
×
888

889
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
890
{
2✔
891
    auto cv = std::make_unique<InterprocessCondVar>();
2✔
892
    InterprocessCondVar::SharedPart condvar_part;
2✔
893
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
894
    TEST_PATH(path);
2✔
895
    DBOptions default_options;
2✔
896
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
897
    std::thread([&] {
2✔
898
        cv.reset();
2✔
899
    }).join();
2✔
900
}
2✔
901

902
#ifdef _WIN32
903
TEST(Thread_Win32InterprocessBackslashes)
904
{
905
    InterprocessMutex mutex;
906
    InterprocessMutex::SharedPart mutex_part;
907
    InterprocessCondVar condvar;
908
    InterprocessCondVar::SharedPart condvar_part;
909
    InterprocessCondVar::init_shared_part(condvar_part);
910
    DBOptions default_options;
911

912
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
913
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
914
}
915
#endif
916

917
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc