• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1853

20 Nov 2023 07:46PM UTC coverage: 91.683% (-0.02%) from 91.699%
1853

push

Evergreen

web-flow
Fix client reset cycle detection for PBS recovery errors (#7149)

Tracking that a client reset was in progress was done in the same write
transaction as the recovery operation, so if recovery failed the tracking was
rolled back too. This worked for FLX due to that codepath committing before
beginning recovery.

92262 of 169120 branches covered (0.0%)

31 of 31 new or added lines in 3 files covered. (100.0%)

143 existing lines in 15 files now uncovered.

231277 of 252257 relevant lines covered (91.68%)

6495482.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/test/test_thread.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "testsettings.hpp"
20
#ifdef TEST_THREAD
21

22
#include <condition_variable>
23
#include <cstring>
24
#include <algorithm>
25
#include <queue>
26
#include <functional>
27
#include <mutex>
28
#include <thread>
29
#include <atomic>
30

31
#ifndef _WIN32
32
#include <unistd.h>
33
#include <sys/time.h>
34
#include <realm/utilities.hpp> // gettimeofday()
35
#endif
36

37
#include <realm/db_options.hpp>
38
#include <realm/utilities.hpp>
39
#include <realm/util/features.h>
40
#include <realm/util/thread.hpp>
41
#include <realm/util/interprocess_condvar.hpp>
42
#include <realm/util/interprocess_mutex.hpp>
43

44
#include <iostream>
45
#include "test.hpp"
46

47
using namespace realm;
48
using namespace realm::util;
49

50

51
// Test independence and thread-safety
52
// -----------------------------------
53
//
54
// All tests must be thread safe and independent of each other. This
55
// is required because it allows for both shuffling of the execution
56
// order and for parallelized testing.
57
//
58
// In particular, avoid using std::rand() since it is not guaranteed
59
// to be thread safe. Instead use the API offered in
60
// `test/util/random.hpp`.
61
//
62
// All files created in tests must use the TEST_PATH macro (or one of
63
// its friends) to obtain a suitable file system path. See
64
// `test/util/test_path.hpp`.
65
//
66
//
67
// Debugging and the ONLY() macro
68
// ------------------------------
69
//
70
// A simple way of disabling all tests except one called `Foo`, is to
71
// replace TEST(Foo) with ONLY(Foo) and then recompile and rerun the
72
// test suite. Note that you can also use filtering by setting the
73
// environment varible `UNITTEST_FILTER`. See `README.md` for more on
74
// this.
75
//
76
// Another way to debug a particular test, is to copy that test into
77
// `experiments/testcase.cpp` and then run `sh build.sh
78
// check-testcase` (or one of its friends) from the command line.
79

80

81
namespace {
82

83
void increment(int* i)
84
{
4✔
85
    ++*i;
4✔
86
}
4✔
87

88
struct Shared {
89
    Mutex m_mutex;
90
    int m_value;
91

92
    // 10000 takes less than 0.1 sec
93
    void increment_10000_times()
94
    {
20✔
95
        for (int i = 0; i < 10000; ++i) {
199,683✔
96
            LockGuard lock(m_mutex);
199,663✔
97
            ++m_value;
199,663✔
98
        }
199,663✔
99
    }
20✔
100

101
    void increment_10000_times2()
102
    {
20✔
103
        for (int i = 0; i < 10000; ++i) {
197,703✔
104
            LockGuard lock(m_mutex);
197,683✔
105
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
97,705✔
106
            // could assemble into 'inc [addr]' which has very tiny gap
97,705✔
107
            double f = m_value;
197,683✔
108
            f += 1.;
197,683✔
109
            m_value = int(f);
197,683✔
110
        }
197,683✔
111
    }
20✔
112
};
113

114
struct SharedWithEmulated {
115
    InterprocessMutex m_mutex;
116
    InterprocessMutex::SharedPart m_shared_part;
117
    int m_value;
118

119
    SharedWithEmulated(std::string name)
120
    {
2✔
121
        m_mutex.set_shared_part(m_shared_part, name, "0");
2✔
122
    }
2✔
123
    ~SharedWithEmulated()
124
    {
2✔
125
        m_mutex.release_shared_part();
2✔
126
    }
2✔
127

128
    // 10000 takes less than 0.1 sec
129
    void increment_10000_times()
130
    {
20✔
131
        for (int i = 0; i < 10000; ++i) {
198,819✔
132
            std::lock_guard<InterprocessMutex> lock(m_mutex);
198,799✔
133
            ++m_value;
198,799✔
134
        }
198,799✔
135
    }
20✔
136

137
    void increment_10000_times2()
138
    {
×
139
        for (int i = 0; i < 10000; ++i) {
×
140
            std::lock_guard<InterprocessMutex> lock(m_mutex);
×
141
            // Create a time window where thread interference can take place. Problem with ++m_value is that it
×
142
            // could assemble into 'inc [addr]' which has very tiny gap
×
143
            double f = m_value;
×
144
            f += 1.;
×
145
            m_value = int(f);
×
146
        }
×
147
    }
×
148
};
149

150
struct Robust {
151
    RobustMutex m_mutex;
152
    bool m_recover_called;
153

154
    void simulate_death()
155
    {
×
156
        m_mutex.lock(std::bind(&Robust::recover, this));
×
157
        // Do not unlock
×
158
    }
×
159

160
    void simulate_death_during_recovery()
161
    {
×
162
        bool no_thread_has_died = m_mutex.low_level_lock();
×
163
        if (!no_thread_has_died)
×
164
            m_recover_called = true;
×
165
        // Do not unlock
×
166
    }
×
167

168
    void recover()
169
    {
×
170
        m_recover_called = true;
×
171
    }
×
172

173
    void recover_throw()
174
    {
×
175
        m_recover_called = true;
×
176
        throw RobustMutex::NotRecoverable();
×
177
    }
×
178
};
179

180

181
class QueueMonitor {
182
public:
183
    QueueMonitor()
184
        : m_closed(false)
185
    {
2✔
186
    }
2✔
187

188
    bool get(int& value)
189
    {
63,463✔
190
        LockGuard lock(m_mutex);
63,463✔
191
        for (;;) {
135,847✔
192
            if (!m_queue.empty())
135,847✔
193
                break;
64,000✔
194
            if (m_closed)
71,847✔
195
                return false;
64✔
196
            m_nonempty_or_closed.wait(lock); // Wait for producer
71,783✔
197
        }
71,783✔
198
        bool was_full = m_queue.size() == max_queue_size;
63,431✔
199
        value = m_queue.front();
63,399✔
200
        m_queue.pop();
63,399✔
201
        if (was_full)
63,399✔
202
            m_nonfull.notify_all(); // Resume a waiting producer
8,961✔
203
        return true;
63,399✔
204
    }
63,463✔
205

206
    void put(int value)
207
    {
63,474✔
208
        LockGuard lock(m_mutex);
63,474✔
209
        while (m_queue.size() == max_queue_size)
108,843✔
210
            m_nonfull.wait(lock); // Wait for consumer
45,369✔
211
        bool was_empty = m_queue.empty();
63,474✔
212
        m_queue.push(value);
63,474✔
213
        if (was_empty)
63,474✔
214
            m_nonempty_or_closed.notify_all(); // Resume a waiting consumer
10,511✔
215
    }
63,474✔
216

217
    void close()
218
    {
2✔
219
        LockGuard lock(m_mutex);
2✔
220
        m_closed = true;
2✔
221
        m_nonempty_or_closed.notify_all(); // Resume all waiting consumers
2✔
222
    }
2✔
223

224
private:
225
    Mutex m_mutex;
226
    CondVar m_nonempty_or_closed, m_nonfull;
227
    std::queue<int> m_queue;
228
    bool m_closed;
229

230
    static const unsigned max_queue_size = 8;
231
};
232

233
void producer_thread(QueueMonitor* queue, int value)
234
{
63✔
235
    for (int i = 0; i < 1000; ++i) {
63,595✔
236
        queue->put(value);
63,532✔
237
    }
63,532✔
238
}
63✔
239

240
void consumer_thread(QueueMonitor* queue, int* consumed_counts)
241
{
62✔
242
    for (;;) {
63,535✔
243
        int value = 0;
63,535✔
244
        bool closed = !queue->get(value);
63,535✔
245
        if (closed)
63,535✔
246
            return;
64✔
247
        ++consumed_counts[value];
63,471✔
248
    }
63,471✔
249
}
62✔
250

251

252
} // anonymous namespace
253

254

255
TEST(Thread_Join)
256
{
2✔
257
    int i = 0;
2✔
258
    Thread thread(std::bind(&increment, &i));
2✔
259
    CHECK(thread.joinable());
2✔
260
    thread.join();
2✔
261
    CHECK(!thread.joinable());
2✔
262
    CHECK_EQUAL(1, i);
2✔
263
}
2✔
264

265

266
TEST(Thread_Start)
267
{
2✔
268
    int i = 0;
2✔
269
    Thread thread;
2✔
270
    CHECK(!thread.joinable());
2✔
271
    thread.start(std::bind(&increment, &i));
2✔
272
    CHECK(thread.joinable());
2✔
273
    thread.join();
2✔
274
    CHECK(!thread.joinable());
2✔
275
    CHECK_EQUAL(1, i);
2✔
276
}
2✔
277

278

279
TEST(Thread_MutexLock)
280
{
2✔
281
    Mutex mutex;
2✔
282
    {
2✔
283
        LockGuard lock(mutex);
2✔
284
    }
2✔
285
    {
2✔
286
        LockGuard lock(mutex);
2✔
287
    }
2✔
288
}
2✔
289

290
#ifdef REALM_HAVE_PTHREAD_PROCESS_SHARED
291
TEST(Thread_ProcessSharedMutex)
292
{
293
    Mutex mutex((Mutex::process_shared_tag()));
294
    {
295
        LockGuard lock(mutex);
296
    }
297
    {
298
        LockGuard lock(mutex);
299
    }
300
}
301
#endif
302

303
TEST(Thread_CriticalSection)
304
{
2✔
305
    Shared shared;
2✔
306
    shared.m_value = 0;
2✔
307
    Thread threads[10];
2✔
308
    for (int i = 0; i < 10; ++i)
22✔
309
        threads[i].start(std::bind(&Shared::increment_10000_times, &shared));
20✔
310
    for (int i = 0; i < 10; ++i)
22✔
311
        threads[i].join();
20✔
312
    CHECK_EQUAL(100000, shared.m_value);
2✔
313
}
2✔
314

315

316
TEST(Thread_EmulatedMutex_CriticalSection)
317
{
2✔
318
    TEST_PATH(path);
2✔
319
    SharedWithEmulated shared(path);
2✔
320
    shared.m_value = 0;
2✔
321
    Thread threads[10];
2✔
322
    for (int i = 0; i < 10; ++i)
22✔
323
        threads[i].start(std::bind(&SharedWithEmulated::increment_10000_times, &shared));
20✔
324
    for (int i = 0; i < 10; ++i)
22✔
325
        threads[i].join();
20✔
326
    CHECK_EQUAL(100000, shared.m_value);
2✔
327
}
2✔
328

329

330
TEST(Thread_CriticalSection2)
331
{
2✔
332
    Shared shared;
2✔
333
    shared.m_value = 0;
2✔
334
    Thread threads[10];
2✔
335
    for (int i = 0; i < 10; ++i)
22✔
336
        threads[i].start(std::bind(&Shared::increment_10000_times2, &shared));
20✔
337
    for (int i = 0; i < 10; ++i)
22✔
338
        threads[i].join();
20✔
339
    CHECK_EQUAL(100000, shared.m_value);
2✔
340
}
2✔
341

342

343
// Todo. Not supported on Windows in particular? Keywords: winbug
344
TEST_IF(Thread_RobustMutex, TEST_THREAD_ROBUSTNESS)
345
{
×
346
    // Abort if robust mutexes are not supported on the current
347
    // platform. Otherwise we would probably get into a dead-lock.
348
    if (!RobustMutex::is_robust_on_this_platform)
×
349
        return;
×
350

351
    Robust robust;
×
352

353
    // Check that lock/unlock cycle works and does not involve recovery
354
    robust.m_recover_called = false;
×
355
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
356
    CHECK(!robust.m_recover_called);
×
357
    robust.m_mutex.unlock();
×
358
    robust.m_recover_called = false;
×
359
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
360
    CHECK(!robust.m_recover_called);
×
361
    robust.m_mutex.unlock();
×
362

363
    // Check recovery by simulating a death
364
    robust.m_recover_called = false;
×
365
    {
×
366
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
367
        thread.join();
×
368
    }
×
369
    CHECK(!robust.m_recover_called);
×
370
    robust.m_recover_called = false;
×
371
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
372
    CHECK(robust.m_recover_called);
×
373
    robust.m_mutex.unlock();
×
374

375
    // One more round of recovery
376
    robust.m_recover_called = false;
×
377
    {
×
378
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
379
        thread.join();
×
380
    }
×
381
    CHECK(!robust.m_recover_called);
×
382
    robust.m_recover_called = false;
×
383
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
384
    CHECK(robust.m_recover_called);
×
385
    robust.m_mutex.unlock();
×
386

387
    // Simulate a case where recovery fails or is impossible
388
    robust.m_recover_called = false;
×
389
    {
×
390
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
391
        thread.join();
×
392
    }
×
393
    CHECK(!robust.m_recover_called);
×
394
    robust.m_recover_called = false;
×
395
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover_throw, &robust)), RobustMutex::NotRecoverable);
×
396
    CHECK(robust.m_recover_called);
×
397

398
    // Check that successive attempts at locking will throw
399
    robust.m_recover_called = false;
×
400
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
401
    CHECK(!robust.m_recover_called);
×
402
    robust.m_recover_called = false;
×
403
    CHECK_THROW(robust.m_mutex.lock(std::bind(&Robust::recover, &robust)), RobustMutex::NotRecoverable);
×
404
    CHECK(!robust.m_recover_called);
×
405
}
×
406

407

408
TEST_IF(Thread_DeathDuringRecovery, TEST_THREAD_ROBUSTNESS)
409
{
×
410
    // Abort if robust mutexes are not supported on the current
411
    // platform. Otherwise we would probably get into a dead-lock.
412
    if (!RobustMutex::is_robust_on_this_platform)
×
413
        return;
×
414

415
    // This test checks that death during recovery causes a robust
416
    // mutex to stay in the 'inconsistent' state.
417

418
    Robust robust;
×
419

420
    // Bring the mutex into the 'inconsistent' state
421
    robust.m_recover_called = false;
×
422
    {
×
423
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
424
        thread.join();
×
425
    }
×
426
    CHECK(!robust.m_recover_called);
×
427

428
    // Die while recovering
429
    robust.m_recover_called = false;
×
430
    {
×
431
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
432
        thread.join();
×
433
    }
×
434
    CHECK(robust.m_recover_called);
×
435

436
    // The mutex is still in the 'inconsistent' state if another
437
    // attempt at locking it calls the recovery function
438
    robust.m_recover_called = false;
×
439
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
440
    CHECK(robust.m_recover_called);
×
441
    robust.m_mutex.unlock();
×
442

443
    // Now that the mutex is fully recovered, we should be able to
444
    // carry out a regular round of lock/unlock
445
    robust.m_recover_called = false;
×
446
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
447
    CHECK(!robust.m_recover_called);
×
448
    robust.m_mutex.unlock();
×
449

450
    // Try a double death during recovery
451
    robust.m_recover_called = false;
×
452
    {
×
453
        Thread thread(std::bind(&Robust::simulate_death, &robust));
×
454
        thread.join();
×
455
    }
×
456
    CHECK(!robust.m_recover_called);
×
457
    robust.m_recover_called = false;
×
458
    {
×
459
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
460
        thread.join();
×
461
    }
×
462
    CHECK(robust.m_recover_called);
×
463
    robust.m_recover_called = false;
×
464
    {
×
465
        Thread thread(std::bind(&Robust::simulate_death_during_recovery, &robust));
×
466
        thread.join();
×
467
    }
×
468
    CHECK(robust.m_recover_called);
×
469
    robust.m_recover_called = false;
×
470
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
471
    CHECK(robust.m_recover_called);
×
472
    robust.m_mutex.unlock();
×
473
    robust.m_recover_called = false;
×
474
    robust.m_mutex.lock(std::bind(&Robust::recover, &robust));
×
475
    CHECK(!robust.m_recover_called);
×
476
    robust.m_mutex.unlock();
×
477
}
×
478

479

480
TEST(Thread_CondVar)
481
{
2✔
482
    QueueMonitor queue;
2✔
483
    const int num_producers = 32;
2✔
484
    const int num_consumers = 32;
2✔
485
    Thread producers[num_producers], consumers[num_consumers];
2✔
486
    int consumed_counts[num_consumers][num_producers];
2✔
487
    memset(consumed_counts, 0, sizeof consumed_counts);
2✔
488

1✔
489
    for (int i = 0; i < num_producers; ++i)
66✔
490
        producers[i].start(std::bind(&producer_thread, &queue, i));
64✔
491
    for (int i = 0; i < num_consumers; ++i)
66✔
492
        consumers[i].start(std::bind(&consumer_thread, &queue, &consumed_counts[i][0]));
64✔
493
    for (int i = 0; i < num_producers; ++i)
66✔
494
        producers[i].join();
64✔
495
    queue.close(); // Stop consumers when queue is empty
2✔
496
    for (int i = 0; i < num_consumers; ++i)
66✔
497
        consumers[i].join();
64✔
498

1✔
499
    for (int i = 0; i < num_producers; ++i) {
66✔
500
        int n = 0;
64✔
501
        for (int j = 0; j < num_consumers; ++j)
2,112✔
502
            n += consumed_counts[j][i];
2,048✔
503
        CHECK_EQUAL(1000, n);
64✔
504
    }
64✔
505
}
2✔
506

507
TEST(Thread_MutexTryLock)
508
{
2✔
509
    Thread thread;
2✔
510
    Mutex base_mutex;
2✔
511
    std::unique_lock<Mutex> m(base_mutex, std::defer_lock);
2✔
512

1✔
513
    std::condition_variable cv;
2✔
514
    std::mutex cv_lock;
2✔
515

1✔
516
    // basic same thread try_lock
1✔
517
    CHECK(m.try_lock());
2✔
518
    CHECK(m.owns_lock());
2✔
519
    CHECK_THROW(static_cast<void>(m.try_lock()), std::system_error); // already locked: Resource deadlock avoided
2✔
520
    m.unlock();
2✔
521

1✔
522
    bool init_done = false;
2✔
523
    auto do_async = [&]() {
2✔
524
        std::unique_lock<Mutex> mutex2(base_mutex, std::defer_lock);
2✔
525
        CHECK(!mutex2.owns_lock());
2✔
526
        CHECK(!mutex2.try_lock());
2✔
527
        {
2✔
528
            std::lock_guard<std::mutex> guard(cv_lock);
2✔
529
            init_done = true;
2✔
530
        }
2✔
531
        cv.notify_one();
2✔
532
        while(!mutex2.try_lock()) { millisleep(1); }
4✔
533
        CHECK(mutex2.owns_lock());
2✔
534
        mutex2.unlock();
2✔
535
    };
2✔
536

1✔
537
    // Check basic locking across threads.
1✔
538
    CHECK(!m.owns_lock());
2✔
539
    CHECK(m.try_lock());
2✔
540
    CHECK(m.owns_lock());
2✔
541
    thread.start(do_async);
2✔
542
    {
2✔
543
        std::unique_lock<std::mutex> guard(cv_lock);
2✔
544
        cv.wait(guard, [&]{return init_done;});
4✔
545
    }
2✔
546
    m.unlock();
2✔
547
    thread.join();
2✔
548
}
2✔
549

550
TEST(Thread_RobustMutexTryLock)
551
{
2✔
552
    // Abort if robust mutexes are not supported on the current
1✔
553
    // platform. Otherwise we would probably get into a dead-lock.
1✔
554
    if (!RobustMutex::is_robust_on_this_platform)
2✔
555
        return;
2✔
556

557
    Thread thread;
×
558
    RobustMutex m;
×
559
    int times_recover_function_was_called = 0;
×
560

561
    auto recover_function = [&]() {
×
562
        ++times_recover_function_was_called;
×
563
    };
×
564
    // basic same thread try_lock
565
    CHECK(m.try_lock(recover_function));
×
566
    CHECK(!m.try_lock(recover_function));
×
567
    m.unlock();
×
568
    CHECK(times_recover_function_was_called == 0);
×
569

570
    bool init_done = false;
×
571
    std::mutex control_mutex;
×
572
    std::condition_variable control_cv;
×
573

574
    auto do_async = [&]() {
×
575
        CHECK(!m.try_lock(recover_function));
×
576
        {
×
577
            std::lock_guard<std::mutex> guard(control_mutex);
×
578
            init_done = true;
×
579
        }
×
580
        control_cv.notify_one();
×
581
        while(!m.try_lock(recover_function)) { millisleep(1); }
×
582
        // exit the thread with the lock held to check robustness
583
    };
×
584

585
    // Check basic locking across threads.
586
    CHECK(m.try_lock(recover_function));
×
587
    thread.start(do_async);
×
588
    {
×
589
        std::unique_lock<std::mutex> lock(control_mutex);
×
590
        control_cv.wait(lock, [&]{ return init_done; });
×
591
    }
×
592
    m.unlock();
×
593
    thread.join();
×
594
    CHECK(times_recover_function_was_called == 0);
×
595
    // at this point the thread that obtained the mutex is dead with the lock
596
    CHECK(m.try_lock(recover_function));
×
597
    CHECK(times_recover_function_was_called == 1);
×
598
    m.unlock();
×
599
}
×
600

601
#ifndef _WIN32 // FIXME: trylock is not supported by the win32-pthread lib on Windows. No need to fix this
602
               // because we are going to switch to native API soon and discard win32-pthread entirely
603
NONCONCURRENT_TEST(Thread_InterprocessMutexTryLock)
604
{
2✔
605
    Thread thread;
2✔
606
    InterprocessMutex::SharedPart mutex_part;
2✔
607

1✔
608
    InterprocessMutex m;
2✔
609
    TEST_PATH(path);
2✔
610
    std::string mutex_file_name = "Test_Thread_InterprocessMutexTryLock";
2✔
611
    m.set_shared_part(mutex_part, path, mutex_file_name);
2✔
612

1✔
613
    // basic same thread try_lock
1✔
614
    CHECK(m.try_lock());
2✔
615
    CHECK(!m.try_lock()); // already locked but shouldn't deadlock
2✔
616
    m.unlock();
2✔
617

1✔
618
    bool init_done = false;
2✔
619
    std::condition_variable cv;
2✔
620
    std::mutex cv_mutex;
2✔
621
    auto do_async = [&]() {
2✔
622
        InterprocessMutex m2;
2✔
623
        m2.set_shared_part(mutex_part, path, mutex_file_name);
2✔
624

1✔
625
        CHECK(!m2.try_lock());
2✔
626
        {
2✔
627
            std::lock_guard<std::mutex> guard(cv_mutex);
2✔
628
            init_done = true;
2✔
629
        }
2✔
630
        cv.notify_one();
2✔
631
        while(!m2.try_lock()) { millisleep(1); }
4✔
632
        m2.unlock();
2✔
633
    };
2✔
634

1✔
635
    // Check basic locking across threads.
1✔
636
    CHECK(m.try_lock());
2✔
637
    thread.start(do_async);
2✔
638
    {
2✔
639
        std::unique_lock<std::mutex> ul(cv_mutex);
2✔
640
        cv.wait(ul, [&]{return init_done;});
4✔
641
    }
2✔
642
    m.unlock();
2✔
643
    thread.join();
2✔
644
    m.release_shared_part();
2✔
645
}
2✔
646

647
#endif
648

649
// Detect and flag trivial implementations of condvars.
650
namespace {
651

652
void signaller(int* signals, InterprocessMutex* mutex, InterprocessCondVar* cv)
653
{
2✔
654
    millisleep(200);
2✔
655
    {
2✔
656
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
657
        *signals = 1;
2✔
658
        // wakeup any waiters
1✔
659
        cv->notify_all();
2✔
660
    }
2✔
661
    // exit scope to allow waiters to get lock
1✔
662
    millisleep(200);
2✔
663
    {
2✔
664
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
665
        *signals = 2;
2✔
666
        // wakeup any waiters, 2nd time
1✔
667
        cv->notify_all();
2✔
668
    }
2✔
669
    millisleep(200);
2✔
670
    {
2✔
671
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
672
        *signals = 3;
2✔
673
        // wakeup any waiters, 2nd time
1✔
674
        cv->notify_all();
2✔
675
    }
2✔
676
    millisleep(200);
2✔
677
    {
2✔
678
        std::lock_guard<InterprocessMutex> l(*mutex);
2✔
679
        *signals = 4;
2✔
680
    }
2✔
681
}
2✔
682

683
void wakeup_signaller(int* signal_state, InterprocessMutex* mutex, InterprocessCondVar* cv)
684
{
2✔
685
    millisleep(1000);
2✔
686
    *signal_state = 2;
2✔
687
    std::lock_guard<InterprocessMutex> l(*mutex);
2✔
688
    cv->notify_all();
2✔
689
}
2✔
690

691

692

693
void waiter(InterprocessMutex* mutex, InterprocessCondVar* cv, std::mutex* control_mutex,
694
            std::condition_variable* control_cv, size_t* num_threads_holding_lock)
695
{
20✔
696
    std::lock_guard<InterprocessMutex> l(*mutex);
20✔
697

10✔
698
    {
20✔
699
        std::lock_guard<std::mutex> guard(*control_mutex);
20✔
700
        *num_threads_holding_lock = (*num_threads_holding_lock) + 1;
20✔
701
    }
20✔
702
    control_cv->notify_one();
20✔
703

10✔
704
    cv->wait(*mutex, nullptr);
20✔
705
}
20✔
706
}
707

708
// Verify, that a wait on a condition variable actually waits
709
// - this test relies on assumptions about scheduling, which
710
//   may not hold on a heavily loaded system.
711
NONCONCURRENT_TEST(Thread_CondvarWaits)
712
{
2✔
713
    int signals = 0;
2✔
714
    InterprocessMutex mutex;
2✔
715
    InterprocessMutex::SharedPart mutex_part;
2✔
716
    InterprocessCondVar changed;
2✔
717
    InterprocessCondVar::SharedPart condvar_part;
2✔
718
    TEST_PATH(path);
2✔
719
    DBOptions default_options;
2✔
720
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarWaits_Mutex");
2✔
721
    changed.set_shared_part(condvar_part, path, "Thread_CondvarWaits_CondVar", default_options.temp_dir);
2✔
722
    changed.init_shared_part(condvar_part);
2✔
723
    Thread signal_thread;
2✔
724
    signals = 0;
2✔
725
    signal_thread.start(std::bind(signaller, &signals, &mutex, &changed));
2✔
726
    {
2✔
727
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
728
        changed.wait(mutex, nullptr);
2✔
729
        CHECK_EQUAL(signals, 1);
2✔
730
        changed.wait(mutex, nullptr);
2✔
731
        CHECK_EQUAL(signals, 2);
2✔
732
        changed.wait(mutex, nullptr);
2✔
733
        CHECK_EQUAL(signals, 3);
2✔
734
    }
2✔
735
    signal_thread.join();
2✔
736
    changed.release_shared_part();
2✔
737
    mutex.release_shared_part();
2✔
738
}
2✔
739

740
// Verify that a condition variable looses its signal if no one
741
// is waiting on it
742
NONCONCURRENT_TEST(Thread_CondvarIsStateless)
743
{
2✔
744
    int signal_state = 0;
2✔
745
    InterprocessMutex mutex;
2✔
746
    InterprocessMutex::SharedPart mutex_part;
2✔
747
    InterprocessCondVar changed;
2✔
748
    InterprocessCondVar::SharedPart condvar_part;
2✔
749
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
750
    TEST_PATH(path);
2✔
751
    DBOptions default_options;
2✔
752

1✔
753
    // Must have names because default_options.temp_dir is empty string on Windows
1✔
754
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarIsStateless_Mutex");
2✔
755
    changed.set_shared_part(condvar_part, path, "Thread_CondvarIsStateless_CondVar", default_options.temp_dir);
2✔
756
    Thread signal_thread;
2✔
757
    signal_state = 1;
2✔
758
    // send some signals:
1✔
759
    {
2✔
760
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
761
        for (int i = 0; i < 10; ++i)
22✔
762
            changed.notify_all();
20✔
763
    }
2✔
764
    // spawn a thread which will later do one more signal in order
1✔
765
    // to wake us up.
1✔
766
    signal_thread.start(std::bind(wakeup_signaller, &signal_state, &mutex, &changed));
2✔
767
    // Wait for a signal - the signals sent above should be lost, so
1✔
768
    // that this wait will actually wait for the thread to signal.
1✔
769
    {
2✔
770
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
771
        changed.wait(mutex, 0);
2✔
772
        CHECK_EQUAL(signal_state, 2);
2✔
773
    }
2✔
774
    signal_thread.join();
2✔
775
    changed.release_shared_part();
2✔
776
    mutex.release_shared_part();
2✔
777
}
2✔
778

779

780
// this test hangs, if timeout doesn't work.
781
NONCONCURRENT_TEST(Thread_CondvarTimeout)
782
{
2✔
783
    InterprocessMutex mutex;
2✔
784
    InterprocessMutex::SharedPart mutex_part;
2✔
785
    InterprocessCondVar changed;
2✔
786
    InterprocessCondVar::SharedPart condvar_part;
2✔
787
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
788
    TEST_PATH(path);
2✔
789
    DBOptions default_options;
2✔
790
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarTimeout_Mutex");
2✔
791
    changed.set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
792
    struct timespec time_limit;
2✔
793
    timeval tv;
2✔
794
    gettimeofday(&tv, nullptr);
2✔
795
    time_limit.tv_sec = tv.tv_sec;
2✔
796
    time_limit.tv_nsec = tv.tv_usec * 1000;
2✔
797
    time_limit.tv_nsec += 100000000;        // 100 msec wait
2✔
798
    if (time_limit.tv_nsec >= 1000000000) { // overflow
2✔
UNCOV
799
        time_limit.tv_nsec -= 1000000000;
×
UNCOV
800
        time_limit.tv_sec += 1;
×
UNCOV
801
    }
×
802
    {
2✔
803
        std::lock_guard<InterprocessMutex> l(mutex);
2✔
804
        for (int i = 0; i < 5; ++i)
12✔
805
            changed.wait(mutex, &time_limit);
10✔
806
    }
2✔
807
    changed.release_shared_part();
2✔
808
    mutex.release_shared_part();
2✔
809
}
2✔
810

811

812
// test that notify_all will wake up all waiting threads, if there
813
// are many waiters:
814
NONCONCURRENT_TEST(Thread_CondvarNotifyAllWakeup)
815
{
2✔
816
    InterprocessMutex mutex;
2✔
817
    InterprocessMutex::SharedPart mutex_part;
2✔
818
    InterprocessCondVar changed;
2✔
819
    InterprocessCondVar::SharedPart condvar_part;
2✔
820
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
821
    TEST_PATH(path);
2✔
822
    DBOptions default_options;
2✔
823
    mutex.set_shared_part(mutex_part, path, "Thread_CondvarNotifyAllWakeup_Mutex");
2✔
824
    changed.set_shared_part(condvar_part, path, "Thread_CondvarNotifyAllWakeup_CondVar", default_options.temp_dir);
2✔
825

1✔
826
    size_t num_threads_holding_lock = 0;
2✔
827
    std::mutex control_mutex;
2✔
828
    std::condition_variable control_cv;
2✔
829

1✔
830
    const size_t num_waiters = 10;
2✔
831
    Thread waiters[num_waiters];
2✔
832
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
833
        waiters[i].start(std::bind(waiter, &mutex, &changed, &control_mutex, &control_cv, &num_threads_holding_lock));
20✔
834
    }
20✔
835
    {
2✔
836
        // allow all waiters to start and obtain the InterprocessCondVar
1✔
837
        std::unique_lock<std::mutex> unique_lock(control_mutex);
2✔
838
        control_cv.wait(unique_lock, [&]{ return num_threads_holding_lock == num_waiters; });
4✔
839
    }
2✔
840

1✔
841
    mutex.lock();
2✔
842
    changed.notify_all();
2✔
843
    mutex.unlock();
2✔
844

1✔
845
    for (size_t i = 0; i < num_waiters; ++i) {
22✔
846
        waiters[i].join();
20✔
847
    }
20✔
848
    changed.release_shared_part();
2✔
849
    mutex.release_shared_part();
2✔
850
}
2✔
851

852

853
// Test that the unlock+wait operation of wait() takes part atomically, i.e. that there is no time
854
// gap between them where another thread could invoke signal() which could go undetected by the wait.
855
// This test takes more than 3 days with valgrind.
856
TEST_IF(Thread_CondvarAtomicWaitUnlock, !running_with_valgrind && TEST_DURATION > 0)
857
{
×
858
    SHARED_GROUP_TEST_PATH(path);
×
859

860
    const int iter = 10000;
×
861

862
    // It's nice to have many threads to trigger preemption (see notes inside the t1 thread)
863
    const int thread_pair_count = 2; // std::thread::hardware_concurrency();
×
864
    std::vector<std::thread> threads;
×
865
    for (int tpc = 0; tpc < thread_pair_count; tpc++) {
×
866

867
        threads.push_back(std::thread([&]() {
×
868
            InterprocessMutex mutex;
×
869
            InterprocessMutex::SharedPart mutex_part;
×
870
            InterprocessCondVar condvar;
×
871
            InterprocessCondVar::SharedPart condvar_part;
×
872
            DBOptions default_options;
×
873

874
            std::stringstream ss;
×
875
            ss << std::this_thread::get_id();
×
876
            std::string id = ss.str();
×
877

878
            mutex.set_shared_part(mutex_part, path, "mutex" + id);
×
879
            condvar.set_shared_part(condvar_part, path, "sema" + id, default_options.temp_dir);
×
880
            InterprocessCondVar::init_shared_part(condvar_part);
×
881

882
            std::atomic<bool> signal(false);
×
883

884
            std::thread t1([&]() {
×
885
                for (int i = 0; i < iter; i++) {
×
886
                    mutex.lock();
×
887
                    signal = true;
×
888

889
                    // A gap in wait() could be very tight, so we need a way to preemt it between two instructions.
890
                    // Problem is that we have so many/frequent operating system wait calls in this that they might 
891
                    // be invoked closer than a thread time slice, so preemption would never occur. So we create 
892
                    // some work that some times willsome times bring the current time slice close to its end.
893

894
                    // Wait between 0 and number of clocks on 100 ms on a on 3 GHz machine (100 ms is Linux default
895
                    // time slice)
896
                    uint64_t clocks_to_wait = fastrand(3ULL * 1000000000ULL / 1000000ULL * 100ULL); 
×
897

898
                    // This loop can wait alot more than 100 ms because each iteration takes many more clocks than 
899
                    // just 1. That's intentional and will cover other OS'es with bigger time slices.
900
                    volatile int sum = 0; // must be volatile, else it compiles into no-op
×
901
                    for (uint64_t t = 0; t < clocks_to_wait; t++) {
×
902
                        sum = sum + 1;
×
903
                    }
×
904

905
                    condvar.wait(mutex, nullptr);
×
906
                    mutex.unlock();
×
907
                }
×
908
            });
×
909

910
            // This thread calls notify_all() exactly one time after the other thread has invoked wait() and has
911
            // released the mutex. If wait() misses the notify_all() then there is a bug, which will reveal itself
912
            // by both threads hanging infinitely.
913
            std::thread t2([&]() {
×
914
                for (int i = 0; i < iter; i++) {
×
915
                    while (!signal) {
×
916
                    }
×
917
                    signal = false;
×
918
                    mutex.lock();
×
919
                    condvar.notify_all();
×
920
                    mutex.unlock();
×
921
                }
×
922
            });
×
923
            
924
            t1.join();
×
925
            t2.join();
×
926
        }));
×
927
    }
×
928

929
    for (int i = 0; i < thread_pair_count; i++) {
×
930
        threads[i].join();
×
931
    }
×
932
}
×
933

934
NONCONCURRENT_TEST(Thread_Condvar_CreateDestroyDifferentThreads)
935
{
2✔
936
    auto cv = std::make_unique<InterprocessCondVar>();
2✔
937
    InterprocessCondVar::SharedPart condvar_part;
2✔
938
    InterprocessCondVar::init_shared_part(condvar_part);
2✔
939
    TEST_PATH(path);
2✔
940
    DBOptions default_options;
2✔
941
    cv->set_shared_part(condvar_part, path, "Thread_CondvarTimeout_CondVar", default_options.temp_dir);
2✔
942
    std::thread([&] {
2✔
943
        cv.reset();
2✔
944
    }).join();
2✔
945
}
2✔
946

947
#ifdef _WIN32
948
TEST(Thread_Win32InterprocessBackslashes)
949
{
950
    InterprocessMutex mutex;
951
    InterprocessMutex::SharedPart mutex_part;
952
    InterprocessCondVar condvar;
953
    InterprocessCondVar::SharedPart condvar_part;
954
    InterprocessCondVar::init_shared_part(condvar_part);
955
    DBOptions default_options;
956

957
    mutex.set_shared_part(mutex_part, "Path\\With\\Slashes", "my_mutex");
958
    condvar.set_shared_part(condvar_part, "Path\\With\\Slashes", "my_condvar", default_options.temp_dir);
959
}
960
#endif
961

962
#endif // TEST_THREAD
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc