• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_312964

19 Feb 2025 07:31PM UTC coverage: 90.814% (-0.3%) from 91.119%
github_pull_request_312964

Pull #8071

Evergreen

web-flow
Bump serialize-javascript and mocha

Bumps [serialize-javascript](https://github.com/yahoo/serialize-javascript) to 6.0.2 and updates ancestor dependency [mocha](https://github.com/mochajs/mocha). These dependencies need to be updated together.


Updates `serialize-javascript` from 6.0.0 to 6.0.2
- [Release notes](https://github.com/yahoo/serialize-javascript/releases)
- [Commits](https://github.com/yahoo/serialize-javascript/compare/v6.0.0...v6.0.2)

Updates `mocha` from 10.2.0 to 10.8.2
- [Release notes](https://github.com/mochajs/mocha/releases)
- [Changelog](https://github.com/mochajs/mocha/blob/main/CHANGELOG.md)
- [Commits](https://github.com/mochajs/mocha/compare/v10.2.0...v10.8.2)

---
updated-dependencies:
- dependency-name: serialize-javascript
  dependency-type: indirect
- dependency-name: mocha
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #8071: Bump serialize-javascript and mocha

96552 of 179126 branches covered (53.9%)

212672 of 234185 relevant lines covered (90.81%)

3115802.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.79
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
struct Shared {
84
    Mutex m_mutex;
85
    int m_value;
86

87
    // 10000 takes less than 0.1 sec
88
    void increment_10000_times()
89
    {
10✔
90
        for (int i = 0; i < 10000; ++i) {
97,850✔
91
            LockGuard lock(m_mutex);
97,840✔
92
            ++m_value;
97,840✔
93
        }
97,840✔
94
    }
10✔
95

96
    void increment_10000_times2()
97
    {
10✔
98
        for (int i = 0; i < 10000; ++i) {
100,010✔
99
            LockGuard lock(m_mutex);
100,000✔
100
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
101
            // could assemble into 'inc [addr]' which has very tiny gap
102
            double f = m_value;
100,000✔
103
            f += 1.;
100,000✔
104
            m_value = int(f);
100,000✔
105
        }
100,000✔
106
    }
10✔
107
};
108

109
struct SharedWithEmulated {
110
    InterprocessMutex m_mutex;
111
    InterprocessMutex::SharedPart m_shared_part;
112
    int m_value;
113

114
    SharedWithEmulated(std::string name)
115
    {
1✔
116
        m_mutex.set_shared_part(m_shared_part, name, "0");
1✔
117
    }
1✔
118
    ~SharedWithEmulated()
119
    {
1✔
120
        m_mutex.release_shared_part();
1✔
121
    }
1✔
122

123
    // 10000 takes less than 0.1 sec
124
    void increment_10000_times()
125
    {
10✔
126
        for (int i = 0; i < 10000; ++i) {
97,513✔
127
            std::lock_guard<InterprocessMutex> lock(m_mutex);
97,503✔
128
            ++m_value;
97,503✔
129
        }
97,503✔
130
    }
10✔
131

132
    void increment_10000_times2()
133
    {
×
134
        for (int i = 0; i < 10000; ++i) {
×
135
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
136
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
137
            // could assemble into 'inc [addr]' which has very tiny gap
×
138
            double f = m_value;
×
139
            f += 1.;
×
140
            m_value = int(f);
×
141
        }
×
142
    }
×
143
};
144

145
struct Robust {
146
    RobustMutex m_mutex;
147
    bool m_recover_called;
148

149
    void simulate_death()
150
    {
×
151
        m_mutex.lock(std::bind(&Robust::recover, this));
×
152
        // Do not unlock
×
153
    }
×
154

155
    void simulate_death_during_recovery()
156
    {
×
157
        bool no_thread_has_died = m_mutex.low_level_lock();
×
158
        if (!no_thread_has_died)
×
159
            m_recover_called = true;
×
160
        // Do not unlock
×
161
    }
×
162

163
    void recover()
164
    {
×
165
        m_recover_called = true;
×
166
    }
×
167

168
    void recover_throw()
169
    {
×
170
        m_recover_called = true;
×
171
        throw RobustMutex::NotRecoverable();
×
172
    }
×
173
};
174

175

176
class QueueMonitor {
177
public:
178
    QueueMonitor()
179
        : m_closed(false)
1✔
180
    {
1✔
181
    }
1✔
182

183
    bool get(int& value)
184
    {
31,919✔
185
        LockGuard lock(m_mutex);
31,919✔
186
        for (;;) {
38,182✔
187
            if (!m_queue.empty())
38,182✔
188
                break;
32,000✔
189
            if (m_closed)
6,182✔
190
                return false;
32✔
191
            m_nonempty_or_closed.wait(lock); // Wait for producer
6,150✔
192
        }
6,150✔
193
        bool was_full = m_queue.size() == max_queue_size;
31,887✔
194
        value = m_queue.front();
31,887✔
195
        m_queue.pop();
31,887✔
196
        if (was_full)
31,887✔
197
            m_nonfull.notify_all(); // Resume a waiting producer
5,569✔
198
        return true;
31,887✔
199
    }
31,919✔
200

201
    void put(int value)
202
    {
31,887✔
203
        LockGuard lock(m_mutex);
31,887✔
204
        while (m_queue.size() == max_queue_size)
38,440✔
205
            m_nonfull.wait(lock); // Wait for consumer
6,553✔
206
        bool was_empty = m_queue.empty();
31,887✔
207
        m_queue.push(value);
31,887✔
208
        if (was_empty)
31,887✔
209
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
5,386✔
210
    }
31,887✔
211

212
    void close()
213
    {
1✔
214
        LockGuard lock(m_mutex);
1✔
215
        m_closed = true;
1✔
216
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
1✔
217
    }
1✔
218

219
private:
220
    Mutex m_mutex;
221
    CondVar m_nonempty_or_closed, m_nonfull;
222
    std::queue<int> m_queue;
223
    bool m_closed;
224

225
    static const unsigned max_queue_size = 8;
226
};
227

228
void producer_thread(QueueMonitor* queue, int value)
229
{
32✔
230
    for (int i = 0; i < 1000; ++i) {
31,942✔
231
        queue->put(value);
31,910✔
232
    }
31,910✔
233
}
32✔
234

235
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
236
{
32✔
237
    for (;;) {
31,939✔
238
        int value = 0;
31,939✔
239
        bool closed = !queue->get(value);
31,939✔
240
        if (closed)
31,939✔
241
            return;
32✔
242
        ++consumed_counts[value];
31,907✔
243
    }
31,907✔
244
}
32✔
245

246

247
} // anonymous namespace
248

249

250
TEST(Thread_MutexLock)
251
{
1✔
252
    Mutex mutex;
1✔
253
    {
1✔
254
        LockGuard lock(mutex);
1✔
255
    }
1✔
256
    {
1✔
257
        LockGuard lock(mutex);
1✔
258
    }
1✔
259
}
1✔
260

261
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
262
TEST(Thread_ProcessSharedMutex)
263
{
264
    Mutex mutex((Mutex::process_shared_tag()));
265
    {
266
        LockGuard lock(mutex);
267
    }
268
    {
269
        LockGuard lock(mutex);
270
    }
271
}
272
#endif
273

274
TEST(Thread_CriticalSection)
275
{
1✔
276
    Shared shared;
1✔
277
    shared.m_value = 0;
1✔
278
    std::thread threads[10];
1✔
279
    for (int i = 0; i < 10; ++i)
11✔
280
        threads[i] = std::thread(&Shared::increment_10000_times, &shared);
10✔
281
    for (int i = 0; i < 10; ++i)
11✔
282
        threads[i].join();
10✔
283
    CHECK_EQUAL(100000, shared.m_value);
1✔
284
}
1✔
285

286

287
TEST(Thread_EmulatedMutex_CriticalSection)
288
{
1✔
289
    TEST_PATH(path);
1✔
290
    SharedWithEmulated shared(path);
1✔
291
    shared.m_value = 0;
1✔
292
    std::thread threads[10];
1✔
293
    for (int i = 0; i < 10; ++i)
11✔
294
        threads[i] = std::thread(&SharedWithEmulated::increment_10000_times, &shared);
10✔
295
    for (int i = 0; i < 10; ++i)
11✔
296
        threads[i].join();
10✔
297
    CHECK_EQUAL(100000, shared.m_value);
1✔
298
}
1✔
299

300

301
TEST(Thread_CriticalSection2)
302
{
1✔
303
    Shared shared;
1✔
304
    shared.m_value = 0;
1✔
305
    std::thread threads[10];
1✔
306
    for (int i = 0; i < 10; ++i)
11✔
307
        threads[i] = std::thread(&Shared::increment_10000_times2, &shared);
10✔
308
    for (int i = 0; i < 10; ++i)
11✔
309
        threads[i].join();
10✔
310
    CHECK_EQUAL(100000, shared.m_value);
1✔
311
}
1✔
312

313

314
// Todo. Not supported on Windows in particular? Keywords: winbug
315
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
316
{
×
317
    // Abort if robust mutexes are not supported on the current
318
    // platform. Otherwise we would probably get into a dead-lock.
319
    if (!RobustMutex::is_robust_on_this_platform)
×
320
        return;
×
321

322
    Robust robust;
×
323

324
    // Check that lock/unlock cycle works and does not involve recovery
325
    robust.m_recover_called = false;
×
326
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
327
    CHECK(!robust.m_recover_called);
×
328
    robust.m_mutex.unlock();
×
329
    robust.m_recover_called = false;
×
330
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
331
    CHECK(!robust.m_recover_called);
×
332
    robust.m_mutex.unlock();
×
333

334
    // Check recovery by simulating a death
335
    robust.m_recover_called = false;
×
336
    std::thread(&Robust::simulate_death, &robust).join();
×
337
    CHECK(!robust.m_recover_called);
×
338
    robust.m_recover_called = false;
×
339
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
340
    CHECK(robust.m_recover_called);
×
341
    robust.m_mutex.unlock();
×
342

343
    // One more round of recovery
344
    robust.m_recover_called = false;
×
345
    std::thread(&Robust::simulate_death, &robust).join();
×
346
    CHECK(!robust.m_recover_called);
×
347
    robust.m_recover_called = false;
×
348
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
349
    CHECK(robust.m_recover_called);
×
350
    robust.m_mutex.unlock();
×
351

352
    // Simulate a case where recovery fails or is impossible
353
    robust.m_recover_called = false;
×
354
    std::thread(&Robust::simulate_death, &robust).join();
×
355
    CHECK(!robust.m_recover_called);
×
356
    robust.m_recover_called = false;
×
357
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
358
    CHECK(robust.m_recover_called);
×
359

360
    // Check that successive attempts at locking will throw
361
    robust.m_recover_called = false;
×
362
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
363
    CHECK(!robust.m_recover_called);
×
364
    robust.m_recover_called = false;
×
365
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
366
    CHECK(!robust.m_recover_called);
×
367
}
×
368

369

370
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
371
{
×
372
    // Abort if robust mutexes are not supported on the current
373
    // platform. Otherwise we would probably get into a dead-lock.
374
    if (!RobustMutex::is_robust_on_this_platform)
×
375
        return;
×
376

377
    // This test checks that death during recovery causes a robust
378
    // mutex to stay in the 'inconsistent' state.
379

380
    Robust robust;
×
381

382
    // Bring the mutex into the 'inconsistent' state
383
    robust.m_recover_called = false;
×
384
    std::thread(&Robust::simulate_death, &robust).join();
×
385
    CHECK(!robust.m_recover_called);
×
386

387
    // Die while recovering
388
    robust.m_recover_called = false;
×
389
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
390
    CHECK(robust.m_recover_called);
×
391

392
    // The mutex is still in the 'inconsistent' state if another
393
    // attempt at locking it calls the recovery function
394
    robust.m_recover_called = false;
×
395
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
396
    CHECK(robust.m_recover_called);
×
397
    robust.m_mutex.unlock();
×
398

399
    // Now that the mutex is fully recovered, we should be able to
400
    // carry out a regular round of lock/unlock
401
    robust.m_recover_called = false;
×
402
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
403
    CHECK(!robust.m_recover_called);
×
404
    robust.m_mutex.unlock();
×
405

406
    // Try a double death during recovery
407
    robust.m_recover_called = false;
×
408
    std::thread(&Robust::simulate_death, &robust).join();
×
409
    CHECK(!robust.m_recover_called);
×
410
    robust.m_recover_called = false;
×
411
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
412
    CHECK(robust.m_recover_called);
×
413
    robust.m_recover_called = false;
×
414
    std::thread(&Robust::simulate_death_during_recovery, &robust).join();
×
415
    CHECK(robust.m_recover_called);
×
416
    robust.m_recover_called = false;
×
417
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
418
    CHECK(robust.m_recover_called);
×
419
    robust.m_mutex.unlock();
×
420
    robust.m_recover_called = false;
×
421
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
422
    CHECK(!robust.m_recover_called);
×
423
    robust.m_mutex.unlock();
×
424
}
×
425

426

427
TEST(Thread_CondVar)
428
{
1✔
429
    QueueMonitor queue;
1✔
430
    const int num_producers = 32;
1✔
431
    const int num_consumers = 32;
1✔
432
    std::thread producers[num_producers], consumers[num_consumers];
1✔
433
    int consumed_counts[num_consumers][num_producers];
1✔
434
    memset(consumed_counts, 0, sizeof consumed_counts);
1✔
435

436
    for (int i = 0; i < num_producers; ++i)
33✔
437
        producers[i] = std::thread(&producer_thread, &queue, i);
32✔
438
    for (int i = 0; i < num_consumers; ++i)
33✔
439
        consumers[i] = std::thread(&consumer_thread, &queue, &consumed_counts[i][0]);
32✔
440
    for (int i = 0; i < num_producers; ++i)
33✔
441
        producers[i].join();
32✔
442
    queue.close(); // Stop consumers when queue is empty
1✔
443
    for (int i = 0; i < num_consumers; ++i)
33✔
444
        consumers[i].join();
32✔
445

446
    for (int i = 0; i < num_producers; ++i) {
33✔
447
        int n = 0;
32✔
448
        for (int j = 0; j < num_consumers; ++j)
1,056✔
449
            n += consumed_counts[j][i];
1,024✔
450
        CHECK_EQUAL(1000, n);
32✔
451
    }
32✔
452
}
1✔
453

454
TEST(Thread_MutexTryLock)
455
{
1✔
456
    Mutex base_mutex;
1✔
457
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
1✔
458

459
    std::condition_variable cv;
1✔
460
    std::mutex cv_lock;
1✔
461

462
    // basic same thread try_lock
463
    CHECK(m.try_lock());
1✔
464
    CHECK(m.owns_lock());
1✔
465
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
1✔
466
    m.unlock();
1✔
467

468
    bool init_done = false;
1✔
469
    auto do_async = [&]() {
1✔
470
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
1✔
471
        CHECK(!mutex2.owns_lock());
1✔
472
        CHECK(!mutex2.try_lock());
1✔
473
        {
1✔
474
            std::lock_guard<std::mutex> guard(cv_lock);
1✔
475
            init_done = true;
1✔
476
        }
1✔
477
        cv.notify_one();
1✔
478
        while (!mutex2.try_lock()) {
1✔
479
            millisleep(1);
×
480
        }
×
481
        CHECK(mutex2.owns_lock());
1✔
482
        mutex2.unlock();
1✔
483
    };
1✔
484

485
    // Check basic locking across threads.
486
    CHECK(!m.owns_lock());
1✔
487
    CHECK(m.try_lock());
1✔
488
    CHECK(m.owns_lock());
1✔
489
    std::thread thread(do_async);
1✔
490
    {
1✔
491
        std::unique_lock<std::mutex> guard(cv_lock);
1✔
492
        cv.wait(guard, [&] {
2✔
493
            return init_done;
2✔
494
        });
2✔
495
    }
1✔
496
    m.unlock();
1✔
497
    thread.join();
1✔
498
}
1✔
499

500
TEST(Thread_RobustMutexTryLock)
501
{
1✔
502
    // Abort if robust mutexes are not supported on the current
503
    // platform. Otherwise we would probably get into a dead-lock.
504
    if (!RobustMutex::is_robust_on_this_platform)
1✔
505
        return;
1✔
506

507
    RobustMutex m;
×
508
    int times_recover_function_was_called = 0;
×
509

510
    auto recover_function = [&]() {
×
511
        ++times_recover_function_was_called;
×
512
    };
×
513
    // basic same thread try_lock
514
    CHECK(m.try_lock(recover_function));
×
515
    CHECK(!m.try_lock(recover_function));
×
516
    m.unlock();
×
517
    CHECK(times_recover_function_was_called == 0);
×
518

519
    bool init_done = false;
×
520
    std::mutex control_mutex;
×
521
    std::condition_variable control_cv;
×
522

523
    auto do_async = [&]() {
×
524
        CHECK(!m.try_lock(recover_function));
×
525
        {
×
526
            std::lock_guard<std::mutex> guard(control_mutex);
×
527
            init_done = true;
×
528
        }
×
529
        control_cv.notify_one();
×
530
        while (!m.try_lock(recover_function)) {
×
531
            millisleep(1);
×
532
        }
×
533
        // exit the thread with the lock held to check robustness
534
    };
×
535

536
    // Check basic locking across threads.
537
    CHECK(m.try_lock(recover_function));
×
538
    std::thread thread(do_async);
×
539
    {
×
540
        std::unique_lock<std::mutex> lock(control_mutex);
×
541
        control_cv.wait(lock, [&] {
×
542
            return init_done;
×
543
        });
×
544
    }
×
545
    m.unlock();
×
546
    thread.join();
×
547
    CHECK(times_recover_function_was_called == 0);
×
548
    // at this point the thread that obtained the mutex is dead with the lock
549
    CHECK(m.try_lock(recover_function));
×
550
    CHECK(times_recover_function_was_called == 1);
×
551
    m.unlock();
×
552
}
×
553

554
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
555
               // because we are going to switch to native API soon and discard win32-pthread entirely
556
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
557
{
1✔
558
    InterprocessMutex::SharedPart mutex_part;
1✔
559

560
    InterprocessMutex m;
1✔
561
    TEST_PATH(path);
1✔
562
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
1✔
563
    m.set_shared_part(mutex_part, path, mutex_file_name);
1✔
564

565
    // basic same thread try_lock
566
    CHECK(m.try_lock());
1✔
567
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
1✔
568
    m.unlock();
1✔
569

570
    bool init_done = false;
1✔
571
    std::condition_variable cv;
1✔
572
    std::mutex cv_mutex;
1✔
573
    auto do_async = [&]() {
1✔
574
        InterprocessMutex m2;
1✔
575
        m2.set_shared_part(mutex_part, path, mutex_file_name);
1✔
576

577
        CHECK(!m2.try_lock());
1✔
578
        {
1✔
579
            std::lock_guard<std::mutex> guard(cv_mutex);
1✔
580
            init_done = true;
1✔
581
        }
1✔
582
        cv.notify_one();
1✔
583
        while (!m2.try_lock()) {
2✔
584
            millisleep(1);
1✔
585
        }
1✔
586
        m2.unlock();
1✔
587
    };
1✔
588

589
    // Check basic locking across threads.
590
    CHECK(m.try_lock());
1✔
591
    std::thread thread(do_async);
1✔
592
    {
1✔
593
        std::unique_lock<std::mutex> ul(cv_mutex);
1✔
594
        cv.wait(ul, [&] {
2✔
595
            return init_done;
2✔
596
        });
2✔
597
    }
1✔
598
    m.unlock();
1✔
599
    thread.join();
1✔
600
    m.release_shared_part();
1✔
601
}
1✔
602

603
#endif
604

605
// Detect and flag trivial implementations of condvars.
606
namespace {
607

608
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
609
{
1✔
610
    millisleep(200);
1✔
611
    {
1✔
612
        std::lock_guard<InterprocessMutex> l(*mutex);
1✔
613
        *signals = 1;
1✔
614
        // wakeup any waiters
615
        cv->notify_all();
1✔
616
    }
1✔
617
    // exit scope to allow waiters to get lock
618
    millisleep(200);
1✔
619
    {
1✔
620
        std::lock_guard<InterprocessMutex> l(*mutex);
1✔
621
        *signals = 2;
1✔
622
        // wakeup any waiters, 2nd time
623
        cv->notify_all();
1✔
624
    }
1✔
625
    millisleep(200);
1✔
626
    {
1✔
627
        std::lock_guard<InterprocessMutex> l(*mutex);
1✔
628
        *signals = 3;
1✔
629
        // wakeup any waiters, 2nd time
630
        cv->notify_all();
1✔
631
    }
1✔
632
    millisleep(200);
1✔
633
    {
1✔
634
        std::lock_guard<InterprocessMutex> l(*mutex);
1✔
635
        *signals = 4;
1✔
636
    }
1✔
637
}
1✔
638

639
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
640
{
1✔
641
    millisleep(1000);
1✔
642
    *signal_state = 2;
1✔
643
    std::lock_guard<InterprocessMutex> l(*mutex);
1✔
644
    cv->notify_all();
1✔
645
}
1✔
646

647

648
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
649
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
650
{
10✔
651
    std::lock_guard<InterprocessMutex> l(*mutex);
10✔
652

653
    {
10✔
654
        std::lock_guard<std::mutex> guard(*control_mutex);
10✔
655
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
10✔
656
    }
10✔
657
    control_cv->notify_one();
10✔
658

659
    cv->wait(*mutex, nullptr);
10✔
660
}
10✔
661
} // namespace
662

663
// Verify, that a wait on a condition variable actually waits
664
// - this test relies on assumptions about scheduling, which
665
//   may not hold on a heavily loaded system.
666
NONCONCURRENT_TEST(Thread_CondvarWaits)
667
{
1✔
668
    int signals = 0;
1✔
669
    InterprocessMutex mutex;
1✔
670
    InterprocessMutex::SharedPart mutex_part;
1✔
671
    InterprocessCondVar changed;
1✔
672
    InterprocessCondVar::SharedPart condvar_part;
1✔
673
    TEST_PATH(path);
1✔
674
    DBOptions default_options;
1✔
675
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
1✔
676
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
1✔
677
    changed.init_shared_part(condvar_part);
1✔
678
    signals = 0;
1✔
679
    std::thread signal_thread(signaller, &signals, &mutex, &changed);
1✔
680
    {
1✔
681
        std::lock_guard<InterprocessMutex> l(mutex);
1✔
682
        changed.wait(mutex, nullptr);
1✔
683
        CHECK_EQUAL(signals, 1);
1✔
684
        changed.wait(mutex, nullptr);
1✔
685
        CHECK_EQUAL(signals, 2);
1✔
686
        changed.wait(mutex, nullptr);
1✔
687
        CHECK_EQUAL(signals, 3);
1✔
688
    }
1✔
689
    signal_thread.join();
1✔
690
    changed.release_shared_part();
1✔
691
    mutex.release_shared_part();
1✔
692
}
1✔
693

694
// Verify that a condition variable looses its signal if no one
695
// is waiting on it
696
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
697
{
1✔
698
    int signal_state = 0;
1✔
699
    InterprocessMutex mutex;
1✔
700
    InterprocessMutex::SharedPart mutex_part;
1✔
701
    InterprocessCondVar changed;
1✔
702
    InterprocessCondVar::SharedPart condvar_part;
1✔
703
    InterprocessCondVar::init_shared_part(condvar_part);
1✔
704
    TEST_PATH(path);
1✔
705
    DBOptions default_options;
1✔
706

707
    // Must have names because default_options.temp_dir is empty string on Windows
708
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
1✔
709
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
1✔
710
    signal_state = 1;
1✔
711
    // send some signals:
712
    {
1✔
713
        std::lock_guard<InterprocessMutex> l(mutex);
1✔
714
        for (int i = 0; i < 10; ++i)
11✔
715
            changed.notify_all();
10✔
716
    }
1✔
717
    // spawn a thread which will later do one more signal in order
718
    // to wake us up.
719
    std::thread signal_thread(wakeup_signaller, &signal_state, &mutex, &changed);
1✔
720
    // Wait for a signal - the signals sent above should be lost, so
721
    // that this wait will actually wait for the thread to signal.
722
    {
1✔
723
        std::lock_guard<InterprocessMutex> l(mutex);
1✔
724
        changed.wait(mutex, 0);
1✔
725
        CHECK_EQUAL(signal_state, 2);
1✔
726
    }
1✔
727
    signal_thread.join();
1✔
728
    changed.release_shared_part();
1✔
729
    mutex.release_shared_part();
1✔
730
}
1✔
731

732

733
// this test hangs, if timeout doesn't work.
734
NONCONCURRENT_TEST(Thread_CondvarTimeout)
735
{
1✔
736
    InterprocessMutex mutex;
1✔
737
    InterprocessMutex::SharedPart mutex_part;
1✔
738
    InterprocessCondVar changed;
1✔
739
    InterprocessCondVar::SharedPart condvar_part;
1✔
740
    InterprocessCondVar::init_shared_part(condvar_part);
1✔
741
    TEST_PATH(path);
1✔
742
    DBOptions default_options;
1✔
743
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
1✔
744
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
1✔
745
    struct timespec time_limit;
1✔
746
    timeval tv;
1✔
747
    gettimeofday(&tv, nullptr);
1✔
748
    time_limit.tv_sec = tv.tv_sec;
1✔
749
    time_limit.tv_nsec = tv.tv_usec * 1000;
1✔
750
    time_limit.tv_nsec += 100000000;        // 100 msec wait
1✔
751
    if (time_limit.tv_nsec >= 1000000000) { // overflow
1✔
752
        time_limit.tv_nsec -= 1000000000;
×
753
        time_limit.tv_sec += 1;
×
754
    }
×
755
    {
1✔
756
        std::lock_guard<InterprocessMutex> l(mutex);
1✔
757
        for (int i = 0; i < 5; ++i)
6✔
758
            changed.wait(mutex, &time_limit);
5✔
759
    }
1✔
760
    changed.release_shared_part();
1✔
761
    mutex.release_shared_part();
1✔
762
}
1✔
763

764

765
// test that notify_all will wake up all waiting threads, if there
766
// are many waiters:
767
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
768
{
1✔
769
    InterprocessMutex mutex;
1✔
770
    InterprocessMutex::SharedPart mutex_part;
1✔
771
    InterprocessCondVar changed;
1✔
772
    InterprocessCondVar::SharedPart condvar_part;
1✔
773
    InterprocessCondVar::init_shared_part(condvar_part);
1✔
774
    TEST_PATH(path);
1✔
775
    DBOptions default_options;
1✔
776
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
1✔
777
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
1✔
778

779
    size_t num_threads_holding_lock = 0;
1✔
780
    std::mutex control_mutex;
1✔
781
    std::condition_variable control_cv;
1✔
782

783
    const size_t num_waiters = 10;
1✔
784
    std::thread waiters[num_waiters];
1✔
785
    for (size_t i = 0; i < num_waiters; ++i) {
11✔
786
        waiters[i] = std::thread(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock);
10✔
787
    }
10✔
788
    {
1✔
789
        // allow all waiters to start and obtain the InterprocessCondVar
790
        std::unique_lock<std::mutex> unique_lock(control_mutex);
1✔
791
        control_cv.wait(unique_lock, [&] {
2✔
792
            return num_threads_holding_lock == num_waiters;
2✔
793
        });
2✔
794
    }
1✔
795

796
    mutex.lock();
1✔
797
    changed.notify_all();
1✔
798
    mutex.unlock();
1✔
799

800
    for (size_t i = 0; i < num_waiters; ++i) {
11✔
801
        waiters[i].join();
10✔
802
    }
10✔
803
    changed.release_shared_part();
1✔
804
    mutex.release_shared_part();
1✔
805
}
1✔
806

807

808
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
809
// gap between them where another thread could invoke signal() which could go undetected by the wait.
810
// This test takes more than 3 days with valgrind.
811
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
812
{
×
813
    SHARED_GROUP_TEST_PATH(path);
×
814

815
    const int iter = 10000;
×
816

817
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
818
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
819
    std::vector<std::thread> threads;
×
820
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
821

822
        threads.push_back(std::thread([&]() {
×
823
            InterprocessMutex mutex;
×
824
            InterprocessMutex::SharedPart mutex_part;
×
825
            InterprocessCondVar condvar;
×
826
            InterprocessCondVar::SharedPart condvar_part;
×
827
            DBOptions default_options;
×
828

829
            std::stringstream ss;
×
830
            ss << std::this_thread::get_id();
×
831
            std::string id = ss.str();
×
832

833
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
834
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
835
            InterprocessCondVar::init_shared_part(condvar_part);
×
836

837
            std::atomic<bool> signal(false);
×
838

839
            std::thread t1([&]() {
×
840
                for (int i = 0; i < iter; i++) {
×
841
                    mutex.lock();
×
842
                    signal = true;
×
843

844
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
845
                    // Problem is that we have so many/frequent operating system wait calls in this that they might
846
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create
847
                    // some work that some times willsome times bring the current time slice close to its end.
848

849
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
850
                    // time slice)
851
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL);
×
852

853
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than
854
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
855
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
856
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
857
                        sum = sum + 1;
×
858
                    }
×
859

860
                    condvar.wait(mutex, nullptr);
×
861
                    mutex.unlock();
×
862
                }
×
863
            });
×
864

865
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
866
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
867
            // by both threads hanging infinitely.
868
            std::thread t2([&]() {
×
869
                for (int i = 0; i < iter; i++) {
×
870
                    while (!signal) {
×
871
                    }
×
872
                    signal = false;
×
873
                    mutex.lock();
×
874
                    condvar.notify_all();
×
875
                    mutex.unlock();
×
876
                }
×
877
            });
×
878

879
            t1.join();
×
880
            t2.join();
×
881
        }));
×
882
    }
×
883

884
    for (int i = 0; i < thread_pair_count; i++) {
×
885
        threads[i].join();
×
886
    }
×
887
}
×
888

889
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
890
{
1✔
891
    auto cv = std::make_unique<InterprocessCondVar>();
1✔
892
    InterprocessCondVar::SharedPart condvar_part;
1✔
893
    InterprocessCondVar::init_shared_part(condvar_part);
1✔
894
    TEST_PATH(path);
1✔
895
    DBOptions default_options;
1✔
896
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
1✔
897
    std::thread([&] {
1✔
898
        cv.reset();
1✔
899
    }).join();
1✔
900
}
1✔
901

902
#ifdef _WIN32
903
TEST(Thread_Win32InterprocessBackslashes)
904
{
905
    InterprocessMutex mutex;
906
    InterprocessMutex::SharedPart mutex_part;
907
    InterprocessCondVar condvar;
908
    InterprocessCondVar::SharedPart condvar_part;
909
    InterprocessCondVar::init_shared_part(condvar_part);
910
    DBOptions default_options;
911

912
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
913
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
914
}
915
#endif
916

917
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc