• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2444

27 Jun 2024 05:43PM UTC coverage: 90.951% (+0.02%) from 90.934%
2444

push

Evergreen

web-flow
Fix compacting a Realm file using the existing encryption key (#7844)

`compact()` stored the existing encryption key in a `std::string`, which
expects a nul-terminated string, not a fixed-size buffer. If the key contained
any nul bytes then the key would be truncated and garbage data would be used as
the encryption key instead, and if it didn't then arbitrarily large additional
amounts of memory would also be copied into the buffer. The tests happened to
always use a nul-terminated string as the key and so worked by coincidence.

102140 of 180396 branches covered (56.62%)

52 of 52 new or added lines in 5 files covered. (100.0%)

53 existing lines in 13 files now uncovered.

214756 of 236124 relevant lines covered (90.95%)

5736951.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.14
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
struct Shared {
84
    Mutex m_mutex;
85
    int m_value;
86

87
    // 10000 takes less than 0.1 sec
88
    void increment_10000_times()
89
    {
20✔
90
        for (int i = 0; i < 10000; ++i) {
199,452✔
91
            LockGuard lock(m_mutex);
199,432✔
92
            ++m_value;
199,432✔
93
        }
199,432✔
94
    }
20✔
95

96
    void increment_10000_times2()
97
    {
20✔
98
        for (int i = 0; i < 10000; ++i) {
199,740✔
99
            LockGuard lock(m_mutex);
199,720✔
100
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
101
            // could assemble into 'inc [addr]' which has very tiny gap
102
            double f = m_value;
199,720✔
103
            f += 1.;
199,720✔
104
            m_value = int(f);
199,720✔
105
        }
199,720✔
106
    }
20✔
107
};
108

109
struct SharedWithEmulated {
110
    InterprocessMutex m_mutex;
111
    InterprocessMutex::SharedPart m_shared_part;
112
    int m_value;
113

114
    SharedWithEmulated(std::string name)
115
    {
2✔
116
        m_mutex.set_shared_part(m_shared_part, name, "0");
2✔
117
    }
2✔
118
    ~SharedWithEmulated()
119
    {
2✔
120
        m_mutex.release_shared_part();
2✔
121
    }
2✔
122

123
    // 10000 takes less than 0.1 sec
124
    void increment_10000_times()
125
    {
20✔
126
        for (int i = 0; i < 10000; ++i) {
199,873✔
127
            std::lock_guard<InterprocessMutex> lock(m_mutex);
199,853✔
128
            ++m_value;
199,853✔
129
        }
199,853✔
130
    }
20✔
131

132
    void increment_10000_times2()
133
    {
×
134
        for (int i = 0; i < 10000; ++i) {
×
135
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
136
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
137
            // could assemble into 'inc [addr]' which has very tiny gap
×
138
            double f = m_value;
×
139
            f += 1.;
×
140
            m_value = int(f);
×
141
        }
×
142
    }
×
143
};
144

145
struct Robust {
146
    RobustMutex m_mutex;
147
    bool m_recover_called;
148

149
    void simulate_death()
150
    {
×
151
        m_mutex.lock(std::bind(&Robust::recover, this));
×
152
        // Do not unlock
×
153
    }
×
154

155
    void simulate_death_during_recovery()
156
    {
×
157
        bool no_thread_has_died = m_mutex.low_level_lock();
×
158
        if (!no_thread_has_died)
×
159
            m_recover_called = true;
×
160
        // Do not unlock
×
161
    }
×
162

163
    void recover()
164
    {
×
165
        m_recover_called = true;
×
166
    }
×
167

168
    void recover_throw()
169
    {
×
170
        m_recover_called = true;
×
171
        throw RobustMutex::NotRecoverable();
×
172
    }
×
173
};
174

175

176
class QueueMonitor {
177
public:
178
    QueueMonitor()
179
        : m_closed(false)
1✔
180
    {
2✔
181
    }
2✔
182

183
    bool get(int& value)
184
    {
63,923✔
185
        LockGuard lock(m_mutex);
63,923✔
186
        for (;;) {
84,215✔
187
            if (!m_queue.empty())
84,215✔
188
                break;
64,000✔
189
            if (m_closed)
20,215✔
190
                return false;
64✔
191
            m_nonempty_or_closed.wait(lock); // Wait for producer
20,151✔
192
        }
20,151✔
193
        bool was_full = m_queue.size() == max_queue_size;
63,859✔
194
        value = m_queue.front();
63,859✔
195
        m_queue.pop();
63,859✔
196
        if (was_full)
63,859✔
197
            m_nonfull.notify_all(); // Resume a waiting producer
10,988✔
198
        return true;
63,859✔
199
    }
63,923✔
200

201
    void put(int value)
202
    {
63,879✔
203
        LockGuard lock(m_mutex);
63,879✔
204
        while (m_queue.size() == max_queue_size)
80,654✔
205
            m_nonfull.wait(lock); // Wait for consumer
16,775✔
206
        bool was_empty = m_queue.empty();
63,879✔
207
        m_queue.push(value);
63,879✔
208
        if (was_empty)
63,879✔
209
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
12,285✔
210
    }
63,879✔
211

212
    void close()
213
    {
2✔
214
        LockGuard lock(m_mutex);
2✔
215
        m_closed = true;
2✔
216
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
2✔
217
    }
2✔
218

219
private:
220
    Mutex m_mutex;
221
    CondVar m_nonempty_or_closed, m_nonfull;
222
    std::queue<int> m_queue;
223
    bool m_closed;
224

225
    static const unsigned max_queue_size = 8;
226
};
227

228
void producer_thread(QueueMonitor* queue, int value)
229
{
64✔
230
    for (int i = 0; i < 1000; ++i) {
63,965✔
231
        queue->put(value);
63,901✔
232
    }
63,901✔
233
}
64✔
234

235
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
236
{
64✔
237
    for (;;) {
63,945✔
238
        int value = 0;
63,945✔
239
        bool closed = !queue->get(value);
63,945✔
240
        if (closed)
63,945✔
241
            return;
64✔
242
        ++consumed_counts[value];
63,881✔
243
    }
63,881✔
244
}
64✔
245

246

247
} // anonymous namespace
248

249

250
TEST(Thread_MutexLock)
251
{
2✔
252
    Mutex mutex;
2✔
253
    {
2✔
254
        LockGuard lock(mutex);
2✔
255
    }
2✔
256
    {
2✔
257
        LockGuard lock(mutex);
2✔
258
    }
2✔
259
}
2✔
260

261
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
262
TEST(Thread_ProcessSharedMutex)
263
{
264
    Mutex mutex((Mutex::process_shared_tag()));
265
    {
266
        LockGuard lock(mutex);
267
    }
268
    {
269
        LockGuard lock(mutex);
270
    }
271
}
272
#endif
273

274
TEST(Thread_CriticalSection)
275
{
2✔
276
    Shared shared;
2✔
277
    shared.m_value = 0;
2✔
278
    std::thread threads[10];
2✔
279
    for (int i = 0; i < 10; ++i)
22✔
280
        threads[i] = std::thread(&Shared::increment_10000_times, &shared);
20✔
281
    for (int i = 0; i < 10; ++i)
22✔
282
        threads[i].join();
20✔
283
    CHECK_EQUAL(100000, shared.m_value);
2✔
284
}
2✔
285

286

287
TEST(Thread_EmulatedMutex_CriticalSection)
288
{
2✔
289
    TEST_PATH(path);
2✔
290
    SharedWithEmulated shared(path);
2✔
291
    shared.m_value = 0;
2✔
292
    std::thread threads[10];
2✔
293
    for (int i = 0; i < 10; ++i)
22✔
294
        threads[i] = std::thread(&SharedWithEmulated::increment_10000_times, &shared);
20✔
295
    for (int i = 0; i < 10; ++i)
22✔
296
        threads[i].join();
20✔
297
    CHECK_EQUAL(100000, shared.m_value);
2✔
298
}
2✔
299

300

301
TEST(Thread_CriticalSection2)
302
{
2✔
303
    Shared shared;
2✔
304
    shared.m_value = 0;
2✔
305
    std::thread threads[10];
2✔
306
    for (int i = 0; i < 10; ++i)
22✔
307
        threads[i] = std::thread(&Shared::increment_10000_times2, &shared);
20✔
308
    for (int i = 0; i < 10; ++i)
22✔
309
        threads[i].join();
20✔
310
    CHECK_EQUAL(100000, shared.m_value);
2✔
311
}
2✔
312

313

314
// Todo. Not supported on Windows in particular? Keywords: winbug
315
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
316
{
×
317
    // Abort if robust mutexes are not supported on the current
318
    // platform. Otherwise we would probably get into a dead-lock.
319
    if (!RobustMutex::is_robust_on_this_platform)
×
320
        return;
×
321

322
    Robust robust;
×
323

324
    // Check that lock/unlock cycle works and does not involve recovery
325
    robust.m_recover_called = false;
×
326
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
327
    CHECK(!robust.m_recover_called);
×
328
    robust.m_mutex.unlock();
×
329
    robust.m_recover_called = false;
×
330
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
331
    CHECK(!robust.m_recover_called);
×
332
    robust.m_mutex.unlock();
×
333

334
    // Check recovery by simulating a death
335
    robust.m_recover_called = false;
×
336
    std::thread(&Robust::simulate_death, &robust).join();
×
337
    CHECK(!robust.m_recover_called);
×
338
    robust.m_recover_called = false;
×
339
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
340
    CHECK(robust.m_recover_called);
×
341
    robust.m_mutex.unlock();
×
342

343
    // One more round of recovery
344
    robust.m_recover_called = false;
×
345
    std::thread(&Robust::simulate_death, &robust).join();
×
346
    CHECK(!robust.m_recover_called);
×
347
    robust.m_recover_called = false;
×
348
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
349
    CHECK(robust.m_recover_called);
×
350
    robust.m_mutex.unlock();
×
351

352
    // Simulate a case where recovery fails or is impossible
353
    robust.m_recover_called = false;
×
354
    std::thread(&Robust::simulate_death, &robust).join();
×
355
    CHECK(!robust.m_recover_called);
×
356
    robust.m_recover_called = false;
×
357
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
358
    CHECK(robust.m_recover_called);
×
359

360
    // Check that successive attempts at locking will throw
361
    robust.m_recover_called = false;
×
362
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
363
    CHECK(!robust.m_recover_called);
×
364
    robust.m_recover_called = false;
×
365
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
366
    CHECK(!robust.m_recover_called);
×
367
}
×
368

369

370
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
371
{
×
372
    // Abort if robust mutexes are not supported on the current
373
    // platform. Otherwise we would probably get into a dead-lock.
374
    if (!RobustMutex::is_robust_on_this_platform)
×
375
        return;
×
376

377
    // This test checks that death during recovery causes a robust
378
    // mutex to stay in the 'inconsistent' state.
379

380
    Robust robust;
×
381

382
    // Bring the mutex into the 'inconsistent' state
383
    robust.m_recover_called = false;
×
384
    std::thread(&Robust::simulate_death, &robust).join();
×
385
    CHECK(!robust.m_recover_called);
×
386

387
    // Die while recovering
388
    robust.m_recover_called = false;
×
389
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
390
    CHECK(robust.m_recover_called);
×
391

392
    // The mutex is still in the 'inconsistent' state if another
393
    // attempt at locking it calls the recovery function
394
    robust.m_recover_called = false;
×
395
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
396
    CHECK(robust.m_recover_called);
×
397
    robust.m_mutex.unlock();
×
398

399
    // Now that the mutex is fully recovered, we should be able to
400
    // carry out a regular round of lock/unlock
401
    robust.m_recover_called = false;
×
402
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
403
    CHECK(!robust.m_recover_called);
×
404
    robust.m_mutex.unlock();
×
405

406
    // Try a double death during recovery
407
    robust.m_recover_called = false;
×
408
    std::thread(&Robust::simulate_death, &robust).join();
×
409
    CHECK(!robust.m_recover_called);
×
410
    robust.m_recover_called = false;
×
411
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
412
    CHECK(robust.m_recover_called);
×
413
    robust.m_recover_called = false;
×
414
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
415
    CHECK(robust.m_recover_called);
×
416
    robust.m_recover_called = false;
×
417
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
418
    CHECK(robust.m_recover_called);
×
419
    robust.m_mutex.unlock();
×
420
    robust.m_recover_called = false;
×
421
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
422
    CHECK(!robust.m_recover_called);
×
423
    robust.m_mutex.unlock();
×
424
}
×
425

426

427
TEST(Thread_CondVar)
428
{
2✔
429
    QueueMonitor queue;
2✔
430
    const int num_producers = 32;
2✔
431
    const int num_consumers = 32;
2✔
432
    std::thread producers[num_producers], consumers[num_consumers];
2✔
433
    int consumed_counts[num_consumers][num_producers];
2✔
434
    memset(consumed_counts, 0, sizeof consumed_counts);
2✔
435

436
    for (int i = 0; i < num_producers; ++i)
66✔
437
        producers[i] = std::thread(&producer_thread, &queue, i);
64✔
438
    for (int i = 0; i < num_consumers; ++i)
66✔
439
        consumers[i] = std::thread(&consumer_thread, &queue, &consumed_counts[i][0]);
64✔
440
    for (int i = 0; i < num_producers; ++i)
66✔
441
        producers[i].join();
64✔
442
    queue.close(); // Stop consumers when queue is empty
2✔
443
    for (int i = 0; i < num_consumers; ++i)
66✔
444
        consumers[i].join();
64✔
445

446
    for (int i = 0; i < num_producers; ++i) {
66✔
447
        int n = 0;
64✔
448
        for (int j = 0; j < num_consumers; ++j)
2,112✔
449
            n += consumed_counts[j][i];
2,048✔
450
        CHECK_EQUAL(1000, n);
64✔
451
    }
64✔
452
}
2✔
453

454
TEST(Thread_MutexTryLock)
455
{
2✔
456
    Mutex base_mutex;
2✔
457
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
2✔
458

459
    std::condition_variable cv;
2✔
460
    std::mutex cv_lock;
2✔
461

462
    // basic same thread try_lock
463
    CHECK(m.try_lock());
2✔
464
    CHECK(m.owns_lock());
2✔
465
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
2✔
466
    m.unlock();
2✔
467

468
    bool init_done = false;
2✔
469
    auto do_async = [&]() {
2✔
470
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
2✔
471
        CHECK(!mutex2.owns_lock());
2✔
472
        CHECK(!mutex2.try_lock());
2✔
473
        {
2✔
474
            std::lock_guard<std::mutex> guard(cv_lock);
2✔
475
            init_done = true;
2✔
476
        }
2✔
477
        cv.notify_one();
2✔
478
        while (!mutex2.try_lock()) {
4✔
479
            millisleep(1);
2✔
480
        }
2✔
481
        CHECK(mutex2.owns_lock());
2✔
482
        mutex2.unlock();
2✔
483
    };
2✔
484

485
    // Check basic locking across threads.
486
    CHECK(!m.owns_lock());
2✔
487
    CHECK(m.try_lock());
2✔
488
    CHECK(m.owns_lock());
2✔
489
    std::thread thread(do_async);
2✔
490
    {
2✔
491
        std::unique_lock<std::mutex> guard(cv_lock);
2✔
492
        cv.wait(guard, [&] {
4✔
493
            return init_done;
4✔
494
        });
4✔
495
    }
2✔
496
    m.unlock();
2✔
497
    thread.join();
2✔
498
}
2✔
499

500
TEST(Thread_RobustMutexTryLock)
501
{
2✔
502
    // Abort if robust mutexes are not supported on the current
503
    // platform. Otherwise we would probably get into a dead-lock.
504
    if (!RobustMutex::is_robust_on_this_platform)
2✔
505
        return;
2✔
506

507
    RobustMutex m;
×
508
    int times_recover_function_was_called = 0;
×
509

510
    auto recover_function = [&]() {
×
511
        ++times_recover_function_was_called;
×
512
    };
×
513
    // basic same thread try_lock
514
    CHECK(m.try_lock(recover_function));
×
515
    CHECK(!m.try_lock(recover_function));
×
516
    m.unlock();
×
517
    CHECK(times_recover_function_was_called == 0);
×
518

519
    bool init_done = false;
×
520
    std::mutex control_mutex;
×
521
    std::condition_variable control_cv;
×
522

523
    auto do_async = [&]() {
×
524
        CHECK(!m.try_lock(recover_function));
×
525
        {
×
526
            std::lock_guard<std::mutex> guard(control_mutex);
×
527
            init_done = true;
×
528
        }
×
529
        control_cv.notify_one();
×
530
        while (!m.try_lock(recover_function)) {
×
531
            millisleep(1);
×
532
        }
×
533
        // exit the thread with the lock held to check robustness
534
    };
×
535

536
    // Check basic locking across threads.
537
    CHECK(m.try_lock(recover_function));
×
538
    std::thread thread(do_async);
×
539
    {
×
540
        std::unique_lock<std::mutex> lock(control_mutex);
×
541
        control_cv.wait(lock, [&] {
×
542
            return init_done;
×
543
        });
×
544
    }
×
545
    m.unlock();
×
546
    thread.join();
×
547
    CHECK(times_recover_function_was_called == 0);
×
548
    // at this point the thread that obtained the mutex is dead with the lock
549
    CHECK(m.try_lock(recover_function));
×
550
    CHECK(times_recover_function_was_called == 1);
×
551
    m.unlock();
×
552
}
×
553

554
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
555
               // because we are going to switch to native API soon and discard win32-pthread entirely
556
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
557
{
2✔
558
    InterprocessMutex::SharedPart mutex_part;
2✔
559

560
    InterprocessMutex m;
2✔
561
    TEST_PATH(path);
2✔
562
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
2✔
563
    m.set_shared_part(mutex_part, path, mutex_file_name);
2✔
564

565
    // basic same thread try_lock
566
    CHECK(m.try_lock());
2✔
567
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
2✔
568
    m.unlock();
2✔
569

570
    bool init_done = false;
2✔
571
    std::condition_variable cv;
2✔
572
    std::mutex cv_mutex;
2✔
573
    auto do_async = [&]() {
2✔
574
        InterprocessMutex m2;
2✔
575
        m2.set_shared_part(mutex_part, path, mutex_file_name);
2✔
576

577
        CHECK(!m2.try_lock());
2✔
578
        {
2✔
579
            std::lock_guard<std::mutex> guard(cv_mutex);
2✔
580
            init_done = true;
2✔
581
        }
2✔
582
        cv.notify_one();
2✔
583
        while (!m2.try_lock()) {
4✔
584
            millisleep(1);
2✔
585
        }
2✔
586
        m2.unlock();
2✔
587
    };
2✔
588

589
    // Check basic locking across threads.
590
    CHECK(m.try_lock());
2✔
591
    std::thread thread(do_async);
2✔
592
    {
2✔
593
        std::unique_lock<std::mutex> ul(cv_mutex);
2✔
594
        cv.wait(ul, [&] {
4✔
595
            return init_done;
4✔
596
        });
4✔
597
    }
2✔
598
    m.unlock();
2✔
599
    thread.join();
2✔
600
    m.release_shared_part();
2✔
601
}
2✔
602

603
#endif
604

605
// Detect and flag trivial implementations of condvars.
606
namespace {
607

608
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
609
{
2✔
610
    millisleep(200);
2✔
611
    {
2✔
612
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
613
        *signals = 1;
2✔
614
        // wakeup any waiters
615
        cv->notify_all();
2✔
616
    }
2✔
617
    // exit scope to allow waiters to get lock
618
    millisleep(200);
2✔
619
    {
2✔
620
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
621
        *signals = 2;
2✔
622
        // wakeup any waiters, 2nd time
623
        cv->notify_all();
2✔
624
    }
2✔
625
    millisleep(200);
2✔
626
    {
2✔
627
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
628
        *signals = 3;
2✔
629
        // wakeup any waiters, 2nd time
630
        cv->notify_all();
2✔
631
    }
2✔
632
    millisleep(200);
2✔
633
    {
2✔
634
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
635
        *signals = 4;
2✔
636
    }
2✔
637
}
2✔
638

639
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
640
{
2✔
641
    millisleep(1000);
2✔
642
    *signal_state = 2;
2✔
643
    std::lock_guard<InterprocessMutex> l(*mutex);
2✔
644
    cv->notify_all();
2✔
645
}
2✔
646

647

648
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
649
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
650
{
20✔
651
    std::lock_guard<InterprocessMutex> l(*mutex);
20✔
652

653
    {
20✔
654
        std::lock_guard<std::mutex> guard(*control_mutex);
20✔
655
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
20✔
656
    }
20✔
657
    control_cv->notify_one();
20✔
658

659
    cv->wait(*mutex, nullptr);
20✔
660
}
20✔
661
} // namespace
662

663
// Verify, that a wait on a condition variable actually waits
664
// - this test relies on assumptions about scheduling, which
665
//   may not hold on a heavily loaded system.
666
NONCONCURRENT_TEST(Thread_CondvarWaits)
667
{
2✔
668
    int signals = 0;
2✔
669
    InterprocessMutex mutex;
2✔
670
    InterprocessMutex::SharedPart mutex_part;
2✔
671
    InterprocessCondVar changed;
2✔
672
    InterprocessCondVar::SharedPart condvar_part;
2✔
673
    TEST_PATH(path);
2✔
674
    DBOptions default_options;
2✔
675
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
2✔
676
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
2✔
677
    changed.init_shared_part(condvar_part);
2✔
678
    signals = 0;
2✔
679
    std::thread signal_thread(signaller, &signals, &mutex, &changed);
2✔
680
    {
2✔
681
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
682
        changed.wait(mutex, nullptr);
2✔
683
        CHECK_EQUAL(signals, 1);
2✔
684
        changed.wait(mutex, nullptr);
2✔
685
        CHECK_EQUAL(signals, 2);
2✔
686
        changed.wait(mutex, nullptr);
2✔
687
        CHECK_EQUAL(signals, 3);
2✔
688
    }
2✔
689
    signal_thread.join();
2✔
690
    changed.release_shared_part();
2✔
691
    mutex.release_shared_part();
2✔
692
}
2✔
693

694
// Verify that a condition variable looses its signal if no one
695
// is waiting on it
696
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
697
{
2✔
698
    int signal_state = 0;
2✔
699
    InterprocessMutex mutex;
2✔
700
    InterprocessMutex::SharedPart mutex_part;
2✔
701
    InterprocessCondVar changed;
2✔
702
    InterprocessCondVar::SharedPart condvar_part;
2✔
703
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
704
    TEST_PATH(path);
2✔
705
    DBOptions default_options;
2✔
706

707
    // Must have names because default_options.temp_dir is empty string on Windows
708
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
2✔
709
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
2✔
710
    signal_state = 1;
2✔
711
    // send some signals:
712
    {
2✔
713
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
714
        for (int i = 0; i < 10; ++i)
22✔
715
            changed.notify_all();
20✔
716
    }
2✔
717
    // spawn a thread which will later do one more signal in order
718
    // to wake us up.
719
    std::thread signal_thread(wakeup_signaller, &signal_state, &mutex, &changed);
2✔
720
    // Wait for a signal - the signals sent above should be lost, so
721
    // that this wait will actually wait for the thread to signal.
722
    {
2✔
723
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
724
        changed.wait(mutex, 0);
2✔
725
        CHECK_EQUAL(signal_state, 2);
2✔
726
    }
2✔
727
    signal_thread.join();
2✔
728
    changed.release_shared_part();
2✔
729
    mutex.release_shared_part();
2✔
730
}
2✔
731

732

733
// this test hangs, if timeout doesn't work.
734
NONCONCURRENT_TEST(Thread_CondvarTimeout)
735
{
2✔
736
    InterprocessMutex mutex;
2✔
737
    InterprocessMutex::SharedPart mutex_part;
2✔
738
    InterprocessCondVar changed;
2✔
739
    InterprocessCondVar::SharedPart condvar_part;
2✔
740
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
741
    TEST_PATH(path);
2✔
742
    DBOptions default_options;
2✔
743
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
2✔
744
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
745
    struct timespec time_limit;
2✔
746
    timeval tv;
2✔
747
    gettimeofday(&tv, nullptr);
2✔
748
    time_limit.tv_sec = tv.tv_sec;
2✔
749
    time_limit.tv_nsec = tv.tv_usec * 1000;
2✔
750
    time_limit.tv_nsec += 100000000;        // 100 msec wait
2✔
751
    if (time_limit.tv_nsec >= 1000000000) { // overflow
2✔
UNCOV
752
        time_limit.tv_nsec -= 1000000000;
×
UNCOV
753
        time_limit.tv_sec += 1;
×
UNCOV
754
    }
×
755
    {
2✔
756
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
757
        for (int i = 0; i < 5; ++i)
12✔
758
            changed.wait(mutex, &time_limit);
10✔
759
    }
2✔
760
    changed.release_shared_part();
2✔
761
    mutex.release_shared_part();
2✔
762
}
2✔
763

764

765
// test that notify_all will wake up all waiting threads, if there
766
// are many waiters:
767
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
768
{
2✔
769
    InterprocessMutex mutex;
2✔
770
    InterprocessMutex::SharedPart mutex_part;
2✔
771
    InterprocessCondVar changed;
2✔
772
    InterprocessCondVar::SharedPart condvar_part;
2✔
773
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
774
    TEST_PATH(path);
2✔
775
    DBOptions default_options;
2✔
776
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
2✔
777
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
2✔
778

779
    size_t num_threads_holding_lock = 0;
2✔
780
    std::mutex control_mutex;
2✔
781
    std::condition_variable control_cv;
2✔
782

783
    const size_t num_waiters = 10;
2✔
784
    std::thread waiters[num_waiters];
2✔
785
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
786
        waiters[i] = std::thread(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock);
20✔
787
    }
20✔
788
    {
2✔
789
        // allow all waiters to start and obtain the InterprocessCondVar
790
        std::unique_lock<std::mutex> unique_lock(control_mutex);
2✔
791
        control_cv.wait(unique_lock, [&] {
4✔
792
            return num_threads_holding_lock == num_waiters;
4✔
793
        });
4✔
794
    }
2✔
795

796
    mutex.lock();
2✔
797
    changed.notify_all();
2✔
798
    mutex.unlock();
2✔
799

800
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
801
        waiters[i].join();
20✔
802
    }
20✔
803
    changed.release_shared_part();
2✔
804
    mutex.release_shared_part();
2✔
805
}
2✔
806

807

808
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
809
// gap between them where another thread could invoke signal() which could go undetected by the wait.
810
// This test takes more than 3 days with valgrind.
811
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
812
{
×
813
    SHARED_GROUP_TEST_PATH(path);
×
814

815
    const int iter = 10000;
×
816

817
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
818
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
819
    std::vector<std::thread> threads;
×
820
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
821

822
        threads.push_back(std::thread([&]() {
×
823
            InterprocessMutex mutex;
×
824
            InterprocessMutex::SharedPart mutex_part;
×
825
            InterprocessCondVar condvar;
×
826
            InterprocessCondVar::SharedPart condvar_part;
×
827
            DBOptions default_options;
×
828

829
            std::stringstream ss;
×
830
            ss << std::this_thread::get_id();
×
831
            std::string id = ss.str();
×
832

833
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
834
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
835
            InterprocessCondVar::init_shared_part(condvar_part);
×
836

837
            std::atomic<bool> signal(false);
×
838

839
            std::thread t1([&]() {
×
840
                for (int i = 0; i < iter; i++) {
×
841
                    mutex.lock();
×
842
                    signal = true;
×
843

844
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
845
                    // Problem is that we have so many/frequent operating system wait calls in this that they might
846
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create
847
                    // some work that some times willsome times bring the current time slice close to its end.
848

849
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
850
                    // time slice)
851
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL);
×
852

853
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than
854
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
855
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
856
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
857
                        sum = sum + 1;
×
858
                    }
×
859

860
                    condvar.wait(mutex, nullptr);
×
861
                    mutex.unlock();
×
862
                }
×
863
            });
×
864

865
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
866
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
867
            // by both threads hanging infinitely.
868
            std::thread t2([&]() {
×
869
                for (int i = 0; i < iter; i++) {
×
870
                    while (!signal) {
×
871
                    }
×
872
                    signal = false;
×
873
                    mutex.lock();
×
874
                    condvar.notify_all();
×
875
                    mutex.unlock();
×
876
                }
×
877
            });
×
878

879
            t1.join();
×
880
            t2.join();
×
881
        }));
×
882
    }
×
883

884
    for (int i = 0; i < thread_pair_count; i++) {
×
885
        threads[i].join();
×
886
    }
×
887
}
×
888

889
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
890
{
2✔
891
    auto cv = std::make_unique<InterprocessCondVar>();
2✔
892
    InterprocessCondVar::SharedPart condvar_part;
2✔
893
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
894
    TEST_PATH(path);
2✔
895
    DBOptions default_options;
2✔
896
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
897
    std::thread([&] {
2✔
898
        cv.reset();
2✔
899
    }).join();
2✔
900
}
2✔
901

902
#ifdef _WIN32
903
TEST(Thread_Win32InterprocessBackslashes)
904
{
905
    InterprocessMutex mutex;
906
    InterprocessMutex::SharedPart mutex_part;
907
    InterprocessCondVar condvar;
908
    InterprocessCondVar::SharedPart condvar_part;
909
    InterprocessCondVar::init_shared_part(condvar_part);
910
    DBOptions default_options;
911

912
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
913
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
914
}
915
#endif
916

917
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc