• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2309

10 May 2024 05:50PM UTC coverage: 90.809% (+0.009%) from 90.8%
2309

push

Evergreen

danieltabacaru
Add back ability to format Objective-C code

102086 of 181070 branches covered (56.38%)

214553 of 236269 relevant lines covered (90.81%)

5697616.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.84
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
void increment(int* i)
84
{
4✔
85
    ++*i;
4✔
86
}
4✔
87

88
struct Shared {
89
    Mutex m_mutex;
90
    int m_value;
91

92
    // 10000 takes less than 0.1 sec
93
    void increment_10000_times()
94
    {
20✔
95
        for (int i = 0; i < 10000; ++i) {
199,327✔
96
            LockGuard lock(m_mutex);
199,307✔
97
            ++m_value;
199,307✔
98
        }
199,307✔
99
    }
20✔
100

101
    void increment_10000_times2()
102
    {
20✔
103
        for (int i = 0; i < 10000; ++i) {
199,097✔
104
            LockGuard lock(m_mutex);
199,077✔
105
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
106
            // could assemble into 'inc [addr]' which has very tiny gap
107
            double f = m_value;
199,077✔
108
            f += 1.;
199,077✔
109
            m_value = int(f);
199,077✔
110
        }
199,077✔
111
    }
20✔
112
};
113

114
struct SharedWithEmulated {
115
    InterprocessMutex m_mutex;
116
    InterprocessMutex::SharedPart m_shared_part;
117
    int m_value;
118

119
    SharedWithEmulated(std::string name)
120
    {
2✔
121
        m_mutex.set_shared_part(m_shared_part, name, "0");
2✔
122
    }
2✔
123
    ~SharedWithEmulated()
124
    {
2✔
125
        m_mutex.release_shared_part();
2✔
126
    }
2✔
127

128
    // 10000 takes less than 0.1 sec
129
    void increment_10000_times()
130
    {
20✔
131
        for (int i = 0; i < 10000; ++i) {
197,587✔
132
            std::lock_guard<InterprocessMutex> lock(m_mutex);
197,567✔
133
            ++m_value;
197,567✔
134
        }
197,567✔
135
    }
20✔
136

137
    void increment_10000_times2()
138
    {
×
139
        for (int i = 0; i < 10000; ++i) {
×
140
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
141
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
142
            // could assemble into 'inc [addr]' which has very tiny gap
×
143
            double f = m_value;
×
144
            f += 1.;
×
145
            m_value = int(f);
×
146
        }
×
147
    }
×
148
};
149

150
struct Robust {
151
    RobustMutex m_mutex;
152
    bool m_recover_called;
153

154
    void simulate_death()
155
    {
×
156
        m_mutex.lock(std::bind(&Robust::recover, this));
×
157
        // Do not unlock
×
158
    }
×
159

160
    void simulate_death_during_recovery()
161
    {
×
162
        bool no_thread_has_died = m_mutex.low_level_lock();
×
163
        if (!no_thread_has_died)
×
164
            m_recover_called = true;
×
165
        // Do not unlock
×
166
    }
×
167

168
    void recover()
169
    {
×
170
        m_recover_called = true;
×
171
    }
×
172

173
    void recover_throw()
174
    {
×
175
        m_recover_called = true;
×
176
        throw RobustMutex::NotRecoverable();
×
177
    }
×
178
};
179

180

181
class QueueMonitor {
182
public:
183
    QueueMonitor()
184
        : m_closed(false)
1✔
185
    {
2✔
186
    }
2✔
187

188
    bool get(int& value)
189
    {
63,862✔
190
        LockGuard lock(m_mutex);
63,862✔
191
        for (;;) {
95,838✔
192
            if (!m_queue.empty())
95,838✔
193
                break;
64,000✔
194
            if (m_closed)
31,838✔
195
                return false;
64✔
196
            m_nonempty_or_closed.wait(lock); // Wait for producer
31,774✔
197
        }
31,774✔
198
        bool was_full = m_queue.size() == max_queue_size;
63,798✔
199
        value = m_queue.front();
63,798✔
200
        m_queue.pop();
63,798✔
201
        if (was_full)
63,798✔
202
            m_nonfull.notify_all(); // Resume a waiting producer
10,283✔
203
        return true;
63,798✔
204
    }
63,862✔
205

206
    void put(int value)
207
    {
63,802✔
208
        LockGuard lock(m_mutex);
63,802✔
209
        while (m_queue.size() == max_queue_size)
91,846✔
210
            m_nonfull.wait(lock); // Wait for consumer
28,044✔
211
        bool was_empty = m_queue.empty();
63,802✔
212
        m_queue.push(value);
63,802✔
213
        if (was_empty)
63,802✔
214
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
11,566✔
215
    }
63,802✔
216

217
    void close()
218
    {
2✔
219
        LockGuard lock(m_mutex);
2✔
220
        m_closed = true;
2✔
221
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
2✔
222
    }
2✔
223

224
private:
225
    Mutex m_mutex;
226
    CondVar m_nonempty_or_closed, m_nonfull;
227
    std::queue<int> m_queue;
228
    bool m_closed;
229

230
    static const unsigned max_queue_size = 8;
231
};
232

233
void producer_thread(QueueMonitor* queue, int value)
234
{
63✔
235
    for (int i = 0; i < 1000; ++i) {
63,889✔
236
        queue->put(value);
63,826✔
237
    }
63,826✔
238
}
63✔
239

240
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
241
{
64✔
242
    for (;;) {
63,893✔
243
        int value = 0;
63,893✔
244
        bool closed = !queue->get(value);
63,893✔
245
        if (closed)
63,893✔
246
            return;
64✔
247
        ++consumed_counts[value];
63,829✔
248
    }
63,829✔
249
}
64✔
250

251

252
} // anonymous namespace
253

254

255
TEST(Thread_Join)
256
{
2✔
257
    int i = 0;
2✔
258
    Thread thread(std::bind(&increment, &i));
2✔
259
    CHECK(thread.joinable());
2✔
260
    thread.join();
2✔
261
    CHECK(!thread.joinable());
2✔
262
    CHECK_EQUAL(1, i);
2✔
263
}
2✔
264

265

266
TEST(Thread_Start)
267
{
2✔
268
    int i = 0;
2✔
269
    Thread thread;
2✔
270
    CHECK(!thread.joinable());
2✔
271
    thread.start(std::bind(&increment, &i));
2✔
272
    CHECK(thread.joinable());
2✔
273
    thread.join();
2✔
274
    CHECK(!thread.joinable());
2✔
275
    CHECK_EQUAL(1, i);
2✔
276
}
2✔
277

278

279
TEST(Thread_MutexLock)
280
{
2✔
281
    Mutex mutex;
2✔
282
    {
2✔
283
        LockGuard lock(mutex);
2✔
284
    }
2✔
285
    {
2✔
286
        LockGuard lock(mutex);
2✔
287
    }
2✔
288
}
2✔
289

290
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
291
TEST(Thread_ProcessSharedMutex)
292
{
293
    Mutex mutex((Mutex::process_shared_tag()));
294
    {
295
        LockGuard lock(mutex);
296
    }
297
    {
298
        LockGuard lock(mutex);
299
    }
300
}
301
#endif
302

303
TEST(Thread_CriticalSection)
304
{
2✔
305
    Shared shared;
2✔
306
    shared.m_value = 0;
2✔
307
    Thread threads[10];
2✔
308
    for (int i = 0; i < 10; ++i)
22✔
309
        threads[i].start(std::bind(&Shared::increment_10000_times, &shared));
20✔
310
    for (int i = 0; i < 10; ++i)
22✔
311
        threads[i].join();
20✔
312
    CHECK_EQUAL(100000, shared.m_value);
2✔
313
}
2✔
314

315

316
TEST(Thread_EmulatedMutex_CriticalSection)
317
{
2✔
318
    TEST_PATH(path);
2✔
319
    SharedWithEmulated shared(path);
2✔
320
    shared.m_value = 0;
2✔
321
    Thread threads[10];
2✔
322
    for (int i = 0; i < 10; ++i)
22✔
323
        threads[i].start(std::bind(&SharedWithEmulated::increment_10000_times, &shared));
20✔
324
    for (int i = 0; i < 10; ++i)
22✔
325
        threads[i].join();
20✔
326
    CHECK_EQUAL(100000, shared.m_value);
2✔
327
}
2✔
328

329

330
TEST(Thread_CriticalSection2)
331
{
2✔
332
    Shared shared;
2✔
333
    shared.m_value = 0;
2✔
334
    Thread threads[10];
2✔
335
    for (int i = 0; i < 10; ++i)
22✔
336
        threads[i].start(std::bind(&Shared::increment_10000_times2, &shared));
20✔
337
    for (int i = 0; i < 10; ++i)
22✔
338
        threads[i].join();
20✔
339
    CHECK_EQUAL(100000, shared.m_value);
2✔
340
}
2✔
341

342

343
// Todo. Not supported on Windows in particular? Keywords: winbug
344
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
345
{
×
346
    // Abort if robust mutexes are not supported on the current
347
    // platform. Otherwise we would probably get into a dead-lock.
348
    if (!RobustMutex::is_robust_on_this_platform)
×
349
        return;
×
350

351
    Robust robust;
×
352

353
    // Check that lock/unlock cycle works and does not involve recovery
354
    robust.m_recover_called = false;
×
355
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
356
    CHECK(!robust.m_recover_called);
×
357
    robust.m_mutex.unlock();
×
358
    robust.m_recover_called = false;
×
359
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
360
    CHECK(!robust.m_recover_called);
×
361
    robust.m_mutex.unlock();
×
362

363
    // Check recovery by simulating a death
364
    robust.m_recover_called = false;
×
365
    {
×
366
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
367
        thread.join();
×
368
    }
×
369
    CHECK(!robust.m_recover_called);
×
370
    robust.m_recover_called = false;
×
371
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
372
    CHECK(robust.m_recover_called);
×
373
    robust.m_mutex.unlock();
×
374

375
    // One more round of recovery
376
    robust.m_recover_called = false;
×
377
    {
×
378
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
379
        thread.join();
×
380
    }
×
381
    CHECK(!robust.m_recover_called);
×
382
    robust.m_recover_called = false;
×
383
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
384
    CHECK(robust.m_recover_called);
×
385
    robust.m_mutex.unlock();
×
386

387
    // Simulate a case where recovery fails or is impossible
388
    robust.m_recover_called = false;
×
389
    {
×
390
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
391
        thread.join();
×
392
    }
×
393
    CHECK(!robust.m_recover_called);
×
394
    robust.m_recover_called = false;
×
395
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
396
    CHECK(robust.m_recover_called);
×
397

398
    // Check that successive attempts at locking will throw
399
    robust.m_recover_called = false;
×
400
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
401
    CHECK(!robust.m_recover_called);
×
402
    robust.m_recover_called = false;
×
403
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
404
    CHECK(!robust.m_recover_called);
×
405
}
×
406

407

408
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
409
{
×
410
    // Abort if robust mutexes are not supported on the current
411
    // platform. Otherwise we would probably get into a dead-lock.
412
    if (!RobustMutex::is_robust_on_this_platform)
×
413
        return;
×
414

415
    // This test checks that death during recovery causes a robust
416
    // mutex to stay in the 'inconsistent' state.
417

418
    Robust robust;
×
419

420
    // Bring the mutex into the 'inconsistent' state
421
    robust.m_recover_called = false;
×
422
    {
×
423
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
424
        thread.join();
×
425
    }
×
426
    CHECK(!robust.m_recover_called);
×
427

428
    // Die while recovering
429
    robust.m_recover_called = false;
×
430
    {
×
431
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
432
        thread.join();
×
433
    }
×
434
    CHECK(robust.m_recover_called);
×
435

436
    // The mutex is still in the 'inconsistent' state if another
437
    // attempt at locking it calls the recovery function
438
    robust.m_recover_called = false;
×
439
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
440
    CHECK(robust.m_recover_called);
×
441
    robust.m_mutex.unlock();
×
442

443
    // Now that the mutex is fully recovered, we should be able to
444
    // carry out a regular round of lock/unlock
445
    robust.m_recover_called = false;
×
446
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
447
    CHECK(!robust.m_recover_called);
×
448
    robust.m_mutex.unlock();
×
449

450
    // Try a double death during recovery
451
    robust.m_recover_called = false;
×
452
    {
×
453
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
454
        thread.join();
×
455
    }
×
456
    CHECK(!robust.m_recover_called);
×
457
    robust.m_recover_called = false;
×
458
    {
×
459
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
460
        thread.join();
×
461
    }
×
462
    CHECK(robust.m_recover_called);
×
463
    robust.m_recover_called = false;
×
464
    {
×
465
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
466
        thread.join();
×
467
    }
×
468
    CHECK(robust.m_recover_called);
×
469
    robust.m_recover_called = false;
×
470
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
471
    CHECK(robust.m_recover_called);
×
472
    robust.m_mutex.unlock();
×
473
    robust.m_recover_called = false;
×
474
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
475
    CHECK(!robust.m_recover_called);
×
476
    robust.m_mutex.unlock();
×
477
}
×
478

479

480
TEST(Thread_CondVar)
481
{
2✔
482
    QueueMonitor queue;
2✔
483
    const int num_producers = 32;
2✔
484
    const int num_consumers = 32;
2✔
485
    Thread producers[num_producers], consumers[num_consumers];
2✔
486
    int consumed_counts[num_consumers][num_producers];
2✔
487
    memset(consumed_counts, 0, sizeof consumed_counts);
2✔
488

489
    for (int i = 0; i < num_producers; ++i)
66✔
490
        producers[i].start(std::bind(&producer_thread, &queue, i));
64✔
491
    for (int i = 0; i < num_consumers; ++i)
66✔
492
        consumers[i].start(std::bind(&consumer_thread, &queue, &consumed_counts[i][0]));
64✔
493
    for (int i = 0; i < num_producers; ++i)
66✔
494
        producers[i].join();
64✔
495
    queue.close(); // Stop consumers when queue is empty
2✔
496
    for (int i = 0; i < num_consumers; ++i)
66✔
497
        consumers[i].join();
64✔
498

499
    for (int i = 0; i < num_producers; ++i) {
66✔
500
        int n = 0;
64✔
501
        for (int j = 0; j < num_consumers; ++j)
2,112✔
502
            n += consumed_counts[j][i];
2,048✔
503
        CHECK_EQUAL(1000, n);
64✔
504
    }
64✔
505
}
2✔
506

507
TEST(Thread_MutexTryLock)
508
{
2✔
509
    Thread thread;
2✔
510
    Mutex base_mutex;
2✔
511
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
2✔
512

513
    std::condition_variable cv;
2✔
514
    std::mutex cv_lock;
2✔
515

516
    // basic same thread try_lock
517
    CHECK(m.try_lock());
2✔
518
    CHECK(m.owns_lock());
2✔
519
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
2✔
520
    m.unlock();
2✔
521

522
    bool init_done = false;
2✔
523
    auto do_async = [&]() {
2✔
524
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
2✔
525
        CHECK(!mutex2.owns_lock());
2✔
526
        CHECK(!mutex2.try_lock());
2✔
527
        {
2✔
528
            std::lock_guard<std::mutex> guard(cv_lock);
2✔
529
            init_done = true;
2✔
530
        }
2✔
531
        cv.notify_one();
2✔
532
        while (!mutex2.try_lock()) {
4✔
533
            millisleep(1);
2✔
534
        }
2✔
535
        CHECK(mutex2.owns_lock());
2✔
536
        mutex2.unlock();
2✔
537
    };
2✔
538

539
    // Check basic locking across threads.
540
    CHECK(!m.owns_lock());
2✔
541
    CHECK(m.try_lock());
2✔
542
    CHECK(m.owns_lock());
2✔
543
    thread.start(do_async);
2✔
544
    {
2✔
545
        std::unique_lock<std::mutex> guard(cv_lock);
2✔
546
        cv.wait(guard, [&] {
4✔
547
            return init_done;
4✔
548
        });
4✔
549
    }
2✔
550
    m.unlock();
2✔
551
    thread.join();
2✔
552
}
2✔
553

554
TEST(Thread_RobustMutexTryLock)
555
{
2✔
556
    // Abort if robust mutexes are not supported on the current
557
    // platform. Otherwise we would probably get into a dead-lock.
558
    if (!RobustMutex::is_robust_on_this_platform)
2✔
559
        return;
2✔
560

561
    Thread thread;
×
562
    RobustMutex m;
×
563
    int times_recover_function_was_called = 0;
×
564

565
    auto recover_function = [&]() {
×
566
        ++times_recover_function_was_called;
×
567
    };
×
568
    // basic same thread try_lock
569
    CHECK(m.try_lock(recover_function));
×
570
    CHECK(!m.try_lock(recover_function));
×
571
    m.unlock();
×
572
    CHECK(times_recover_function_was_called == 0);
×
573

574
    bool init_done = false;
×
575
    std::mutex control_mutex;
×
576
    std::condition_variable control_cv;
×
577

578
    auto do_async = [&]() {
×
579
        CHECK(!m.try_lock(recover_function));
×
580
        {
×
581
            std::lock_guard<std::mutex> guard(control_mutex);
×
582
            init_done = true;
×
583
        }
×
584
        control_cv.notify_one();
×
585
        while (!m.try_lock(recover_function)) {
×
586
            millisleep(1);
×
587
        }
×
588
        // exit the thread with the lock held to check robustness
589
    };
×
590

591
    // Check basic locking across threads.
592
    CHECK(m.try_lock(recover_function));
×
593
    thread.start(do_async);
×
594
    {
×
595
        std::unique_lock<std::mutex> lock(control_mutex);
×
596
        control_cv.wait(lock, [&] {
×
597
            return init_done;
×
598
        });
×
599
    }
×
600
    m.unlock();
×
601
    thread.join();
×
602
    CHECK(times_recover_function_was_called == 0);
×
603
    // at this point the thread that obtained the mutex is dead with the lock
604
    CHECK(m.try_lock(recover_function));
×
605
    CHECK(times_recover_function_was_called == 1);
×
606
    m.unlock();
×
607
}
×
608

609
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
610
               // because we are going to switch to native API soon and discard win32-pthread entirely
611
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
612
{
2✔
613
    Thread thread;
2✔
614
    InterprocessMutex::SharedPart mutex_part;
2✔
615

616
    InterprocessMutex m;
2✔
617
    TEST_PATH(path);
2✔
618
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
2✔
619
    m.set_shared_part(mutex_part, path, mutex_file_name);
2✔
620

621
    // basic same thread try_lock
622
    CHECK(m.try_lock());
2✔
623
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
2✔
624
    m.unlock();
2✔
625

626
    bool init_done = false;
2✔
627
    std::condition_variable cv;
2✔
628
    std::mutex cv_mutex;
2✔
629
    auto do_async = [&]() {
2✔
630
        InterprocessMutex m2;
2✔
631
        m2.set_shared_part(mutex_part, path, mutex_file_name);
2✔
632

633
        CHECK(!m2.try_lock());
2✔
634
        {
2✔
635
            std::lock_guard<std::mutex> guard(cv_mutex);
2✔
636
            init_done = true;
2✔
637
        }
2✔
638
        cv.notify_one();
2✔
639
        while (!m2.try_lock()) {
4✔
640
            millisleep(1);
2✔
641
        }
2✔
642
        m2.unlock();
2✔
643
    };
2✔
644

645
    // Check basic locking across threads.
646
    CHECK(m.try_lock());
2✔
647
    thread.start(do_async);
2✔
648
    {
2✔
649
        std::unique_lock<std::mutex> ul(cv_mutex);
2✔
650
        cv.wait(ul, [&] {
4✔
651
            return init_done;
4✔
652
        });
4✔
653
    }
2✔
654
    m.unlock();
2✔
655
    thread.join();
2✔
656
    m.release_shared_part();
2✔
657
}
2✔
658

659
#endif
660

661
// Detect and flag trivial implementations of condvars.
662
namespace {
663

664
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
665
{
2✔
666
    millisleep(200);
2✔
667
    {
2✔
668
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
669
        *signals = 1;
2✔
670
        // wakeup any waiters
671
        cv->notify_all();
2✔
672
    }
2✔
673
    // exit scope to allow waiters to get lock
674
    millisleep(200);
2✔
675
    {
2✔
676
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
677
        *signals = 2;
2✔
678
        // wakeup any waiters, 2nd time
679
        cv->notify_all();
2✔
680
    }
2✔
681
    millisleep(200);
2✔
682
    {
2✔
683
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
684
        *signals = 3;
2✔
685
        // wakeup any waiters, 2nd time
686
        cv->notify_all();
2✔
687
    }
2✔
688
    millisleep(200);
2✔
689
    {
2✔
690
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
691
        *signals = 4;
2✔
692
    }
2✔
693
}
2✔
694

695
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
696
{
2✔
697
    millisleep(1000);
2✔
698
    *signal_state = 2;
2✔
699
    std::lock_guard<InterprocessMutex> l(*mutex);
2✔
700
    cv->notify_all();
2✔
701
}
2✔
702

703

704
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
705
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
706
{
20✔
707
    std::lock_guard<InterprocessMutex> l(*mutex);
20✔
708

709
    {
20✔
710
        std::lock_guard<std::mutex> guard(*control_mutex);
20✔
711
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
20✔
712
    }
20✔
713
    control_cv->notify_one();
20✔
714

715
    cv->wait(*mutex, nullptr);
20✔
716
}
20✔
717
} // namespace
718

719
// Verify, that a wait on a condition variable actually waits
720
// - this test relies on assumptions about scheduling, which
721
//   may not hold on a heavily loaded system.
722
NONCONCURRENT_TEST(Thread_CondvarWaits)
723
{
2✔
724
    int signals = 0;
2✔
725
    InterprocessMutex mutex;
2✔
726
    InterprocessMutex::SharedPart mutex_part;
2✔
727
    InterprocessCondVar changed;
2✔
728
    InterprocessCondVar::SharedPart condvar_part;
2✔
729
    TEST_PATH(path);
2✔
730
    DBOptions default_options;
2✔
731
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
2✔
732
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
2✔
733
    changed.init_shared_part(condvar_part);
2✔
734
    Thread signal_thread;
2✔
735
    signals = 0;
2✔
736
    signal_thread.start(std::bind(signaller, &signals, &mutex, &changed));
2✔
737
    {
2✔
738
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
739
        changed.wait(mutex, nullptr);
2✔
740
        CHECK_EQUAL(signals, 1);
2✔
741
        changed.wait(mutex, nullptr);
2✔
742
        CHECK_EQUAL(signals, 2);
2✔
743
        changed.wait(mutex, nullptr);
2✔
744
        CHECK_EQUAL(signals, 3);
2✔
745
    }
2✔
746
    signal_thread.join();
2✔
747
    changed.release_shared_part();
2✔
748
    mutex.release_shared_part();
2✔
749
}
2✔
750

751
// Verify that a condition variable looses its signal if no one
752
// is waiting on it
753
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
754
{
2✔
755
    int signal_state = 0;
2✔
756
    InterprocessMutex mutex;
2✔
757
    InterprocessMutex::SharedPart mutex_part;
2✔
758
    InterprocessCondVar changed;
2✔
759
    InterprocessCondVar::SharedPart condvar_part;
2✔
760
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
761
    TEST_PATH(path);
2✔
762
    DBOptions default_options;
2✔
763

764
    // Must have names because default_options.temp_dir is empty string on Windows
765
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
2✔
766
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
2✔
767
    Thread signal_thread;
2✔
768
    signal_state = 1;
2✔
769
    // send some signals:
770
    {
2✔
771
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
772
        for (int i = 0; i < 10; ++i)
22✔
773
            changed.notify_all();
20✔
774
    }
2✔
775
    // spawn a thread which will later do one more signal in order
776
    // to wake us up.
777
    signal_thread.start(std::bind(wakeup_signaller, &signal_state, &mutex, &changed));
2✔
778
    // Wait for a signal - the signals sent above should be lost, so
779
    // that this wait will actually wait for the thread to signal.
780
    {
2✔
781
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
782
        changed.wait(mutex, 0);
2✔
783
        CHECK_EQUAL(signal_state, 2);
2✔
784
    }
2✔
785
    signal_thread.join();
2✔
786
    changed.release_shared_part();
2✔
787
    mutex.release_shared_part();
2✔
788
}
2✔
789

790

791
// this test hangs, if timeout doesn't work.
792
NONCONCURRENT_TEST(Thread_CondvarTimeout)
793
{
2✔
794
    InterprocessMutex mutex;
2✔
795
    InterprocessMutex::SharedPart mutex_part;
2✔
796
    InterprocessCondVar changed;
2✔
797
    InterprocessCondVar::SharedPart condvar_part;
2✔
798
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
799
    TEST_PATH(path);
2✔
800
    DBOptions default_options;
2✔
801
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
2✔
802
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
803
    struct timespec time_limit;
2✔
804
    timeval tv;
2✔
805
    gettimeofday(&tv, nullptr);
2✔
806
    time_limit.tv_sec = tv.tv_sec;
2✔
807
    time_limit.tv_nsec = tv.tv_usec * 1000;
2✔
808
    time_limit.tv_nsec += 100000000;        // 100 msec wait
2✔
809
    if (time_limit.tv_nsec >= 1000000000) { // overflow
2✔
810
        time_limit.tv_nsec -= 1000000000;
×
811
        time_limit.tv_sec += 1;
×
812
    }
×
813
    {
2✔
814
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
815
        for (int i = 0; i < 5; ++i)
12✔
816
            changed.wait(mutex, &time_limit);
10✔
817
    }
2✔
818
    changed.release_shared_part();
2✔
819
    mutex.release_shared_part();
2✔
820
}
2✔
821

822

823
// test that notify_all will wake up all waiting threads, if there
824
// are many waiters:
825
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
826
{
2✔
827
    InterprocessMutex mutex;
2✔
828
    InterprocessMutex::SharedPart mutex_part;
2✔
829
    InterprocessCondVar changed;
2✔
830
    InterprocessCondVar::SharedPart condvar_part;
2✔
831
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
832
    TEST_PATH(path);
2✔
833
    DBOptions default_options;
2✔
834
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
2✔
835
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
2✔
836

837
    size_t num_threads_holding_lock = 0;
2✔
838
    std::mutex control_mutex;
2✔
839
    std::condition_variable control_cv;
2✔
840

841
    const size_t num_waiters = 10;
2✔
842
    Thread waiters[num_waiters];
2✔
843
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
844
        waiters[i].start(std::bind(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock));
20✔
845
    }
20✔
846
    {
2✔
847
        // allow all waiters to start and obtain the InterprocessCondVar
848
        std::unique_lock<std::mutex> unique_lock(control_mutex);
2✔
849
        control_cv.wait(unique_lock, [&] {
4✔
850
            return num_threads_holding_lock == num_waiters;
4✔
851
        });
4✔
852
    }
2✔
853

854
    mutex.lock();
2✔
855
    changed.notify_all();
2✔
856
    mutex.unlock();
2✔
857

858
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
859
        waiters[i].join();
20✔
860
    }
20✔
861
    changed.release_shared_part();
2✔
862
    mutex.release_shared_part();
2✔
863
}
2✔
864

865

866
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
867
// gap between them where another thread could invoke signal() which could go undetected by the wait.
868
// This test takes more than 3 days with valgrind.
869
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
870
{
×
871
    SHARED_GROUP_TEST_PATH(path);
×
872

873
    const int iter = 10000;
×
874

875
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
876
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
877
    std::vector<std::thread> threads;
×
878
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
879

880
        threads.push_back(std::thread([&]() {
×
881
            InterprocessMutex mutex;
×
882
            InterprocessMutex::SharedPart mutex_part;
×
883
            InterprocessCondVar condvar;
×
884
            InterprocessCondVar::SharedPart condvar_part;
×
885
            DBOptions default_options;
×
886

887
            std::stringstream ss;
×
888
            ss << std::this_thread::get_id();
×
889
            std::string id = ss.str();
×
890

891
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
892
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
893
            InterprocessCondVar::init_shared_part(condvar_part);
×
894

895
            std::atomic<bool> signal(false);
×
896

897
            std::thread t1([&]() {
×
898
                for (int i = 0; i < iter; i++) {
×
899
                    mutex.lock();
×
900
                    signal = true;
×
901

902
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
903
                    // Problem is that we have so many/frequent operating system wait calls in this that they might
904
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create
905
                    // some work that some times willsome times bring the current time slice close to its end.
906

907
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
908
                    // time slice)
909
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL);
×
910

911
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than
912
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
913
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
914
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
915
                        sum = sum + 1;
×
916
                    }
×
917

918
                    condvar.wait(mutex, nullptr);
×
919
                    mutex.unlock();
×
920
                }
×
921
            });
×
922

923
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
924
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
925
            // by both threads hanging infinitely.
926
            std::thread t2([&]() {
×
927
                for (int i = 0; i < iter; i++) {
×
928
                    while (!signal) {
×
929
                    }
×
930
                    signal = false;
×
931
                    mutex.lock();
×
932
                    condvar.notify_all();
×
933
                    mutex.unlock();
×
934
                }
×
935
            });
×
936

937
            t1.join();
×
938
            t2.join();
×
939
        }));
×
940
    }
×
941

942
    for (int i = 0; i < thread_pair_count; i++) {
×
943
        threads[i].join();
×
944
    }
×
945
}
×
946

947
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
948
{
2✔
949
    auto cv = std::make_unique<InterprocessCondVar>();
2✔
950
    InterprocessCondVar::SharedPart condvar_part;
2✔
951
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
952
    TEST_PATH(path);
2✔
953
    DBOptions default_options;
2✔
954
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
955
    std::thread([&] {
2✔
956
        cv.reset();
2✔
957
    }).join();
2✔
958
}
2✔
959

960
#ifdef _WIN32
961
TEST(Thread_Win32InterprocessBackslashes)
962
{
963
    InterprocessMutex mutex;
964
    InterprocessMutex::SharedPart mutex_part;
965
    InterprocessCondVar condvar;
966
    InterprocessCondVar::SharedPart condvar_part;
967
    InterprocessCondVar::init_shared_part(condvar_part);
968
    DBOptions default_options;
969

970
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
971
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
972
}
973
#endif
974

975
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc