• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_279261

11 Oct 2023 02:16PM UTC coverage: 91.624% (+0.06%) from 91.563%
github_pull_request_279261

Pull #6763

Evergreen

finnschiermer
Merge branch 'master' of github.com:realm/realm-core into fsa/enhance-freelist-check
Pull Request #6763: add freelist verification at more points during commit

94332 of 173512 branches covered (0.0%)

124 of 124 new or added lines in 2 files covered. (100.0%)

29 existing lines in 10 files now uncovered.

230660 of 251746 relevant lines covered (91.62%)

6863226.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
void increment(int* i)
84
{
4✔
85
    ++*i;
4✔
86
}
4✔
87

88
struct Shared {
89
    Mutex m_mutex;
90
    int m_value;
91

92
    // 10000 takes less than 0.1 sec
93
    void increment_10000_times()
94
    {
20✔
95
        for (int i = 0; i < 10000; ++i) {
199,218✔
96
            LockGuard lock(m_mutex);
199,198✔
97
            ++m_value;
199,198✔
98
        }
199,198✔
99
    }
20✔
100

101
    void increment_10000_times2()
102
    {
20✔
103
        for (int i = 0; i < 10000; ++i) {
198,862✔
104
            LockGuard lock(m_mutex);
198,842✔
105
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
99,521✔
106
            // could assemble into 'inc [addr]' which has very tiny gap
99,521✔
107
            double f = m_value;
198,842✔
108
            f += 1.;
198,842✔
109
            m_value = int(f);
198,842✔
110
        }
198,842✔
111
    }
20✔
112
};
113

114
struct SharedWithEmulated {
115
    InterprocessMutex m_mutex;
116
    InterprocessMutex::SharedPart m_shared_part;
117
    int m_value;
118

119
    SharedWithEmulated(std::string name)
120
    {
2✔
121
        m_mutex.set_shared_part(m_shared_part, name, "0");
2✔
122
    }
2✔
123
    ~SharedWithEmulated()
124
    {
2✔
125
        m_mutex.release_shared_part();
2✔
126
    }
2✔
127

128
    // 10000 takes less than 0.1 sec
129
    void increment_10000_times()
130
    {
20✔
131
        for (int i = 0; i < 10000; ++i) {
199,704✔
132
            std::lock_guard<InterprocessMutex> lock(m_mutex);
199,684✔
133
            ++m_value;
199,684✔
134
        }
199,684✔
135
    }
20✔
136

137
    void increment_10000_times2()
138
    {
×
139
        for (int i = 0; i < 10000; ++i) {
×
140
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
141
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
142
            // could assemble into 'inc [addr]' which has very tiny gap
×
143
            double f = m_value;
×
144
            f += 1.;
×
145
            m_value = int(f);
×
146
        }
×
147
    }
×
148
};
149

150
struct Robust {
151
    RobustMutex m_mutex;
152
    bool m_recover_called;
153

154
    void simulate_death()
155
    {
×
156
        m_mutex.lock(std::bind(&Robust::recover, this));
×
157
        // Do not unlock
×
158
    }
×
159

160
    void simulate_death_during_recovery()
161
    {
×
162
        bool no_thread_has_died = m_mutex.low_level_lock();
×
163
        if (!no_thread_has_died)
×
164
            m_recover_called = true;
×
165
        // Do not unlock
×
166
    }
×
167

168
    void recover()
169
    {
×
170
        m_recover_called = true;
×
171
    }
×
172

173
    void recover_throw()
174
    {
×
175
        m_recover_called = true;
×
176
        throw RobustMutex::NotRecoverable();
×
177
    }
×
178
};
179

180

181
class QueueMonitor {
182
public:
183
    QueueMonitor()
184
        : m_closed(false)
185
    {
2✔
186
    }
2✔
187

188
    bool get(int& value)
189
    {
63,596✔
190
        LockGuard lock(m_mutex);
63,596✔
191
        for (;;) {
155,467✔
192
            if (!m_queue.empty())
155,467✔
193
                break;
64,000✔
194
            if (m_closed)
91,467✔
195
                return false;
64✔
196
            m_nonempty_or_closed.wait(lock); // Wait for producer
91,403✔
197
        }
91,403✔
198
        bool was_full = m_queue.size() == max_queue_size;
63,564✔
199
        value = m_queue.front();
63,532✔
200
        m_queue.pop();
63,532✔
201
        if (was_full)
63,532✔
202
            m_nonfull.notify_all(); // Resume a waiting producer
9,140✔
203
        return true;
63,532✔
204
    }
63,596✔
205

206
    void put(int value)
207
    {
63,611✔
208
        LockGuard lock(m_mutex);
63,611✔
209
        while (m_queue.size() == max_queue_size)
122,974✔
210
            m_nonfull.wait(lock); // Wait for consumer
59,363✔
211
        bool was_empty = m_queue.empty();
63,611✔
212
        m_queue.push(value);
63,611✔
213
        if (was_empty)
63,611✔
214
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
11,870✔
215
    }
63,611✔
216

217
    void close()
218
    {
2✔
219
        LockGuard lock(m_mutex);
2✔
220
        m_closed = true;
2✔
221
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
2✔
222
    }
2✔
223

224
private:
225
    Mutex m_mutex;
226
    CondVar m_nonempty_or_closed, m_nonfull;
227
    std::queue<int> m_queue;
228
    bool m_closed;
229

230
    static const unsigned max_queue_size = 8;
231
};
232

233
void producer_thread(QueueMonitor* queue, int value)
234
{
61✔
235
    for (int i = 0; i < 1000; ++i) {
63,717✔
236
        queue->put(value);
63,656✔
237
    }
63,656✔
238
}
61✔
239

240
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
241
{
64✔
242
    for (;;) {
63,659✔
243
        int value = 0;
63,659✔
244
        bool closed = !queue->get(value);
63,659✔
245
        if (closed)
63,659✔
246
            return;
64✔
247
        ++consumed_counts[value];
63,595✔
248
    }
63,595✔
249
}
64✔
250

251

252
} // anonymous namespace
253

254

255
TEST(Thread_Join)
256
{
2✔
257
    int i = 0;
2✔
258
    Thread thread(std::bind(&increment, &i));
2✔
259
    CHECK(thread.joinable());
2✔
260
    thread.join();
2✔
261
    CHECK(!thread.joinable());
2✔
262
    CHECK_EQUAL(1, i);
2✔
263
}
2✔
264

265

266
TEST(Thread_Start)
267
{
2✔
268
    int i = 0;
2✔
269
    Thread thread;
2✔
270
    CHECK(!thread.joinable());
2✔
271
    thread.start(std::bind(&increment, &i));
2✔
272
    CHECK(thread.joinable());
2✔
273
    thread.join();
2✔
274
    CHECK(!thread.joinable());
2✔
275
    CHECK_EQUAL(1, i);
2✔
276
}
2✔
277

278

279
TEST(Thread_MutexLock)
280
{
2✔
281
    Mutex mutex;
2✔
282
    {
2✔
283
        LockGuard lock(mutex);
2✔
284
    }
2✔
285
    {
2✔
286
        LockGuard lock(mutex);
2✔
287
    }
2✔
288
}
2✔
289

290
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
291
TEST(Thread_ProcessSharedMutex)
292
{
293
    Mutex mutex((Mutex::process_shared_tag()));
294
    {
295
        LockGuard lock(mutex);
296
    }
297
    {
298
        LockGuard lock(mutex);
299
    }
300
}
301
#endif
302

303
TEST(Thread_CriticalSection)
304
{
2✔
305
    Shared shared;
2✔
306
    shared.m_value = 0;
2✔
307
    Thread threads[10];
2✔
308
    for (int i = 0; i < 10; ++i)
22✔
309
        threads[i].start(std::bind(&Shared::increment_10000_times, &shared));
20✔
310
    for (int i = 0; i < 10; ++i)
22✔
311
        threads[i].join();
20✔
312
    CHECK_EQUAL(100000, shared.m_value);
2✔
313
}
2✔
314

315

316
TEST(Thread_EmulatedMutex_CriticalSection)
317
{
2✔
318
    TEST_PATH(path);
2✔
319
    SharedWithEmulated shared(path);
2✔
320
    shared.m_value = 0;
2✔
321
    Thread threads[10];
2✔
322
    for (int i = 0; i < 10; ++i)
22✔
323
        threads[i].start(std::bind(&SharedWithEmulated::increment_10000_times, &shared));
20✔
324
    for (int i = 0; i < 10; ++i)
22✔
325
        threads[i].join();
20✔
326
    CHECK_EQUAL(100000, shared.m_value);
2✔
327
}
2✔
328

329

330
TEST(Thread_CriticalSection2)
331
{
2✔
332
    Shared shared;
2✔
333
    shared.m_value = 0;
2✔
334
    Thread threads[10];
2✔
335
    for (int i = 0; i < 10; ++i)
22✔
336
        threads[i].start(std::bind(&Shared::increment_10000_times2, &shared));
20✔
337
    for (int i = 0; i < 10; ++i)
22✔
338
        threads[i].join();
20✔
339
    CHECK_EQUAL(100000, shared.m_value);
2✔
340
}
2✔
341

342

343
// Todo. Not supported on Windows in particular? Keywords: winbug
344
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
345
{
×
346
    // Abort if robust mutexes are not supported on the current
347
    // platform. Otherwise we would probably get into a dead-lock.
348
    if (!RobustMutex::is_robust_on_this_platform)
×
349
        return;
×
350

351
    Robust robust;
×
352

353
    // Check that lock/unlock cycle works and does not involve recovery
354
    robust.m_recover_called = false;
×
355
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
356
    CHECK(!robust.m_recover_called);
×
357
    robust.m_mutex.unlock();
×
358
    robust.m_recover_called = false;
×
359
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
360
    CHECK(!robust.m_recover_called);
×
361
    robust.m_mutex.unlock();
×
362

363
    // Check recovery by simulating a death
364
    robust.m_recover_called = false;
×
365
    {
×
366
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
367
        thread.join();
×
368
    }
×
369
    CHECK(!robust.m_recover_called);
×
370
    robust.m_recover_called = false;
×
371
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
372
    CHECK(robust.m_recover_called);
×
373
    robust.m_mutex.unlock();
×
374

375
    // One more round of recovery
376
    robust.m_recover_called = false;
×
377
    {
×
378
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
379
        thread.join();
×
380
    }
×
381
    CHECK(!robust.m_recover_called);
×
382
    robust.m_recover_called = false;
×
383
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
384
    CHECK(robust.m_recover_called);
×
385
    robust.m_mutex.unlock();
×
386

387
    // Simulate a case where recovery fails or is impossible
388
    robust.m_recover_called = false;
×
389
    {
×
390
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
391
        thread.join();
×
392
    }
×
393
    CHECK(!robust.m_recover_called);
×
394
    robust.m_recover_called = false;
×
395
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
396
    CHECK(robust.m_recover_called);
×
397

398
    // Check that successive attempts at locking will throw
399
    robust.m_recover_called = false;
×
400
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
401
    CHECK(!robust.m_recover_called);
×
402
    robust.m_recover_called = false;
×
403
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
404
    CHECK(!robust.m_recover_called);
×
405
}
×
406

407

408
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
409
{
×
410
    // Abort if robust mutexes are not supported on the current
411
    // platform. Otherwise we would probably get into a dead-lock.
412
    if (!RobustMutex::is_robust_on_this_platform)
×
413
        return;
×
414

415
    // This test checks that death during recovery causes a robust
416
    // mutex to stay in the 'inconsistent' state.
417

418
    Robust robust;
×
419

420
    // Bring the mutex into the 'inconsistent' state
421
    robust.m_recover_called = false;
×
422
    {
×
423
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
424
        thread.join();
×
425
    }
×
426
    CHECK(!robust.m_recover_called);
×
427

428
    // Die while recovering
429
    robust.m_recover_called = false;
×
430
    {
×
431
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
432
        thread.join();
×
433
    }
×
434
    CHECK(robust.m_recover_called);
×
435

436
    // The mutex is still in the 'inconsistent' state if another
437
    // attempt at locking it calls the recovery function
438
    robust.m_recover_called = false;
×
439
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
440
    CHECK(robust.m_recover_called);
×
441
    robust.m_mutex.unlock();
×
442

443
    // Now that the mutex is fully recovered, we should be able to
444
    // carry out a regular round of lock/unlock
445
    robust.m_recover_called = false;
×
446
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
447
    CHECK(!robust.m_recover_called);
×
448
    robust.m_mutex.unlock();
×
449

450
    // Try a double death during recovery
451
    robust.m_recover_called = false;
×
452
    {
×
453
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
454
        thread.join();
×
455
    }
×
456
    CHECK(!robust.m_recover_called);
×
457
    robust.m_recover_called = false;
×
458
    {
×
459
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
460
        thread.join();
×
461
    }
×
462
    CHECK(robust.m_recover_called);
×
463
    robust.m_recover_called = false;
×
464
    {
×
465
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
466
        thread.join();
×
467
    }
×
468
    CHECK(robust.m_recover_called);
×
469
    robust.m_recover_called = false;
×
470
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
471
    CHECK(robust.m_recover_called);
×
472
    robust.m_mutex.unlock();
×
473
    robust.m_recover_called = false;
×
474
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
475
    CHECK(!robust.m_recover_called);
×
476
    robust.m_mutex.unlock();
×
477
}
×
478

479

480
TEST(Thread_CondVar)
481
{
2✔
482
    QueueMonitor queue;
2✔
483
    const int num_producers = 32;
2✔
484
    const int num_consumers = 32;
2✔
485
    Thread producers[num_producers], consumers[num_consumers];
2✔
486
    int consumed_counts[num_consumers][num_producers];
2✔
487
    memset(consumed_counts, 0, sizeof consumed_counts);
2✔
488

1✔
489
    for (int i = 0; i < num_producers; ++i)
66✔
490
        producers[i].start(std::bind(&producer_thread, &queue, i));
64✔
491
    for (int i = 0; i < num_consumers; ++i)
66✔
492
        consumers[i].start(std::bind(&consumer_thread, &queue, &consumed_counts[i][0]));
64✔
493
    for (int i = 0; i < num_producers; ++i)
66✔
494
        producers[i].join();
64✔
495
    queue.close(); // Stop consumers when queue is empty
2✔
496
    for (int i = 0; i < num_consumers; ++i)
66✔
497
        consumers[i].join();
64✔
498

1✔
499
    for (int i = 0; i < num_producers; ++i) {
66✔
500
        int n = 0;
64✔
501
        for (int j = 0; j < num_consumers; ++j)
2,112✔
502
            n += consumed_counts[j][i];
2,048✔
503
        CHECK_EQUAL(1000, n);
64✔
504
    }
64✔
505
}
2✔
506

507
TEST(Thread_MutexTryLock)
508
{
2✔
509
    Thread thread;
2✔
510
    Mutex base_mutex;
2✔
511
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
2✔
512

1✔
513
    std::condition_variable cv;
2✔
514
    std::mutex cv_lock;
2✔
515

1✔
516
    // basic same thread try_lock
1✔
517
    CHECK(m.try_lock());
2✔
518
    CHECK(m.owns_lock());
2✔
519
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
2✔
520
    m.unlock();
2✔
521

1✔
522
    bool init_done = false;
2✔
523
    auto do_async = [&]() {
2✔
524
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
2✔
525
        CHECK(!mutex2.owns_lock());
2✔
526
        CHECK(!mutex2.try_lock());
2✔
527
        {
2✔
528
            std::lock_guard<std::mutex> guard(cv_lock);
2✔
529
            init_done = true;
2✔
530
        }
2✔
531
        cv.notify_one();
2✔
532
        while(!mutex2.try_lock()) { millisleep(1); }
4✔
533
        CHECK(mutex2.owns_lock());
2✔
534
        mutex2.unlock();
2✔
535
    };
2✔
536

1✔
537
    // Check basic locking across threads.
1✔
538
    CHECK(!m.owns_lock());
2✔
539
    CHECK(m.try_lock());
2✔
540
    CHECK(m.owns_lock());
2✔
541
    thread.start(do_async);
2✔
542
    {
2✔
543
        std::unique_lock<std::mutex> guard(cv_lock);
2✔
544
        cv.wait(guard, [&]{return init_done;});
4✔
545
    }
2✔
546
    m.unlock();
2✔
547
    thread.join();
2✔
548
}
2✔
549

550
TEST(Thread_RobustMutexTryLock)
551
{
2✔
552
    // Abort if robust mutexes are not supported on the current
1✔
553
    // platform. Otherwise we would probably get into a dead-lock.
1✔
554
    if (!RobustMutex::is_robust_on_this_platform)
2✔
555
        return;
2✔
556

557
    Thread thread;
×
558
    RobustMutex m;
×
559
    int times_recover_function_was_called = 0;
×
560

561
    auto recover_function = [&]() {
×
562
        ++times_recover_function_was_called;
×
563
    };
×
564
    // basic same thread try_lock
565
    CHECK(m.try_lock(recover_function));
×
566
    CHECK(!m.try_lock(recover_function));
×
567
    m.unlock();
×
568
    CHECK(times_recover_function_was_called == 0);
×
569

570
    bool init_done = false;
×
571
    std::mutex control_mutex;
×
572
    std::condition_variable control_cv;
×
573

574
    auto do_async = [&]() {
×
575
        CHECK(!m.try_lock(recover_function));
×
576
        {
×
577
            std::lock_guard<std::mutex> guard(control_mutex);
×
578
            init_done = true;
×
579
        }
×
580
        control_cv.notify_one();
×
581
        while(!m.try_lock(recover_function)) { millisleep(1); }
×
582
        // exit the thread with the lock held to check robustness
583
    };
×
584

585
    // Check basic locking across threads.
586
    CHECK(m.try_lock(recover_function));
×
587
    thread.start(do_async);
×
588
    {
×
589
        std::unique_lock<std::mutex> lock(control_mutex);
×
590
        control_cv.wait(lock, [&]{ return init_done; });
×
591
    }
×
592
    m.unlock();
×
593
    thread.join();
×
594
    CHECK(times_recover_function_was_called == 0);
×
595
    // at this point the thread that obtained the mutex is dead with the lock
596
    CHECK(m.try_lock(recover_function));
×
597
    CHECK(times_recover_function_was_called == 1);
×
598
    m.unlock();
×
599
}
×
600

601
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
602
               // because we are going to switch to native API soon and discard win32-pthread entirely
603
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
604
{
2✔
605
    Thread thread;
2✔
606
    InterprocessMutex::SharedPart mutex_part;
2✔
607

1✔
608
    InterprocessMutex m;
2✔
609
    TEST_PATH(path);
2✔
610
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
2✔
611
    m.set_shared_part(mutex_part, path, mutex_file_name);
2✔
612

1✔
613
    // basic same thread try_lock
1✔
614
    CHECK(m.try_lock());
2✔
615
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
2✔
616
    m.unlock();
2✔
617

1✔
618
    bool init_done = false;
2✔
619
    std::condition_variable cv;
2✔
620
    std::mutex cv_mutex;
2✔
621
    auto do_async = [&]() {
2✔
622
        InterprocessMutex m2;
2✔
623
        m2.set_shared_part(mutex_part, path, mutex_file_name);
2✔
624

1✔
625
        CHECK(!m2.try_lock());
2✔
626
        {
2✔
627
            std::lock_guard<std::mutex> guard(cv_mutex);
2✔
628
            init_done = true;
2✔
629
        }
2✔
630
        cv.notify_one();
2✔
631
        while(!m2.try_lock()) { millisleep(1); }
4✔
632
        m2.unlock();
2✔
633
    };
2✔
634

1✔
635
    // Check basic locking across threads.
1✔
636
    CHECK(m.try_lock());
2✔
637
    thread.start(do_async);
2✔
638
    {
2✔
639
        std::unique_lock<std::mutex> ul(cv_mutex);
2✔
640
        cv.wait(ul, [&]{return init_done;});
4✔
641
    }
2✔
642
    m.unlock();
2✔
643
    thread.join();
2✔
644
    m.release_shared_part();
2✔
645
}
2✔
646

647
#endif
648

649
// Detect and flag trivial implementations of condvars.
650
namespace {
651

652
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
653
{
2✔
654
    millisleep(200);
2✔
655
    {
2✔
656
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
657
        *signals = 1;
2✔
658
        // wakeup any waiters
1✔
659
        cv->notify_all();
2✔
660
    }
2✔
661
    // exit scope to allow waiters to get lock
1✔
662
    millisleep(200);
2✔
663
    {
2✔
664
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
665
        *signals = 2;
2✔
666
        // wakeup any waiters, 2nd time
1✔
667
        cv->notify_all();
2✔
668
    }
2✔
669
    millisleep(200);
2✔
670
    {
2✔
671
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
672
        *signals = 3;
2✔
673
        // wakeup any waiters, 2nd time
1✔
674
        cv->notify_all();
2✔
675
    }
2✔
676
    millisleep(200);
2✔
677
    {
2✔
678
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
679
        *signals = 4;
2✔
680
    }
2✔
681
}
2✔
682

683
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
684
{
2✔
685
    millisleep(1000);
2✔
686
    *signal_state = 2;
2✔
687
    std::lock_guard<InterprocessMutex> l(*mutex);
2✔
688
    cv->notify_all();
2✔
689
}
2✔
690

691

692

693
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
694
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
695
{
20✔
696
    std::lock_guard<InterprocessMutex> l(*mutex);
20✔
697

10✔
698
    {
20✔
699
        std::lock_guard<std::mutex> guard(*control_mutex);
20✔
700
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
20✔
701
    }
20✔
702
    control_cv->notify_one();
20✔
703

10✔
704
    cv->wait(*mutex, nullptr);
20✔
705
}
20✔
706
}
707

708
// Verify, that a wait on a condition variable actually waits
709
// - this test relies on assumptions about scheduling, which
710
//   may not hold on a heavily loaded system.
711
NONCONCURRENT_TEST(Thread_CondvarWaits)
712
{
2✔
713
    int signals = 0;
2✔
714
    InterprocessMutex mutex;
2✔
715
    InterprocessMutex::SharedPart mutex_part;
2✔
716
    InterprocessCondVar changed;
2✔
717
    InterprocessCondVar::SharedPart condvar_part;
2✔
718
    TEST_PATH(path);
2✔
719
    DBOptions default_options;
2✔
720
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
2✔
721
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
2✔
722
    changed.init_shared_part(condvar_part);
2✔
723
    Thread signal_thread;
2✔
724
    signals = 0;
2✔
725
    signal_thread.start(std::bind(signaller, &signals, &mutex, &changed));
2✔
726
    {
2✔
727
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
728
        changed.wait(mutex, nullptr);
2✔
729
        CHECK_EQUAL(signals, 1);
2✔
730
        changed.wait(mutex, nullptr);
2✔
731
        CHECK_EQUAL(signals, 2);
2✔
732
        changed.wait(mutex, nullptr);
2✔
733
        CHECK_EQUAL(signals, 3);
2✔
734
    }
2✔
735
    signal_thread.join();
2✔
736
    changed.release_shared_part();
2✔
737
    mutex.release_shared_part();
2✔
738
}
2✔
739

740
// Verify that a condition variable looses its signal if no one
741
// is waiting on it
742
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
743
{
2✔
744
    int signal_state = 0;
2✔
745
    InterprocessMutex mutex;
2✔
746
    InterprocessMutex::SharedPart mutex_part;
2✔
747
    InterprocessCondVar changed;
2✔
748
    InterprocessCondVar::SharedPart condvar_part;
2✔
749
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
750
    TEST_PATH(path);
2✔
751
    DBOptions default_options;
2✔
752

1✔
753
    // Must have names because default_options.temp_dir is empty string on Windows
1✔
754
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
2✔
755
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
2✔
756
    Thread signal_thread;
2✔
757
    signal_state = 1;
2✔
758
    // send some signals:
1✔
759
    {
2✔
760
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
761
        for (int i = 0; i < 10; ++i)
22✔
762
            changed.notify_all();
20✔
763
    }
2✔
764
    // spawn a thread which will later do one more signal in order
1✔
765
    // to wake us up.
1✔
766
    signal_thread.start(std::bind(wakeup_signaller, &signal_state, &mutex, &changed));
2✔
767
    // Wait for a signal - the signals sent above should be lost, so
1✔
768
    // that this wait will actually wait for the thread to signal.
1✔
769
    {
2✔
770
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
771
        changed.wait(mutex, 0);
2✔
772
        CHECK_EQUAL(signal_state, 2);
2✔
773
    }
2✔
774
    signal_thread.join();
2✔
775
    changed.release_shared_part();
2✔
776
    mutex.release_shared_part();
2✔
777
}
2✔
778

779

780
// this test hangs, if timeout doesn't work.
781
NONCONCURRENT_TEST(Thread_CondvarTimeout)
782
{
2✔
783
    InterprocessMutex mutex;
2✔
784
    InterprocessMutex::SharedPart mutex_part;
2✔
785
    InterprocessCondVar changed;
2✔
786
    InterprocessCondVar::SharedPart condvar_part;
2✔
787
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
788
    TEST_PATH(path);
2✔
789
    DBOptions default_options;
2✔
790
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
2✔
791
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
792
    struct timespec time_limit;
2✔
793
    timeval tv;
2✔
794
    gettimeofday(&tv, nullptr);
2✔
795
    time_limit.tv_sec = tv.tv_sec;
2✔
796
    time_limit.tv_nsec = tv.tv_usec * 1000;
2✔
797
    time_limit.tv_nsec += 100000000;        // 100 msec wait
2✔
798
    if (time_limit.tv_nsec >= 1000000000) { // overflow
2✔
UNCOV
799
        time_limit.tv_nsec -= 1000000000;
×
UNCOV
800
        time_limit.tv_sec += 1;
×
UNCOV
801
    }
×
802
    {
2✔
803
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
804
        for (int i = 0; i < 5; ++i)
12✔
805
            changed.wait(mutex, &time_limit);
10✔
806
    }
2✔
807
    changed.release_shared_part();
2✔
808
    mutex.release_shared_part();
2✔
809
}
2✔
810

811

812
// test that notify_all will wake up all waiting threads, if there
813
// are many waiters:
814
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
815
{
2✔
816
    InterprocessMutex mutex;
2✔
817
    InterprocessMutex::SharedPart mutex_part;
2✔
818
    InterprocessCondVar changed;
2✔
819
    InterprocessCondVar::SharedPart condvar_part;
2✔
820
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
821
    TEST_PATH(path);
2✔
822
    DBOptions default_options;
2✔
823
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
2✔
824
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
2✔
825

1✔
826
    size_t num_threads_holding_lock = 0;
2✔
827
    std::mutex control_mutex;
2✔
828
    std::condition_variable control_cv;
2✔
829

1✔
830
    const size_t num_waiters = 10;
2✔
831
    Thread waiters[num_waiters];
2✔
832
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
833
        waiters[i].start(std::bind(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock));
20✔
834
    }
20✔
835
    {
2✔
836
        // allow all waiters to start and obtain the InterprocessCondVar
1✔
837
        std::unique_lock<std::mutex> unique_lock(control_mutex);
2✔
838
        control_cv.wait(unique_lock, [&]{ return num_threads_holding_lock == num_waiters; });
5✔
839
    }
2✔
840

1✔
841
    mutex.lock();
2✔
842
    changed.notify_all();
2✔
843
    mutex.unlock();
2✔
844

1✔
845
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
846
        waiters[i].join();
20✔
847
    }
20✔
848
    changed.release_shared_part();
2✔
849
    mutex.release_shared_part();
2✔
850
}
2✔
851

852

853
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
854
// gap between them where another thread could invoke signal() which could go undetected by the wait.
855
// This test takes more than 3 days with valgrind.
856
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
857
{
×
858
    SHARED_GROUP_TEST_PATH(path);
×
859

860
    const int iter = 10000;
×
861

862
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
863
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
864
    std::vector<std::thread> threads;
×
865
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
866

867
        threads.push_back(std::thread([&]() {
×
868
            InterprocessMutex mutex;
×
869
            InterprocessMutex::SharedPart mutex_part;
×
870
            InterprocessCondVar condvar;
×
871
            InterprocessCondVar::SharedPart condvar_part;
×
872
            DBOptions default_options;
×
873

874
            std::stringstream ss;
×
875
            ss << std::this_thread::get_id();
×
876
            std::string id = ss.str();
×
877

878
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
879
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
880
            InterprocessCondVar::init_shared_part(condvar_part);
×
881

882
            std::atomic<bool> signal(false);
×
883

884
            std::thread t1([&]() {
×
885
                for (int i = 0; i < iter; i++) {
×
886
                    mutex.lock();
×
887
                    signal = true;
×
888

889
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
890
                    // Problem is that we have so many/frequent operating system wait calls in this that they might 
891
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create 
892
                    // some work that some times willsome times bring the current time slice close to its end.
893

894
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
895
                    // time slice)
896
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL); 
×
897

898
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than 
899
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
900
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
901
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
902
                        sum = sum + 1;
×
903
                    }
×
904

905
                    condvar.wait(mutex, nullptr);
×
906
                    mutex.unlock();
×
907
                }
×
908
            });
×
909

910
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
911
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
912
            // by both threads hanging infinitely.
913
            std::thread t2([&]() {
×
914
                for (int i = 0; i < iter; i++) {
×
915
                    while (!signal) {
×
916
                    }
×
917
                    signal = false;
×
918
                    mutex.lock();
×
919
                    condvar.notify_all();
×
920
                    mutex.unlock();
×
921
                }
×
922
            });
×
923
            
924
            t1.join();
×
925
            t2.join();
×
926
        }));
×
927
    }
×
928

929
    for (int i = 0; i < thread_pair_count; i++) {
×
930
        threads[i].join();
×
931
    }
×
932
}
×
933

934
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
935
{
2✔
936
    auto cv = std::make_unique<InterprocessCondVar>();
2✔
937
    InterprocessCondVar::SharedPart condvar_part;
2✔
938
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
939
    TEST_PATH(path);
2✔
940
    DBOptions default_options;
2✔
941
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
942
    std::thread([&] {
2✔
943
        cv.reset();
2✔
944
    }).join();
2✔
945
}
2✔
946

947
#ifdef _WIN32
948
TEST(Thread_Win32InterprocessBackslashes)
949
{
950
    InterprocessMutex mutex;
951
    InterprocessMutex::SharedPart mutex_part;
952
    InterprocessCondVar condvar;
953
    InterprocessCondVar::SharedPart condvar_part;
954
    InterprocessCondVar::init_shared_part(condvar_part);
955
    DBOptions default_options;
956

957
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
958
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
959
}
960
#endif
961

962
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc