• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_301264

30 Jul 2024 07:11PM UTC coverage: 91.111% (+0.009%) from 91.102%
github_pull_request_301264

Pull #7936

Evergreen

web-flow
Add support for multi-process subscription state change notifications (#7862)

As with the other multi-process notifications, the core idea here is to
eliminate the in-memory state and produce notifications based entirely on the
current state of the Realm file.

SubscriptionStore::update_state() has been replaced with separate functions for
the specific legal state transitions, which also take a write transaction as a
parameter. These functions are called by PendingBootstrapStore inside the same
write transaction as the bootstrap updates which changed the subscription
state. This is both a minor performance optimization (due to fewer writes) and
eliminates a brief window between the two writes where the Realm file was in an
inconsistent state.

There's a minor functional change here: previously old subscription sets were
superseded when the new one reached the Completed state, and now they are
superseded on AwaitingMark. This aligns it with when the new subscription set
becomes the one which is returned by get_active().
Pull Request #7936: Fix connection callback crashes when reloading with React Native

102800 of 181570 branches covered (56.62%)

216840 of 237996 relevant lines covered (91.11%)

5918493.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.19
/src/realm/db.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20

21
#include <algorithm>
22
#include <atomic>
23
#include <cerrno>
24
#include <fcntl.h>
25
#include <iostream>
26
#include <mutex>
27
#include <sstream>
28
#include <type_traits>
29
#include <random>
30
#include <deque>
31
#include <thread>
32
#include <chrono>
33
#include <condition_variable>
34

35
#include <realm/disable_sync_to_disk.hpp>
36
#include <realm/group_writer.hpp>
37
#include <realm/impl/simulated_failure.hpp>
38
#include <realm/replication.hpp>
39
#include <realm/util/errno.hpp>
40
#include <realm/util/features.h>
41
#include <realm/util/file_mapper.hpp>
42
#include <realm/util/safe_int_ops.hpp>
43
#include <realm/util/scope_exit.hpp>
44
#include <realm/util/thread.hpp>
45
#include <realm/util/to_string.hpp>
46

47
#ifndef _WIN32
48
#include <sys/wait.h>
49
#include <sys/time.h>
50
#include <unistd.h>
51
#else
52
#include <windows.h>
53
#include <process.h>
54
#endif
55

56
// #define REALM_ENABLE_LOGFILE
57

58

59
using namespace realm;
60
using namespace realm::util;
61
using Durability = DBOptions::Durability;
62

63
namespace {
64

65
// value   change
66
// --------------------
67
//  4      Unknown
68
//  5      Introduction of SharedInfo::file_format_version and
69
//         SharedInfo::history_type.
70
//  6      Using new robust mutex emulation where applicable
71
//  7      Introducing `commit_in_critical_phase` and `sync_agent_present`, and
72
//         changing `daemon_started` and `daemon_ready` from 1-bit to 8-bit
73
//         fields.
74
//  8      Placing the commitlog history inside the Realm file.
75
//  9      Fair write transactions requires an additional condition variable,
76
//         `write_fairness`
77
// 10      Introducing SharedInfo::history_schema_version.
78
// 11      New impl of InterprocessCondVar on windows.
79
// 12      Change `number_of_versions` to an atomic rather than guarding it
80
//         with a lock.
81
// 13      New impl of VersionList and added mutex for it (former RingBuffer)
82
// 14      Added field for tracking ongoing encrypted writes
83
const uint_fast16_t g_shared_info_version = 14;
84

85

86
struct VersionList {
87
    // the VersionList is an array of ReadCount structures.
88
    // it is placed in the "lock-file" and accessed via memory mapping
89
    struct ReadCount {
90
        uint64_t version;
91
        uint64_t filesize;
92
        uint64_t current_top;
93
        uint32_t count_live;
94
        uint32_t count_frozen;
95
        uint32_t count_full;
96
        bool is_active()
97
        {
85,698,687✔
98
            return version != 0;
85,698,687✔
99
        }
85,698,687✔
100
        void deactivate()
101
        {
5,166,876✔
102
            version = 0;
5,166,876✔
103
            count_live = count_frozen = count_full = 0;
5,166,876✔
104
        }
5,166,876✔
105
        void activate(uint64_t v)
106
        {
701,154✔
107
            version = v;
701,154✔
108
        }
701,154✔
109
    };
110

111
    void reserve(uint32_t size) noexcept
112
    {
144,483✔
113
        for (auto i = entries; i < size; ++i)
4,767,873✔
114
            data()[i].deactivate();
4,623,390✔
115
        if (size > entries) {
144,486✔
116
            // Fence preventing downward motion of above writes
117
            std::atomic_signal_fence(std::memory_order_release);
144,486✔
118
            entries = size;
144,486✔
119
        }
144,486✔
120
    }
144,483✔
121

122
    VersionList() noexcept
123
    {
70,788✔
124
        newest = nil; // empty
70,788✔
125
        entries = 0;
70,788✔
126
        reserve(init_readers_size);
70,788✔
127
    }
70,788✔
128

129
    static size_t compute_required_space(uint_fast32_t num_entries) noexcept
130
    {
74,949✔
131
        // get space required for given number of entries beyond the initial count.
132
        // NB: this not the size of the VersionList, it is the size minus whatever was
133
        // the initial size.
134
        return sizeof(ReadCount) * (num_entries - init_readers_size);
74,949✔
135
    }
74,949✔
136

137
    unsigned int capacity() const noexcept
138
    {
1,190,883✔
139
        return entries;
1,190,883✔
140
    }
1,190,883✔
141

142
    ReadCount& get(uint_fast32_t idx) noexcept
143
    {
3,320,220✔
144
        return data()[idx];
3,320,220✔
145
    }
3,320,220✔
146

147
    ReadCount& get_newest() noexcept
148
    {
×
149
        return get(newest);
×
150
    }
×
151
    // returns nullptr if all entries are in use
152
    ReadCount* try_allocate_entry(uint64_t top, uint64_t size, uint64_t version)
153
    {
701,217✔
154
        auto i = allocating.load();
701,217✔
155
        if (i == newest.load()) {
701,217✔
156
            // if newest != allocating we are recovering from a crash and MUST complete the earlier allocation
157
            // but if not, find lowest free entry by linear search.
158
            uint32_t k = 0;
627,576✔
159
            while (k < entries && data()[k].is_active()) {
1,448,817✔
160
                ++k;
821,241✔
161
            }
821,241✔
162
            if (k == entries)
627,576✔
163
                return nullptr;     // no free entries
66✔
164
            allocating.exchange(k); // barrier: prevent upward movement of instructions below
627,510✔
165
            i = k;
627,510✔
166
        }
627,510✔
167
        auto& rc = data()[i];
701,151✔
168
        REALM_ASSERT(rc.count_frozen == 0);
701,151✔
169
        REALM_ASSERT(rc.count_live == 0);
701,151✔
170
        REALM_ASSERT(rc.count_full == 0);
701,151✔
171
        rc.current_top = top;
701,151✔
172
        rc.filesize = size;
701,151✔
173
        rc.activate(version);
701,151✔
174
        newest.store(i); // barrier: prevent downward movement of instructions above
701,151✔
175
        return &rc;
701,151✔
176
    }
701,217✔
177

178
    uint32_t index_of(const ReadCount& rc) noexcept
179
    {
543,501✔
180
        return (uint32_t)(&rc - data());
543,501✔
181
    }
543,501✔
182

183
    void free_entry(ReadCount* rc) noexcept
184
    {
543,504✔
185
        rc->current_top = rc->filesize = -1ULL; // easy to recognize in debugger
543,504✔
186
        rc->deactivate();
543,504✔
187
    }
543,504✔
188

189
    // This method resets the version list to an empty state, then allocates an entry.
190
    // Precondition: This should *only* be done if the caller has established that she
191
    // is the only thread/process that has access to the VersionList. It is currently
192
    // called from init_versioning(), which is called by DB::open() under the
193
    // condition that it is the session initiator and under guard by the control mutex,
194
    // thus ensuring the precondition. It is also called from compact() in a similar situation.
195
    // It is most likely not suited for any other use.
196
    ReadCount& init_versioning(uint64_t top, uint64_t filesize, uint64_t version) noexcept
197
    {
73,632✔
198
        newest = nil;
73,632✔
199
        allocating = 0;
73,632✔
200
        auto t_free = entries;
73,632✔
201
        entries = 0;
73,632✔
202
        reserve(t_free);
73,632✔
203
        return *try_allocate_entry(top, filesize, version);
73,632✔
204
    }
73,632✔
205

206
    void purge_versions(uint64_t& oldest_live_v, TopRefMap& top_refs, bool& any_new_unreachables)
207
    {
627,531✔
208
        oldest_live_v = std::numeric_limits<uint64_t>::max();
627,531✔
209
        auto oldest_full_v = std::numeric_limits<uint64_t>::max();
627,531✔
210
        any_new_unreachables = false;
627,531✔
211
        // correct case where an earlier crash may have left the entry at 'allocating' partially initialized:
212
        const auto index_of_newest = newest.load();
627,531✔
213
        if (auto a = allocating.load(); a != index_of_newest) {
627,531✔
214
            data()[a].deactivate();
×
215
        }
×
216
        // determine fully locked versions - after one of those all versions are considered live.
217
        for (auto* rc = data(); rc < data() + entries; ++rc) {
21,383,877✔
218
            if (!rc->is_active())
20,756,346✔
219
                continue;
18,606,417✔
220
            if (rc->count_full) {
2,149,929✔
221
                if (rc->version < oldest_full_v)
×
222
                    oldest_full_v = rc->version;
×
223
            }
×
224
        }
2,149,929✔
225
        // collect reachable versions and determine oldest live reachable version
226
        // (oldest reachable version is the first entry in the top_refs map, so no need to find it explicitly)
227
        for (auto* rc = data(); rc < data() + entries; ++rc) {
21,384,552✔
228
            if (!rc->is_active())
20,757,021✔
229
                continue;
18,607,188✔
230
            if (rc->count_frozen || rc->count_live || rc->version >= oldest_full_v) {
2,149,833✔
231
                // entry is still reachable
232
                top_refs.emplace(rc->version, VersionInfo{to_ref(rc->current_top), to_ref(rc->filesize)});
1,606,554✔
233
            }
1,606,554✔
234
            if (rc->count_live || rc->version >= oldest_full_v) {
2,149,833✔
235
                if (rc->version < oldest_live_v)
1,050,369✔
236
                    oldest_live_v = rc->version;
699,456✔
237
            }
1,050,369✔
238
        }
2,149,833✔
239
        // we must have found at least one reachable version
240
        REALM_ASSERT(top_refs.size());
627,531✔
241
        // free unreachable entries and determine if we want to trigger backdating
242
        uint64_t oldest_v = top_refs.begin()->first;
627,531✔
243
        for (auto* rc = data(); rc < data() + entries; ++rc) {
21,383,391✔
244
            if (!rc->is_active())
20,755,860✔
245
                continue;
18,606,318✔
246
            if (rc->count_frozen == 0 && rc->count_live == 0 && rc->version < oldest_full_v) {
2,149,542✔
247
                // entry is becoming unreachable.
248
                // if it is also younger than a reachable version, then set 'any_new_unreachables' to trigger
249
                // backdating
250
                if (rc->version > oldest_v) {
543,504✔
251
                    any_new_unreachables = true;
73,344✔
252
                }
73,344✔
253
                REALM_ASSERT(index_of(*rc) != index_of_newest);
543,504✔
254
                free_entry(rc);
543,504✔
255
            }
543,504✔
256
        }
2,149,542✔
257
        REALM_ASSERT(oldest_v != std::numeric_limits<uint64_t>::max());
627,531✔
258
        REALM_ASSERT(oldest_live_v != std::numeric_limits<uint64_t>::max());
627,531✔
259
    }
627,531✔
260

261
#if REALM_DEBUG
262
    void dump()
263
    {
×
264
        util::format(std::cout, "VersionList has %1 entries: \n", entries);
×
265
        for (auto* rc = data(); rc < data() + entries; ++rc) {
×
266
            util::format(std::cout, "[%1]: version %2, live: %3, full: %4, frozen: %5\n", index_of(*rc), rc->version,
×
267
                         rc->count_live, rc->count_full, rc->count_frozen);
×
268
        }
×
269
    }
×
270
#endif // REALM_DEBUG
271

272
    constexpr static uint32_t nil = (uint32_t)-1;
273
    const static int init_readers_size = 32;
274
    uint32_t entries;
275
    std::atomic<uint32_t> allocating; // atomic for crash safety, not threading
276
    std::atomic<uint32_t> newest;     // atomic for crash safety, not threading
277

278
    // IMPORTANT: The actual data comprising the version list MUST BE PLACED LAST in
279
    // the VersionList structure, as the data area is extended at run time.
280
    // Similarly, the VersionList must be the final element of the SharedInfo structure.
281
    // IMPORTANT II:
282
    // To ensure proper alignment across all platforms, the SharedInfo structure
283
    // should NOT have a stricter alignment requirement than the ReadCount structure.
284
    ReadCount m_data[init_readers_size];
285

286
    // Silence UBSan errors about out-of-bounds reads on m_data by casting to a pointer
287
    ReadCount* data() noexcept
288
    {
76,652,148✔
289
        return m_data;
76,652,148✔
290
    }
76,652,148✔
291
    const ReadCount* data() const noexcept
292
    {
×
293
        return m_data;
×
294
    }
×
295
};
296

297
// Using lambda rather than function so that shared_ptr shared state doesn't need to hold a function pointer.
298
constexpr auto TransactionDeleter = [](Transaction* t) {
2,090,262✔
299
    t->close();
2,090,262✔
300
    delete t;
2,090,262✔
301
};
2,090,262✔
302

303
template <typename... Args>
304
TransactionRef make_transaction_ref(Args&&... args)
305
{
2,092,665✔
306
    return TransactionRef(new Transaction(std::forward<Args>(args)...), TransactionDeleter);
2,092,665✔
307
}
2,092,665✔
308

309
} // anonymous namespace
310

311
namespace realm {
312

313
/// The structure of the contents of the per session `.lock` file. Note that
314
/// this file is transient in that it is recreated/reinitialized at the
315
/// beginning of every session. A session is any sequence of temporally
316
/// overlapping openings of a particular Realm file via DB objects. For
317
/// example, if there are two DB objects, A and B, and the file is
318
/// first opened via A, then opened via B, then closed via A, and finally closed
319
/// via B, then the session streaches from the opening via A to the closing via
320
/// B.
321
///
322
/// IMPORTANT: Remember to bump `g_shared_info_version` if anything is changed
323
/// in the memory layout of this class, or if the meaning of any of the stored
324
/// values change.
325
///
326
/// Members `init_complete`, `shared_info_version`, `size_of_mutex`, and
327
/// `size_of_condvar` may only be modified only while holding an exclusive lock
328
/// on the file, and may be read only while holding a shared (or exclusive) lock
329
/// on the file. All other members (except for the VersionList which has its own mutex)
330
/// may be accessed only while holding a lock on `controlmutex`.
331
///
332
/// SharedInfo must be 8-byte aligned. On 32-bit Apple platforms, mutexes store their
333
/// alignment as part of the mutex state. We're copying the SharedInfo (including
334
/// embedded but alway unlocked mutexes) and it must retain the same alignment
335
/// throughout.
336
struct alignas(8) DB::SharedInfo {
337
    /// Indicates that initialization of the lock file was completed
338
    /// sucessfully.
339
    ///
340
    /// CAUTION: This member must never move or change type, as that would
341
    /// compromize safety of the the session initiation process.
342
    std::atomic<uint8_t> init_complete; // Offset 0
343

344
    /// The size in bytes of a mutex member of SharedInfo. This allows all
345
    /// session participants to be in agreement. Obviously, a size match is not
346
    /// enough to guarantee identical layout internally in the mutex object, but
347
    /// it is hoped that it will catch some (if not most) of the cases where
348
    /// there is a layout discrepancy internally in the mutex object.
349
    uint8_t size_of_mutex; // Offset 1
350

351
    /// Like size_of_mutex, but for condition variable members of SharedInfo.
352
    uint8_t size_of_condvar; // Offset 2
353

354
    /// Set during the critical phase of a commit, when the logs, the VersionList
355
    /// and the database may be out of sync with respect to each other. If a
356
    /// writer crashes during this phase, there is no safe way of continuing
357
    /// with further write transactions. When beginning a write transaction,
358
    /// this must be checked and an exception thrown if set.
359
    ///
360
    /// Note that std::atomic<uint8_t> is guaranteed to have standard layout.
361
    std::atomic<uint8_t> commit_in_critical_phase = {0}; // Offset 3
362

363
    /// The target Realm file format version for the current session. This
364
    /// allows all session participants to be in agreement. It can only differ
365
    /// from what is returned by Group::get_file_format_version() temporarily,
366
    /// and only during the Realm file opening process. If it differs, it means
367
    /// that the file format needs to be upgraded from its current format
368
    /// (Group::get_file_format_version()), the format specified by this member
369
    /// of SharedInfo.
370
    uint8_t file_format_version; // Offset 4
371

372
    /// Stores a value of type Replication::HistoryType. Must match across all
373
    /// session participants.
374
    int8_t history_type; // Offset 5
375

376
    /// The SharedInfo layout version. This allows all session participants to
377
    /// be in agreement. Must be bumped if the layout of the SharedInfo
378
    /// structure is changed. Note, however, that only the part that lies beyond
379
    /// SharedInfoUnchangingLayout can have its layout changed.
380
    ///
381
    /// CAUTION: This member must never move or change type, as that would
382
    /// compromize version agreement checking.
383
    uint16_t shared_info_version = g_shared_info_version; // Offset 6
384

385
    uint16_t durability;           // Offset 8
386
    uint16_t free_write_slots = 0; // Offset 10
387

388
    /// Number of participating shared groups
389
    uint32_t num_participants = 0; // Offset 12
390

391
    /// Latest version number. Guarded by the controlmutex (for lock-free
392
    /// access, use get_version_of_latest_snapshot() instead)
393
    uint64_t latest_version_number; // Offset 16
394

395
    /// Pid of process initiating the session, but only if that process runs
396
    /// with encryption enabled, zero otherwise. This was used to prevent
397
    /// multiprocess encryption until support for that was added.
398
    uint64_t session_initiator_pid = 0; // Offset 24
399

400
    std::atomic<uint64_t> number_of_versions; // Offset 32
401

402
    /// True (1) if there is a sync agent present (a session participant acting
403
    /// as sync client). It is an error to have a session with more than one
404
    /// sync agent. The purpose of this flag is to prevent that from ever
405
    /// happening. If the sync agent crashes and leaves the flag set, the
406
    /// session will need to be restarted (lock file reinitialized) before a new
407
    /// sync agent can be started.
408
    uint8_t sync_agent_present = 0; // Offset 40
409

410
    /// Set when a participant decides to start the daemon, cleared by the
411
    /// daemon when it decides to exit. Participants check during open() and
412
    /// start the daemon if running in async mode.
413
    uint8_t daemon_started = 0; // Offset 41
414

415
    /// Set by the daemon when it is ready to handle commits. Participants must
416
    /// wait during open() on 'daemon_becomes_ready' for this to become true.
417
    /// Cleared by the daemon when it decides to exit.
418
    uint8_t daemon_ready = 0; // Offset 42
419

420
    uint8_t filler_1; // Offset 43
421

422
    /// Stores a history schema version (as returned by
423
    /// Replication::get_history_schema_version()). Must match across all
424
    /// session participants.
425
    uint16_t history_schema_version; // Offset 44
426

427
    uint16_t filler_2; // Offset 46
428

429
    InterprocessMutex::SharedPart shared_writemutex; // Offset 48
430
    InterprocessMutex::SharedPart shared_controlmutex;
431
    InterprocessMutex::SharedPart shared_versionlist_mutex;
432
    InterprocessCondVar::SharedPart room_to_write;
433
    InterprocessCondVar::SharedPart work_to_do;
434
    InterprocessCondVar::SharedPart daemon_becomes_ready;
435
    InterprocessCondVar::SharedPart new_commit_available;
436
    InterprocessCondVar::SharedPart pick_next_writer;
437
    std::atomic<uint32_t> next_ticket;
438
    std::atomic<uint32_t> next_served = 0;
439
    std::atomic<uint64_t> writing_page_offset;
440
    std::atomic<uint64_t> write_counter;
441

442
    // IMPORTANT: The VersionList MUST be the last field in SharedInfo - see above.
443
    VersionList readers;
444

445
    SharedInfo(Durability, Replication::HistoryType, int history_schema_version);
446
    ~SharedInfo() noexcept {}
25,506✔
447

448
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version)
449
    {
73,632✔
450
        // Create our first versioning entry:
451
        readers.init_versioning(top_ref, file_size, initial_version);
73,632✔
452
    }
73,632✔
453
};
454

455

456
DB::SharedInfo::SharedInfo(Durability dura, Replication::HistoryType ht, int hsv)
457
    : size_of_mutex(sizeof(shared_writemutex))
34,866✔
458
    , size_of_condvar(sizeof(room_to_write))
34,866✔
459
    , shared_writemutex()   // Throws
34,866✔
460
    , shared_controlmutex() // Throws
34,866✔
461
{
70,791✔
462
    durability = static_cast<uint16_t>(dura); // durability level is fixed from creation
70,791✔
463
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_type)>(ht + 0));
70,791✔
464
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_schema_version)>(hsv));
70,791✔
465
    history_type = ht;
70,791✔
466
    history_schema_version = static_cast<uint16_t>(hsv);
70,791✔
467
    InterprocessCondVar::init_shared_part(new_commit_available); // Throws
70,791✔
468
    InterprocessCondVar::init_shared_part(pick_next_writer);     // Throws
70,791✔
469
    next_ticket = 0;
70,791✔
470

471
// IMPORTANT: The offsets, types (, and meanings) of these members must
472
// never change, not even when the SharedInfo layout version is bumped. The
473
// eternal constancy of this part of the layout is what ensures that a
474
// joining session participant can reliably verify that the actual format is
475
// as expected.
476
#ifndef _WIN32
70,791✔
477
#pragma GCC diagnostic push
70,791✔
478
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
70,791✔
479
#endif
70,791✔
480
    static_assert(offsetof(SharedInfo, init_complete) == 0 && ATOMIC_BOOL_LOCK_FREE == 2 &&
70,791✔
481
                      std::is_same<decltype(init_complete), std::atomic<uint8_t>>::value &&
70,791✔
482
                      offsetof(SharedInfo, shared_info_version) == 6 &&
70,791✔
483
                      std::is_same<decltype(shared_info_version), uint16_t>::value,
70,791✔
484
                  "Forbidden change in SharedInfo layout");
70,791✔
485

486
    // Try to catch some of the memory layout changes that requires bumping of
487
    // the SharedInfo file format version (shared_info_version).
488
    static_assert(
70,791✔
489
        offsetof(SharedInfo, size_of_mutex) == 1 && std::is_same<decltype(size_of_mutex), uint8_t>::value &&
70,791✔
490
            offsetof(SharedInfo, size_of_condvar) == 2 && std::is_same<decltype(size_of_condvar), uint8_t>::value &&
70,791✔
491
            offsetof(SharedInfo, commit_in_critical_phase) == 3 &&
70,791✔
492
            std::is_same<decltype(commit_in_critical_phase), std::atomic<uint8_t>>::value &&
70,791✔
493
            offsetof(SharedInfo, file_format_version) == 4 &&
70,791✔
494
            std::is_same<decltype(file_format_version), uint8_t>::value && offsetof(SharedInfo, history_type) == 5 &&
70,791✔
495
            std::is_same<decltype(history_type), int8_t>::value && offsetof(SharedInfo, durability) == 8 &&
70,791✔
496
            std::is_same<decltype(durability), uint16_t>::value && offsetof(SharedInfo, free_write_slots) == 10 &&
70,791✔
497
            std::is_same<decltype(free_write_slots), uint16_t>::value &&
70,791✔
498
            offsetof(SharedInfo, num_participants) == 12 &&
70,791✔
499
            std::is_same<decltype(num_participants), uint32_t>::value &&
70,791✔
500
            offsetof(SharedInfo, latest_version_number) == 16 &&
70,791✔
501
            std::is_same<decltype(latest_version_number), uint64_t>::value &&
70,791✔
502
            offsetof(SharedInfo, session_initiator_pid) == 24 &&
70,791✔
503
            std::is_same<decltype(session_initiator_pid), uint64_t>::value &&
70,791✔
504
            offsetof(SharedInfo, number_of_versions) == 32 &&
70,791✔
505
            std::is_same<decltype(number_of_versions), std::atomic<uint64_t>>::value &&
70,791✔
506
            offsetof(SharedInfo, sync_agent_present) == 40 &&
70,791✔
507
            std::is_same<decltype(sync_agent_present), uint8_t>::value &&
70,791✔
508
            offsetof(SharedInfo, daemon_started) == 41 && std::is_same<decltype(daemon_started), uint8_t>::value &&
70,791✔
509
            offsetof(SharedInfo, daemon_ready) == 42 && std::is_same<decltype(daemon_ready), uint8_t>::value &&
70,791✔
510
            offsetof(SharedInfo, filler_1) == 43 && std::is_same<decltype(filler_1), uint8_t>::value &&
70,791✔
511
            offsetof(SharedInfo, history_schema_version) == 44 &&
70,791✔
512
            std::is_same<decltype(history_schema_version), uint16_t>::value && offsetof(SharedInfo, filler_2) == 46 &&
70,791✔
513
            std::is_same<decltype(filler_2), uint16_t>::value && offsetof(SharedInfo, shared_writemutex) == 48 &&
70,791✔
514
            std::is_same<decltype(shared_writemutex), InterprocessMutex::SharedPart>::value,
70,791✔
515
        "Caught layout change requiring SharedInfo file format bumping");
70,791✔
516
    static_assert(std::atomic<uint64_t>::is_always_lock_free);
70,791✔
517
#ifndef _WIN32
70,791✔
518
#pragma GCC diagnostic pop
70,791✔
519
#endif
70,791✔
520
}
70,791✔
521

522
class DB::VersionManager {
523
public:
524
    VersionManager(util::InterprocessMutex& mutex)
525
        : m_mutex(mutex)
49,026✔
526
    {
99,429✔
527
    }
99,429✔
528
    virtual ~VersionManager() {}
99,429✔
529

530
    void cleanup_versions(uint64_t& oldest_live_version, TopRefMap& top_refs, bool& any_new_unreachables)
531
        REQUIRES(!m_info_mutex)
532
    {
627,525✔
533
        std::lock_guard lock(m_mutex);
627,525✔
534
        util::CheckedLockGuard info_lock(m_info_mutex);
627,525✔
535
        ensure_reader_mapping();
627,525✔
536
        m_info->readers.purge_versions(oldest_live_version, top_refs, any_new_unreachables);
627,525✔
537
    }
627,525✔
538

539
    version_type get_newest_version() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
540
    {
627,516✔
541
        return get_version_id_of_latest_snapshot().version;
627,516✔
542
    }
627,516✔
543

544
    VersionID get_version_id_of_latest_snapshot() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
545
    {
16,097,673✔
546
        {
16,097,673✔
547
            // First check the local cache. This is an unlocked read, so it may
548
            // race with adding a new version. If this happens we'll either see
549
            // a stale value (acceptable for a racing write on one thread and
550
            // a read on another), or a new value which is guaranteed to not
551
            // be an active index in the local cache.
552
            util::CheckedLockGuard lock(m_local_readers_mutex);
16,097,673✔
553
            util::CheckedLockGuard info_lock(m_info_mutex);
16,097,673✔
554
            auto index = m_info->readers.newest.load();
16,097,673✔
555
            if (index < m_local_readers.size()) {
16,097,673✔
556
                auto& r = m_local_readers[index];
16,095,027✔
557
                if (r.is_active()) {
16,095,027✔
558
                    return {r.version, index};
16,048,065✔
559
                }
16,048,065✔
560
            }
16,095,027✔
561
        }
16,097,673✔
562

563
        std::lock_guard lock(m_mutex);
49,608✔
564
        util::CheckedLockGuard info_lock(m_info_mutex);
49,608✔
565
        auto index = m_info->readers.newest.load();
49,608✔
566
        ensure_reader_mapping(index);
49,608✔
567
        return {m_info->readers.get(index).version, index};
49,608✔
568
    }
16,097,673✔
569

570
    void release_read_lock(const ReadLockInfo& read_lock) REQUIRES(!m_local_readers_mutex, !m_info_mutex)
571
    {
4,474,857✔
572
        {
4,474,857✔
573
            util::CheckedLockGuard lock(m_local_readers_mutex);
4,474,857✔
574
            REALM_ASSERT(read_lock.m_reader_idx < m_local_readers.size());
4,474,857✔
575
            auto& r = m_local_readers[read_lock.m_reader_idx];
4,474,857✔
576
            auto& f = field_for_type(r, read_lock.m_type);
4,474,857✔
577
            REALM_ASSERT(f > 0);
4,474,857✔
578
            if (--f > 0)
4,474,857✔
579
                return;
2,839,743✔
580
            if (r.count_live == 0 && r.count_full == 0 && r.count_frozen == 0)
1,635,114✔
581
                r.version = 0;
1,608,246✔
582
        }
1,635,114✔
583

584
        std::lock_guard lock(m_mutex);
×
585
        util::CheckedLockGuard info_lock(m_info_mutex);
1,635,114✔
586
        // we should not need to call ensure_full_reader_mapping,
587
        // since releasing a read lock means it has been grabbed
588
        // earlier - and hence must reside in mapped memory:
589
        REALM_ASSERT(read_lock.m_reader_idx < m_local_max_entry);
1,635,114✔
590
        auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,635,114✔
591
        REALM_ASSERT(read_lock.m_version == r.version);
1,635,114✔
592
        --field_for_type(r, read_lock.m_type);
1,635,114✔
593
    }
1,635,114✔
594

595
    ReadLockInfo grab_read_lock(ReadLockInfo::Type type, VersionID version_id = {})
596
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
597
    {
4,475,028✔
598
        ReadLockInfo read_lock;
4,475,028✔
599
        if (try_grab_local_read_lock(read_lock, type, version_id))
4,475,028✔
600
            return read_lock;
2,839,767✔
601

602
        {
1,635,261✔
603
            const bool pick_specific = version_id.version != VersionID().version;
1,635,261✔
604
            std::lock_guard lock(m_mutex);
1,635,261✔
605
            util::CheckedLockGuard info_lock(m_info_mutex);
1,635,261✔
606
            auto newest = m_info->readers.newest.load();
1,635,261✔
607
            REALM_ASSERT(newest != VersionList::nil);
1,635,261✔
608
            read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
1,635,261✔
609
            ensure_reader_mapping((unsigned int)read_lock.m_reader_idx);
1,635,261✔
610
            bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
1,635,261✔
611
            auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,635,261✔
612
            if (pick_specific && version_id.version != r.version)
1,635,261✔
613
                throw BadVersion(version_id.version);
72✔
614
            if (!picked_newest) {
1,635,189✔
615
                if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
612✔
616
                    throw BadVersion(version_id.version);
×
617
                if (type != ReadLockInfo::Frozen && r.count_live == 0)
612✔
618
                    throw BadVersion(version_id.version);
60✔
619
            }
612✔
620
            populate_read_lock(read_lock, r, type);
1,635,129✔
621
        }
1,635,129✔
622

623
        {
×
624
            util::CheckedLockGuard local_lock(m_local_readers_mutex);
1,635,129✔
625
            grow_local_cache(read_lock.m_reader_idx + 1);
1,635,129✔
626
            auto& r2 = m_local_readers[read_lock.m_reader_idx];
1,635,129✔
627
            if (!r2.is_active()) {
1,635,129✔
628
                r2.version = read_lock.m_version;
1,608,273✔
629
                r2.filesize = read_lock.m_file_size;
1,608,273✔
630
                r2.current_top = read_lock.m_top_ref;
1,608,273✔
631
                r2.count_full = r2.count_live = r2.count_frozen = 0;
1,608,273✔
632
            }
1,608,273✔
633
            REALM_ASSERT_EX(field_for_type(r2, type) == 0, type, r2.count_full, r2.count_live, r2.count_frozen);
1,635,129✔
634
            field_for_type(r2, type) = 1;
1,635,129✔
635
        }
1,635,129✔
636

637
        return read_lock;
1,635,129✔
638
    }
1,635,189✔
639

640
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version) REQUIRES(!m_info_mutex)
641
    {
48,123✔
642
        std::lock_guard lock(m_mutex);
48,123✔
643
        util::CheckedLockGuard info_lock(m_info_mutex);
48,123✔
644
        m_info->init_versioning(top_ref, file_size, initial_version);
48,123✔
645
    }
48,123✔
646

647
    void add_version(ref_type new_top_ref, size_t new_file_size, uint64_t new_version) REQUIRES(!m_info_mutex)
648
    {
627,525✔
649
        std::lock_guard lock(m_mutex);
627,525✔
650
        util::CheckedLockGuard info_lock(m_info_mutex);
627,525✔
651
        ensure_reader_mapping();
627,525✔
652
        if (m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version)) {
627,525✔
653
            return;
627,453✔
654
        }
627,453✔
655
        // allocation failed, expand VersionList (and lockfile) and retry
656
        auto entries = m_info->readers.capacity();
72✔
657
        auto new_entries = entries + 32;
72✔
658
        expand_version_list(new_entries);
72✔
659
        m_local_max_entry = new_entries;
72✔
660
        m_info->readers.reserve(new_entries);
72✔
661
        auto success = m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version);
72✔
662
        REALM_ASSERT_EX(success, new_entries, new_version);
72✔
663
    }
72✔
664

665

666
private:
667
    void grow_local_cache(size_t new_size) REQUIRES(m_local_readers_mutex)
668
    {
1,635,423✔
669
        if (new_size > m_local_readers.size())
1,635,423✔
670
            m_local_readers.resize(new_size, VersionList::ReadCount{});
190,914✔
671
    }
1,635,423✔
672

673
    void populate_read_lock(ReadLockInfo& read_lock, VersionList::ReadCount& r, ReadLockInfo::Type type)
674
    {
4,474,833✔
675
        ++field_for_type(r, type);
4,474,833✔
676
        read_lock.m_type = type;
4,474,833✔
677
        read_lock.m_version = r.version;
4,474,833✔
678
        read_lock.m_top_ref = static_cast<ref_type>(r.current_top);
4,474,833✔
679
        read_lock.m_file_size = static_cast<size_t>(r.filesize);
4,474,833✔
680
    }
4,474,833✔
681

682
    bool try_grab_local_read_lock(ReadLockInfo& read_lock, ReadLockInfo::Type type, VersionID version_id)
683
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
684
    {
4,475,019✔
685
        const bool pick_specific = version_id.version != VersionID().version;
4,475,019✔
686
        auto index = version_id.index;
4,475,019✔
687
        if (!pick_specific) {
4,475,019✔
688
            util::CheckedLockGuard lock(m_info_mutex);
4,174,860✔
689
            index = m_info->readers.newest.load();
4,174,860✔
690
        }
4,174,860✔
691
        util::CheckedLockGuard local_lock(m_local_readers_mutex);
4,475,019✔
692
        if (index >= m_local_readers.size())
4,475,019✔
693
            return false;
190,920✔
694

695
        auto& r = m_local_readers[index];
4,284,099✔
696
        if (!r.is_active())
4,284,099✔
697
            return false;
1,417,395✔
698
        if (pick_specific && r.version != version_id.version)
2,866,704✔
699
            return false;
×
700
        if (field_for_type(r, type) == 0)
2,866,704✔
701
            return false;
27,111✔
702

703
        read_lock.m_reader_idx = index;
2,839,593✔
704
        populate_read_lock(read_lock, r, type);
2,839,593✔
705
        return true;
2,839,593✔
706
    }
2,866,704✔
707

708
    static uint32_t& field_for_type(VersionList::ReadCount& r, ReadLockInfo::Type type)
709
    {
16,719,828✔
710
        switch (type) {
16,719,828✔
711
            case ReadLockInfo::Frozen:
247,935✔
712
                return r.count_frozen;
247,935✔
713
            case ReadLockInfo::Live:
16,471,539✔
714
                return r.count_live;
16,471,539✔
715
            case ReadLockInfo::Full:
✔
716
                return r.count_full;
×
717
            default:
✔
718
                REALM_UNREACHABLE(); // silence a warning
719
        }
16,719,828✔
720
    }
16,719,828✔
721

722
    void mark_page_for_writing(uint64_t page_offset) REQUIRES(!m_info_mutex)
723
    {
1,200✔
724
        util::CheckedLockGuard info_lock(m_info_mutex);
1,200✔
725
        m_info->writing_page_offset = page_offset + 1;
1,200✔
726
        m_info->write_counter++;
1,200✔
727
    }
1,200✔
728
    void clear_writing_marker() REQUIRES(!m_info_mutex)
729
    {
1,200✔
730
        util::CheckedLockGuard info_lock(m_info_mutex);
1,200✔
731
        m_info->write_counter++;
1,200✔
732
        m_info->writing_page_offset = 0;
1,200✔
733
    }
1,200✔
734
    // returns false if no page is marked.
735
    // if a page is marked, returns true and optionally the offset of the page marked for writing
736
    // in all cases returns optionally the write counter
737
    bool observe_writer(uint64_t* page_offset, uint64_t* write_counter) REQUIRES(!m_info_mutex)
738
    {
101,448✔
739
        util::CheckedLockGuard info_lock(m_info_mutex);
101,448✔
740
        if (write_counter) {
101,448✔
741
            *write_counter = m_info->write_counter;
101,448✔
742
        }
101,448✔
743
        uint64_t marked = m_info->writing_page_offset;
101,448✔
744
        if (marked && page_offset) {
101,448!
745
            *page_offset = marked - 1;
×
746
        }
×
747
        return marked != 0;
101,448✔
748
    }
101,448✔
749

750
protected:
751
    util::InterprocessMutex& m_mutex;
752
    util::CheckedMutex m_local_readers_mutex;
753
    std::vector<VersionList::ReadCount> m_local_readers GUARDED_BY(m_local_readers_mutex);
754

755
    util::CheckedMutex m_info_mutex;
756
    unsigned int m_local_max_entry GUARDED_BY(m_info_mutex) = 0;
757
    SharedInfo* m_info GUARDED_BY(m_info_mutex) = nullptr;
758

759
    virtual void ensure_reader_mapping(unsigned int required = -1) REQUIRES(m_info_mutex) = 0;
760
    virtual void expand_version_list(unsigned new_entries) REQUIRES(m_info_mutex) = 0;
761
    friend class DB::EncryptionMarkerObserver;
762
};
763

764
class DB::FileVersionManager final : public DB::VersionManager {
765
public:
766
    FileVersionManager(File& file, util::InterprocessMutex& mutex)
767
        : VersionManager(mutex)
36,273✔
768
        , m_file(file)
36,273✔
769
    {
73,923✔
770
        size_t size = 0, required_size = sizeof(SharedInfo);
73,923✔
771
        while (size < required_size) {
147,846✔
772
            // Map the file without the lock held. This could result in the
773
            // mapping being too small and having to remap if the file is grown
774
            // concurrently, but if this is the case we should always see a bigger
775
            // size the next time.
776
            auto new_size = static_cast<size_t>(m_file.get_size());
73,923✔
777
            REALM_ASSERT(new_size > size);
73,923✔
778
            size = new_size;
73,923✔
779
            m_reader_map.remap(m_file, File::access_ReadWrite, size);
73,923✔
780
            m_info = m_reader_map.get_addr();
73,923✔
781

782
            std::lock_guard lock(m_mutex);
73,923✔
783
            m_local_max_entry = m_info->readers.capacity();
73,923✔
784
            required_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry);
73,923✔
785
            REALM_ASSERT(required_size >= size);
73,923✔
786
        }
73,923✔
787
    }
73,923✔
788

789
    void expand_version_list(unsigned new_entries) override REQUIRES(m_info_mutex)
790
    {
66✔
791
        size_t new_info_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(new_entries);
66✔
792
        m_file.prealloc(new_info_size);                                          // Throws
66✔
793
        m_reader_map.remap(m_file, util::File::access_ReadWrite, new_info_size); // Throws
66✔
794
        m_info = m_reader_map.get_addr();
66✔
795
    }
66✔
796

797
private:
798
    void ensure_reader_mapping(unsigned int required = -1) override REQUIRES(m_info_mutex)
799
    {
2,603,865✔
800
        using _impl::SimulatedFailure;
2,603,865✔
801
        SimulatedFailure::trigger(SimulatedFailure::shared_group__grow_reader_mapping); // Throws
2,603,865✔
802

803
        if (required < m_local_max_entry)
2,603,865✔
804
            return;
1,512,426✔
805

806
        auto new_max_entry = m_info->readers.capacity();
1,091,439✔
807
        if (new_max_entry > m_local_max_entry) {
1,091,439✔
808
            // handle mapping expansion if required
809
            size_t info_size = sizeof(DB::SharedInfo) + m_info->readers.compute_required_space(new_max_entry);
960✔
810
            m_reader_map.remap(m_file, util::File::access_ReadWrite, info_size); // Throws
960✔
811
            m_local_max_entry = new_max_entry;
960✔
812
            m_info = m_reader_map.get_addr();
960✔
813
        }
960✔
814
    }
1,091,439✔
815

816
    File& m_file;
817
    File::Map<DB::SharedInfo> m_reader_map;
818

819
    friend class DB::EncryptionMarkerObserver;
820
};
821

822
// adapter class for marking/observing encrypted writes
823
class DB::EncryptionMarkerObserver final : public util::WriteMarker, public util::WriteObserver {
824
public:
825
    EncryptionMarkerObserver(DB::VersionManager& vm)
826
        : vm(vm)
36,273✔
827
    {
73,923✔
828
    }
73,923✔
829
    bool no_concurrent_writer_seen() override
830
    {
101,448✔
831
        uint64_t tmp_write_count;
101,448✔
832
        auto page_may_have_been_written = vm.observe_writer(nullptr, &tmp_write_count);
101,448✔
833
        if (tmp_write_count != last_seen_count) {
101,448✔
834
            page_may_have_been_written = true;
42✔
835
            last_seen_count = tmp_write_count;
42✔
836
        }
42✔
837
        if (page_may_have_been_written) {
101,448✔
838
            calls_since_last_writer_observed = 0;
42✔
839
            return false;
42✔
840
        }
42✔
841
        ++calls_since_last_writer_observed;
101,406✔
842
        constexpr size_t max_calls = 5; // an arbitrary handful, > 1
101,406✔
843
        return calls_since_last_writer_observed >= max_calls;
101,406✔
844
    }
101,448✔
845
    void mark(uint64_t pos) override
846
    {
1,200✔
847
        vm.mark_page_for_writing(pos);
1,200✔
848
    }
1,200✔
849
    void unmark() override
850
    {
1,200✔
851
        vm.clear_writing_marker();
1,200✔
852
    }
1,200✔
853

854
private:
855
    DB::VersionManager& vm;
856
    uint64_t last_seen_count = 0;
857
    size_t calls_since_last_writer_observed = 0;
858
};
859

860
class DB::InMemoryVersionManager final : public DB::VersionManager {
861
public:
862
    InMemoryVersionManager(SharedInfo* info, util::InterprocessMutex& mutex)
863
        : VersionManager(mutex)
12,753✔
864
    {
25,506✔
865
        m_info = info;
25,506✔
866
        m_local_max_entry = m_info->readers.capacity();
25,506✔
867
    }
25,506✔
868
    void expand_version_list(unsigned) override
869
    {
×
870
        REALM_ASSERT(false);
×
871
    }
×
872

873
private:
874
    void ensure_reader_mapping(unsigned int) override {}
336,084✔
875
};
876

877
#if REALM_HAVE_STD_FILESYSTEM
878
std::string DBOptions::sys_tmp_dir = std::filesystem::temp_directory_path().string();
879
#else
880
std::string DBOptions::sys_tmp_dir = getenv("TMPDIR") ? getenv("TMPDIR") : "";
881
#endif
882

883
// NOTES ON CREATION AND DESTRUCTION OF SHARED MUTEXES:
884
//
885
// According to the 'process-sharing example' in the POSIX man page
886
// for pthread_mutexattr_init() other processes may continue to use a
887
// process-shared mutex after exit of the process that initialized
888
// it. Also, the example does not contain any call to
889
// pthread_mutex_destroy(), so apparently a process-shared mutex need
890
// not be destroyed at all, nor can it be that a process-shared mutex
891
// is associated with any resources that are local to the initializing
892
// process, because that would imply a leak.
893
//
894
// While it is not explicitly guaranteed in the man page, we shall
895
// assume that is is valid to initialize a process-shared mutex twice
896
// without an intervening call to pthread_mutex_destroy(). We need to
897
// be able to reinitialize a process-shared mutex if the first
898
// initializing process crashes and leaves the shared memory in an
899
// undefined state.
900

901
void DB::open(const std::string& path, const DBOptions& options)
902
{
73,686✔
903
    // Exception safety: Since do_open() is called from constructors, if it
904
    // throws, it must leave the file closed.
905
    using util::format;
73,686✔
906

907
    REALM_ASSERT(!is_attached());
73,686✔
908
    REALM_ASSERT(path.size());
73,686✔
909

910
    m_db_path = path;
73,686✔
911

912
    set_logger(options.logger);
73,686✔
913
    if (m_replication) {
73,686✔
914
        m_replication->set_logger(m_logger.get());
46,389✔
915
    }
46,389✔
916
    if (m_logger) {
73,686✔
917
        m_logger->log(util::Logger::Level::detail, "Open file: %1", path);
57,672✔
918
    }
57,672✔
919
    SlabAlloc& alloc = m_alloc;
73,686✔
920
    ref_type top_ref = 0;
73,686✔
921

922
    if (options.is_immutable) {
73,686✔
923
        SlabAlloc::Config cfg;
186✔
924
        cfg.read_only = true;
186✔
925
        cfg.no_create = true;
186✔
926
        cfg.encryption_key = options.encryption_key;
186✔
927
        top_ref = alloc.attach_file(path, cfg);
186✔
928
        SlabAlloc::DetachGuard dg(alloc);
186✔
929
        Group::read_only_version_check(alloc, top_ref, path);
186✔
930
        m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, m_alloc.get_baseline());
186✔
931
        dg.release();
186✔
932
        return;
186✔
933
    }
186✔
934
    std::string lockfile_path = get_core_file(path, CoreFileType::Lock);
73,500✔
935
    std::string coordination_dir = get_core_file(path, CoreFileType::Management);
73,500✔
936
    std::string lockfile_prefix = coordination_dir + "/access_control";
73,500✔
937
    m_alloc.set_read_only(false);
73,500✔
938

939
    Replication::HistoryType openers_hist_type = Replication::hist_None;
73,500✔
940
    int openers_hist_schema_version = 0;
73,500✔
941
    if (Replication* repl = get_replication()) {
73,500✔
942
        openers_hist_type = repl->get_history_type();
46,392✔
943
        openers_hist_schema_version = repl->get_history_schema_version();
46,392✔
944
    }
46,392✔
945

946
    int current_file_format_version;
73,500✔
947
    int target_file_format_version;
73,500✔
948
    int stored_hist_schema_version = -1; // Signals undetermined
73,500✔
949

950
    int retries_left = 10; // number of times to retry before throwing exceptions
73,500✔
951
    // in case there is something wrong with the .lock file... the retries allows
952
    // us to pick a new lockfile initializer in case the first one crashes without
953
    // completing the initialization
954
    std::default_random_engine random_gen;
73,500✔
955
    for (;;) {
182,040✔
956

957
        // if we're retrying, we first wait a random time
958
        if (retries_left < 10) {
182,040✔
959
            if (retries_left == 9) { // we seed it from a true random source if possible
240✔
960
                std::random_device r;
24✔
961
                random_gen.seed(r());
24✔
962
            }
24✔
963
            int max_delay = (10 - retries_left) * 10;
240✔
964
            int msecs = random_gen() % max_delay;
240✔
965
            millisleep(msecs);
240✔
966
        }
240✔
967

968
        m_file.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
182,040✔
969
        File::CloseGuard fcg(m_file);
182,040✔
970
        m_file.set_fifo_path(coordination_dir, "lock.fifo");
182,040✔
971

972
        if (m_file.try_rw_lock_exclusive()) { // Throws
182,040✔
973
            File::UnlockGuard ulg(m_file);
45,285✔
974

975
            // We're alone in the world, and it is Ok to initialize the
976
            // file. Start by truncating the file to zero to ensure that
977
            // the following resize will generate a file filled with zeroes.
978
            //
979
            // This will in particular set m_init_complete to 0.
980
            m_file.resize(0);
45,285✔
981
            m_file.prealloc(sizeof(SharedInfo));
45,285✔
982

983
            // We can crash anytime during this process. A crash prior to
984
            // the first resize could allow another thread which could not
985
            // get the exclusive lock because we hold it, and hence were
986
            // waiting for the shared lock instead, to observe and use an
987
            // old lock file.
988
            m_file_map.map(m_file, File::access_ReadWrite, sizeof(SharedInfo)); // Throws
45,285✔
989
            File::UnmapGuard fug(m_file_map);
45,285✔
990
            SharedInfo* info = m_file_map.get_addr();
45,285✔
991

992
            new (info) SharedInfo{options.durability, openers_hist_type, openers_hist_schema_version}; // Throws
45,285✔
993

994
            // Because init_complete is an std::atomic, it's guaranteed not to be observable by others
995
            // as being 1 before the entire SharedInfo header has been written.
996
            info->init_complete = 1;
45,285✔
997
        }
45,285✔
998

999
// We hold the shared lock from here until we close the file!
1000
#if REALM_PLATFORM_APPLE
74,469✔
1001
        // macOS has a bug which can cause a hang waiting to obtain a lock, even
1002
        // if the lock is already open in shared mode, so we work around it by
1003
        // busy waiting. This should occur only briefly during session initialization.
1004
        while (!m_file.try_rw_lock_shared()) {
136,677✔
1005
            sched_yield();
62,208✔
1006
        }
62,208✔
1007
#else
1008
        m_file.rw_lock_shared(); // Throws
107,571✔
1009
#endif
107,571✔
1010
        File::UnlockGuard ulg(m_file);
182,040✔
1011

1012
        // The coordination/management dir is created as a side effect of the lock
1013
        // operation above if needed for lock emulation. But it may also be needed
1014
        // for other purposes, so make sure it exists.
1015
        // in worst case there'll be a race on creating this directory.
1016
        // This should be safe but a waste of resources.
1017
        // Unfortunately it cannot be created at an earlier point, because
1018
        // it may then be deleted during the above lock_shared() operation.
1019
        try_make_dir(coordination_dir);
182,040✔
1020

1021
        // If the file is not completely initialized at this point in time, the
1022
        // preceeding initialization attempt must have failed. We know that an
1023
        // initialization process was in progress, because this thread (or
1024
        // process) failed to get an exclusive lock on the file. Because this
1025
        // thread (or process) currently has a shared lock on the file, we also
1026
        // know that the initialization process can no longer be in progress, so
1027
        // the initialization must either have completed or failed at this time.
1028

1029
        // The file is taken to be completely initialized if it is large enough
1030
        // to contain the `init_complete` field, and `init_complete` is true. If
1031
        // the file was not completely initialized, this thread must give up its
1032
        // shared lock, and retry to become the initializer. Eventually, one of
1033
        // two things must happen; either this thread, or another thread
1034
        // succeeds in completing the initialization, or this thread becomes the
1035
        // initializer, and fails the initialization. In either case, the retry
1036
        // loop will eventually terminate.
1037

1038
        // An empty file is (and was) never a successfully initialized file.
1039
        size_t info_size = sizeof(SharedInfo);
182,040✔
1040
        {
182,040✔
1041
            auto file_size = m_file.get_size();
182,040✔
1042
            if (util::int_less_than(file_size, info_size)) {
182,040✔
1043
                if (file_size == 0)
107,925✔
1044
                    continue; // Retry
79,173✔
1045
                info_size = size_t(file_size);
28,752✔
1046
            }
28,752✔
1047
        }
182,040✔
1048

1049
        // Map the initial section of the SharedInfo file that corresponds to
1050
        // the SharedInfo struct, or less if the file is smaller. We know that
1051
        // we have at least one byte, and that is enough to read the
1052
        // `init_complete` flag.
1053
        m_file_map.map(m_file, File::access_ReadWrite, info_size);
102,867✔
1054
        File::UnmapGuard fug_1(m_file_map);
102,867✔
1055
        SharedInfo* info = m_file_map.get_addr();
102,867✔
1056

1057
#ifndef _WIN32
102,867✔
1058
#pragma GCC diagnostic push
102,867✔
1059
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
102,867✔
1060
#endif
102,867✔
1061
        static_assert(offsetof(SharedInfo, init_complete) + sizeof SharedInfo::init_complete <= 1,
102,867✔
1062
                      "Unexpected position or size of SharedInfo::init_complete");
102,867✔
1063
#ifndef _WIN32
102,867✔
1064
#pragma GCC diagnostic pop
102,867✔
1065
#endif
102,867✔
1066
        if (info->init_complete == 0)
102,867✔
1067
            continue;
28,686✔
1068
        REALM_ASSERT(info->init_complete == 1);
74,181✔
1069

1070
        // At this time, we know that the file was completely initialized, but
1071
        // we still need to verify that is was initialized with the memory
1072
        // layout expected by this session participant. We could find that it is
1073
        // initializaed with a different memory layout if other concurrent
1074
        // session participants use different versions of the core library.
1075
        if (info_size < sizeof(SharedInfo)) {
74,181✔
1076
            if (retries_left) {
66✔
1077
                --retries_left;
60✔
1078
                continue;
60✔
1079
            }
60✔
1080
            throw IncompatibleLockFile(path, format("Architecture mismatch: SharedInfo size is %1 but should be %2.",
6✔
1081
                                                    info_size, sizeof(SharedInfo)));
6✔
1082
        }
66✔
1083
        if (info->shared_info_version != g_shared_info_version) {
74,115✔
1084
            if (retries_left) {
66✔
1085
                --retries_left;
60✔
1086
                continue;
60✔
1087
            }
60✔
1088
            throw IncompatibleLockFile(path, format("Version mismatch: SharedInfo version is %1 but should be %2.",
6✔
1089
                                                    info->shared_info_version, g_shared_info_version));
6✔
1090
        }
66✔
1091
        // Validate compatible sizes of mutex and condvar types. Sizes of all
1092
        // other fields are architecture independent, so if condvar and mutex
1093
        // sizes match, the entire struct matches. The offsets of
1094
        // `size_of_mutex` and `size_of_condvar` are known to be as expected due
1095
        // to the preceeding check in `shared_info_version`.
1096
        if (info->size_of_mutex != sizeof info->shared_controlmutex) {
74,049✔
1097
            if (retries_left) {
66✔
1098
                --retries_left;
60✔
1099
                continue;
60✔
1100
            }
60✔
1101
            throw IncompatibleLockFile(path, format("Architecture mismatch: Mutex size is %1 but should be %2.",
6✔
1102
                                                    info->size_of_mutex, sizeof(info->shared_controlmutex)));
6✔
1103
        }
66✔
1104

1105
        if (info->size_of_condvar != sizeof info->room_to_write) {
73,983✔
1106
            if (retries_left) {
66✔
1107
                --retries_left;
60✔
1108
                continue;
60✔
1109
            }
60✔
1110
            throw IncompatibleLockFile(
6✔
1111
                path, format("Architecture mismatch: Condition variable size is %1 but should be %2.",
6✔
1112
                             info->size_of_condvar, sizeof(info->room_to_write)));
6✔
1113
        }
66✔
1114
        m_writemutex.set_shared_part(info->shared_writemutex, lockfile_prefix, "write");
73,917✔
1115
        m_controlmutex.set_shared_part(info->shared_controlmutex, lockfile_prefix, "control");
73,917✔
1116
        m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, lockfile_prefix, "versions");
73,917✔
1117

1118
        // even though fields match wrt alignment and size, there may still be incompatibilities
1119
        // between implementations, so lets ask one of the mutexes if it thinks it'll work.
1120
        if (!m_controlmutex.is_valid()) {
73,917✔
1121
            throw IncompatibleLockFile(
×
1122
                path, "Control mutex is invalid. This suggests that incompatible pthread libraries are in use.");
×
1123
        }
×
1124

1125
        // OK! lock file appears valid. We can now continue operations under the protection
1126
        // of the controlmutex. The controlmutex protects the following activities:
1127
        // - attachment of the database file
1128
        // - start of the async daemon
1129
        // - stop of the async daemon
1130
        // - restore of a backup, if desired
1131
        // - backup of the realm file in preparation of file format upgrade
1132
        // - DB beginning/ending a session
1133
        // - Waiting for and signalling database changes
1134
        {
73,917✔
1135
            std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
73,917✔
1136
            auto version_manager = std::make_unique<FileVersionManager>(m_file, m_versionlist_mutex);
73,917✔
1137

1138
            // proceed to initialize versioning and other metadata information related to
1139
            // the database. Also create the database if we're beginning a new session
1140
            bool begin_new_session = (info->num_participants == 0);
73,917✔
1141
            SlabAlloc::Config cfg;
73,917✔
1142
            cfg.session_initiator = begin_new_session;
73,917✔
1143
            cfg.is_shared = true;
73,917✔
1144
            cfg.read_only = false;
73,917✔
1145
            cfg.skip_validate = !begin_new_session;
73,917✔
1146
            cfg.disable_sync = options.durability == Durability::MemOnly || options.durability == Durability::Unsafe;
73,917✔
1147
            cfg.clear_file_on_error = options.clear_on_invalid_file;
73,917✔
1148

1149
            // only the session initiator is allowed to create the database, all other
1150
            // must assume that it already exists.
1151
            cfg.no_create = (begin_new_session ? options.no_create : true);
73,917✔
1152

1153
            // if we're opening a MemOnly file that isn't already opened by
1154
            // someone else then it's a file which should have been deleted on
1155
            // close previously, but wasn't (perhaps due to the process crashing)
1156
            cfg.clear_file = (options.durability == Durability::MemOnly && begin_new_session);
73,917✔
1157

1158
            cfg.encryption_key = options.encryption_key;
73,917✔
1159
            m_marker_observer = std::make_unique<EncryptionMarkerObserver>(*version_manager);
73,917✔
1160
            try {
73,917✔
1161
                top_ref = alloc.attach_file(path, cfg, m_marker_observer.get()); // Throws
73,917✔
1162
            }
73,917✔
1163
            catch (const SlabAlloc::Retry&) {
73,917✔
1164
                // On a SlabAlloc::Retry file mappings are already unmapped, no
1165
                // need to do more
1166
                continue;
×
1167
            }
×
1168

1169
            // Determine target file format version for session (upgrade
1170
            // required if greater than file format version of attached file).
1171
            current_file_format_version = alloc.get_committed_file_format_version();
73,821✔
1172
            target_file_format_version =
73,821✔
1173
                Group::get_target_file_format_version_for_session(current_file_format_version, openers_hist_type);
73,821✔
1174
            BackupHandler backup(path, options.accepted_versions, options.to_be_deleted);
73,821✔
1175
            if (backup.must_restore_from_backup(current_file_format_version)) {
73,821✔
1176
                // we need to unmap before any file ops that'll change the realm
1177
                // file:
1178
                // (only strictly needed for Windows)
1179
                alloc.detach();
12✔
1180
                backup.restore_from_backup();
12✔
1181
                // finally, retry with the restored file instead of the original
1182
                // one:
1183
                continue;
12✔
1184
            }
12✔
1185
            backup.cleanup_backups();
73,809✔
1186

1187
            // From here on, if we fail in any way, we must detach the
1188
            // allocator.
1189
            SlabAlloc::DetachGuard alloc_detach_guard(alloc);
73,809✔
1190

1191
            // Check validity of top array (to give more meaningful errors
1192
            // early)
1193
            if (top_ref) {
73,809✔
1194
                try {
29,328✔
1195
                    Array top{alloc};
29,328✔
1196
                    top.init_from_ref(top_ref);
29,328✔
1197
                    Group::validate_top_array(top, alloc);
29,328✔
1198
                }
29,328✔
1199
                catch (const InvalidDatabase& e) {
29,328✔
1200
                    if (e.get_path().empty()) {
×
1201
                        throw InvalidDatabase(e.what(), path);
×
1202
                    }
×
1203
                    throw;
×
1204
                }
×
1205
            }
29,328✔
1206
            if (options.backup_at_file_format_change) {
73,809✔
1207
                backup.backup_realm_if_needed(current_file_format_version, target_file_format_version);
73,800✔
1208
            }
73,800✔
1209
            using gf = _impl::GroupFriend;
73,809✔
1210
            bool file_format_ok;
73,809✔
1211
            // In shared mode (Realm file opened via a DB instance) this
1212
            // version of the core library is able to open Realms using file format
1213
            // versions listed below. Please see Group::get_file_format_version() for
1214
            // information about the individual file format versions.
1215
            if (current_file_format_version == 0) {
73,809✔
1216
                file_format_ok = (top_ref == 0);
44,475✔
1217
            }
44,475✔
1218
            else {
29,334✔
1219
                file_format_ok = backup.is_accepted_file_format(current_file_format_version);
29,334✔
1220
            }
29,334✔
1221

1222
            if (REALM_UNLIKELY(!file_format_ok)) {
73,809✔
1223
                throw UnsupportedFileFormatVersion(current_file_format_version);
12✔
1224
            }
12✔
1225

1226
            if (begin_new_session) {
73,797✔
1227
                // Determine version (snapshot number) and check history
1228
                // compatibility
1229
                version_type version = 0;
48,417✔
1230
                int stored_hist_type = 0;
48,417✔
1231
                gf::get_version_and_history_info(alloc, top_ref, version, stored_hist_type,
48,417✔
1232
                                                 stored_hist_schema_version);
48,417✔
1233
                bool good_history_type = false;
48,417✔
1234
                switch (openers_hist_type) {
48,417✔
1235
                    case Replication::hist_None:
6,834✔
1236
                        good_history_type = (stored_hist_type == Replication::hist_None);
6,834✔
1237
                        if (!good_history_type)
6,834✔
1238
                            throw IncompatibleHistories(
6✔
1239
                                util::format("Realm file at path '%1' has history type '%2', but is being opened "
6✔
1240
                                             "with replication disabled.",
6✔
1241
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1242
                                path);
6✔
1243
                        break;
6,828✔
1244
                    case Replication::hist_OutOfRealm:
6,828✔
1245
                        REALM_ASSERT(false); // No longer in use
×
1246
                        break;
×
1247
                    case Replication::hist_InRealm:
9,258✔
1248
                        good_history_type = (stored_hist_type == Replication::hist_InRealm ||
9,258✔
1249
                                             stored_hist_type == Replication::hist_None);
9,258✔
1250
                        if (!good_history_type)
9,258✔
1251
                            throw IncompatibleHistories(
6✔
1252
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1253
                                             "local history mode.",
6✔
1254
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1255
                                path);
6✔
1256
                        break;
9,252✔
1257
                    case Replication::hist_SyncClient:
30,549✔
1258
                        good_history_type = ((stored_hist_type == Replication::hist_SyncClient) || (top_ref == 0));
30,549✔
1259
                        if (!good_history_type)
30,549✔
1260
                            throw IncompatibleHistories(
6✔
1261
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1262
                                             "synchronized history mode.",
6✔
1263
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1264
                                path);
6✔
1265
                        break;
30,543✔
1266
                    case Replication::hist_SyncServer:
30,543✔
1267
                        good_history_type = ((stored_hist_type == Replication::hist_SyncServer) || (top_ref == 0));
1,776✔
1268
                        if (!good_history_type)
1,776✔
1269
                            throw IncompatibleHistories(
×
1270
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
×
1271
                                             "server history mode.",
×
1272
                                             path, Replication::history_type_name(stored_hist_type)),
×
1273
                                path);
×
1274
                        break;
1,776✔
1275
                }
48,417✔
1276

1277
                REALM_ASSERT(stored_hist_schema_version >= 0);
48,399✔
1278
                if (stored_hist_schema_version > openers_hist_schema_version)
48,399✔
1279
                    throw IncompatibleHistories(
×
1280
                        util::format("Unexpected future history schema version %1, current schema %2",
×
1281
                                     stored_hist_schema_version, openers_hist_schema_version),
×
1282
                        path);
×
1283
                bool need_hist_schema_upgrade =
48,399✔
1284
                    (stored_hist_schema_version < openers_hist_schema_version && top_ref != 0);
48,399✔
1285
                if (need_hist_schema_upgrade) {
48,399✔
1286
                    Replication* repl = get_replication();
258✔
1287
                    if (!repl->is_upgradable_history_schema(stored_hist_schema_version))
258✔
1288
                        throw IncompatibleHistories(util::format("Nonupgradable history schema %1, current schema %2",
×
1289
                                                                 stored_hist_schema_version,
×
1290
                                                                 openers_hist_schema_version),
×
1291
                                                    path);
×
1292
                }
258✔
1293

1294
                bool need_file_format_upgrade =
48,399✔
1295
                    current_file_format_version < target_file_format_version && top_ref != 0;
48,399✔
1296
                if (!options.allow_file_format_upgrade && (need_hist_schema_upgrade || need_file_format_upgrade)) {
48,399✔
1297
                    throw FileFormatUpgradeRequired(m_db_path);
6✔
1298
                }
6✔
1299

1300
                alloc.convert_from_streaming_form(top_ref);
48,393✔
1301
                try {
48,393✔
1302
                    bool file_changed_size = alloc.align_filesize_for_mmap(top_ref, cfg);
48,393✔
1303
                    if (file_changed_size) {
48,393✔
1304
                        // we need to re-establish proper mappings after file size change.
1305
                        // we do this simply by aborting and starting all over:
1306
                        continue;
423✔
1307
                    }
423✔
1308
                }
48,393✔
1309
                // something went wrong. Retry.
1310
                catch (SlabAlloc::Retry&) {
48,393✔
1311
                    continue;
×
1312
                }
×
1313
                if (options.encryption_key) {
47,970✔
1314
#ifdef _WIN32
1315
                    uint64_t pid = GetCurrentProcessId();
1316
#else
1317
                    static_assert(sizeof(pid_t) <= sizeof(uint64_t), "process identifiers too large");
228✔
1318
                    uint64_t pid = getpid();
228✔
1319
#endif
228✔
1320
                    info->session_initiator_pid = pid;
228✔
1321
                }
228✔
1322

1323
                info->file_format_version = uint_fast8_t(target_file_format_version);
47,970✔
1324

1325
                // Initially there is a single version in the file
1326
                info->number_of_versions = 1;
47,970✔
1327

1328
                info->latest_version_number = version;
47,970✔
1329
                alloc.init_mapping_management(version);
47,970✔
1330

1331
                size_t file_size = 24;
47,970✔
1332
                if (top_ref) {
47,970✔
1333
                    Array top(alloc);
7,368✔
1334
                    top.init_from_ref(top_ref);
7,368✔
1335
                    file_size = Group::get_logical_file_size(top);
7,368✔
1336
                }
7,368✔
1337
                version_manager->init_versioning(top_ref, file_size, version);
47,970✔
1338
            }
47,970✔
1339
            else { // Not the session initiator
25,380✔
1340
                // Durability setting must be consistent across a session. An
1341
                // inconsistency is a logic error, as the user is required to
1342
                // make sure that all possible concurrent session participants
1343
                // use the same durability setting for the same Realm file.
1344
                if (Durability(info->durability) != options.durability)
25,380✔
1345
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "Durability not consistent");
6✔
1346

1347
                // History type must be consistent across a session. An
1348
                // inconsistency is a logic error, as the user is required to
1349
                // make sure that all possible concurrent session participants
1350
                // use the same history type for the same Realm file.
1351
                if (info->history_type != openers_hist_type)
25,374✔
1352
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History type not consistent");
6✔
1353

1354
                // History schema version must be consistent across a
1355
                // session. An inconsistency is a logic error, as the user is
1356
                // required to make sure that all possible concurrent session
1357
                // participants use the same history schema version for the same
1358
                // Realm file.
1359
                if (info->history_schema_version != openers_hist_schema_version)
25,368✔
1360
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History schema version not consistent");
×
1361

1362
                // We need per session agreement among all participants on the
1363
                // target Realm file format. From a technical perspective, the
1364
                // best way to ensure that, would be to require a bumping of the
1365
                // SharedInfo file format version on any change that could lead
1366
                // to a different result from
1367
                // get_target_file_format_for_session() given the same current
1368
                // Realm file format version and the same history type, as that
1369
                // would prevent the outcome of the Realm opening process from
1370
                // depending on race conditions. However, for practical reasons,
1371
                // we shall instead simply check that there is agreement, and
1372
                // throw the same kind of exception, as would have been thrown
1373
                // with a bumped SharedInfo file format version, if there isn't.
1374
                if (info->file_format_version != target_file_format_version) {
25,368✔
1375
                    throw IncompatibleLockFile(path,
×
1376
                                               format("Version mismatch: File format version is %1 but should be %2.",
×
1377
                                                      info->file_format_version, target_file_format_version));
×
1378
                }
×
1379

1380
                // Even though this session participant is not the session initiator,
1381
                // it may be the one that has to perform the history schema upgrade.
1382
                // See upgrade_file_format(). However we cannot get the actual value
1383
                // at this point as the allocator is not synchronized with the file.
1384
                // The value will be read in a ReadTransaction later.
1385

1386
                // We need to setup the allocators version information, as it is needed
1387
                // to correctly age and later reclaim memory mappings.
1388
                version_type version = info->latest_version_number;
25,368✔
1389
                alloc.init_mapping_management(version);
25,368✔
1390
            }
25,368✔
1391

1392
            m_new_commit_available.set_shared_part(info->new_commit_available, lockfile_prefix, "new_commit",
73,338✔
1393
                                                   options.temp_dir);
73,338✔
1394
            m_pick_next_writer.set_shared_part(info->pick_next_writer, lockfile_prefix, "pick_writer",
73,338✔
1395
                                               options.temp_dir);
73,338✔
1396

1397
            // make our presence noted:
1398
            ++info->num_participants;
73,338✔
1399
            m_info = info;
73,338✔
1400

1401
            // Keep the mappings and file open:
1402
            m_version_manager = std::move(version_manager);
73,338✔
1403
            alloc_detach_guard.release();
73,338✔
1404
            fug_1.release(); // Do not unmap
73,338✔
1405
            fcg.release();   // Do not close
73,338✔
1406
        }
73,338✔
1407
        ulg.release(); // Do not release shared lock
×
1408
        break;
73,338✔
1409
    }
73,797✔
1410

1411
    if (m_logger) {
73,332✔
1412
        m_logger->log(util::Logger::Level::debug, "   Number of participants: %1", m_info->num_participants);
57,462✔
1413
        m_logger->log(util::Logger::Level::debug, "   Durability: %1", [&] {
57,462✔
1414
            switch (options.durability) {
57,462✔
1415
                case DBOptions::Durability::Full:
34,848✔
1416
                    return "Full";
34,848✔
1417
                case DBOptions::Durability::MemOnly:
22,614✔
1418
                    return "MemOnly";
22,614✔
1419
                case realm::DBOptions::Durability::Unsafe:
✔
1420
                    return "Unsafe";
×
1421
            }
57,462✔
1422
            return "";
×
1423
        }());
57,462✔
1424
        m_logger->log(util::Logger::Level::debug, "   EncryptionKey: %1", options.encryption_key ? "yes" : "no");
57,462✔
1425
        if (m_logger->would_log(util::Logger::Level::debug)) {
57,462✔
1426
            if (top_ref) {
48,036✔
1427
                Array top(alloc);
24,186✔
1428
                top.init_from_ref(top_ref);
24,186✔
1429
                auto file_size = Group::get_logical_file_size(top);
24,186✔
1430
                auto history_size = Group::get_history_size(top);
24,186✔
1431
                auto freee_space_size = Group::get_free_space_size(top);
24,186✔
1432
                m_logger->log(util::Logger::Level::debug, "   File size: %1", file_size);
24,186✔
1433
                m_logger->log(util::Logger::Level::debug, "   User data size: %1",
24,186✔
1434
                              file_size - (freee_space_size + history_size));
24,186✔
1435
                m_logger->log(util::Logger::Level::debug, "   Free space size: %1", freee_space_size);
24,186✔
1436
                m_logger->log(util::Logger::Level::debug, "   History size: %1", history_size);
24,186✔
1437
            }
24,186✔
1438
            else {
23,850✔
1439
                m_logger->log(util::Logger::Level::debug, "   Empty file");
23,850✔
1440
            }
23,850✔
1441
        }
48,036✔
1442
    }
57,462✔
1443

1444
    // Upgrade file format and/or history schema
1445
    try {
73,332✔
1446
        if (stored_hist_schema_version == -1) {
73,332✔
1447
            // current_hist_schema_version has not been read. Read it now
1448
            stored_hist_schema_version = start_read()->get_history_schema_version();
25,368✔
1449
        }
25,368✔
1450
        if (current_file_format_version == 0) {
73,332✔
1451
            // If the current file format is still undecided, no upgrade is
1452
            // necessary, but we still need to make the chosen file format
1453
            // visible to the rest of the core library by updating the value
1454
            // that will be subsequently returned by
1455
            // Group::get_file_format_version(). For this to work, all session
1456
            // participants must adopt the chosen target Realm file format when
1457
            // the stored file format version is zero regardless of the version
1458
            // of the core library used.
1459
            m_file_format_version = target_file_format_version;
44,463✔
1460
        }
44,463✔
1461
        else {
28,869✔
1462
            m_file_format_version = current_file_format_version;
28,869✔
1463
            upgrade_file_format(options.allow_file_format_upgrade, target_file_format_version,
28,869✔
1464
                                stored_hist_schema_version, openers_hist_schema_version); // Throws
28,869✔
1465
        }
28,869✔
1466
    }
73,332✔
1467
    catch (...) {
73,332✔
1468
        close();
6✔
1469
        throw;
6✔
1470
    }
6✔
1471
    m_alloc.set_read_only(true);
73,332✔
1472
}
73,332✔
1473

1474
void DB::open(BinaryData buffer, bool take_ownership)
1475
{
6✔
1476
    auto top_ref = m_alloc.attach_buffer(buffer.data(), buffer.size());
6✔
1477
    m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, buffer.size());
6✔
1478
    if (take_ownership)
6✔
1479
        m_alloc.own_buffer();
×
1480
}
6✔
1481

1482
void DB::open(Replication& repl, const std::string& file, const DBOptions& options)
1483
{
46,386✔
1484
    // Exception safety: Since open() is called from constructors, if it throws,
1485
    // it must leave the file closed.
1486

1487
    REALM_ASSERT(!is_attached());
46,386✔
1488

1489
    repl.initialize(*this); // Throws
46,386✔
1490

1491
    set_replication(&repl);
46,386✔
1492

1493
    open(file, options); // Throws
46,386✔
1494
}
46,386✔
1495

1496
class DBLogger : public Logger {
1497
public:
1498
    DBLogger(const std::shared_ptr<Logger>& base_logger, unsigned hash) noexcept
1499
        : Logger(LogCategory::storage, *base_logger)
41,298✔
1500
        , m_hash(hash)
41,298✔
1501
        , m_base_logger_ptr(base_logger)
41,298✔
1502
    {
83,163✔
1503
    }
83,163✔
1504

1505
protected:
1506
    void do_log(const LogCategory& category, Level level, const std::string& message) final
1507
    {
1,547,334✔
1508
        std::ostringstream ostr;
1,547,334✔
1509
        auto id = std::this_thread::get_id();
1,547,334✔
1510
        ostr << "DB: " << m_hash << " Thread " << id << ": " << message;
1,547,334✔
1511
        Logger::do_log(*m_base_logger_ptr, category, level, ostr.str());
1,547,334✔
1512
    }
1,547,334✔
1513

1514
private:
1515
    unsigned m_hash;
1516
    std::shared_ptr<Logger> m_base_logger_ptr;
1517
};
1518

1519
void DB::set_logger(const std::shared_ptr<util::Logger>& logger) noexcept
1520
{
99,189✔
1521
    if (logger)
99,189✔
1522
        m_logger = std::make_shared<DBLogger>(logger, m_log_id);
83,157✔
1523
}
99,189✔
1524

1525
void DB::open(Replication& repl, const DBOptions& options)
1526
{
25,506✔
1527
    REALM_ASSERT(!is_attached());
25,506✔
1528
    repl.initialize(*this); // Throws
25,506✔
1529
    set_replication(&repl);
25,506✔
1530

1531
    m_alloc.init_in_memory_buffer();
25,506✔
1532

1533
    set_logger(options.logger);
25,506✔
1534
    m_replication->set_logger(m_logger.get());
25,506✔
1535
    if (m_logger)
25,506✔
1536
        m_logger->detail("Open memory-only realm");
25,494✔
1537

1538
    auto hist_type = repl.get_history_type();
25,506✔
1539
    m_in_memory_info =
25,506✔
1540
        std::make_unique<SharedInfo>(DBOptions::Durability::MemOnly, hist_type, repl.get_history_schema_version());
25,506✔
1541
    SharedInfo* info = m_in_memory_info.get();
25,506✔
1542
    m_writemutex.set_shared_part(info->shared_writemutex, "", "write");
25,506✔
1543
    m_controlmutex.set_shared_part(info->shared_controlmutex, "", "control");
25,506✔
1544
    m_new_commit_available.set_shared_part(info->new_commit_available, "", "new_commit", options.temp_dir);
25,506✔
1545
    m_pick_next_writer.set_shared_part(info->pick_next_writer, "", "pick_writer", options.temp_dir);
25,506✔
1546
    m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, "", "versions");
25,506✔
1547

1548
    auto target_file_format_version = uint_fast8_t(Group::get_target_file_format_version_for_session(0, hist_type));
25,506✔
1549
    info->file_format_version = target_file_format_version;
25,506✔
1550
    info->number_of_versions = 1;
25,506✔
1551
    info->latest_version_number = 1;
25,506✔
1552
    info->init_versioning(0, m_alloc.get_baseline(), 1);
25,506✔
1553
    ++info->num_participants;
25,506✔
1554

1555
    m_version_manager = std::make_unique<InMemoryVersionManager>(info, m_versionlist_mutex);
25,506✔
1556

1557
    m_file_format_version = target_file_format_version;
25,506✔
1558

1559
    m_info = info;
25,506✔
1560
    m_alloc.set_read_only(true);
25,506✔
1561
}
25,506✔
1562

1563
void DB::create_new_history(Replication& repl)
1564
{
36✔
1565
    Replication* old_repl = get_replication();
36✔
1566
    try {
36✔
1567
        repl.initialize(*this);
36✔
1568
        set_replication(&repl);
36✔
1569

1570
        auto tr = start_write();
36✔
1571
        tr->clear_history();
36✔
1572
        tr->replicate(tr.get(), repl);
36✔
1573
        tr->commit();
36✔
1574
    }
36✔
1575
    catch (...) {
36✔
1576
        set_replication(old_repl);
×
1577
        throw;
×
1578
    }
×
1579
}
36✔
1580

1581
void DB::create_new_history(std::unique_ptr<Replication> repl)
1582
{
36✔
1583
    create_new_history(*repl);
36✔
1584
    m_history = std::move(repl);
36✔
1585
}
36✔
1586

1587
// WARNING / FIXME: compact() should NOT be exposed publicly on Windows because it's not crash safe! It may
1588
// corrupt your database if something fails.
1589
// Tracked by https://github.com/realm/realm-core/issues/4111
1590

1591
// A note about lock ordering.
1592
// The local mutex, m_mutex, guards transaction start/stop and map/unmap of the lock file.
1593
// Except for compact(), open() and close(), it should only be held briefly.
1594
// The controlmutex guards operations which change the file size, session initialization
1595
// and session exit.
1596
// The writemutex guards the integrity of the (write) transaction data.
1597
// The controlmutex and writemutex resides in the .lock file and thus requires
1598
// the mapping of the .lock file to work. A straightforward approach would be to lock
1599
// the m_mutex whenever the other mutexes are taken or released...but that would be too
1600
// bad for performance of transaction start/stop.
1601
//
1602
// The locks are to be taken in this order: writemutex->controlmutex->m_mutex
1603
//
1604
// The .lock file is mapped during DB::create() and unmapped by a call to DB::close().
1605
// Once unmapped, it is never mapped again. Hence any observer with a valid DBRef may
1606
// only see the transition from mapped->unmapped, never the opposite.
1607
//
1608
// Trying to create a transaction if the .lock file is unmapped will result in an assert.
1609
// Unmapping (during close()) while transactions are live, is not considered an error. There
1610
// is a potential race between unmapping during close() and any operation carried out by a live
1611
// transaction. The user must ensure that this race never happens if she uses DB::close().
1612
bool DB::compact(bool bump_version_number, std::optional<const char*> output_encryption_key)
1613
    NO_THREAD_SAFETY_ANALYSIS // this would work except for a known limitation: "No alias analysis" where clang cannot
1614
                              // tell that tr->db->m_mutex is the same thing as m_mutex
1615
{
162✔
1616
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
162✔
1617
    std::string tmp_path = m_db_path + ".tmp_compaction_space";
162✔
1618

1619
    // To enter compact, the DB object must already have been attached to a file,
1620
    // since this happens in DB::create().
1621

1622
    // Verify that the lock file is still attached. There is no attempt to guard against
1623
    // a race between close() and compact().
1624
    if (is_attached() == false) {
162✔
1625
        throw Exception(ErrorCodes::IllegalOperation, m_db_path + ": compact must be done on an open/attached DB");
×
1626
    }
×
1627
    auto info = m_info;
162✔
1628
    Durability dura = Durability(info->durability);
162✔
1629
    std::unique_ptr<char[]> key_buffer;
162✔
1630
    const char* write_key = nullptr;
162✔
1631
    if (output_encryption_key) {
162✔
1632
        if (*output_encryption_key) {
18✔
1633
            write_key = *output_encryption_key;
12✔
1634
        }
12✔
1635
    }
18✔
1636
#if REALM_ENABLE_ENCRYPTION
144✔
1637
    else if (auto encryption = m_alloc.get_file().get_encryption()) {
144✔
1638
        key_buffer = std::make_unique<char[]>(64);
18✔
1639
        memcpy(key_buffer.get(), encryption->get_key(), 64);
18✔
1640
        write_key = key_buffer.get();
18✔
1641
    }
18✔
1642
#endif
162✔
1643
    {
162✔
1644
        std::unique_lock<InterprocessMutex> lock(m_controlmutex); // Throws
162✔
1645
        auto t1 = std::chrono::steady_clock::now();
162✔
1646

1647
        // We must be the ONLY DB object attached if we're to do compaction
1648
        if (info->num_participants > 1)
162✔
1649
            return false;
×
1650

1651
        // Holding the controlmutex prevents any other DB from attaching to the file.
1652

1653
        // Using start_read here ensures that we have access to the latest entry
1654
        // in the VersionList. We need to have access to that later to update top_ref and file_size.
1655
        // This is also needed to attach the group (get the proper top pointer, etc)
1656
        TransactionRef tr = start_read();
162✔
1657
        auto file_size_before = tr->get_logical_file_size();
162✔
1658

1659
        // local lock blocking any transaction from starting (and stopping)
1660
        CheckedLockGuard local_lock(m_mutex);
162✔
1661

1662
        // We should be the only transaction active - otherwise back out
1663
        if (m_transaction_count != 1)
162✔
1664
            return false;
6✔
1665

1666
        // group::write() will throw if the file already exists.
1667
        // To prevent this, we have to remove the file (should it exist)
1668
        // before calling group::write().
1669
        File::try_remove(tmp_path);
156✔
1670

1671
        // Compact by writing a new file holding only live data, then renaming the new file
1672
        // so it becomes the database file, replacing the old one in the process.
1673
        try {
156✔
1674
            File file;
156✔
1675
            file.open(tmp_path, File::access_ReadWrite, File::create_Must, 0);
156✔
1676
            int incr = bump_version_number ? 1 : 0;
156✔
1677
            Group::DefaultTableWriter writer;
156✔
1678
            tr->write(file, write_key, info->latest_version_number + incr, writer); // Throws
156✔
1679
            // Data needs to be flushed to the disk before renaming.
1680
            bool disable_sync = get_disable_sync_to_disk();
156✔
1681
            if (!disable_sync && dura != Durability::Unsafe)
156!
1682
                file.sync(); // Throws
×
1683
        }
156✔
1684
        catch (...) {
156✔
1685
            // If writing the compact version failed in any way, delete the partially written file to clean up disk
1686
            // space. This is so that we don't fail with 100% disk space used when compacting on a mostly full disk.
1687
            File::try_remove(tmp_path);
×
1688
            throw;
×
1689
        }
×
1690
        // if we've written a file with a bumped version number, we need to update the lock file to match.
1691
        if (bump_version_number) {
156✔
1692
            ++info->latest_version_number;
12✔
1693
        }
12✔
1694
        // We need to release any shared mapping *before* releasing the control mutex.
1695
        // When someone attaches to the new database file, they *must* *not* see and
1696
        // reuse any existing memory mapping of the stale file.
1697
        tr->close_read_with_lock();
156✔
1698
        m_alloc.detach();
156✔
1699

1700
        util::File::move(tmp_path, m_db_path);
156✔
1701

1702
        SlabAlloc::Config cfg;
156✔
1703
        cfg.session_initiator = true;
156✔
1704
        cfg.is_shared = true;
156✔
1705
        cfg.read_only = false;
156✔
1706
        cfg.skip_validate = false;
156✔
1707
        cfg.no_create = true;
156✔
1708
        cfg.clear_file = false;
156✔
1709
        cfg.encryption_key = write_key;
156✔
1710
        ref_type top_ref;
156✔
1711
        top_ref = m_alloc.attach_file(m_db_path, cfg, m_marker_observer.get());
156✔
1712
        m_alloc.convert_from_streaming_form(top_ref);
156✔
1713
        m_alloc.init_mapping_management(info->latest_version_number);
156✔
1714
        info->number_of_versions = 1;
156✔
1715
        size_t logical_file_size = sizeof(SlabAlloc::Header);
156✔
1716
        if (top_ref) {
156✔
1717
            Array top(m_alloc);
150✔
1718
            top.init_from_ref(top_ref);
150✔
1719
            logical_file_size = Group::get_logical_file_size(top);
150✔
1720
        }
150✔
1721
        m_version_manager->init_versioning(top_ref, logical_file_size, info->latest_version_number);
156✔
1722
        if (m_logger) {
156✔
1723
            auto t2 = std::chrono::steady_clock::now();
54✔
1724
            m_logger->log(util::Logger::Level::info, "DB compacted from: %1 to %2 in %3 us", file_size_before,
54✔
1725
                          logical_file_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
54✔
1726
        }
54✔
1727
    }
156✔
1728
    return true;
×
1729
}
156✔
1730

1731
void DB::write_copy(std::string_view path, const char* output_encryption_key)
1732
{
246✔
1733
    auto tr = start_read();
246✔
1734
    if (auto hist = tr->get_history()) {
246✔
1735
        if (!hist->no_pending_local_changes(tr->get_version())) {
246✔
1736
            throw Exception(ErrorCodes::IllegalOperation,
6✔
1737
                            "All client changes must be integrated in server before writing copy");
6✔
1738
        }
6✔
1739
    }
246✔
1740

1741
    class NoClientFileIdWriter : public Group::DefaultTableWriter {
240✔
1742
    public:
240✔
1743
        NoClientFileIdWriter()
240✔
1744
            : Group::DefaultTableWriter(true)
240✔
1745
        {
240✔
1746
        }
240✔
1747
        HistoryInfo write_history(_impl::OutputStream& out) override
240✔
1748
        {
240✔
1749
            auto hist = Group::DefaultTableWriter::write_history(out);
234✔
1750
            hist.sync_file_id = 0;
234✔
1751
            return hist;
234✔
1752
        }
234✔
1753
    } writer;
240✔
1754

1755
    File file;
240✔
1756
    file.open(path, File::access_ReadWrite, File::create_Must, 0);
240✔
1757
    file.resize(0);
240✔
1758

1759
    auto t1 = std::chrono::steady_clock::now();
240✔
1760
    tr->write(file, output_encryption_key, m_info->latest_version_number, writer);
240✔
1761
    if (m_logger) {
240✔
1762
        auto t2 = std::chrono::steady_clock::now();
60✔
1763
        m_logger->log(util::Logger::Level::info, "DB written to '%1' in %2 us", path,
60✔
1764
                      std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1765
    }
60✔
1766
}
240✔
1767

1768
uint_fast64_t DB::get_number_of_versions()
1769
{
292,599✔
1770
    if (m_fake_read_lock_if_immutable)
292,599✔
1771
        return 1;
6✔
1772
    return m_info->number_of_versions;
292,593✔
1773
}
292,599✔
1774

1775
size_t DB::get_allocated_size() const
1776
{
6✔
1777
    return m_alloc.get_allocated_size();
6✔
1778
}
6✔
1779

1780
void DB::release_all_read_locks() noexcept
1781
{
98,841✔
1782
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
98,841✔
1783
    CheckedLockGuard local_lock(m_mutex); // mx on m_local_locks_held
98,841✔
1784
    for (auto& read_lock : m_local_locks_held) {
98,841✔
1785
        --m_transaction_count;
6✔
1786
        m_version_manager->release_read_lock(read_lock);
6✔
1787
    }
6✔
1788
    m_local_locks_held.clear();
98,841✔
1789
    REALM_ASSERT(m_transaction_count == 0);
98,841✔
1790
}
98,841✔
1791

1792
class DB::AsyncCommitHelper {
1793
public:
1794
    AsyncCommitHelper(DB* db)
1795
        : m_db(db)
29,223✔
1796
    {
59,022✔
1797
    }
59,022✔
1798
    ~AsyncCommitHelper()
1799
    {
59,022✔
1800
        {
59,022✔
1801
            std::unique_lock lg(m_mutex);
59,022✔
1802
            if (!m_running) {
59,022✔
1803
                return;
30,357✔
1804
            }
30,357✔
1805
            m_running = false;
28,665✔
1806
            m_cv_worker.notify_one();
28,665✔
1807
        }
28,665✔
1808
        m_thread.join();
×
1809
    }
28,665✔
1810

1811
    void begin_write(util::UniqueFunction<void()> fn)
1812
    {
1,614✔
1813
        std::unique_lock lg(m_mutex);
1,614✔
1814
        start_thread();
1,614✔
1815
        m_pending_writes.emplace_back(std::move(fn));
1,614✔
1816
        m_cv_worker.notify_one();
1,614✔
1817
    }
1,614✔
1818

1819
    void blocking_begin_write()
1820
    {
211,140✔
1821
        std::unique_lock lg(m_mutex);
211,140✔
1822

1823
        // If we support unlocking InterprocessMutex from a different thread
1824
        // than it was locked on, we can sometimes just begin the write on
1825
        // the current thread. This requires that no one is currently waiting
1826
        // for the worker thread to acquire the write lock, as we'll deadlock
1827
        // if we try to async commit while the worker is waiting for the lock.
1828
        bool can_lock_on_caller =
211,140✔
1829
            !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
211,140✔
1830
                                                       m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
107,148✔
1831

1832
        // If we support cross-thread unlocking and m_running is false,
1833
        // can_lock_on_caller should always be true or we forgot to launch the thread
1834
        REALM_ASSERT(can_lock_on_caller || m_running || InterprocessMutex::is_thread_confined);
211,140✔
1835

1836
        // If possible, just begin the write on the current thread
1837
        if (can_lock_on_caller) {
211,140✔
1838
            m_waiting_for_write_mutex = true;
107,088✔
1839
            lg.unlock();
107,088✔
1840
            m_db->do_begin_write();
107,088✔
1841
            lg.lock();
107,088✔
1842
            m_waiting_for_write_mutex = false;
107,088✔
1843
            m_has_write_mutex = true;
107,088✔
1844
            m_owns_write_mutex = false;
107,088✔
1845
            return;
107,088✔
1846
        }
107,088✔
1847

1848
        // Otherwise we have to ask the worker thread to acquire it and wait
1849
        // for that
1850
        start_thread();
104,052✔
1851
        size_t ticket = ++m_write_lock_claim_ticket;
104,052✔
1852
        m_cv_worker.notify_one();
104,052✔
1853
        m_cv_callers.wait(lg, [this, ticket] {
208,332✔
1854
            return ticket == m_write_lock_claim_fulfilled;
208,332✔
1855
        });
208,332✔
1856
    }
104,052✔
1857

1858
    void end_write()
1859
    {
54✔
1860
        std::unique_lock lg(m_mutex);
54✔
1861
        REALM_ASSERT(m_has_write_mutex);
54✔
1862
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
54✔
1863

1864
        // If we acquired the write lock on the worker thread, also release it
1865
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
1866
        if (m_owns_write_mutex) {
54✔
1867
            m_pending_mx_release = true;
51✔
1868
            m_cv_worker.notify_one();
51✔
1869
        }
51✔
1870
        else {
3✔
1871
            m_db->do_end_write();
3✔
1872
            m_has_write_mutex = false;
3✔
1873
        }
3✔
1874
    }
54✔
1875

1876
    bool blocking_end_write()
1877
    {
259,827✔
1878
        std::unique_lock lg(m_mutex);
259,827✔
1879
        if (!m_has_write_mutex) {
259,827✔
1880
            return false;
48,489✔
1881
        }
48,489✔
1882
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
211,338✔
1883

1884
        // If we acquired the write lock on the worker thread, also release it
1885
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
1886
        if (m_owns_write_mutex) {
211,338✔
1887
            m_pending_mx_release = true;
104,574✔
1888
            m_cv_worker.notify_one();
104,574✔
1889
            m_cv_callers.wait(lg, [this] {
209,148✔
1890
                return !m_pending_mx_release;
209,148✔
1891
            });
209,148✔
1892
        }
104,574✔
1893
        else {
106,764✔
1894
            m_db->do_end_write();
106,764✔
1895
            m_has_write_mutex = false;
106,764✔
1896

1897
            // The worker thread may have ignored a request for the write mutex
1898
            // while we were acquiring it, so we need to wake up the thread
1899
            if (has_pending_write_requests()) {
106,764✔
1900
                lg.unlock();
×
1901
                m_cv_worker.notify_one();
×
1902
            }
×
1903
        }
106,764✔
1904
        return true;
211,338✔
1905
    }
259,827✔
1906

1907

1908
    void sync_to_disk(util::UniqueFunction<void()> fn)
1909
    {
1,362✔
1910
        REALM_ASSERT(fn);
1,362✔
1911
        std::unique_lock lg(m_mutex);
1,362✔
1912
        REALM_ASSERT(!m_pending_sync);
1,362✔
1913
        start_thread();
1,362✔
1914
        m_pending_sync = std::move(fn);
1,362✔
1915
        m_cv_worker.notify_one();
1,362✔
1916
    }
1,362✔
1917

1918
private:
1919
    DB* m_db;
1920
    std::thread m_thread;
1921
    std::mutex m_mutex;
1922
    std::condition_variable m_cv_worker;
1923
    std::condition_variable m_cv_callers;
1924
    std::deque<util::UniqueFunction<void()>> m_pending_writes;
1925
    util::UniqueFunction<void()> m_pending_sync;
1926
    size_t m_write_lock_claim_ticket = 0;
1927
    size_t m_write_lock_claim_fulfilled = 0;
1928
    bool m_pending_mx_release = false;
1929
    bool m_running = false;
1930
    bool m_has_write_mutex = false;
1931
    bool m_owns_write_mutex = false;
1932
    bool m_waiting_for_write_mutex = false;
1933

1934
    void main();
1935

1936
    void start_thread()
1937
    {
107,028✔
1938
        if (m_running) {
107,028✔
1939
            return;
78,363✔
1940
        }
78,363✔
1941
        m_running = true;
28,665✔
1942
        m_thread = std::thread([this]() {
28,665✔
1943
            main();
28,665✔
1944
        });
28,665✔
1945
    }
28,665✔
1946

1947
    bool has_pending_write_requests()
1948
    {
310,761✔
1949
        return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
310,761✔
1950
    }
310,761✔
1951
};
1952

1953
DB::~DB() noexcept
1954
{
99,210✔
1955
    close();
99,210✔
1956
}
99,210✔
1957

1958
// Note: close() and close_internal() may be called from the DB::~DB().
1959
// in that case, they will not throw. Throwing can only happen if called
1960
// directly.
1961
void DB::close(bool allow_open_read_transactions)
1962
{
100,230✔
1963
    // make helper thread(s) terminate
1964
    m_commit_helper.reset();
100,230✔
1965

1966
    if (m_fake_read_lock_if_immutable) {
100,230✔
1967
        if (!is_attached())
192✔
1968
            return;
×
1969
        {
192✔
1970
            CheckedLockGuard local_lock(m_mutex);
192✔
1971
            if (!allow_open_read_transactions && m_transaction_count)
192✔
1972
                throw WrongTransactionState("Closing with open read transactions");
×
1973
        }
192✔
1974
        if (m_alloc.is_attached())
192✔
1975
            m_alloc.detach();
192✔
1976
        m_fake_read_lock_if_immutable.reset();
192✔
1977
    }
192✔
1978
    else {
100,038✔
1979
        close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
100,038✔
1980
                       allow_open_read_transactions);
100,038✔
1981
    }
100,038✔
1982
}
100,230✔
1983

1984
void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
1985
{
100,035✔
1986
    if (!is_attached())
100,035✔
1987
        return;
1,182✔
1988

1989
    {
98,853✔
1990
        CheckedLockGuard local_lock(m_mutex);
98,853✔
1991
        if (m_write_transaction_open)
98,853✔
1992
            throw WrongTransactionState("Closing with open write transactions");
6✔
1993
        if (!allow_open_read_transactions && m_transaction_count)
98,847✔
1994
            throw WrongTransactionState("Closing with open read transactions");
6✔
1995
    }
98,847✔
1996
    SharedInfo* info = m_info;
98,841✔
1997
    {
98,841✔
1998
        if (!lock.owns_lock())
98,841✔
1999
            lock.lock();
98,844✔
2000

2001
        if (m_alloc.is_attached())
98,841✔
2002
            m_alloc.detach();
98,844✔
2003

2004
        if (m_is_sync_agent) {
98,841✔
2005
            REALM_ASSERT(info->sync_agent_present);
3,606✔
2006
            info->sync_agent_present = 0; // Set to false
3,606✔
2007
        }
3,606✔
2008
        release_all_read_locks();
98,841✔
2009
        --info->num_participants;
98,841✔
2010
        bool end_of_session = info->num_participants == 0;
98,841✔
2011
        // std::cerr << "closing" << std::endl;
2012
        if (end_of_session) {
98,841✔
2013

2014
            // If the db file is just backing for a transient data structure,
2015
            // we can delete it when done.
2016
            if (Durability(info->durability) == Durability::MemOnly && !m_in_memory_info) {
73,458✔
2017
                try {
22,698✔
2018
                    util::File::remove(m_db_path.c_str());
22,698✔
2019
                }
22,698✔
2020
                catch (...) {
22,698✔
2021
                } // ignored on purpose.
12✔
2022
            }
22,698✔
2023
        }
73,458✔
2024
        lock.unlock();
98,841✔
2025
    }
98,841✔
2026
    {
98,841✔
2027
        CheckedLockGuard local_lock(m_mutex);
98,841✔
2028

2029
        m_new_commit_available.close();
98,841✔
2030
        m_pick_next_writer.close();
98,841✔
2031

2032
        if (m_in_memory_info) {
98,841✔
2033
            m_in_memory_info.reset();
25,506✔
2034
        }
25,506✔
2035
        else {
73,335✔
2036
            // On Windows it is important that we unmap before unlocking, else a SetEndOfFile() call from another
2037
            // thread may interleave which is not permitted on Windows. It is permitted on *nix.
2038
            m_file_map.unmap();
73,335✔
2039
            m_version_manager.reset();
73,335✔
2040
            m_file.rw_unlock();
73,335✔
2041
            // info->~SharedInfo(); // DO NOT Call destructor
2042
            m_file.close();
73,335✔
2043
        }
73,335✔
2044
        m_info = nullptr;
98,841✔
2045
        if (m_logger)
98,841✔
2046
            m_logger->log(util::Logger::Level::detail, "DB closed");
82,929✔
2047
    }
98,841✔
2048
}
98,841✔
2049

2050
bool DB::other_writers_waiting_for_lock() const
2051
{
63,855✔
2052
    SharedInfo* info = m_info;
63,855✔
2053

2054
    uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
63,855✔
2055
    uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
63,855✔
2056
    // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
2057
    // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
2058
    return next_ticket > next_served + 1;
63,855✔
2059
}
63,855✔
2060

2061
void DB::AsyncCommitHelper::main()
2062
{
28,665✔
2063
    std::unique_lock lg(m_mutex);
28,665✔
2064
    while (m_running) {
451,872✔
2065
#if 0 // Enable for testing purposes
2066
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
2067
#endif
2068
        if (m_has_write_mutex) {
423,207✔
2069
            if (auto cb = std::move(m_pending_sync)) {
219,204✔
2070
                // Only one of sync_to_disk(), end_write(), or blocking_end_write()
2071
                // should be called, so we should never have both a pending sync
2072
                // and pending release.
2073
                REALM_ASSERT(!m_pending_mx_release);
1,362✔
2074
                lg.unlock();
1,362✔
2075
                cb();
1,362✔
2076
                cb = nullptr; // Release things captured by the callback before reacquiring the lock
1,362✔
2077
                lg.lock();
1,362✔
2078
                m_pending_mx_release = true;
1,362✔
2079
            }
1,362✔
2080
            if (m_pending_mx_release) {
219,204✔
2081
                REALM_ASSERT(!InterprocessMutex::is_thread_confined || m_owns_write_mutex);
105,987✔
2082
                m_db->do_end_write();
105,987✔
2083
                m_pending_mx_release = false;
105,987✔
2084
                m_has_write_mutex = false;
105,987✔
2085
                m_owns_write_mutex = false;
105,987✔
2086

2087
                lg.unlock();
105,987✔
2088
                m_cv_callers.notify_all();
105,987✔
2089
                lg.lock();
105,987✔
2090
                continue;
105,987✔
2091
            }
105,987✔
2092
        }
219,204✔
2093
        else {
204,003✔
2094
            REALM_ASSERT(!m_pending_sync && !m_pending_mx_release);
204,003✔
2095

2096
            // Acquire the write lock if anyone has requested it, but only if
2097
            // another thread is not already waiting for it. If there's another
2098
            // thread requesting and they get it while we're waiting, we'll
2099
            // deadlock if they ask us to perform the sync.
2100
            if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
204,003✔
2101
                lg.unlock();
105,666✔
2102
                m_db->do_begin_write();
105,666✔
2103
                lg.lock();
105,666✔
2104

2105
                REALM_ASSERT(!m_has_write_mutex);
105,666✔
2106
                m_has_write_mutex = true;
105,666✔
2107
                m_owns_write_mutex = true;
105,666✔
2108

2109
                // Synchronous transaction requests get priority over async
2110
                if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
105,666✔
2111
                    ++m_write_lock_claim_fulfilled;
104,052✔
2112
                    m_cv_callers.notify_all();
104,052✔
2113
                    continue;
104,052✔
2114
                }
104,052✔
2115

2116
                REALM_ASSERT(!m_pending_writes.empty());
1,614✔
2117
                auto callback = std::move(m_pending_writes.front());
1,614✔
2118
                m_pending_writes.pop_front();
1,614✔
2119
                lg.unlock();
1,614✔
2120
                callback();
1,614✔
2121
                // Release things captured by the callback before reacquiring the lock
2122
                callback = nullptr;
1,614✔
2123
                lg.lock();
1,614✔
2124
                continue;
1,614✔
2125
            }
105,666✔
2126
        }
204,003✔
2127
        m_cv_worker.wait(lg);
211,554✔
2128
    }
211,554✔
2129
    if (m_has_write_mutex && m_owns_write_mutex) {
28,665!
2130
        m_db->do_end_write();
×
2131
    }
×
2132
}
28,665✔
2133

2134
void DB::async_begin_write(util::UniqueFunction<void()> fn)
2135
{
1,614✔
2136
    REALM_ASSERT(m_commit_helper);
1,614✔
2137
    m_commit_helper->begin_write(std::move(fn));
1,614✔
2138
}
1,614✔
2139

2140
void DB::async_end_write()
2141
{
54✔
2142
    REALM_ASSERT(m_commit_helper);
54✔
2143
    m_commit_helper->end_write();
54✔
2144
}
54✔
2145

2146
void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
2147
{
1,362✔
2148
    REALM_ASSERT(m_commit_helper);
1,362✔
2149
    m_commit_helper->sync_to_disk(std::move(fn));
1,362✔
2150
}
1,362✔
2151

2152
bool DB::has_changed(TransactionRef& tr)
2153
{
15,276,297✔
2154
    if (m_fake_read_lock_if_immutable)
15,276,297✔
2155
        return false; // immutable doesn't change
×
2156
    bool changed = tr->m_read_lock.m_version != get_version_of_latest_snapshot();
15,276,297✔
2157
    return changed;
15,276,297✔
2158
}
15,276,297✔
2159

2160
bool DB::wait_for_change(TransactionRef& tr)
2161
{
×
2162
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2163
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2164
    while (tr->m_read_lock.m_version == m_info->latest_version_number && m_wait_for_change_enabled) {
×
2165
        m_new_commit_available.wait(m_controlmutex, 0);
×
2166
    }
×
2167
    return tr->m_read_lock.m_version != m_info->latest_version_number;
×
2168
}
×
2169

2170

2171
void DB::wait_for_change_release()
2172
{
×
2173
    if (m_fake_read_lock_if_immutable)
×
2174
        return;
×
2175
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2176
    m_wait_for_change_enabled = false;
×
2177
    m_new_commit_available.notify_all();
×
2178
}
×
2179

2180

2181
void DB::enable_wait_for_change()
2182
{
×
2183
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2184
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2185
    m_wait_for_change_enabled = true;
×
2186
}
×
2187

2188
bool DB::needs_file_format_upgrade(const std::string& file, Span<const char> encryption_key)
2189
{
54✔
2190
    SlabAlloc alloc;
54✔
2191
    SlabAlloc::Config cfg;
54✔
2192
    cfg.session_initiator = false;
54✔
2193
    cfg.read_only = true;
54✔
2194
    cfg.no_create = true;
54✔
2195
    if (!encryption_key.empty()) {
54✔
2196
        REALM_ASSERT(encryption_key.size() == 64);
6✔
2197
        cfg.encryption_key = encryption_key.data();
6✔
2198
    }
6✔
2199
    try {
54✔
2200
        alloc.attach_file(file, cfg);
54✔
2201
        if (auto current_file_format_version = alloc.get_committed_file_format_version()) {
54✔
2202
            auto target_file_format_version = Group::g_current_file_format_version;
42✔
2203
            return current_file_format_version < target_file_format_version;
42✔
2204
        }
42✔
2205
    }
54✔
2206
    catch (const FileAccessError& err) {
54✔
2207
        if (err.code() != ErrorCodes::FileNotFound) {
6✔
2208
            throw;
×
2209
        }
×
2210
    }
6✔
2211
    return false;
12✔
2212
}
54✔
2213

2214
void DB::upgrade_file_format(bool allow_file_format_upgrade, int target_file_format_version,
2215
                             int current_hist_schema_version, int target_hist_schema_version)
2216
{
28,875✔
2217
    // In a multithreaded scenario multiple threads may initially see a need to
2218
    // upgrade (maybe_upgrade == true) even though one onw thread is supposed to
2219
    // perform the upgrade, but that is ok, because the condition is rechecked
2220
    // in a fully reliable way inside a transaction.
2221

2222
    // First a non-threadsafe but fast check
2223
    int current_file_format_version = m_file_format_version;
28,875✔
2224
    REALM_ASSERT(current_file_format_version <= target_file_format_version);
28,875✔
2225
    REALM_ASSERT(current_hist_schema_version <= target_hist_schema_version);
28,875✔
2226
    bool maybe_upgrade_file_format = (current_file_format_version < target_file_format_version);
28,875✔
2227
    bool maybe_upgrade_hist_schema = (current_hist_schema_version < target_hist_schema_version);
28,875✔
2228
    bool maybe_upgrade = maybe_upgrade_file_format || maybe_upgrade_hist_schema;
28,875✔
2229
    if (maybe_upgrade) {
28,875✔
2230

2231
#ifdef REALM_DEBUG
225✔
2232
// This sleep() only exists in order to increase the quality of the
2233
// TEST(Upgrade_Database_2_3_Writes_New_File_Format_new) unit test.
2234
// The unit test creates multiple threads that all call
2235
// upgrade_file_format() simultaneously. This sleep() then acts like
2236
// a simple thread barrier that makes sure the threads meet here, to
2237
// increase the likelyhood of detecting any potential race problems.
2238
// See the unit test for details.
2239
//
2240
// NOTE: This sleep has been disabled because no problems have been found with
2241
// this code in a long while, and it was dramatically slowing down a unit test
2242
// in realm-sync.
2243

2244
// millisleep(200);
2245
#endif
225✔
2246

2247
        // WriteTransaction wt(*this);
2248
        auto wt = start_write();
225✔
2249
        bool dirty = false;
225✔
2250

2251
        // We need to upgrade history first. We may need to access it during migration
2252
        // when processing the !OID columns
2253
        int current_hist_schema_version_2 = wt->get_history_schema_version();
225✔
2254
        // The history must either still be using its initial schema or have
2255
        // been upgraded already to the chosen target schema version via a
2256
        // concurrent DB object.
2257
        REALM_ASSERT(current_hist_schema_version_2 == current_hist_schema_version ||
225✔
2258
                     current_hist_schema_version_2 == target_hist_schema_version);
225✔
2259
        bool need_hist_schema_upgrade = (current_hist_schema_version_2 < target_hist_schema_version);
225✔
2260
        if (need_hist_schema_upgrade) {
225✔
2261
            if (!allow_file_format_upgrade)
138✔
2262
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2263

2264
            Replication* repl = get_replication();
138✔
2265
            repl->upgrade_history_schema(current_hist_schema_version_2); // Throws
138✔
2266
            wt->set_history_schema_version(target_hist_schema_version);  // Throws
138✔
2267
            dirty = true;
138✔
2268
        }
138✔
2269

2270
        // File format upgrade
2271
        int current_file_format_version_2 = m_alloc.get_committed_file_format_version();
225✔
2272
        // The file must either still be using its initial file_format or have
2273
        // been upgraded already to the chosen target file format via a
2274
        // concurrent DB object.
2275
        REALM_ASSERT(current_file_format_version_2 == current_file_format_version ||
225!
2276
                     current_file_format_version_2 == target_file_format_version);
225✔
2277
        bool need_file_format_upgrade = (current_file_format_version_2 < target_file_format_version);
225✔
2278
        if (need_file_format_upgrade) {
225✔
2279
            if (!allow_file_format_upgrade)
156✔
2280
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2281
            wt->upgrade_file_format(target_file_format_version); // Throws
156✔
2282
            // Note: The file format version stored in the Realm file will be
2283
            // updated to the new file format version as part of the following
2284
            // commit operation. This happens in GroupWriter::commit().
2285
            if (m_upgrade_callback)
156✔
2286
                m_upgrade_callback(current_file_format_version_2, target_file_format_version); // Throws
18✔
2287
            dirty = true;
156✔
2288
        }
156✔
2289
        wt->set_file_format_version(target_file_format_version);
225✔
2290
        m_file_format_version = target_file_format_version;
225✔
2291

2292
        if (dirty)
225✔
2293
            wt->commit(); // Throws
216✔
2294
    }
225✔
2295
}
28,875✔
2296

2297
void DB::release_read_lock(ReadLockInfo& read_lock) noexcept
2298
{
4,470,273✔
2299
    // ignore if opened with immutable file (then we have no lockfile)
2300
    if (m_fake_read_lock_if_immutable)
4,470,273✔
2301
        return;
384✔
2302
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
4,469,889✔
2303
    do_release_read_lock(read_lock);
4,469,889✔
2304
}
4,469,889✔
2305

2306
// this is called with m_mutex locked
2307
void DB::do_release_read_lock(ReadLockInfo& read_lock) noexcept
2308
{
4,475,136✔
2309
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
4,475,136✔
2310
    bool found_match = false;
4,475,136✔
2311
    // simple linear search and move-last-over if a match is found.
2312
    // common case should have only a modest number of transactions in play..
2313
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
7,900,944✔
2314
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
7,900,725✔
2315
            m_local_locks_held[j] = m_local_locks_held.back();
4,474,917✔
2316
            m_local_locks_held.pop_back();
4,474,917✔
2317
            found_match = true;
4,474,917✔
2318
            break;
4,474,917✔
2319
        }
4,474,917✔
2320
    }
7,900,725✔
2321
    if (!found_match) {
4,475,136✔
2322
        REALM_ASSERT(!is_attached());
6✔
2323
        // it's OK, someone called close() and all locks where released
2324
        return;
6✔
2325
    }
6✔
2326
    --m_transaction_count;
4,475,130✔
2327
    m_version_manager->release_read_lock(read_lock);
4,475,130✔
2328
}
4,475,130✔
2329

2330

2331
DB::ReadLockInfo DB::grab_read_lock(ReadLockInfo::Type type, VersionID version_id)
2332
{
4,449,051✔
2333
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
4,449,051✔
2334
    REALM_ASSERT_RELEASE(is_attached());
4,449,051✔
2335
    auto read_lock = m_version_manager->grab_read_lock(type, version_id);
4,449,051✔
2336

2337
    m_local_locks_held.emplace_back(read_lock);
4,449,051✔
2338
    ++m_transaction_count;
4,449,051✔
2339
    REALM_ASSERT(read_lock.m_file_size > read_lock.m_top_ref);
4,449,051✔
2340
    return read_lock;
4,449,051✔
2341
}
4,449,051✔
2342

2343
void DB::leak_read_lock(ReadLockInfo& read_lock) noexcept
2344
{
6✔
2345
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6✔
2346
    // simple linear search and move-last-over if a match is found.
2347
    // common case should have only a modest number of transactions in play..
2348
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
6✔
2349
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
6✔
2350
            m_local_locks_held[j] = m_local_locks_held.back();
6✔
2351
            m_local_locks_held.pop_back();
6✔
2352
            --m_transaction_count;
6✔
2353
            return;
6✔
2354
        }
6✔
2355
    }
6✔
2356
}
6✔
2357

2358
bool DB::do_try_begin_write()
2359
{
84✔
2360
    // In the non-blocking case, we will only succeed if there is no contention for
2361
    // the write mutex. For this case we are trivially fair and can ignore the
2362
    // fairness machinery.
2363
    bool got_the_lock = m_writemutex.try_lock();
84✔
2364
    if (got_the_lock) {
84✔
2365
        finish_begin_write();
72✔
2366
    }
72✔
2367
    return got_the_lock;
84✔
2368
}
84✔
2369

2370
void DB::do_begin_write()
2371
{
621,651✔
2372
    if (m_logger) {
621,651✔
2373
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "acquire writemutex");
261,546✔
2374
    }
261,546✔
2375

2376
    SharedInfo* info = m_info;
621,651✔
2377

2378
    // Get write lock - the write lock is held until do_end_write().
2379
    //
2380
    // We use a ticketing scheme to ensure fairness wrt performing write transactions.
2381
    // (But cannot do that on Windows until we have interprocess condition variables there)
2382
    uint32_t my_ticket = info->next_ticket.fetch_add(1, std::memory_order_relaxed);
621,651✔
2383
    m_writemutex.lock(); // Throws
621,651✔
2384

2385
    // allow for comparison even after wrap around of ticket numbering:
2386
    int32_t diff = int32_t(my_ticket - info->next_served.load(std::memory_order_relaxed));
621,651✔
2387
    bool should_yield = diff > 0; // ticket is in the future
621,651✔
2388
    // a) the above comparison is only guaranteed to be correct, if the distance
2389
    //    between my_ticket and info->next_served is less than 2^30. This will
2390
    //    be the case since the distance will be bounded by the number of threads
2391
    //    and each thread cannot ever hold more than one ticket.
2392
    // b) we could use 64 bit counters instead, but it is unclear if all platforms
2393
    //    have support for interprocess atomics for 64 bit values.
2394

2395
    timespec time_limit; // only compute the time limit if we're going to use it:
621,651✔
2396
    if (should_yield) {
621,651✔
2397
        // This clock is not monotonic, so time can move backwards. This can lead
2398
        // to a wrong time limit, but the only effect of a wrong time limit is that
2399
        // we momentarily lose fairness, so we accept it.
2400
        timeval tv;
25,941✔
2401
        gettimeofday(&tv, nullptr);
25,941✔
2402
        time_limit.tv_sec = tv.tv_sec;
25,941✔
2403
        time_limit.tv_nsec = tv.tv_usec * 1000;
25,941✔
2404
        time_limit.tv_nsec += 500000000;        // 500 msec wait
25,941✔
2405
        if (time_limit.tv_nsec >= 1000000000) { // overflow
25,941✔
2406
            time_limit.tv_nsec -= 1000000000;
13,383✔
2407
            time_limit.tv_sec += 1;
13,383✔
2408
        }
13,383✔
2409
    }
25,941✔
2410

2411
    while (should_yield) {
772,020✔
2412

2413
        m_pick_next_writer.wait(m_writemutex, &time_limit);
150,369✔
2414
        timeval tv;
150,369✔
2415
        gettimeofday(&tv, nullptr);
150,369✔
2416
        if (time_limit.tv_sec < tv.tv_sec ||
150,369✔
2417
            (time_limit.tv_sec == tv.tv_sec && time_limit.tv_nsec < tv.tv_usec * 1000)) {
150,369✔
2418
            // Timeout!
2419
            break;
×
2420
        }
×
2421
        diff = int32_t(my_ticket - info->next_served);
150,369✔
2422
        should_yield = diff > 0; // ticket is in the future, so yield to someone else
150,369✔
2423
    }
150,369✔
2424

2425
    // we may get here because a) it's our turn, b) we timed out
2426
    // we don't distinguish, satisfied that event b) should be rare.
2427
    // In case b), we have to *make* it our turn. Failure to do so could leave us
2428
    // with 'next_served' permanently trailing 'next_ticket'.
2429
    //
2430
    // In doing so, we may bypass other waiters, hence the condition for yielding
2431
    // should take this situation into account by comparing with '>' instead of '!='
2432
    info->next_served = my_ticket;
621,651✔
2433
    finish_begin_write();
621,651✔
2434
    if (m_logger) {
621,651✔
2435
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex acquired");
261,546✔
2436
    }
261,546✔
2437
}
621,651✔
2438

2439
void DB::finish_begin_write()
2440
{
621,702✔
2441
    if (m_info->commit_in_critical_phase) {
621,702✔
2442
        m_writemutex.unlock();
×
2443
        throw RuntimeError(ErrorCodes::BrokenInvariant, "Crash of other process detected, session restart required");
×
2444
    }
×
2445

2446

2447
    {
621,702✔
2448
        CheckedLockGuard local_lock(m_mutex);
621,702✔
2449
        m_write_transaction_open = true;
621,702✔
2450
    }
621,702✔
2451
    m_alloc.set_read_only(false);
621,702✔
2452
}
621,702✔
2453

2454
void DB::do_end_write() noexcept
2455
{
621,708✔
2456
    m_info->next_served.fetch_add(1, std::memory_order_relaxed);
621,708✔
2457

2458
    CheckedLockGuard local_lock(m_mutex);
621,708✔
2459
    REALM_ASSERT(m_write_transaction_open);
621,708✔
2460
    m_alloc.set_read_only(true);
621,708✔
2461
    m_write_transaction_open = false;
621,708✔
2462
    m_pick_next_writer.notify_all();
621,708✔
2463
    m_writemutex.unlock();
621,708✔
2464
    if (m_logger) {
621,708✔
2465
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex released");
261,594✔
2466
    }
261,594✔
2467
}
621,708✔
2468

2469

2470
Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to_disk)
2471
{
627,507✔
2472
    version_type current_version;
627,507✔
2473
    {
627,507✔
2474
        current_version = m_version_manager->get_newest_version();
627,507✔
2475
    }
627,507✔
2476
    version_type new_version = current_version + 1;
627,507✔
2477

2478
    if (!transaction.m_tables_to_clear.empty()) {
627,507✔
2479
        for (auto table_key : transaction.m_tables_to_clear) {
678✔
2480
            transaction.get_table_unchecked(table_key)->clear();
678✔
2481
        }
678✔
2482
        transaction.m_tables_to_clear.clear();
678✔
2483
    }
678✔
2484
    if (Replication* repl = get_replication()) {
627,507✔
2485
        // If Replication::prepare_commit() fails, then the entire transaction
2486
        // fails. The application then has the option of terminating the
2487
        // transaction with a call to Transaction::Rollback(), which in turn
2488
        // must call Replication::abort_transact().
2489
        new_version = repl->prepare_commit(current_version);        // Throws
603,051✔
2490
        low_level_commit(new_version, transaction, commit_to_disk); // Throws
603,051✔
2491
        repl->finalize_commit();
603,051✔
2492
    }
603,051✔
2493
    else {
24,456✔
2494
        low_level_commit(new_version, transaction); // Throws
24,456✔
2495
    }
24,456✔
2496

2497
    {
627,507✔
2498
        std::lock_guard lock(m_commit_listener_mutex);
627,507✔
2499
        for (auto listener : m_commit_listeners) {
627,507✔
2500
            listener->on_commit(new_version);
412,359✔
2501
        }
412,359✔
2502
    }
627,507✔
2503

2504
    return new_version;
627,507✔
2505
}
627,507✔
2506

2507
VersionID DB::get_version_id_of_latest_snapshot()
2508
{
15,470,355✔
2509
    if (m_fake_read_lock_if_immutable)
15,470,355✔
2510
        return {m_fake_read_lock_if_immutable->m_version, 0};
12✔
2511
    return m_version_manager->get_version_id_of_latest_snapshot();
15,470,343✔
2512
}
15,470,355✔
2513

2514

2515
DB::version_type DB::get_version_of_latest_snapshot()
2516
{
15,469,530✔
2517
    return get_version_id_of_latest_snapshot().version;
15,469,530✔
2518
}
15,469,530✔
2519

2520

2521
void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction, bool commit_to_disk)
2522
{
627,528✔
2523
    SharedInfo* info = m_info;
627,528✔
2524

2525
    // Version of oldest snapshot currently (or recently) bound in a transaction
2526
    // of the current session.
2527
    uint64_t oldest_version = 0, oldest_live_version = 0;
627,528✔
2528
    TopRefMap top_refs;
627,528✔
2529
    bool any_new_unreachables;
627,528✔
2530
    {
627,528✔
2531
        CheckedLockGuard lock(m_mutex);
627,528✔
2532
        m_version_manager->cleanup_versions(oldest_live_version, top_refs, any_new_unreachables);
627,528✔
2533
        oldest_version = top_refs.begin()->first;
627,528✔
2534
        // Allow for trimming of the history. Some types of histories do not
2535
        // need store changesets prior to the oldest *live* bound snapshot.
2536
        if (auto hist = transaction.get_history()) {
627,528✔
2537
            hist->set_oldest_bound_version(oldest_live_version); // Throws
603,024✔
2538
        }
603,024✔
2539
        // Cleanup any stale mappings
2540
        m_alloc.purge_old_mappings(oldest_version, new_version);
627,528✔
2541
    }
627,528✔
2542
    // save number of live versions for later:
2543
    // (top_refs is std::moved into GroupWriter so we'll loose it in the call to set_versions below)
2544
    auto live_versions = top_refs.size();
627,528✔
2545
    // Do the actual commit
2546
    REALM_ASSERT(oldest_version <= new_version);
627,528✔
2547

2548
    GroupWriter out(transaction, Durability(info->durability), m_marker_observer.get()); // Throws
627,528✔
2549
    out.set_versions(new_version, top_refs, any_new_unreachables);
627,528✔
2550
    out.prepare_evacuation();
627,528✔
2551
    auto t1 = std::chrono::steady_clock::now();
627,528✔
2552
    auto commit_size = m_alloc.get_commit_size();
627,528✔
2553
    if (m_logger) {
627,528✔
2554
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Initiate commit version: %1",
268,671✔
2555
                      new_version);
268,671✔
2556
    }
268,671✔
2557
    if (auto limit = out.get_evacuation_limit()) {
627,528✔
2558
        // Get a work limit based on the size of the transaction we're about to commit
2559
        // Add 4k to ensure progress on small commits
2560
        size_t work_limit = commit_size / 2 + out.get_free_list_size() + 0x1000;
5,364✔
2561
        transaction.cow_outliers(out.get_evacuation_progress(), limit, work_limit);
5,364✔
2562
    }
5,364✔
2563

2564
    ref_type new_top_ref;
627,528✔
2565
    // Recursively write all changed arrays to end of file
2566
    {
627,528✔
2567
        // protect against race with any other DB trying to attach to the file
2568
        std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
627,528✔
2569
        new_top_ref = out.write_group();                         // Throws
627,528✔
2570
    }
627,528✔
2571
    {
627,528✔
2572
        // protect access to shared variables and m_reader_mapping from here
2573
        CheckedLockGuard lock_guard(m_mutex);
627,528✔
2574
        m_free_space = out.get_free_space_size();
627,528✔
2575
        m_locked_space = out.get_locked_space_size();
627,528✔
2576
        m_used_space = out.get_logical_size() - m_free_space;
627,528✔
2577
        m_evac_stage.store(EvacStage(out.get_evacuation_stage()));
627,528✔
2578
        out.sync_according_to_durability();
627,528✔
2579
        if (Durability(info->durability) == Durability::Full || Durability(info->durability) == Durability::Unsafe) {
627,528✔
2580
            if (commit_to_disk) {
426,246✔
2581
                GroupCommitter cm(transaction, Durability(info->durability), m_marker_observer.get());
418,710✔
2582
                cm.commit(new_top_ref);
418,710✔
2583
            }
418,710✔
2584
        }
426,246✔
2585
        size_t new_file_size = out.get_logical_size();
627,528✔
2586
        // We must reset the allocators free space tracking before communicating the new
2587
        // version through the ring buffer. If not, a reader may start updating the allocators
2588
        // mappings while the allocator is in dirty state.
2589
        reset_free_space_tracking();
627,528✔
2590
        // Add the new version. If this fails in any way, the VersionList may be corrupted.
2591
        // This can lead to readers seing invalid data which is likely to cause them
2592
        // to crash. Other writers *must* be prevented from writing any further updates
2593
        // to the database. The flag "commit_in_critical_phase" is used to prevent such updates.
2594
        info->commit_in_critical_phase = 1;
627,528✔
2595
        {
627,528✔
2596
            m_version_manager->add_version(new_top_ref, new_file_size, new_version);
627,528✔
2597

2598
            // REALM_ASSERT(m_alloc.matches_section_boundary(new_file_size));
2599
            REALM_ASSERT(new_top_ref < new_file_size);
627,528✔
2600
        }
627,528✔
2601
        // At this point, the VersionList has been succesfully updated, and the next writer
2602
        // can safely proceed once the writemutex has been lifted.
2603
        info->commit_in_critical_phase = 0;
627,528✔
2604
    }
627,528✔
2605
    {
627,528✔
2606
        // protect against concurrent updates to the .lock file.
2607
        // must release m_mutex before this point to obey lock order
2608
        std::lock_guard<InterprocessMutex> lock(m_controlmutex);
627,528✔
2609

2610
        info->number_of_versions = live_versions + 1;
627,528✔
2611
        info->latest_version_number = new_version;
627,528✔
2612

2613
        m_new_commit_available.notify_all();
627,528✔
2614
    }
627,528✔
2615
    auto t2 = std::chrono::steady_clock::now();
627,528✔
2616
    if (m_logger) {
627,528✔
2617
        std::string to_disk_str = commit_to_disk ? util::format(" ref %1", new_top_ref) : " (no commit to disk)";
268,671✔
2618
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Commit of size %1 done in %2 us%3",
268,671✔
2619
                      commit_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count(),
268,671✔
2620
                      to_disk_str);
268,671✔
2621
    }
268,671✔
2622
}
627,528✔
2623

2624
#ifdef REALM_DEBUG
2625
void DB::reserve(size_t size)
2626
{
36✔
2627
    REALM_ASSERT(is_attached());
36✔
2628
    m_alloc.reserve_disk_space(size); // Throws
36✔
2629
}
36✔
2630
#endif
2631

2632
bool DB::call_with_lock(const std::string& realm_path, CallbackWithLock&& callback)
2633
{
195✔
2634
    auto lockfile_path = get_core_file(realm_path, CoreFileType::Lock);
195✔
2635

2636
    File lockfile;
195✔
2637
    lockfile.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
195✔
2638
    File::CloseGuard fcg(lockfile);
195✔
2639
    lockfile.set_fifo_path(realm_path + ".management", "lock.fifo");
195✔
2640
    if (lockfile.try_rw_lock_exclusive()) { // Throws
195✔
2641
        callback(realm_path);
153✔
2642
        return true;
153✔
2643
    }
153✔
2644
    return false;
42✔
2645
}
195✔
2646

2647
std::string DB::get_core_file(const std::string& base_path, CoreFileType type)
2648
{
171,084✔
2649
    switch (type) {
171,084✔
2650
        case CoreFileType::Lock:
74,631✔
2651
            return base_path + ".lock";
74,631✔
2652
        case CoreFileType::Storage:
972✔
2653
            return base_path;
972✔
2654
        case CoreFileType::Management:
74,472✔
2655
            return base_path + ".management";
74,472✔
2656
        case CoreFileType::Note:
20,046✔
2657
            return base_path + ".note";
20,046✔
2658
        case CoreFileType::Log:
963✔
2659
            return base_path + ".log";
963✔
2660
    }
171,084✔
2661
    REALM_UNREACHABLE();
2662
}
×
2663

2664
void DB::delete_files(const std::string& base_path, bool* did_delete, bool delete_lockfile)
2665
{
966✔
2666
    if (File::try_remove(get_core_file(base_path, CoreFileType::Storage)) && did_delete) {
966✔
2667
        *did_delete = true;
105✔
2668
    }
105✔
2669

2670
    File::try_remove(get_core_file(base_path, CoreFileType::Note));
966✔
2671
    File::try_remove(get_core_file(base_path, CoreFileType::Log));
966✔
2672
    util::try_remove_dir_recursive(get_core_file(base_path, CoreFileType::Management));
966✔
2673

2674
    if (delete_lockfile) {
966✔
2675
        File::try_remove(get_core_file(base_path, CoreFileType::Lock));
921✔
2676
    }
921✔
2677
}
966✔
2678

2679
TransactionRef DB::start_read(VersionID version_id)
2680
{
1,766,028✔
2681
    if (!is_attached())
1,766,028✔
2682
        throw StaleAccessor("Stale transaction");
6✔
2683
    TransactionRef tr;
1,766,022✔
2684
    if (m_fake_read_lock_if_immutable) {
1,766,022✔
2685
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Reading);
372✔
2686
    }
372✔
2687
    else {
1,765,650✔
2688
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, version_id);
1,765,650✔
2689
        ReadLockGuard g(*this, read_lock);
1,765,650✔
2690
        read_lock.check();
1,765,650✔
2691
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Reading);
1,765,650✔
2692
        g.release();
1,765,650✔
2693
    }
1,765,650✔
2694
    tr->set_file_format_version(get_file_format_version());
1,766,022✔
2695
    return tr;
1,766,022✔
2696
}
1,766,028✔
2697

2698
TransactionRef DB::start_frozen(VersionID version_id)
2699
{
44,526✔
2700
    if (!is_attached())
44,526✔
2701
        throw StaleAccessor("Stale transaction");
×
2702
    TransactionRef tr;
44,526✔
2703
    if (m_fake_read_lock_if_immutable) {
44,526✔
2704
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Frozen);
12✔
2705
    }
12✔
2706
    else {
44,514✔
2707
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Frozen, version_id);
44,514✔
2708
        ReadLockGuard g(*this, read_lock);
44,514✔
2709
        read_lock.check();
44,514✔
2710
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Frozen);
44,514✔
2711
        g.release();
44,514✔
2712
    }
44,514✔
2713
    tr->set_file_format_version(get_file_format_version());
44,526✔
2714
    return tr;
44,526✔
2715
}
44,526✔
2716

2717
TransactionRef DB::start_write(bool nonblocking)
2718
{
279,888✔
2719
    if (m_fake_read_lock_if_immutable) {
279,888✔
2720
        REALM_ASSERT(false && "Can't write an immutable DB");
×
2721
    }
×
2722
    if (nonblocking) {
279,888✔
2723
        bool success = do_try_begin_write();
84✔
2724
        if (!success) {
84✔
2725
            return TransactionRef();
12✔
2726
        }
12✔
2727
    }
84✔
2728
    else {
279,804✔
2729
        do_begin_write();
279,804✔
2730
    }
279,804✔
2731
    {
279,876✔
2732
        CheckedUniqueLock local_lock(m_mutex);
279,876✔
2733
        if (!is_attached()) {
279,876✔
2734
            local_lock.unlock();
×
2735
            end_write_on_correct_thread();
×
2736
            throw StaleAccessor("Stale transaction");
×
2737
        }
×
2738
        m_write_transaction_open = true;
279,876✔
2739
    }
279,876✔
2740
    TransactionRef tr;
×
2741
    try {
279,876✔
2742
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, VersionID());
279,876✔
2743
        ReadLockGuard g(*this, read_lock);
279,876✔
2744
        read_lock.check();
279,876✔
2745

2746
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Writing);
279,876✔
2747
        tr->set_file_format_version(get_file_format_version());
279,876✔
2748
        version_type current_version = read_lock.m_version;
279,876✔
2749
        m_alloc.init_mapping_management(current_version);
279,876✔
2750
        if (Replication* repl = get_replication()) {
279,876✔
2751
            bool history_updated = false;
255,312✔
2752
            repl->initiate_transact(*tr, current_version, history_updated); // Throws
255,312✔
2753
        }
255,312✔
2754
        g.release();
279,876✔
2755
    }
279,876✔
2756
    catch (...) {
279,876✔
2757
        end_write_on_correct_thread();
×
2758
        throw;
×
2759
    }
×
2760

2761
    return tr;
279,831✔
2762
}
279,876✔
2763

2764
void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
2765
{
1,614✔
2766
    {
1,614✔
2767
        util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2768
        REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
1,614✔
2769
        tr->m_async_stage = Transaction::AsyncState::Requesting;
1,614✔
2770
        tr->m_request_time_point = std::chrono::steady_clock::now();
1,614✔
2771
        if (tr->db->m_logger) {
1,614✔
2772
            tr->db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,614✔
2773
                                  "Tr %1: Async request write lock", tr->m_log_id);
1,614✔
2774
        }
1,614✔
2775
    }
1,614✔
2776
    std::weak_ptr<Transaction> weak_tr = tr;
1,614✔
2777
    async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
1,614✔
2778
        if (auto tr = weak_tr.lock()) {
1,614✔
2779
            util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2780
            // If a synchronous transaction happened while we were pending
2781
            // we may be in HasCommits
2782
            if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
1,614✔
2783
                tr->m_async_stage = Transaction::AsyncState::HasLock;
1,614✔
2784
            }
1,614✔
2785
            if (tr->db->m_logger) {
1,614✔
2786
                auto t2 = std::chrono::steady_clock::now();
1,614✔
2787
                tr->db->m_logger->log(
1,614✔
2788
                    util::LogCategory::transaction, util::Logger::Level::trace, "Tr %1, Got write lock in %2 us",
1,614✔
2789
                    tr->m_log_id,
1,614✔
2790
                    std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
1,614✔
2791
            }
1,614✔
2792
            if (tr->m_waiting_for_write_lock) {
1,614✔
2793
                tr->m_waiting_for_write_lock = false;
129✔
2794
                tr->m_async_cv.notify_one();
129✔
2795
            }
129✔
2796
            else if (cb) {
1,485✔
2797
                cb();
1,485✔
2798
            }
1,485✔
2799
            tr.reset(); // Release pointer while lock is held
1,614✔
2800
        }
1,614✔
2801
    });
1,614✔
2802
}
1,614✔
2803

2804
inline DB::DB(Private, const DBOptions& options)
2805
    : m_upgrade_callback(std::move(options.upgrade_callback))
48,930✔
2806
    , m_log_id(util::gen_log_id(this))
48,930✔
2807
{
99,207✔
2808
    if (options.enable_async_writes) {
99,207✔
2809
        m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
59,022✔
2810
    }
59,022✔
2811
}
99,207✔
2812

2813
DBRef DB::create(const std::string& file, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2814
{
27,306✔
2815
    DBRef retval = std::make_shared<DB>(Private(), options);
27,306✔
2816
    retval->open(file, options);
27,306✔
2817
    return retval;
27,306✔
2818
}
27,306✔
2819

2820
DBRef DB::create(Replication& repl, const std::string& file, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2821
{
8,679✔
2822
    DBRef retval = std::make_shared<DB>(Private(), options);
8,679✔
2823
    retval->open(repl, file, options);
8,679✔
2824
    return retval;
8,679✔
2825
}
8,679✔
2826

2827
DBRef DB::create(std::unique_ptr<Replication> repl, const std::string& file,
2828
                 const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2829
{
37,710✔
2830
    REALM_ASSERT(repl);
37,710✔
2831
    DBRef retval = std::make_shared<DB>(Private(), options);
37,710✔
2832
    retval->m_history = std::move(repl);
37,710✔
2833
    retval->open(*retval->m_history, file, options);
37,710✔
2834
    return retval;
37,710✔
2835
}
37,710✔
2836

2837
DBRef DB::create(std::unique_ptr<Replication> repl, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2838
{
25,506✔
2839
    REALM_ASSERT(repl);
25,506✔
2840
    DBRef retval = std::make_shared<DB>(Private(), options);
25,506✔
2841
    retval->m_history = std::move(repl);
25,506✔
2842
    retval->open(*retval->m_history, options);
25,506✔
2843
    return retval;
25,506✔
2844
}
25,506✔
2845

2846
DBRef DB::create_in_memory(std::unique_ptr<Replication> repl, const std::string& in_memory_path,
2847
                           const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2848
{
×
2849
    DBRef db = create(std::move(repl), options);
×
2850
    db->m_db_path = in_memory_path;
×
2851
    return db;
×
2852
}
×
2853

2854
DBRef DB::create(BinaryData buffer, bool take_ownership) NO_THREAD_SAFETY_ANALYSIS
2855
{
6✔
2856
    DBOptions options;
6✔
2857
    options.is_immutable = true;
6✔
2858
    DBRef retval = std::make_shared<DB>(Private(), options);
6✔
2859
    retval->open(buffer, take_ownership);
6✔
2860
    return retval;
6✔
2861
}
6✔
2862

2863
bool DB::try_claim_sync_agent()
2864
{
19,317✔
2865
    REALM_ASSERT(is_attached());
19,317✔
2866
    std::lock_guard lock(m_controlmutex);
19,317✔
2867
    if (m_info->sync_agent_present)
19,317✔
2868
        return false;
102✔
2869
    m_info->sync_agent_present = 1; // Set to true
19,215✔
2870
    m_is_sync_agent = true;
19,215✔
2871
    return true;
19,215✔
2872
}
19,317✔
2873

2874
void DB::claim_sync_agent()
2875
{
17,160✔
2876
    if (!try_claim_sync_agent())
17,160✔
2877
        throw MultipleSyncAgents{};
6✔
2878
}
17,160✔
2879

2880
void DB::release_sync_agent()
2881
{
15,720✔
2882
    REALM_ASSERT(is_attached());
15,720✔
2883
    std::lock_guard lock(m_controlmutex);
15,720✔
2884
    if (!m_is_sync_agent)
15,720✔
2885
        return;
111✔
2886
    REALM_ASSERT(m_info->sync_agent_present);
15,609✔
2887
    m_info->sync_agent_present = 0;
15,609✔
2888
    m_is_sync_agent = false;
15,609✔
2889
}
15,609✔
2890

2891
void DB::do_begin_possibly_async_write()
2892
{
340,227✔
2893
    if (m_commit_helper) {
340,227✔
2894
        m_commit_helper->blocking_begin_write();
211,140✔
2895
    }
211,140✔
2896
    else {
129,087✔
2897
        do_begin_write();
129,087✔
2898
    }
129,087✔
2899
}
340,227✔
2900

2901
void DB::end_write_on_correct_thread() noexcept
2902
{
620,301✔
2903
    //    m_local_write_mutex.unlock();
2904
    if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
620,301✔
2905
        do_end_write();
408,954✔
2906
    }
408,954✔
2907
}
620,301✔
2908

2909
void DB::add_commit_listener(CommitListener* listener)
2910
{
74,658✔
2911
    std::lock_guard lock(m_commit_listener_mutex);
74,658✔
2912
    m_commit_listeners.push_back(listener);
74,658✔
2913
}
74,658✔
2914

2915
void DB::remove_commit_listener(CommitListener* listener)
2916
{
74,730✔
2917
    std::lock_guard lock(m_commit_listener_mutex);
74,730✔
2918
    m_commit_listeners.erase(std::remove(m_commit_listeners.begin(), m_commit_listeners.end(), listener),
74,730✔
2919
                             m_commit_listeners.end());
74,730✔
2920
}
74,730✔
2921

2922
DisableReplication::DisableReplication(Transaction& t)
2923
    : m_tr(t)
2924
    , m_owner(t.get_db())
2925
    , m_repl(m_owner->get_replication())
2926
    , m_version(t.get_version())
2927
{
×
2928
    m_owner->set_replication(nullptr);
×
2929
    t.m_history = nullptr;
×
2930
}
×
2931

2932
DisableReplication::~DisableReplication()
2933
{
×
2934
    m_owner->set_replication(m_repl);
×
2935
    if (m_version != m_tr.get_version())
×
2936
        m_tr.initialize_replication();
×
2937
}
×
2938

2939
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc