• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1659

11 Sep 2023 10:20AM UTC coverage: 91.215% (-0.007%) from 91.222%
1659

push

Evergreen

GitHub
Throw an exception if File::unlock has failed + use same logic for all platforms, for detecting when we hold a lock on a file (#6926)

95832 of 175764 branches covered (0.0%)

13 of 15 new or added lines in 2 files covered. (86.67%)

92 existing lines in 18 files now uncovered.

233432 of 255915 relevant lines covered (91.21%)

6744298.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.53
/src/realm/db.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20

21
#include <algorithm>
22
#include <atomic>
23
#include <cerrno>
24
#include <fcntl.h>
25
#include <iostream>
26
#include <mutex>
27
#include <sstream>
28
#include <type_traits>
29
#include <random>
30
#include <deque>
31
#include <thread>
32
#include <chrono>
33
#include <condition_variable>
34

35
#include <realm/disable_sync_to_disk.hpp>
36
#include <realm/group_writer.hpp>
37
#include <realm/impl/simulated_failure.hpp>
38
#include <realm/replication.hpp>
39
#include <realm/util/errno.hpp>
40
#include <realm/util/features.h>
41
#include <realm/util/file_mapper.hpp>
42
#include <realm/util/safe_int_ops.hpp>
43
#include <realm/util/scope_exit.hpp>
44
#include <realm/util/thread.hpp>
45
#include <realm/util/to_string.hpp>
46

47
#ifndef _WIN32
48
#include <sys/wait.h>
49
#include <sys/time.h>
50
#include <unistd.h>
51
#else
52
#include <windows.h>
53
#include <process.h>
54
#endif
55

56
// #define REALM_ENABLE_LOGFILE
57

58

59
using namespace realm;
60
using namespace realm::metrics;
61
using namespace realm::util;
62
using Durability = DBOptions::Durability;
63

64
namespace {
65

66
// value   change
67
// --------------------
68
//  4      Unknown
69
//  5      Introduction of SharedInfo::file_format_version and
70
//         SharedInfo::history_type.
71
//  6      Using new robust mutex emulation where applicable
72
//  7      Introducing `commit_in_critical_phase` and `sync_agent_present`, and
73
//         changing `daemon_started` and `daemon_ready` from 1-bit to 8-bit
74
//         fields.
75
//  8      Placing the commitlog history inside the Realm file.
76
//  9      Fair write transactions requires an additional condition variable,
77
//         `write_fairness`
78
// 10      Introducing SharedInfo::history_schema_version.
79
// 11      New impl of InterprocessCondVar on windows.
80
// 12      Change `number_of_versions` to an atomic rather than guarding it
81
//         with a lock.
82
// 13      New impl of VersionList and added mutex for it (former RingBuffer)
83
// 14      Added field for tracking ongoing encrypted writes
84
const uint_fast16_t g_shared_info_version = 14;
85

86

87
struct VersionList {
88
    // the VersionList is an array of ReadCount structures.
89
    // it is placed in the "lock-file" and accessed via memory mapping
90
    struct ReadCount {
91
        uint64_t version;
92
        uint64_t filesize;
93
        uint64_t current_top;
94
        uint32_t count_live;
95
        uint32_t count_frozen;
96
        uint32_t count_full;
97
        bool is_active()
98
        {
145,546,908✔
99
            return version != 0;
145,546,908✔
100
        }
145,546,908✔
101
        void deactivate()
102
        {
10,435,506✔
103
            version = 0;
10,435,506✔
104
            count_live = count_frozen = count_full = 0;
10,435,506✔
105
        }
10,435,506✔
106
        void activate(uint64_t v)
107
        {
1,506,855✔
108
            version = v;
1,506,855✔
109
        }
1,506,855✔
110
    };
111

112
    void reserve(uint32_t size) noexcept
113
    {
287,238✔
114
        for (auto i = entries; i < size; ++i)
9,478,734✔
115
            data()[i].deactivate();
9,191,496✔
116
        if (size > entries) {
287,241✔
117
            // Fence preventing downward motion of above writes
141,117✔
118
            std::atomic_signal_fence(std::memory_order_release);
287,241✔
119
            entries = size;
287,241✔
120
        }
287,241✔
121
    }
287,238✔
122

123
    VersionList() noexcept
124
    {
142,728✔
125
        newest = nil; // empty
142,728✔
126
        entries = 0;
142,728✔
127
        reserve(init_readers_size);
142,728✔
128
    }
142,728✔
129

130
    static size_t compute_required_space(uint_fast32_t num_entries) noexcept
131
    {
143,613✔
132
        // get space required for given number of entries beyond the initial count.
70,551✔
133
        // NB: this not the size of the VersionList, it is the size minus whatever was
70,551✔
134
        // the initial size.
70,551✔
135
        return sizeof(ReadCount) * (num_entries - init_readers_size);
143,613✔
136
    }
143,613✔
137

138
    unsigned int capacity() const noexcept
139
    {
2,730,573✔
140
        return entries;
2,730,573✔
141
    }
2,730,573✔
142

143
    ReadCount& get(uint_fast32_t idx) noexcept
144
    {
6,199,902✔
145
        return data()[idx];
6,199,902✔
146
    }
6,199,902✔
147

148
    ReadCount& get_newest() noexcept
149
    {
×
150
        return get(newest);
×
151
    }
×
152
    // returns nullptr if all entries are in use
153
    ReadCount* try_allocate_entry(uint64_t top, uint64_t size, uint64_t version)
154
    {
1,506,831✔
155
        auto i = allocating.load();
1,506,831✔
156
        if (i == newest.load()) {
1,506,831✔
157
            // if newest != allocating we are recovering from a crash and MUST complete the earlier allocation
684,258✔
158
            // but if not, find lowest free entry by linear search.
684,258✔
159
            uint32_t k = 0;
1,362,294✔
160
            while (k < entries && data()[k].is_active()) {
2,480,625✔
161
                ++k;
1,118,331✔
162
            }
1,118,331✔
163
            if (k == entries)
1,362,294✔
164
                return nullptr;     // no free entries
18✔
165
            allocating.exchange(k); // barrier: prevent upward movement of instructions below
1,362,276✔
166
            i = k;
1,362,276✔
167
        }
1,362,276✔
168
        auto& rc = data()[i];
1,506,822✔
169
        REALM_ASSERT(rc.count_frozen == 0);
1,506,813✔
170
        REALM_ASSERT(rc.count_live == 0);
1,506,813✔
171
        REALM_ASSERT(rc.count_full == 0);
1,506,813✔
172
        rc.current_top = top;
1,506,813✔
173
        rc.filesize = size;
1,506,813✔
174
        rc.activate(version);
1,506,813✔
175
        newest.store(i); // barrier: prevent downward movement of instructions above
1,506,813✔
176
        return &rc;
1,506,813✔
177
    }
1,506,831✔
178

179
    uint32_t index_of(const ReadCount& rc) noexcept
180
    {
1,244,067✔
181
        return (uint32_t)(&rc - data());
1,244,067✔
182
    }
1,244,067✔
183

184
    void free_entry(ReadCount* rc) noexcept
185
    {
1,244,064✔
186
        rc->current_top = rc->filesize = -1ULL; // easy to recognize in debugger
1,244,064✔
187
        rc->deactivate();
1,244,064✔
188
    }
1,244,064✔
189

190
    // This method resets the version list to an empty state, then allocates an entry.
191
    // Precondition: This should *only* be done if the caller has established that she
192
    // is the only thread/process that has access to the VersionList. It is currently
193
    // called from init_versioning(), which is called by DB::open() under the
194
    // condition that it is the session initiator and under guard by the control mutex,
195
    // thus ensuring the precondition. It is also called from compact() in a similar situation.
196
    // It is most likely not suited for any other use.
197
    ReadCount& init_versioning(uint64_t top, uint64_t filesize, uint64_t version) noexcept
198
    {
144,495✔
199
        newest = nil;
144,495✔
200
        allocating = 0;
144,495✔
201
        auto t_free = entries;
144,495✔
202
        entries = 0;
144,495✔
203
        reserve(t_free);
144,495✔
204
        return *try_allocate_entry(top, filesize, version);
144,495✔
205
    }
144,495✔
206

207
    void purge_versions(uint64_t& oldest_live_v, TopRefMap& top_refs, bool& any_new_unreachables)
208
    {
1,362,351✔
209
        oldest_live_v = std::numeric_limits<uint64_t>::max();
1,362,351✔
210
        auto oldest_full_v = std::numeric_limits<uint64_t>::max();
1,362,351✔
211
        any_new_unreachables = false;
1,362,351✔
212
        // correct case where an earlier crash may have left the entry at 'allocating' partially initialized:
684,288✔
213
        const auto index_of_newest = newest.load();
1,362,351✔
214
        if (auto a = allocating.load(); a != index_of_newest) {
1,362,351✔
215
            data()[a].deactivate();
×
216
        }
×
217
        // determine fully locked versions - after one of those all versions are considered live.
684,288✔
218
        for (auto* rc = data(); rc < data() + entries; ++rc) {
45,441,405✔
219
            if (!rc->is_active())
44,079,054✔
220
                continue;
40,729,884✔
221
            if (rc->count_full) {
3,349,170✔
222
                if (rc->version < oldest_full_v)
×
223
                    oldest_full_v = rc->version;
×
224
            }
×
225
        }
3,349,170✔
226
        // collect reachable versions and determine oldest live reachable version
684,288✔
227
        // (oldest reachable version is the first entry in the top_refs map, so no need to find it explicitly)
684,288✔
228
        for (auto* rc = data(); rc < data() + entries; ++rc) {
45,443,016✔
229
            if (!rc->is_active())
44,080,665✔
230
                continue;
40,731,588✔
231
            if (rc->count_frozen || rc->count_live || rc->version >= oldest_full_v) {
3,349,077✔
232
                // entry is still reachable
1,061,859✔
233
                top_refs.emplace(rc->version, VersionInfo{to_ref(rc->current_top), to_ref(rc->filesize)});
2,105,547✔
234
            }
2,105,547✔
235
            if (rc->count_live || rc->version >= oldest_full_v) {
3,349,077✔
236
                if (rc->version < oldest_live_v)
1,549,653✔
237
                    oldest_live_v = rc->version;
1,424,130✔
238
            }
1,549,653✔
239
        }
3,349,077✔
240
        // we must have found at least one reachable version
684,288✔
241
        REALM_ASSERT(top_refs.size());
1,362,351✔
242
        // free unreachable entries and determine if we want to trigger backdating
684,288✔
243
        uint64_t oldest_v = top_refs.begin()->first;
1,362,351✔
244
        for (auto* rc = data(); rc < data() + entries; ++rc) {
45,442,674✔
245
            if (!rc->is_active())
44,080,323✔
246
                continue;
40,730,979✔
247
            if (rc->count_frozen == 0 && rc->count_live == 0 && rc->version < oldest_full_v) {
3,349,344✔
248
                // entry is becoming unreachable.
626,094✔
249
                // if it is also younger than a reachable version, then set 'any_new_unreachables' to trigger
626,094✔
250
                // backdating
626,094✔
251
                if (rc->version > oldest_v) {
1,244,067✔
252
                    any_new_unreachables = true;
62,949✔
253
                }
62,949✔
254
                REALM_ASSERT(index_of(*rc) != index_of_newest);
1,244,067✔
255
                free_entry(rc);
1,244,067✔
256
            }
1,244,067✔
257
        }
3,349,344✔
258
        REALM_ASSERT(oldest_v != std::numeric_limits<uint64_t>::max());
1,362,351✔
259
        REALM_ASSERT(oldest_live_v != std::numeric_limits<uint64_t>::max());
1,362,351✔
260
    }
1,362,351✔
261

262
#if REALM_DEBUG
263
    void dump()
264
    {
×
265
        util::format(std::cout, "VersionList has %1 entries: \n", entries);
×
266
        for (auto* rc = data(); rc < data() + entries; ++rc) {
×
267
            util::format(std::cout, "[%1]: version %2, live: %3, full: %4, frozen: %5\n", index_of(*rc), rc->version,
×
268
                         rc->count_live, rc->count_full, rc->count_frozen);
×
269
        }
×
270
    }
×
271
#endif // REALM_DEBUG
272

273
    constexpr static uint32_t nil = (uint32_t)-1;
274
    const static int init_readers_size = 32;
275
    uint32_t entries;
276
    std::atomic<uint32_t> allocating; // atomic for crash safety, not threading
277
    std::atomic<uint32_t> newest;     // atomic for crash safety, not threading
278

279
    // IMPORTANT: The actual data comprising the version list MUST BE PLACED LAST in
280
    // the VersionList structure, as the data area is extended at run time.
281
    // Similarly, the VersionList must be the final element of the SharedInfo structure.
282
    // IMPORTANT II:
283
    // To ensure proper alignment across all platforms, the SharedInfo structure
284
    // should NOT have a stricter alignment requirement than the ReadCount structure.
285
    ReadCount m_data[init_readers_size];
286

287
    // Silence UBSan errors about out-of-bounds reads on m_data by casting to a pointer
288
    ReadCount* data() noexcept
289
    {
160,992,972✔
290
        return m_data;
160,992,972✔
291
    }
160,992,972✔
292
    const ReadCount* data() const noexcept
293
    {
×
294
        return m_data;
×
295
    }
×
296
};
297

298
// Using lambda rather than function so that shared_ptr shared state doesn't need to hold a function pointer.
299
constexpr auto TransactionDeleter = [](Transaction* t) {
2,837,568✔
300
    t->close();
2,837,568✔
301
    delete t;
2,837,568✔
302
};
2,837,568✔
303

304
template <typename... Args>
305
TransactionRef make_transaction_ref(Args&&... args)
306
{
2,848,338✔
307
    return TransactionRef(new Transaction(std::forward<Args>(args)...), TransactionDeleter);
2,848,338✔
308
}
2,848,338✔
309

310
} // anonymous namespace
311

312
namespace realm {
313

314
/// The structure of the contents of the per session `.lock` file. Note that
315
/// this file is transient in that it is recreated/reinitialized at the
316
/// beginning of every session. A session is any sequence of temporally
317
/// overlapping openings of a particular Realm file via DB objects. For
318
/// example, if there are two DB objects, A and B, and the file is
319
/// first opened via A, then opened via B, then closed via A, and finally closed
320
/// via B, then the session streaches from the opening via A to the closing via
321
/// B.
322
///
323
/// IMPORTANT: Remember to bump `g_shared_info_version` if anything is changed
324
/// in the memory layout of this class, or if the meaning of any of the stored
325
/// values change.
326
///
327
/// Members `init_complete`, `shared_info_version`, `size_of_mutex`, and
328
/// `size_of_condvar` may only be modified only while holding an exclusive lock
329
/// on the file, and may be read only while holding a shared (or exclusive) lock
330
/// on the file. All other members (except for the VersionList which has its own mutex)
331
/// may be accessed only while holding a lock on `controlmutex`.
332
///
333
/// SharedInfo must be 8-byte aligned. On 32-bit Apple platforms, mutexes store their
334
/// alignment as part of the mutex state. We're copying the SharedInfo (including
335
/// embedded but alway unlocked mutexes) and it must retain the same alignment
336
/// throughout.
337
struct alignas(8) DB::SharedInfo {
338
    /// Indicates that initialization of the lock file was completed
339
    /// sucessfully.
340
    ///
341
    /// CAUTION: This member must never move or change type, as that would
342
    /// compromize safety of the the session initiation process.
343
    std::atomic<uint8_t> init_complete; // Offset 0
344

345
    /// The size in bytes of a mutex member of SharedInfo. This allows all
346
    /// session participants to be in agreement. Obviously, a size match is not
347
    /// enough to guarantee identical layout internally in the mutex object, but
348
    /// it is hoped that it will catch some (if not most) of the cases where
349
    /// there is a layout discrepancy internally in the mutex object.
350
    uint8_t size_of_mutex; // Offset 1
351

352
    /// Like size_of_mutex, but for condition variable members of SharedInfo.
353
    uint8_t size_of_condvar; // Offset 2
354

355
    /// Set during the critical phase of a commit, when the logs, the VersionList
356
    /// and the database may be out of sync with respect to each other. If a
357
    /// writer crashes during this phase, there is no safe way of continuing
358
    /// with further write transactions. When beginning a write transaction,
359
    /// this must be checked and an exception thrown if set.
360
    ///
361
    /// Note that std::atomic<uint8_t> is guaranteed to have standard layout.
362
    std::atomic<uint8_t> commit_in_critical_phase = {0}; // Offset 3
363

364
    /// The target Realm file format version for the current session. This
365
    /// allows all session participants to be in agreement. It can only differ
366
    /// from what is returned by Group::get_file_format_version() temporarily,
367
    /// and only during the Realm file opening process. If it differs, it means
368
    /// that the file format needs to be upgraded from its current format
369
    /// (Group::get_file_format_version()), the format specified by this member
370
    /// of SharedInfo.
371
    uint8_t file_format_version; // Offset 4
372

373
    /// Stores a value of type Replication::HistoryType. Must match across all
374
    /// session participants.
375
    int8_t history_type; // Offset 5
376

377
    /// The SharedInfo layout version. This allows all session participants to
378
    /// be in agreement. Must be bumped if the layout of the SharedInfo
379
    /// structure is changed. Note, however, that only the part that lies beyond
380
    /// SharedInfoUnchangingLayout can have its layout changed.
381
    ///
382
    /// CAUTION: This member must never move or change type, as that would
383
    /// compromize version agreement checking.
384
    uint16_t shared_info_version = g_shared_info_version; // Offset 6
385

386
    uint16_t durability;           // Offset 8
387
    uint16_t free_write_slots = 0; // Offset 10
388

389
    /// Number of participating shared groups
390
    uint32_t num_participants = 0; // Offset 12
391

392
    /// Latest version number. Guarded by the controlmutex (for lock-free
393
    /// access, use get_version_of_latest_snapshot() instead)
394
    uint64_t latest_version_number; // Offset 16
395

396
    /// Pid of process initiating the session, but only if that process runs
397
    /// with encryption enabled, zero otherwise. This was used to prevent
398
    /// multiprocess encryption until support for that was added.
399
    uint64_t session_initiator_pid = 0; // Offset 24
400

401
    std::atomic<uint64_t> number_of_versions; // Offset 32
402

403
    /// True (1) if there is a sync agent present (a session participant acting
404
    /// as sync client). It is an error to have a session with more than one
405
    /// sync agent. The purpose of this flag is to prevent that from ever
406
    /// happening. If the sync agent crashes and leaves the flag set, the
407
    /// session will need to be restarted (lock file reinitialized) before a new
408
    /// sync agent can be started.
409
    uint8_t sync_agent_present = 0; // Offset 40
410

411
    /// Set when a participant decides to start the daemon, cleared by the
412
    /// daemon when it decides to exit. Participants check during open() and
413
    /// start the daemon if running in async mode.
414
    uint8_t daemon_started = 0; // Offset 41
415

416
    /// Set by the daemon when it is ready to handle commits. Participants must
417
    /// wait during open() on 'daemon_becomes_ready' for this to become true.
418
    /// Cleared by the daemon when it decides to exit.
419
    uint8_t daemon_ready = 0; // Offset 42
420

421
    uint8_t filler_1; // Offset 43
422

423
    /// Stores a history schema version (as returned by
424
    /// Replication::get_history_schema_version()). Must match across all
425
    /// session participants.
426
    uint16_t history_schema_version; // Offset 44
427

428
    uint16_t filler_2; // Offset 46
429

430
    InterprocessMutex::SharedPart shared_writemutex; // Offset 48
431
    InterprocessMutex::SharedPart shared_controlmutex;
432
    InterprocessMutex::SharedPart shared_versionlist_mutex;
433
    InterprocessCondVar::SharedPart room_to_write;
434
    InterprocessCondVar::SharedPart work_to_do;
435
    InterprocessCondVar::SharedPart daemon_becomes_ready;
436
    InterprocessCondVar::SharedPart new_commit_available;
437
    InterprocessCondVar::SharedPart pick_next_writer;
438
    std::atomic<uint32_t> next_ticket;
439
    std::atomic<uint32_t> next_served = 0;
440
    std::atomic<uint64_t> writing_page_offset;
441
    std::atomic<uint64_t> write_counter;
442

443
    // IMPORTANT: The VersionList MUST be the last field in SharedInfo - see above.
444
    VersionList readers;
445

446
    SharedInfo(Durability, Replication::HistoryType, int history_schema_version);
447
    ~SharedInfo() noexcept {}
25,260✔
448

449
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version)
450
    {
144,495✔
451
        // Create our first versioning entry:
70,836✔
452
        readers.init_versioning(top_ref, file_size, initial_version);
144,495✔
453
    }
144,495✔
454
};
455

456

457
DB::SharedInfo::SharedInfo(Durability dura, Replication::HistoryType ht, int hsv)
458
    : size_of_mutex(sizeof(shared_writemutex))
459
    , size_of_condvar(sizeof(room_to_write))
460
    , shared_writemutex()   // Throws
461
    , shared_controlmutex() // Throws
462
{
142,728✔
463
    durability = static_cast<uint16_t>(dura); // durability level is fixed from creation
142,728✔
464
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_type)>(ht + 0));
142,728✔
465
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_schema_version)>(hsv));
142,728✔
466
    history_type = ht;
142,728✔
467
    history_schema_version = static_cast<uint16_t>(hsv);
142,728✔
468
    InterprocessCondVar::init_shared_part(new_commit_available); // Throws
142,728✔
469
    InterprocessCondVar::init_shared_part(pick_next_writer);     // Throws
142,728✔
470
    next_ticket = 0;
142,728✔
471

70,272✔
472
// IMPORTANT: The offsets, types (, and meanings) of these members must
70,272✔
473
// never change, not even when the SharedInfo layout version is bumped. The
70,272✔
474
// eternal constancy of this part of the layout is what ensures that a
70,272✔
475
// joining session participant can reliably verify that the actual format is
70,272✔
476
// as expected.
70,272✔
477
#ifndef _WIN32
142,728✔
478
#pragma GCC diagnostic push
142,728✔
479
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
142,728✔
480
#endif
142,728✔
481
    static_assert(offsetof(SharedInfo, init_complete) == 0 && ATOMIC_BOOL_LOCK_FREE == 2 &&
142,728✔
482
                      std::is_same<decltype(init_complete), std::atomic<uint8_t>>::value &&
142,728✔
483
                      offsetof(SharedInfo, shared_info_version) == 6 &&
142,728✔
484
                      std::is_same<decltype(shared_info_version), uint16_t>::value,
142,728✔
485
                  "Forbidden change in SharedInfo layout");
142,728✔
486

70,272✔
487
    // Try to catch some of the memory layout changes that requires bumping of
70,272✔
488
    // the SharedInfo file format version (shared_info_version).
70,272✔
489
    static_assert(
142,728✔
490
        offsetof(SharedInfo, size_of_mutex) == 1 && std::is_same<decltype(size_of_mutex), uint8_t>::value &&
142,728✔
491
            offsetof(SharedInfo, size_of_condvar) == 2 && std::is_same<decltype(size_of_condvar), uint8_t>::value &&
142,728✔
492
            offsetof(SharedInfo, commit_in_critical_phase) == 3 &&
142,728✔
493
            std::is_same<decltype(commit_in_critical_phase), std::atomic<uint8_t>>::value &&
142,728✔
494
            offsetof(SharedInfo, file_format_version) == 4 &&
142,728✔
495
            std::is_same<decltype(file_format_version), uint8_t>::value && offsetof(SharedInfo, history_type) == 5 &&
142,728✔
496
            std::is_same<decltype(history_type), int8_t>::value && offsetof(SharedInfo, durability) == 8 &&
142,728✔
497
            std::is_same<decltype(durability), uint16_t>::value && offsetof(SharedInfo, free_write_slots) == 10 &&
142,728✔
498
            std::is_same<decltype(free_write_slots), uint16_t>::value &&
142,728✔
499
            offsetof(SharedInfo, num_participants) == 12 &&
142,728✔
500
            std::is_same<decltype(num_participants), uint32_t>::value &&
142,728✔
501
            offsetof(SharedInfo, latest_version_number) == 16 &&
142,728✔
502
            std::is_same<decltype(latest_version_number), uint64_t>::value &&
142,728✔
503
            offsetof(SharedInfo, session_initiator_pid) == 24 &&
142,728✔
504
            std::is_same<decltype(session_initiator_pid), uint64_t>::value &&
142,728✔
505
            offsetof(SharedInfo, number_of_versions) == 32 &&
142,728✔
506
            std::is_same<decltype(number_of_versions), std::atomic<uint64_t>>::value &&
142,728✔
507
            offsetof(SharedInfo, sync_agent_present) == 40 &&
142,728✔
508
            std::is_same<decltype(sync_agent_present), uint8_t>::value &&
142,728✔
509
            offsetof(SharedInfo, daemon_started) == 41 && std::is_same<decltype(daemon_started), uint8_t>::value &&
142,728✔
510
            offsetof(SharedInfo, daemon_ready) == 42 && std::is_same<decltype(daemon_ready), uint8_t>::value &&
142,728✔
511
            offsetof(SharedInfo, filler_1) == 43 && std::is_same<decltype(filler_1), uint8_t>::value &&
142,728✔
512
            offsetof(SharedInfo, history_schema_version) == 44 &&
142,728✔
513
            std::is_same<decltype(history_schema_version), uint16_t>::value && offsetof(SharedInfo, filler_2) == 46 &&
142,728✔
514
            std::is_same<decltype(filler_2), uint16_t>::value && offsetof(SharedInfo, shared_writemutex) == 48 &&
142,728✔
515
            std::is_same<decltype(shared_writemutex), InterprocessMutex::SharedPart>::value,
142,728✔
516
        "Caught layout change requiring SharedInfo file format bumping");
142,728✔
517
    static_assert(std::atomic<uint64_t>::is_always_lock_free);
142,728✔
518
#ifndef _WIN32
142,728✔
519
#pragma GCC diagnostic pop
142,728✔
520
#endif
142,728✔
521
}
142,728✔
522

523
class DB::VersionManager {
524
public:
525
    VersionManager(util::InterprocessMutex& mutex)
526
        : m_mutex(mutex)
527
    {
168,855✔
528
    }
168,855✔
529
    virtual ~VersionManager() {}
168,855✔
530

531
    void cleanup_versions(uint64_t& oldest_live_version, TopRefMap& top_refs, bool& any_new_unreachables)
532
        REQUIRES(!m_info_mutex)
533
    {
1,362,384✔
534
        std::lock_guard lock(m_mutex);
1,362,384✔
535
        util::CheckedLockGuard info_lock(m_info_mutex);
1,362,384✔
536
        ensure_reader_mapping();
1,362,384✔
537
        m_info->readers.purge_versions(oldest_live_version, top_refs, any_new_unreachables);
1,362,384✔
538
    }
1,362,384✔
539

540
    version_type get_newest_version() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
541
    {
1,362,369✔
542
        return get_version_id_of_latest_snapshot().version;
1,362,369✔
543
    }
1,362,369✔
544

545
    VersionID get_version_id_of_latest_snapshot() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
546
    {
1,967,466✔
547
        {
1,967,466✔
548
            // First check the local cache. This is an unlocked read, so it may
1,080,009✔
549
            // race with adding a new version. If this happens we'll either see
1,080,009✔
550
            // a stale value (acceptable for a racing write on one thread and
1,080,009✔
551
            // a read on another), or a new value which is guaranteed to not
1,080,009✔
552
            // be an active index in the local cache.
1,080,009✔
553
            util::CheckedLockGuard lock(m_local_readers_mutex);
1,967,466✔
554
            util::CheckedLockGuard info_lock(m_info_mutex);
1,967,466✔
555
            auto index = m_info->readers.newest.load();
1,967,466✔
556
            if (index < m_local_readers.size()) {
1,967,475✔
557
                auto& r = m_local_readers[index];
1,967,475✔
558
                if (r.is_active()) {
1,967,475✔
559
                    return {r.version, index};
1,962,336✔
560
                }
1,962,336✔
561
            }
5,130✔
562
        }
5,130✔
563

2,817✔
564
        std::lock_guard lock(m_mutex);
5,130✔
565
        util::CheckedLockGuard info_lock(m_info_mutex);
5,130✔
566
        auto index = m_info->readers.newest.load();
5,130✔
567
        ensure_reader_mapping(index);
5,130✔
568
        return {m_info->readers.get(index).version, index};
5,130✔
569
    }
5,130✔
570

571
    void release_read_lock(const ReadLockInfo& read_lock) REQUIRES(!m_local_readers_mutex, !m_info_mutex)
572
    {
6,085,593✔
573
        {
6,085,593✔
574
            util::CheckedLockGuard lock(m_local_readers_mutex);
6,085,593✔
575
            REALM_ASSERT(read_lock.m_reader_idx < m_local_readers.size());
6,085,593✔
576
            auto& r = m_local_readers[read_lock.m_reader_idx];
6,085,593✔
577
            auto& f = field_for_type(r, read_lock.m_type);
6,085,593✔
578
            REALM_ASSERT(f > 0);
6,085,593✔
579
            if (--f > 0)
6,085,593✔
580
                return;
2,988,093✔
581
            if (r.count_live == 0 && r.count_full == 0 && r.count_frozen == 0)
3,097,500✔
582
                r.version = 0;
3,072,822✔
583
        }
3,097,500✔
584

1,545,276✔
585
        std::lock_guard lock(m_mutex);
3,097,500✔
586
        util::CheckedLockGuard info_lock(m_info_mutex);
3,097,500✔
587
        // we should not need to call ensure_full_reader_mapping,
1,545,276✔
588
        // since releasing a read lock means it has been grabbed
1,545,276✔
589
        // earlier - and hence must reside in mapped memory:
1,545,276✔
590
        REALM_ASSERT(read_lock.m_reader_idx < m_local_max_entry);
3,097,500✔
591
        auto& r = m_info->readers.get(read_lock.m_reader_idx);
3,097,500✔
592
        REALM_ASSERT(read_lock.m_version == r.version);
3,097,500✔
593
        --field_for_type(r, read_lock.m_type);
3,097,500✔
594
    }
3,097,500✔
595

596
    ReadLockInfo grab_read_lock(ReadLockInfo::Type type, VersionID version_id = {})
597
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
598
    {
6,085,659✔
599
        ReadLockInfo read_lock;
6,085,659✔
600
        if (try_grab_local_read_lock(read_lock, type, version_id))
6,085,659✔
601
            return read_lock;
2,988,081✔
602

1,545,360✔
603
        {
3,097,578✔
604
            const bool pick_specific = version_id.version != VersionID().version;
3,097,578✔
605
            std::lock_guard lock(m_mutex);
3,097,578✔
606
            util::CheckedLockGuard info_lock(m_info_mutex);
3,097,578✔
607
            auto newest = m_info->readers.newest.load();
3,097,578✔
608
            REALM_ASSERT(newest != VersionList::nil);
3,097,578✔
609
            read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
3,090,963✔
610
            ensure_reader_mapping((unsigned int)read_lock.m_reader_idx);
3,097,578✔
611
            bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
3,097,578✔
612
            auto& r = m_info->readers.get(read_lock.m_reader_idx);
3,097,578✔
613
            if (pick_specific && version_id.version != r.version)
3,097,578✔
614
                throw BadVersion(version_id.version);
72✔
615
            if (!picked_newest) {
3,097,506✔
616
                if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
702✔
617
                    throw BadVersion(version_id.version);
×
618
                if (type != ReadLockInfo::Frozen && r.count_live == 0)
702✔
619
                    throw BadVersion(version_id.version);
60✔
620
            }
3,097,446✔
621
            populate_read_lock(read_lock, r, type);
3,097,446✔
622
        }
3,097,446✔
623

1,545,294✔
624
        {
3,097,446✔
625
            util::CheckedLockGuard local_lock(m_local_readers_mutex);
3,097,446✔
626
            grow_local_cache(read_lock.m_reader_idx + 1);
3,097,446✔
627
            auto& r2 = m_local_readers[read_lock.m_reader_idx];
3,097,446✔
628
            if (!r2.is_active()) {
3,097,446✔
629
                r2.version = read_lock.m_version;
3,072,657✔
630
                r2.filesize = read_lock.m_file_size;
3,072,657✔
631
                r2.current_top = read_lock.m_top_ref;
3,072,657✔
632
                r2.count_full = r2.count_live = r2.count_frozen = 0;
3,072,657✔
633
            }
3,072,657✔
634
            REALM_ASSERT_EX(field_for_type(r2, type) == 0, type, r2.count_full, r2.count_live, r2.count_frozen);
3,097,446✔
635
            field_for_type(r2, type) = 1;
3,097,446✔
636
        }
3,097,446✔
637

1,545,294✔
638
        return read_lock;
3,097,446✔
639
    }
3,097,446✔
640

641
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version) REQUIRES(!m_info_mutex)
642
    {
119,235✔
643
        std::lock_guard lock(m_mutex);
119,235✔
644
        util::CheckedLockGuard info_lock(m_info_mutex);
119,235✔
645
        m_info->init_versioning(top_ref, file_size, initial_version);
119,235✔
646
    }
119,235✔
647

648
    void add_version(ref_type new_top_ref, size_t new_file_size, uint64_t new_version) REQUIRES(!m_info_mutex)
649
    {
1,362,357✔
650
        std::lock_guard lock(m_mutex);
1,362,357✔
651
        util::CheckedLockGuard info_lock(m_info_mutex);
1,362,357✔
652
        ensure_reader_mapping();
1,362,357✔
653
        if (m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version)) {
1,362,369✔
654
            return;
1,362,342✔
655
        }
1,362,342✔
656
        // allocation failed, expand VersionList (and lockfile) and retry
2,147,483,647✔
657
        auto entries = m_info->readers.capacity();
2,147,483,674✔
658
        auto new_entries = entries + 32;
2,147,483,674✔
659
        expand_version_list(new_entries);
2,147,483,674✔
660
        m_local_max_entry = new_entries;
2,147,483,674✔
661
        m_info->readers.reserve(new_entries);
2,147,483,674✔
662
        auto success = m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version);
2,147,483,674✔
663
        REALM_ASSERT_EX(success, new_entries, new_version);
2,147,483,674✔
664
    }
2,147,483,674✔
665

666

667
private:
668
    void grow_local_cache(size_t new_size) REQUIRES(m_local_readers_mutex)
669
    {
3,097,590✔
670
        if (new_size > m_local_readers.size())
3,097,590✔
671
            m_local_readers.resize(new_size, VersionList::ReadCount{});
290,274✔
672
    }
3,097,590✔
673

674
    void populate_read_lock(ReadLockInfo& read_lock, VersionList::ReadCount& r, ReadLockInfo::Type type)
675
    {
6,085,326✔
676
        ++field_for_type(r, type);
6,085,326✔
677
        read_lock.m_type = type;
6,085,326✔
678
        read_lock.m_version = r.version;
6,085,326✔
679
        read_lock.m_top_ref = static_cast<ref_type>(r.current_top);
6,085,326✔
680
        read_lock.m_file_size = static_cast<size_t>(r.filesize);
6,085,326✔
681
    }
6,085,326✔
682

683
    bool try_grab_local_read_lock(ReadLockInfo& read_lock, ReadLockInfo::Type type, VersionID version_id)
684
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
685
    {
6,085,605✔
686
        const bool pick_specific = version_id.version != VersionID().version;
6,085,605✔
687
        auto index = version_id.index;
6,085,605✔
688
        if (!pick_specific) {
6,085,605✔
689
            util::CheckedLockGuard lock(m_info_mutex);
5,803,962✔
690
            index = m_info->readers.newest.load();
5,803,962✔
691
        }
5,803,962✔
692
        util::CheckedLockGuard local_lock(m_local_readers_mutex);
6,085,605✔
693
        if (index >= m_local_readers.size())
6,085,605✔
694
            return false;
290,280✔
695

3,783,390✔
696
        auto& r = m_local_readers[index];
5,795,325✔
697
        if (!r.is_active())
5,795,325✔
698
            return false;
2,782,578✔
699
        if (pick_specific && r.version != version_id.version)
3,012,747✔
700
            return false;
×
701
        if (field_for_type(r, type) == 0)
3,012,747✔
702
            return false;
24,861✔
703

2,380,734✔
704
        read_lock.m_reader_idx = index;
2,987,886✔
705
        populate_read_lock(read_lock, r, type);
2,987,886✔
706
        return true;
2,987,886✔
707
    }
2,987,886✔
708

709
    static uint32_t& field_for_type(VersionList::ReadCount& r, ReadLockInfo::Type type)
710
    {
24,471,699✔
711
        switch (type) {
24,471,699✔
712
            case ReadLockInfo::Frozen:
168,867✔
713
                return r.count_frozen;
168,867✔
714
            case ReadLockInfo::Live:
24,302,754✔
715
                return r.count_live;
24,302,754✔
716
            case ReadLockInfo::Full:
✔
717
                return r.count_full;
×
718
            default:
✔
719
                REALM_UNREACHABLE(); // silence a warning
×
720
        }
24,471,699✔
721
    }
24,471,699✔
722

723
    void mark_page_for_writing(uint64_t page_offset) REQUIRES(!m_info_mutex)
724
    {
1,602✔
725
        util::CheckedLockGuard info_lock(m_info_mutex);
1,602✔
726
        m_info->writing_page_offset = page_offset + 1;
1,602✔
727
        m_info->write_counter++;
1,602✔
728
    }
1,602✔
729
    void clear_writing_marker() REQUIRES(!m_info_mutex)
730
    {
1,602✔
731
        util::CheckedLockGuard info_lock(m_info_mutex);
1,602✔
732
        m_info->write_counter++;
1,602✔
733
        m_info->writing_page_offset = 0;
1,602✔
734
    }
1,602✔
735
    // returns false if no page is marked.
736
    // if a page is marked, returns true and optionally the offset of the page marked for writing
737
    // in all cases returns optionally the write counter
738
    bool observe_writer(uint64_t* page_offset, uint64_t* write_counter) REQUIRES(!m_info_mutex)
739
    {
90✔
740
        util::CheckedLockGuard info_lock(m_info_mutex);
90✔
741
        if (write_counter) {
90✔
742
            *write_counter = m_info->write_counter;
90✔
743
        }
90✔
744
        uint64_t marked = m_info->writing_page_offset;
90✔
745
        if (marked && page_offset) {
90!
746
            *page_offset = marked - 1;
×
747
        }
×
748
        return marked != 0;
90✔
749
    }
90✔
750

751
protected:
752
    util::InterprocessMutex& m_mutex;
753
    util::CheckedMutex m_local_readers_mutex;
754
    std::vector<VersionList::ReadCount> m_local_readers GUARDED_BY(m_local_readers_mutex);
755

756
    util::CheckedMutex m_info_mutex;
757
    unsigned int m_local_max_entry GUARDED_BY(m_info_mutex) = 0;
758
    SharedInfo* m_info GUARDED_BY(m_info_mutex) = nullptr;
759

760
    virtual void ensure_reader_mapping(unsigned int required = -1) REQUIRES(m_info_mutex) = 0;
761
    virtual void expand_version_list(unsigned new_entries) REQUIRES(m_info_mutex) = 0;
762
    friend class DB::EncryptionMarkerObserver;
763
};
764

765
class DB::FileVersionManager final : public DB::VersionManager {
766
public:
767
    FileVersionManager(File& file, util::InterprocessMutex& mutex)
768
        : VersionManager(mutex)
769
        , m_file(file)
770
    {
143,595✔
771
        size_t size = 0, required_size = sizeof(SharedInfo);
143,595✔
772
        while (size < required_size) {
287,190✔
773
            // Map the file without the lock held. This could result in the
70,542✔
774
            // mapping being too small and having to remap if the file is grown
70,542✔
775
            // concurrently, but if this is the case we should always see a bigger
70,542✔
776
            // size the next time.
70,542✔
777
            auto new_size = static_cast<size_t>(m_file.get_size());
143,595✔
778
            REALM_ASSERT(new_size > size);
143,595✔
779
            size = new_size;
143,595✔
780
            m_reader_map.remap(m_file, File::access_ReadWrite, size, File::map_NoSync);
143,595✔
781
            m_info = m_reader_map.get_addr();
143,595✔
782

70,542✔
783
            std::lock_guard lock(m_mutex);
143,595✔
784
            m_local_max_entry = m_info->readers.capacity();
143,595✔
785
            required_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry);
143,595✔
786
            REALM_ASSERT(required_size >= size);
143,595✔
787
        }
143,595✔
788
    }
143,595✔
789

790
    void expand_version_list(unsigned new_entries) override REQUIRES(m_info_mutex)
791
    {
18✔
792
        size_t new_info_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(new_entries);
18✔
793
        m_file.prealloc(new_info_size);                                          // Throws
18✔
794
        m_reader_map.remap(m_file, util::File::access_ReadWrite, new_info_size); // Throws
18✔
795
        m_info = m_reader_map.get_addr();
18✔
796
    }
18✔
797

798
private:
799
    void ensure_reader_mapping(unsigned int required = -1) override REQUIRES(m_info_mutex)
800
    {
5,494,419✔
801
        using _impl::SimulatedFailure;
5,494,419✔
802
        SimulatedFailure::trigger(SimulatedFailure::shared_group__grow_reader_mapping); // Throws
5,494,419✔
803

2,750,367✔
804
        if (required < m_local_max_entry)
5,494,419✔
805
            return;
2,932,740✔
806

1,287,192✔
807
        auto new_max_entry = m_info->readers.capacity();
2,561,679✔
808
        if (new_max_entry > m_local_max_entry) {
2,561,679✔
809
            // handle mapping expansion if required
810
            size_t info_size = sizeof(DB::SharedInfo) + m_info->readers.compute_required_space(new_max_entry);
×
811
            m_reader_map.remap(m_file, util::File::access_ReadWrite, info_size); // Throws
×
812
            m_local_max_entry = new_max_entry;
×
813
            m_info = m_reader_map.get_addr();
×
814
        }
×
815
    }
2,561,679✔
816

817
    File& m_file;
818
    File::Map<DB::SharedInfo> m_reader_map;
819

820
    friend class DB::EncryptionMarkerObserver;
821
};
822

823
// adapter class for marking/observing encrypted writes
824
class DB::EncryptionMarkerObserver : public util::WriteMarker, public util::WriteObserver {
825
public:
826
    EncryptionMarkerObserver(DB::VersionManager& vm)
827
        : vm(vm)
828
    {
143,595✔
829
    }
143,595✔
830
    bool no_concurrent_writer_seen() override
831
    {
90✔
832
        uint64_t tmp_write_count;
90✔
833
        auto page_may_have_been_written = vm.observe_writer(nullptr, &tmp_write_count);
90✔
834
        if (tmp_write_count != last_seen_count) {
90✔
835
            page_may_have_been_written = true;
×
836
            last_seen_count = tmp_write_count;
×
837
        }
×
838
        if (page_may_have_been_written) {
90✔
839
            calls_since_last_writer_observed = 0;
×
840
            return false;
×
841
        }
×
842
        ++calls_since_last_writer_observed;
90✔
843
        constexpr size_t max_calls = 5; // an arbitrary handful, > 1
90✔
844
        return (calls_since_last_writer_observed >= max_calls);
90✔
845
    }
90✔
846
    void mark(uint64_t pos) override
847
    {
1,602✔
848
        vm.mark_page_for_writing(pos);
1,602✔
849
    }
1,602✔
850
    void unmark() override
851
    {
1,602✔
852
        vm.clear_writing_marker();
1,602✔
853
    }
1,602✔
854

855
private:
856
    DB::VersionManager& vm;
857
    uint64_t last_seen_count = 0;
858
    size_t calls_since_last_writer_observed = 0;
859
};
860

861
class DB::InMemoryVersionManager final : public DB::VersionManager {
862
public:
863
    InMemoryVersionManager(SharedInfo* info, util::InterprocessMutex& mutex)
864
        : VersionManager(mutex)
865
    {
25,260✔
866
        m_info = info;
25,260✔
867
        m_local_max_entry = m_info->readers.capacity();
25,260✔
868
    }
25,260✔
869
    void expand_version_list(unsigned) override
870
    {
×
871
        REALM_ASSERT(false);
×
872
    }
×
873

874
private:
875
    void ensure_reader_mapping(unsigned int) override {}
332,865✔
876
};
877

878
#if REALM_HAVE_STD_FILESYSTEM
879
std::string DBOptions::sys_tmp_dir = std::filesystem::temp_directory_path().string();
880
#else
881
std::string DBOptions::sys_tmp_dir = getenv("TMPDIR") ? getenv("TMPDIR") : "";
882
#endif
883

884
// NOTES ON CREATION AND DESTRUCTION OF SHARED MUTEXES:
885
//
886
// According to the 'process-sharing example' in the POSIX man page
887
// for pthread_mutexattr_init() other processes may continue to use a
888
// process-shared mutex after exit of the process that initialized
889
// it. Also, the example does not contain any call to
890
// pthread_mutex_destroy(), so apparently a process-shared mutex need
891
// not be destroyed at all, nor can it be that a process-shared mutex
892
// is associated with any resources that are local to the initializing
893
// process, because that would imply a leak.
894
//
895
// While it is not explicitly guaranteed in the man page, we shall
896
// assume that is is valid to initialize a process-shared mutex twice
897
// without an intervening call to pthread_mutex_destroy(). We need to
898
// be able to reinitialize a process-shared mutex if the first
899
// initializing process crashes and leaves the shared memory in an
900
// undefined state.
901

902
void DB::open(const std::string& path, bool no_create_file, const DBOptions& options)
903
{
143,484✔
904
    // Exception safety: Since do_open() is called from constructors, if it
70,515✔
905
    // throws, it must leave the file closed.
70,515✔
906
    using util::format;
143,484✔
907

70,515✔
908
    REALM_ASSERT(!is_attached());
143,484✔
909
    REALM_ASSERT(path.size());
143,484✔
910

70,515✔
911
    m_db_path = path;
143,484✔
912
    m_path_hash = StringData(path).hash() & 0xffff;
143,484✔
913
    set_logger(options.logger);
143,484✔
914
    if (m_replication) {
143,484✔
915
        m_replication->set_logger(m_logger.get());
118,641✔
916
    }
118,641✔
917
    if (m_logger)
143,484✔
918
        m_logger->log(util::Logger::Level::detail, "Open file: %1", path);
129,003✔
919
    SlabAlloc& alloc = m_alloc;
143,484✔
920
    if (options.is_immutable) {
143,484✔
921
        SlabAlloc::Config cfg;
180✔
922
        cfg.read_only = true;
180✔
923
        cfg.no_create = true;
180✔
924
        cfg.encryption_key = options.encryption_key;
180✔
925
        auto top_ref = alloc.attach_file(path, cfg);
180✔
926
        SlabAlloc::DetachGuard dg(alloc);
180✔
927
        Group::read_only_version_check(alloc, top_ref, path);
180✔
928
        m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, m_alloc.get_baseline());
180✔
929
        dg.release();
180✔
930
        return;
180✔
931
    }
180✔
932
    std::string lockfile_path = get_core_file(path, CoreFileType::Lock);
143,304✔
933
    std::string coordination_dir = get_core_file(path, CoreFileType::Management);
143,304✔
934
    std::string lockfile_prefix = coordination_dir + "/access_control";
143,304✔
935
    m_alloc.set_read_only(false);
143,304✔
936

70,425✔
937
    Replication::HistoryType openers_hist_type = Replication::hist_None;
143,304✔
938
    int openers_hist_schema_version = 0;
143,304✔
939
    if (Replication* repl = get_replication()) {
143,304✔
940
        openers_hist_type = repl->get_history_type();
118,641✔
941
        openers_hist_schema_version = repl->get_history_schema_version();
118,641✔
942
    }
118,641✔
943

70,425✔
944
    int current_file_format_version;
143,304✔
945
    int target_file_format_version;
143,304✔
946
    int stored_hist_schema_version = -1; // Signals undetermined
143,304✔
947

70,425✔
948
    int retries_left = 10; // number of times to retry before throwing exceptions
143,304✔
949
    // in case there is something wrong with the .lock file... the retries allows
70,425✔
950
    // us to pick a new lockfile initializer in case the first one crashes without
70,425✔
951
    // completing the initialization
70,425✔
952
    std::default_random_engine random_gen;
143,304✔
953
    for (;;) {
227,502✔
954

105,261✔
955
        // if we're retrying, we first wait a random time
105,261✔
956
        if (retries_left < 10) {
227,502✔
957
            if (retries_left == 9) { // we seed it from a true random source if possible
240✔
958
                std::random_device r;
24✔
959
                random_gen.seed(r());
24✔
960
            }
24✔
961
            int max_delay = (10 - retries_left) * 10;
240✔
962
            int msecs = random_gen() % max_delay;
240✔
963
            millisleep(msecs);
240✔
964
        }
240✔
965

105,261✔
966
        m_file.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
227,502✔
967
        File::CloseGuard fcg(m_file);
227,502✔
968
        m_file.set_fifo_path(coordination_dir, "lock.fifo");
227,502✔
969

105,261✔
970
        if (m_file.try_rw_lock_exclusive()) { // Throws
227,502✔
971
            File::UnlockGuard ulg(m_file);
117,468✔
972

57,642✔
973
            // We're alone in the world, and it is Ok to initialize the
57,642✔
974
            // file. Start by truncating the file to zero to ensure that
57,642✔
975
            // the following resize will generate a file filled with zeroes.
57,642✔
976
            //
57,642✔
977
            // This will in particular set m_init_complete to 0.
57,642✔
978
            m_file.resize(0);
117,468✔
979
            m_file.prealloc(sizeof(SharedInfo));
117,468✔
980

57,642✔
981
            // We can crash anytime during this process. A crash prior to
57,642✔
982
            // the first resize could allow another thread which could not
57,642✔
983
            // get the exclusive lock because we hold it, and hence were
57,642✔
984
            // waiting for the shared lock instead, to observe and use an
57,642✔
985
            // old lock file.
57,642✔
986
            m_file_map.map(m_file, File::access_ReadWrite, sizeof(SharedInfo), File::map_NoSync); // Throws
117,468✔
987
            File::UnmapGuard fug(m_file_map);
117,468✔
988
            SharedInfo* info = m_file_map.get_addr();
117,468✔
989

57,642✔
990
            new (info) SharedInfo{options.durability, openers_hist_type, openers_hist_schema_version}; // Throws
117,468✔
991

57,642✔
992
            // Because init_complete is an std::atomic, it's guaranteed not to be observable by others
57,642✔
993
            // as being 1 before the entire SharedInfo header has been written.
57,642✔
994
            info->init_complete = 1;
117,468✔
995
        }
117,468✔
996

105,261✔
997
// We hold the shared lock from here until we close the file!
105,261✔
998
#if REALM_PLATFORM_APPLE
122,241✔
999
        // macOS has a bug which can cause a hang waiting to obtain a lock, even
1000
        // if the lock is already open in shared mode, so we work around it by
1001
        // busy waiting. This should occur only briefly during session initialization.
1002
        while (!m_file.try_rw_lock_shared()) {
127,761✔
1003
            sched_yield();
5,520✔
1004
        }
5,520✔
1005
#else
1006
        m_file.rw_lock_shared(); // Throws
105,261✔
1007
#endif
105,261✔
1008
        File::UnlockGuard ulg(m_file);
227,502✔
1009

105,261✔
1010
        // The coordination/management dir is created as a side effect of the lock
105,261✔
1011
        // operation above if needed for lock emulation. But it may also be needed
105,261✔
1012
        // for other purposes, so make sure it exists.
105,261✔
1013
        // in worst case there'll be a race on creating this directory.
105,261✔
1014
        // This should be safe but a waste of resources.
105,261✔
1015
        // Unfortunately it cannot be created at an earlier point, because
105,261✔
1016
        // it may then be deleted during the above lock_shared() operation.
105,261✔
1017
        try_make_dir(coordination_dir);
227,502✔
1018

105,261✔
1019
        // If the file is not completely initialized at this point in time, the
105,261✔
1020
        // preceeding initialization attempt must have failed. We know that an
105,261✔
1021
        // initialization process was in progress, because this thread (or
105,261✔
1022
        // process) failed to get an exclusive lock on the file. Because this
105,261✔
1023
        // thread (or process) currently has a shared lock on the file, we also
105,261✔
1024
        // know that the initialization process can no longer be in progress, so
105,261✔
1025
        // the initialization must either have completed or failed at this time.
105,261✔
1026

105,261✔
1027
        // The file is taken to be completely initialized if it is large enough
105,261✔
1028
        // to contain the `init_complete` field, and `init_complete` is true. If
105,261✔
1029
        // the file was not completely initialized, this thread must give up its
105,261✔
1030
        // shared lock, and retry to become the initializer. Eventually, one of
105,261✔
1031
        // two things must happen; either this thread, or another thread
105,261✔
1032
        // succeeds in completing the initialization, or this thread becomes the
105,261✔
1033
        // initializer, and fails the initialization. In either case, the retry
105,261✔
1034
        // loop will eventually terminate.
105,261✔
1035

105,261✔
1036
        // An empty file is (and was) never a successfully initialized file.
105,261✔
1037
        size_t info_size = sizeof(SharedInfo);
227,502✔
1038
        {
227,502✔
1039
            auto file_size = m_file.get_size();
227,502✔
1040
            if (util::int_less_than(file_size, info_size)) {
227,502✔
1041
                if (file_size == 0)
83,700✔
1042
                    continue; // Retry
53,835✔
1043
                info_size = size_t(file_size);
29,865✔
1044
            }
29,865✔
1045
        }
227,502✔
1046

105,261✔
1047
        // Map the initial section of the SharedInfo file that corresponds to
105,261✔
1048
        // the SharedInfo struct, or less if the file is smaller. We know that
105,261✔
1049
        // we have at least one byte, and that is enough to read the
105,261✔
1050
        // `init_complete` flag.
105,261✔
1051
        m_file_map.map(m_file, File::access_ReadWrite, info_size, File::map_NoSync);
197,985✔
1052
        File::UnmapGuard fug_1(m_file_map);
173,667✔
1053
        SharedInfo* info = m_file_map.get_addr();
173,667✔
1054

80,943✔
1055
#ifndef _WIN32
173,667✔
1056
#pragma GCC diagnostic push
173,667✔
1057
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
173,667✔
1058
#endif
173,667✔
1059
        static_assert(offsetof(SharedInfo, init_complete) + sizeof SharedInfo::init_complete <= 1,
173,667✔
1060
                      "Unexpected position or size of SharedInfo::init_complete");
173,667✔
1061
#ifndef _WIN32
173,667✔
1062
#pragma GCC diagnostic pop
173,667✔
1063
#endif
173,667✔
1064
        if (info->init_complete == 0)
173,667✔
1065
            continue;
29,799✔
1066
        REALM_ASSERT(info->init_complete == 1);
143,868✔
1067

70,680✔
1068
        // At this time, we know that the file was completely initialized, but
70,680✔
1069
        // we still need to verify that is was initialized with the memory
70,680✔
1070
        // layout expected by this session participant. We could find that it is
70,680✔
1071
        // initializaed with a different memory layout if other concurrent
70,680✔
1072
        // session participants use different versions of the core library.
70,680✔
1073
        if (info_size < sizeof(SharedInfo)) {
143,868✔
1074
            if (retries_left) {
66✔
1075
                --retries_left;
60✔
1076
                continue;
60✔
1077
            }
60✔
1078
            throw IncompatibleLockFile(path, format("Architecture mismatch: SharedInfo size is %1 but should be %2.",
6✔
1079
                                                    info_size, sizeof(SharedInfo)));
6✔
1080
        }
6✔
1081
        if (info->shared_info_version != g_shared_info_version) {
143,802✔
1082
            if (retries_left) {
66✔
1083
                --retries_left;
60✔
1084
                continue;
60✔
1085
            }
60✔
1086
            throw IncompatibleLockFile(path, format("Version mismatch: SharedInfo version is %1 but should be %2.",
6✔
1087
                                                    info->shared_info_version, g_shared_info_version));
6✔
1088
        }
6✔
1089
        // Validate compatible sizes of mutex and condvar types. Sizes of all
70,614✔
1090
        // other fields are architecture independent, so if condvar and mutex
70,614✔
1091
        // sizes match, the entire struct matches. The offsets of
70,614✔
1092
        // `size_of_mutex` and `size_of_condvar` are known to be as expected due
70,614✔
1093
        // to the preceeding check in `shared_info_version`.
70,614✔
1094
        if (info->size_of_mutex != sizeof info->shared_controlmutex) {
143,736✔
1095
            if (retries_left) {
66✔
1096
                --retries_left;
60✔
1097
                continue;
60✔
1098
            }
60✔
1099
            throw IncompatibleLockFile(path, format("Architecture mismatch: Mutex size is %1 but should be %2.",
6✔
1100
                                                    info->size_of_mutex, sizeof(info->shared_controlmutex)));
6✔
1101
        }
6✔
1102

70,581✔
1103
        if (info->size_of_condvar != sizeof info->room_to_write) {
143,670✔
1104
            if (retries_left) {
66✔
1105
                --retries_left;
60✔
1106
                continue;
60✔
1107
            }
60✔
1108
            throw IncompatibleLockFile(
6✔
1109
                path, format("Architecture mismatch: Condition variable size is %1 but should be %2.",
6✔
1110
                             info->size_of_condvar, sizeof(info->room_to_write)));
6✔
1111
        }
6✔
1112
        m_writemutex.set_shared_part(info->shared_writemutex, lockfile_prefix, "write");
143,604✔
1113
        m_controlmutex.set_shared_part(info->shared_controlmutex, lockfile_prefix, "control");
143,604✔
1114
        m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, lockfile_prefix, "versions");
143,604✔
1115

70,548✔
1116
        // even though fields match wrt alignment and size, there may still be incompatibilities
70,548✔
1117
        // between implementations, so lets ask one of the mutexes if it thinks it'll work.
70,548✔
1118
        if (!m_controlmutex.is_valid()) {
143,604✔
1119
            throw IncompatibleLockFile(
×
1120
                path, "Control mutex is invalid. This suggests that incompatible pthread libraries are in use.");
×
1121
        }
×
1122

70,548✔
1123
        // OK! lock file appears valid. We can now continue operations under the protection
70,548✔
1124
        // of the controlmutex. The controlmutex protects the following activities:
70,548✔
1125
        // - attachment of the database file
70,548✔
1126
        // - start of the async daemon
70,548✔
1127
        // - stop of the async daemon
70,548✔
1128
        // - restore of a backup, if desired
70,548✔
1129
        // - backup of the realm file in preparation of file format upgrade
70,548✔
1130
        // - DB beginning/ending a session
70,548✔
1131
        // - Waiting for and signalling database changes
70,548✔
1132
        {
143,604✔
1133
            std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
143,604✔
1134
            auto version_manager = std::make_unique<FileVersionManager>(m_file, m_versionlist_mutex);
143,604✔
1135

70,548✔
1136
            // proceed to initialize versioning and other metadata information related to
70,548✔
1137
            // the database. Also create the database if we're beginning a new session
70,548✔
1138
            bool begin_new_session = (info->num_participants == 0);
143,604✔
1139
            SlabAlloc::Config cfg;
143,604✔
1140
            cfg.session_initiator = begin_new_session;
143,604✔
1141
            cfg.is_shared = true;
143,604✔
1142
            cfg.read_only = false;
143,604✔
1143
            cfg.skip_validate = !begin_new_session;
143,604✔
1144
            cfg.disable_sync = options.durability == Durability::MemOnly || options.durability == Durability::Unsafe;
143,604✔
1145

70,548✔
1146
            // only the session initiator is allowed to create the database, all other
70,548✔
1147
            // must assume that it already exists.
70,548✔
1148
            cfg.no_create = (begin_new_session ? no_create_file : true);
131,745✔
1149

70,548✔
1150
            // if we're opening a MemOnly file that isn't already opened by
70,548✔
1151
            // someone else then it's a file which should have been deleted on
70,548✔
1152
            // close previously, but wasn't (perhaps due to the process crashing)
70,548✔
1153
            cfg.clear_file = (options.durability == Durability::MemOnly && begin_new_session);
143,604✔
1154

70,548✔
1155
            cfg.encryption_key = options.encryption_key;
143,604✔
1156
            ref_type top_ref;
143,604✔
1157
            m_marker_observer = std::make_unique<EncryptionMarkerObserver>(*version_manager);
143,604✔
1158
            try {
143,604✔
1159
                top_ref = alloc.attach_file(path, cfg, m_marker_observer.get()); // Throws
143,604✔
1160
            }
143,604✔
1161
            catch (const SlabAlloc::Retry&) {
70,548✔
1162
                // On a SlabAlloc::Retry file mappings are already unmapped, no
1163
                // need to do more
1164
                continue;
×
1165
            }
×
1166

70,503✔
1167
            // Determine target file format version for session (upgrade
70,503✔
1168
            // required if greater than file format version of attached file).
70,503✔
1169
            current_file_format_version = alloc.get_committed_file_format_version();
143,517✔
1170
            target_file_format_version =
143,517✔
1171
                Group::get_target_file_format_version_for_session(current_file_format_version, openers_hist_type);
143,517✔
1172
            BackupHandler backup(path, options.accepted_versions, options.to_be_deleted);
143,517✔
1173
            if (backup.must_restore_from_backup(current_file_format_version)) {
143,517✔
1174
                // we need to unmap before any file ops that'll change the realm
6✔
1175
                // file:
6✔
1176
                // (only strictly needed for Windows)
6✔
1177
                alloc.detach();
12✔
1178
                backup.restore_from_backup();
12✔
1179
                // finally, retry with the restored file instead of the original
6✔
1180
                // one:
6✔
1181
                continue;
12✔
1182
            }
12✔
1183
            backup.cleanup_backups();
143,505✔
1184

70,497✔
1185
            // From here on, if we fail in any way, we must detach the
70,497✔
1186
            // allocator.
70,497✔
1187
            SlabAlloc::DetachGuard alloc_detach_guard(alloc);
143,505✔
1188
            alloc.note_reader_start(this);
143,505✔
1189
            // must come after the alloc detach guard
70,497✔
1190
            auto handler = [this, &alloc]() noexcept {
143,505✔
1191
                alloc.note_reader_end(this);
143,505✔
1192
            };
143,505✔
1193
            auto reader_end_guard = make_scope_exit(handler);
143,505✔
1194

70,497✔
1195
            // Check validity of top array (to give more meaningful errors
70,497✔
1196
            // early)
70,497✔
1197
            if (top_ref) {
143,505✔
1198
                try {
93,117✔
1199
                    alloc.note_reader_start(this);
93,117✔
1200
                    auto reader_end_guard = make_scope_exit([&]() noexcept {
93,117✔
1201
                        alloc.note_reader_end(this);
93,117✔
1202
                    });
93,117✔
1203
                    Array top{alloc};
93,117✔
1204
                    top.init_from_ref(top_ref);
93,117✔
1205
                    Group::validate_top_array(top, alloc);
93,117✔
1206
                }
93,117✔
1207
                catch (const InvalidDatabase& e) {
45,972✔
1208
                    if (e.get_path().empty()) {
×
1209
                        throw InvalidDatabase(e.what(), path);
×
1210
                    }
×
1211
                    throw;
×
1212
                }
×
1213
            }
93,117✔
1214
            if (options.backup_at_file_format_change) {
143,505✔
1215
                backup.backup_realm_if_needed(current_file_format_version, target_file_format_version);
143,496✔
1216
            }
143,496✔
1217
            using gf = _impl::GroupFriend;
143,505✔
1218
            bool file_format_ok;
143,505✔
1219
            // In shared mode (Realm file opened via a DB instance) this
70,497✔
1220
            // version of the core library is able to open Realms using file format
70,497✔
1221
            // versions listed below. Please see Group::get_file_format_version() for
70,497✔
1222
            // information about the individual file format versions.
70,497✔
1223
            if (current_file_format_version == 0) {
143,505✔
1224
                file_format_ok = (top_ref == 0);
50,388✔
1225
            }
50,388✔
1226
            else {
93,117✔
1227
                file_format_ok = backup.is_accepted_file_format(current_file_format_version);
93,117✔
1228
            }
93,117✔
1229

70,497✔
1230
            if (REALM_UNLIKELY(!file_format_ok)) {
143,505✔
1231
                throw UnsupportedFileFormatVersion(current_file_format_version);
12✔
1232
            }
12✔
1233

70,491✔
1234
            if (begin_new_session) {
143,493✔
1235
                // Determine version (snapshot number) and check history
58,275✔
1236
                // compatibility
58,275✔
1237
                version_type version = 0;
119,430✔
1238
                int stored_hist_type = 0;
119,430✔
1239
                gf::get_version_and_history_info(alloc, top_ref, version, stored_hist_type,
119,430✔
1240
                                                 stored_hist_schema_version);
119,430✔
1241
                bool good_history_type = false;
119,430✔
1242
                switch (openers_hist_type) {
119,430✔
1243
                    case Replication::hist_None:
7,170✔
1244
                        good_history_type = (stored_hist_type == Replication::hist_None);
7,170✔
1245
                        if (!good_history_type)
7,170✔
1246
                            throw IncompatibleHistories(
6✔
1247
                                util::format("Realm file at path '%1' has history type '%2', but is being opened "
6✔
1248
                                             "with replication disabled.",
6✔
1249
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1250
                                path);
6✔
1251
                        break;
7,164✔
1252
                    case Replication::hist_OutOfRealm:
3,246✔
1253
                        REALM_ASSERT(false); // No longer in use
×
1254
                        break;
×
1255
                    case Replication::hist_InRealm:
84,411✔
1256
                        good_history_type = (stored_hist_type == Replication::hist_InRealm ||
84,411✔
1257
                                             stored_hist_type == Replication::hist_None);
50,808✔
1258
                        if (!good_history_type)
84,411✔
1259
                            throw IncompatibleHistories(
6✔
1260
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1261
                                             "local history mode.",
6✔
1262
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1263
                                path);
6✔
1264
                        break;
84,405✔
1265
                    case Replication::hist_SyncClient:
54,897✔
1266
                        good_history_type = ((stored_hist_type == Replication::hist_SyncClient) || (top_ref == 0));
26,151✔
1267
                        if (!good_history_type)
26,151✔
1268
                            throw IncompatibleHistories(
6✔
1269
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1270
                                             "synchronized history mode.",
6✔
1271
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1272
                                path);
6✔
1273
                        break;
26,145✔
1274
                    case Replication::hist_SyncServer:
13,803✔
1275
                        good_history_type = ((stored_hist_type == Replication::hist_SyncServer) || (top_ref == 0));
1,698✔
1276
                        if (!good_history_type)
1,698✔
1277
                            throw IncompatibleHistories(
×
1278
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
×
1279
                                             "server history mode.",
×
1280
                                             path, Replication::history_type_name(stored_hist_type)),
×
1281
                                path);
×
1282
                        break;
1,698✔
1283
                }
119,412✔
1284

58,266✔
1285
                REALM_ASSERT(stored_hist_schema_version >= 0);
119,412✔
1286
                if (stored_hist_schema_version > openers_hist_schema_version)
119,412✔
1287
                    throw IncompatibleHistories(
×
1288
                        util::format("Unexpected future history schema version %1, current schema %2",
×
1289
                                     stored_hist_schema_version, openers_hist_schema_version),
×
1290
                        path);
×
1291
                bool need_hist_schema_upgrade =
119,412✔
1292
                    (stored_hist_schema_version < openers_hist_schema_version && top_ref != 0);
119,412✔
1293
                if (need_hist_schema_upgrade) {
119,412✔
1294
                    Replication* repl = get_replication();
15✔
1295
                    if (!repl->is_upgradable_history_schema(stored_hist_schema_version))
15✔
1296
                        throw IncompatibleHistories(util::format("Nonupgradable history schema %1, current schema %2",
×
1297
                                                                 stored_hist_schema_version,
×
1298
                                                                 openers_hist_schema_version),
×
1299
                                                    path);
×
1300
                }
119,412✔
1301

58,266✔
1302
                bool need_file_format_upgrade =
119,412✔
1303
                    current_file_format_version < target_file_format_version && top_ref != 0;
119,412✔
1304
                if (!options.allow_file_format_upgrade && (need_hist_schema_upgrade || need_file_format_upgrade)) {
119,412✔
1305
                    throw FileFormatUpgradeRequired(m_db_path);
6✔
1306
                }
6✔
1307

58,263✔
1308
                alloc.convert_from_streaming_form(top_ref);
119,406✔
1309
                try {
119,406✔
1310
                    bool file_changed_size = alloc.align_filesize_for_mmap(top_ref, cfg);
119,406✔
1311
                    if (file_changed_size) {
119,406✔
1312
                        // we need to re-establish proper mappings after file size change.
129✔
1313
                        // we do this simply by aborting and starting all over:
129✔
1314
                        continue;
315✔
1315
                    }
315✔
1316
                }
×
1317
                // something went wrong. Retry.
1318
                catch (SlabAlloc::Retry&) {
×
1319
                    continue;
×
1320
                }
×
1321
                if (options.encryption_key) {
119,091✔
1322
#ifdef _WIN32
1323
                    uint64_t pid = GetCurrentProcessId();
1324
#else
1325
                    static_assert(sizeof(pid_t) <= sizeof(uint64_t), "process identifiers too large");
207✔
1326
                    uint64_t pid = getpid();
207✔
1327
#endif
207✔
1328
                    info->session_initiator_pid = pid;
207✔
1329
                }
207✔
1330

58,134✔
1331
                info->file_format_version = uint_fast8_t(target_file_format_version);
119,091✔
1332

58,134✔
1333
                // Initially there is a single version in the file
58,134✔
1334
                info->number_of_versions = 1;
119,091✔
1335

58,134✔
1336
                info->latest_version_number = version;
119,091✔
1337
                alloc.init_mapping_management(version);
119,091✔
1338

58,134✔
1339
                size_t file_size = 24;
119,091✔
1340
                if (top_ref) {
119,091✔
1341
                    Array top(alloc);
70,197✔
1342
                    top.init_from_ref(top_ref);
70,197✔
1343
                    file_size = Group::get_logical_file_size(top);
70,197✔
1344
                }
70,197✔
1345
                version_manager->init_versioning(top_ref, file_size, version);
119,091✔
1346
            }
119,091✔
1347
            else { // Not the session initiator
24,063✔
1348
                // Durability setting must be consistent across a session. An
12,216✔
1349
                // inconsistency is a logic error, as the user is required to
12,216✔
1350
                // make sure that all possible concurrent session participants
12,216✔
1351
                // use the same durability setting for the same Realm file.
12,216✔
1352
                if (Durability(info->durability) != options.durability)
24,063✔
1353
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "Durability not consistent");
6✔
1354

12,213✔
1355
                // History type must be consistent across a session. An
12,213✔
1356
                // inconsistency is a logic error, as the user is required to
12,213✔
1357
                // make sure that all possible concurrent session participants
12,213✔
1358
                // use the same history type for the same Realm file.
12,213✔
1359
                if (info->history_type != openers_hist_type)
24,057✔
1360
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History type not consistent");
6✔
1361

12,210✔
1362
                // History schema version must be consistent across a
12,210✔
1363
                // session. An inconsistency is a logic error, as the user is
12,210✔
1364
                // required to make sure that all possible concurrent session
12,210✔
1365
                // participants use the same history schema version for the same
12,210✔
1366
                // Realm file.
12,210✔
1367
                if (info->history_schema_version != openers_hist_schema_version)
24,051✔
1368
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History schema version not consistent");
×
1369

12,210✔
1370
                // We need per session agreement among all participants on the
12,210✔
1371
                // target Realm file format. From a technical perspective, the
12,210✔
1372
                // best way to ensure that, would be to require a bumping of the
12,210✔
1373
                // SharedInfo file format version on any change that could lead
12,210✔
1374
                // to a different result from
12,210✔
1375
                // get_target_file_format_for_session() given the same current
12,210✔
1376
                // Realm file format version and the same history type, as that
12,210✔
1377
                // would prevent the outcome of the Realm opening process from
12,210✔
1378
                // depending on race conditions. However, for practical reasons,
12,210✔
1379
                // we shall instead simply check that there is agreement, and
12,210✔
1380
                // throw the same kind of exception, as would have been thrown
12,210✔
1381
                // with a bumped SharedInfo file format version, if there isn't.
12,210✔
1382
                if (info->file_format_version != target_file_format_version) {
24,051✔
1383
                    throw IncompatibleLockFile(path,
×
1384
                                               format("Version mismatch: File format version is %1 but should be %2.",
×
1385
                                                      info->file_format_version, target_file_format_version));
×
1386
                }
×
1387

12,210✔
1388
                // Even though this session participant is not the session initiator,
12,210✔
1389
                // it may be the one that has to perform the history schema upgrade.
12,210✔
1390
                // See upgrade_file_format(). However we cannot get the actual value
12,210✔
1391
                // at this point as the allocator is not synchronized with the file.
12,210✔
1392
                // The value will be read in a ReadTransaction later.
12,210✔
1393

12,210✔
1394
                // We need to setup the allocators version information, as it is needed
12,210✔
1395
                // to correctly age and later reclaim memory mappings.
12,210✔
1396
                version_type version = info->latest_version_number;
24,051✔
1397
                alloc.init_mapping_management(version);
24,051✔
1398
            }
24,051✔
1399

70,491✔
1400
            m_new_commit_available.set_shared_part(info->new_commit_available, lockfile_prefix, "new_commit",
143,289✔
1401
                                                   options.temp_dir);
143,142✔
1402
            m_pick_next_writer.set_shared_part(info->pick_next_writer, lockfile_prefix, "pick_writer",
143,142✔
1403
                                               options.temp_dir);
143,142✔
1404

70,344✔
1405
            // make our presence noted:
70,344✔
1406
            ++info->num_participants;
143,142✔
1407
            m_info = info;
143,142✔
1408

70,344✔
1409
            // Keep the mappings and file open:
70,344✔
1410
            m_version_manager = std::move(version_manager);
143,142✔
1411
            alloc_detach_guard.release();
143,142✔
1412
            fug_1.release(); // Do not unmap
143,142✔
1413
            fcg.release();   // Do not close
143,142✔
1414
        }
143,142✔
1415
        ulg.release(); // Do not release shared lock
143,142✔
1416
        break;
143,142✔
1417
    }
143,493✔
1418

70,425✔
1419
    // Upgrade file format and/or history schema
70,425✔
1420
    try {
143,226✔
1421
        if (stored_hist_schema_version == -1) {
143,145✔
1422
            // current_hist_schema_version has not been read. Read it now
12,210✔
1423
            stored_hist_schema_version = start_read()->get_history_schema_version();
24,051✔
1424
        }
24,051✔
1425
        if (current_file_format_version == 0) {
143,145✔
1426
            // If the current file format is still undecided, no upgrade is
24,519✔
1427
            // necessary, but we still need to make the chosen file format
24,519✔
1428
            // visible to the rest of the core library by updating the value
24,519✔
1429
            // that will be subsequently returned by
24,519✔
1430
            // Group::get_file_format_version(). For this to work, all session
24,519✔
1431
            // participants must adopt the chosen target Realm file format when
24,519✔
1432
            // the stored file format version is zero regardless of the version
24,519✔
1433
            // of the core library used.
24,519✔
1434
            m_file_format_version = target_file_format_version;
50,376✔
1435
        }
50,376✔
1436
        else {
92,769✔
1437
            m_file_format_version = current_file_format_version;
92,769✔
1438
            upgrade_file_format(options.allow_file_format_upgrade, target_file_format_version,
92,769✔
1439
                                stored_hist_schema_version, openers_hist_schema_version); // Throws
92,769✔
1440
        }
92,769✔
1441
    }
143,145✔
1442
    catch (...) {
70,347✔
1443
        close();
6✔
1444
        throw;
6✔
1445
    }
6✔
1446
#if REALM_METRICS
143,136✔
1447
    if (options.enable_metrics) {
143,136✔
1448
        m_metrics = std::make_shared<Metrics>(options.metrics_buffer_size);
96✔
1449
    }
96✔
1450
#endif // REALM_METRICS
143,136✔
1451
    m_alloc.set_read_only(true);
143,136✔
1452
}
143,136✔
1453

1454
void DB::open(BinaryData buffer, bool take_ownership)
1455
{
6✔
1456
    auto top_ref = m_alloc.attach_buffer(buffer.data(), buffer.size());
6✔
1457
    m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, buffer.size());
6✔
1458
    if (take_ownership)
6✔
1459
        m_alloc.own_buffer();
×
1460
}
6✔
1461

1462
void DB::open(Replication& repl, const std::string& file, const DBOptions& options)
1463
{
118,644✔
1464
    // Exception safety: Since open() is called from constructors, if it throws,
58,095✔
1465
    // it must leave the file closed.
58,095✔
1466

58,095✔
1467
    REALM_ASSERT(!is_attached());
118,644✔
1468

58,095✔
1469
    repl.initialize(*this); // Throws
118,644✔
1470

58,095✔
1471
    set_replication(&repl);
118,644✔
1472

58,095✔
1473
    bool no_create = false;
118,644✔
1474
    open(file, no_create, options); // Throws
118,644✔
1475
}
118,644✔
1476
class DBLogger : public Logger {
1477
public:
1478
    DBLogger(const std::shared_ptr<Logger>& base_logger, size_t hash) noexcept
1479
        : Logger(base_logger)
1480
        , m_hash(hash)
1481
    {
129,003✔
1482
    }
129,003✔
1483

1484
protected:
1485
    void do_log(Level level, const std::string& message) final
1486
    {
1,098,135✔
1487
        std::ostringstream ostr;
1,098,135✔
1488
        auto id = std::this_thread::get_id();
1,098,135✔
1489
        ostr << "DB: " << m_hash << " Thread " << id << ": ";
1,098,135✔
1490
        Logger::do_log(*m_base_logger_ptr, level, ostr.str() + message);
1,098,135✔
1491
    }
1,098,135✔
1492

1493
private:
1494
    size_t m_hash;
1495
};
1496

1497
void DB::set_logger(const std::shared_ptr<util::Logger>& logger) noexcept
1498
{
143,481✔
1499
    if (logger)
143,481✔
1500
        m_logger = std::make_shared<DBLogger>(logger, m_path_hash);
129,003✔
1501
}
143,481✔
1502

1503
void DB::open(Replication& repl, const DBOptions options)
1504
{
25,260✔
1505
    REALM_ASSERT(!is_attached());
25,260✔
1506
    repl.initialize(*this); // Throws
25,260✔
1507
    set_replication(&repl);
25,260✔
1508

12,630✔
1509
    m_alloc.init_in_memory_buffer();
25,260✔
1510

12,630✔
1511
    auto hist_type = repl.get_history_type();
25,260✔
1512
    m_in_memory_info =
25,260✔
1513
        std::make_unique<SharedInfo>(DBOptions::Durability::MemOnly, hist_type, repl.get_history_schema_version());
25,260✔
1514
    SharedInfo* info = m_in_memory_info.get();
25,260✔
1515
    m_writemutex.set_shared_part(info->shared_writemutex, "", "write");
25,260✔
1516
    m_controlmutex.set_shared_part(info->shared_controlmutex, "", "control");
25,260✔
1517
    m_new_commit_available.set_shared_part(info->new_commit_available, "", "new_commit", options.temp_dir);
25,260✔
1518
    m_pick_next_writer.set_shared_part(info->pick_next_writer, "", "pick_writer", options.temp_dir);
25,260✔
1519
    m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, "", "versions");
25,260✔
1520

12,630✔
1521
    auto target_file_format_version = uint_fast8_t(Group::get_target_file_format_version_for_session(0, hist_type));
25,260✔
1522
    info->file_format_version = target_file_format_version;
25,260✔
1523
    info->number_of_versions = 1;
25,260✔
1524
    info->latest_version_number = 1;
25,260✔
1525
    info->init_versioning(0, m_alloc.get_baseline(), 1);
25,260✔
1526
    ++info->num_participants;
25,260✔
1527

12,630✔
1528
    m_version_manager = std::make_unique<InMemoryVersionManager>(info, m_versionlist_mutex);
25,260✔
1529

12,630✔
1530
    m_file_format_version = target_file_format_version;
25,260✔
1531

12,630✔
1532
#if REALM_METRICS
25,260✔
1533
    if (options.enable_metrics) {
25,260✔
1534
        m_metrics = std::make_shared<Metrics>(options.metrics_buffer_size);
×
1535
    }
×
1536
#endif // REALM_METRICS
25,260✔
1537
    m_info = info;
25,260✔
1538
    m_alloc.set_read_only(true);
25,260✔
1539
}
25,260✔
1540

1541
void DB::create_new_history(Replication& repl)
1542
{
36✔
1543
    Replication* old_repl = get_replication();
36✔
1544
    try {
36✔
1545
        repl.initialize(*this);
36✔
1546
        set_replication(&repl);
36✔
1547

18✔
1548
        auto tr = start_write();
36✔
1549
        tr->clear_history();
36✔
1550
        tr->replicate(tr.get(), repl);
36✔
1551
        tr->commit();
36✔
1552
    }
36✔
1553
    catch (...) {
18✔
1554
        set_replication(old_repl);
×
1555
        throw;
×
1556
    }
×
1557
}
36✔
1558

1559
void DB::create_new_history(std::unique_ptr<Replication> repl)
1560
{
36✔
1561
    create_new_history(*repl);
36✔
1562
    m_history = std::move(repl);
36✔
1563
}
36✔
1564

1565
// WARNING / FIXME: compact() should NOT be exposed publicly on Windows because it's not crash safe! It may
1566
// corrupt your database if something fails.
1567
// Tracked by https://github.com/realm/realm-core/issues/4111
1568

1569
// A note about lock ordering.
1570
// The local mutex, m_mutex, guards transaction start/stop and map/unmap of the lock file.
1571
// Except for compact(), open() and close(), it should only be held briefly.
1572
// The controlmutex guards operations which change the file size, session initialization
1573
// and session exit.
1574
// The writemutex guards the integrity of the (write) transaction data.
1575
// The controlmutex and writemutex resides in the .lock file and thus requires
1576
// the mapping of the .lock file to work. A straightforward approach would be to lock
1577
// the m_mutex whenever the other mutexes are taken or released...but that would be too
1578
// bad for performance of transaction start/stop.
1579
//
1580
// The locks are to be taken in this order: writemutex->controlmutex->m_mutex
1581
//
1582
// The .lock file is mapped during DB::create() and unmapped by a call to DB::close().
1583
// Once unmapped, it is never mapped again. Hence any observer with a valid DBRef may
1584
// only see the transition from mapped->unmapped, never the opposite.
1585
//
1586
// Trying to create a transaction if the .lock file is unmapped will result in an assert.
1587
// Unmapping (during close()) while transactions are live, is not considered an error. There
1588
// is a potential race between unmapping during close() and any operation carried out by a live
1589
// transaction. The user must ensure that this race never happens if she uses DB::close().
1590
bool DB::compact(bool bump_version_number, util::Optional<const char*> output_encryption_key)
1591
    NO_THREAD_SAFETY_ANALYSIS // this would work except for a known limitation: "No alias analysis" where clang cannot
1592
                              // tell that tr->db->m_mutex is the same thing as m_mutex
1593
{
150✔
1594
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
150✔
1595
    std::string tmp_path = m_db_path + ".tmp_compaction_space";
150✔
1596

75✔
1597
    // To enter compact, the DB object must already have been attached to a file,
75✔
1598
    // since this happens in DB::create().
75✔
1599

75✔
1600
    // Verify that the lock file is still attached. There is no attempt to guard against
75✔
1601
    // a race between close() and compact().
75✔
1602
    if (is_attached() == false) {
150✔
1603
        throw Exception(ErrorCodes::IllegalOperation, m_db_path + ": compact must be done on an open/attached DB");
×
1604
    }
×
1605
    auto info = m_info;
150✔
1606
    Durability dura = Durability(info->durability);
150✔
1607
    const char* write_key = bool(output_encryption_key) ? *output_encryption_key : get_encryption_key();
144✔
1608
    {
150✔
1609
        std::unique_lock<InterprocessMutex> lock(m_controlmutex); // Throws
150✔
1610
        auto t1 = std::chrono::steady_clock::now();
150✔
1611

75✔
1612
        // We must be the ONLY DB object attached if we're to do compaction
75✔
1613
        if (info->num_participants > 1)
150✔
1614
            return false;
×
1615

75✔
1616
        // Holding the controlmutex prevents any other DB from attaching to the file.
75✔
1617

75✔
1618
        // Using start_read here ensures that we have access to the latest entry
75✔
1619
        // in the VersionList. We need to have access to that later to update top_ref and file_size.
75✔
1620
        // This is also needed to attach the group (get the proper top pointer, etc)
75✔
1621
        TransactionRef tr = start_read();
150✔
1622
        auto file_size_before = tr->get_logical_file_size();
150✔
1623

75✔
1624
        // local lock blocking any transaction from starting (and stopping)
75✔
1625
        CheckedLockGuard local_lock(m_mutex);
150✔
1626

75✔
1627
        // We should be the only transaction active - otherwise back out
75✔
1628
        if (m_transaction_count != 1)
150✔
1629
            return false;
6✔
1630

72✔
1631
        // group::write() will throw if the file already exists.
72✔
1632
        // To prevent this, we have to remove the file (should it exist)
72✔
1633
        // before calling group::write().
72✔
1634
        File::try_remove(tmp_path);
144✔
1635

72✔
1636
        // Compact by writing a new file holding only live data, then renaming the new file
72✔
1637
        // so it becomes the database file, replacing the old one in the process.
72✔
1638
        try {
144✔
1639
            File file;
144✔
1640
            file.open(tmp_path, File::access_ReadWrite, File::create_Must, 0);
144✔
1641
            int incr = bump_version_number ? 1 : 0;
138✔
1642
            Group::DefaultTableWriter writer;
144✔
1643
            tr->write(file, write_key, info->latest_version_number + incr, writer); // Throws
144✔
1644
            // Data needs to be flushed to the disk before renaming.
72✔
1645
            bool disable_sync = get_disable_sync_to_disk();
144✔
1646
            if (!disable_sync && dura != Durability::Unsafe)
144!
1647
                file.sync(); // Throws
×
1648
        }
144✔
1649
        catch (...) {
72✔
1650
            // If writing the compact version failed in any way, delete the partially written file to clean up disk
1651
            // space. This is so that we don't fail with 100% disk space used when compacting on a mostly full disk.
1652
            if (File::exists(tmp_path)) {
×
1653
                File::remove(tmp_path);
×
1654
            }
×
1655
            throw;
×
1656
        }
×
1657
        // if we've written a file with a bumped version number, we need to update the lock file to match.
72✔
1658
        if (bump_version_number) {
144✔
1659
            ++info->latest_version_number;
12✔
1660
        }
12✔
1661
        // We need to release any shared mapping *before* releasing the control mutex.
72✔
1662
        // When someone attaches to the new database file, they *must* *not* see and
72✔
1663
        // reuse any existing memory mapping of the stale file.
72✔
1664
        tr->close_read_with_lock();
144✔
1665
        m_alloc.detach();
144✔
1666

72✔
1667
        util::File::move(tmp_path, m_db_path);
144✔
1668

72✔
1669
        SlabAlloc::Config cfg;
144✔
1670
        cfg.session_initiator = true;
144✔
1671
        cfg.is_shared = true;
144✔
1672
        cfg.read_only = false;
144✔
1673
        cfg.skip_validate = false;
144✔
1674
        cfg.no_create = true;
144✔
1675
        cfg.clear_file = false;
144✔
1676
        cfg.encryption_key = write_key;
144✔
1677
        ref_type top_ref;
144✔
1678
        top_ref = m_alloc.attach_file(m_db_path, cfg, m_marker_observer.get());
144✔
1679
        m_alloc.convert_from_streaming_form(top_ref);
144✔
1680
        m_alloc.init_mapping_management(info->latest_version_number);
144✔
1681
        info->number_of_versions = 1;
144✔
1682
        size_t logical_file_size = sizeof(SlabAlloc::Header);
144✔
1683
        if (top_ref) {
144✔
1684
            Array top(m_alloc);
138✔
1685
            top.init_from_ref(top_ref);
138✔
1686
            logical_file_size = Group::get_logical_file_size(top);
138✔
1687
        }
138✔
1688
        m_version_manager->init_versioning(top_ref, logical_file_size, info->latest_version_number);
144✔
1689
        if (m_logger) {
144✔
1690
            auto t2 = std::chrono::steady_clock::now();
60✔
1691
            m_logger->log(util::Logger::Level::info, "DB compacted from: %1 to %2 in %3 us", file_size_before,
60✔
1692
                          logical_file_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1693
        }
60✔
1694
    }
144✔
1695
    return true;
144✔
1696
}
144✔
1697

1698
void DB::write_copy(StringData path, const char* output_encryption_key)
1699
{
90✔
1700
    auto tr = start_read();
90✔
1701
    if (auto hist = tr->get_history()) {
90✔
1702
        if (!hist->no_pending_local_changes(tr->get_version())) {
90✔
1703
            throw Exception(ErrorCodes::IllegalOperation,
6✔
1704
                            "All client changes must be integrated in server before writing copy");
6✔
1705
        }
6✔
1706
    }
84✔
1707

42✔
1708
    class NoClientFileIdWriter : public Group::DefaultTableWriter {
84✔
1709
    public:
84✔
1710
        NoClientFileIdWriter()
84✔
1711
            : Group::DefaultTableWriter(true)
84✔
1712
        {
84✔
1713
        }
84✔
1714
        HistoryInfo write_history(_impl::OutputStream& out) override
84✔
1715
        {
81✔
1716
            auto hist = Group::DefaultTableWriter::write_history(out);
78✔
1717
            hist.sync_file_id = 0;
78✔
1718
            return hist;
78✔
1719
        }
78✔
1720
    } writer;
84✔
1721

42✔
1722
    File file;
84✔
1723
    file.open(path, File::access_ReadWrite, File::create_Must, 0);
84✔
1724
    file.resize(0);
84✔
1725

42✔
1726
    auto t1 = std::chrono::steady_clock::now();
84✔
1727
    tr->write(file, output_encryption_key, m_info->latest_version_number, writer);
84✔
1728
    if (m_logger) {
84✔
1729
        auto t2 = std::chrono::steady_clock::now();
60✔
1730
        m_logger->log(util::Logger::Level::info, "DB written to '%1' in %2 us", path,
60✔
1731
                      std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1732
    }
60✔
1733
}
84✔
1734

1735
uint_fast64_t DB::get_number_of_versions()
1736
{
381,804✔
1737
    if (m_fake_read_lock_if_immutable)
381,804✔
1738
        return 1;
6✔
1739
    return m_info->number_of_versions;
381,798✔
1740
}
381,798✔
1741

1742
size_t DB::get_allocated_size() const
1743
{
6✔
1744
    return m_alloc.get_allocated_size();
6✔
1745
}
6✔
1746

1747
DB::~DB() noexcept
1748
{
168,750✔
1749
    close();
168,750✔
1750
}
168,750✔
1751

1752
void DB::release_all_read_locks() noexcept
1753
{
168,399✔
1754
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
168,399✔
1755
    CheckedLockGuard local_lock(m_mutex); // mx on m_local_locks_held
168,399✔
1756
    for (auto& read_lock : m_local_locks_held) {
82,977✔
1757
        --m_transaction_count;
6✔
1758
        m_version_manager->release_read_lock(read_lock);
6✔
1759
    }
6✔
1760
    m_local_locks_held.clear();
168,399✔
1761
    REALM_ASSERT(m_transaction_count == 0);
168,399✔
1762
}
168,399✔
1763

1764
// Note: close() and close_internal() may be called from the DB::~DB().
1765
// in that case, they will not throw. Throwing can only happen if called
1766
// directly.
1767
void DB::close(bool allow_open_read_transactions)
1768
{
168,993✔
1769
    // make helper thread(s) terminate
83,286✔
1770
    m_commit_helper.reset();
168,993✔
1771

83,286✔
1772
    if (m_fake_read_lock_if_immutable) {
168,993✔
1773
        if (!is_attached())
186✔
1774
            return;
×
1775
        {
186✔
1776
            CheckedLockGuard local_lock(m_mutex);
186✔
1777
            if (!allow_open_read_transactions && m_transaction_count)
186✔
1778
                throw WrongTransactionState("Closing with open read transactions");
×
1779
        }
186✔
1780
        if (m_alloc.is_attached())
186✔
1781
            m_alloc.detach();
186✔
1782
        m_fake_read_lock_if_immutable.reset();
186✔
1783
    }
186✔
1784
    else {
168,807✔
1785
        close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
168,807✔
1786
                       allow_open_read_transactions);
168,807✔
1787
    }
168,807✔
1788
}
168,993✔
1789

1790
void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
1791
{
168,807✔
1792
    if (!is_attached())
168,807✔
1793
        return;
393✔
1794

82,980✔
1795
    {
168,414✔
1796
        CheckedLockGuard local_lock(m_mutex);
168,414✔
1797
        if (m_write_transaction_open)
168,414✔
1798
            throw WrongTransactionState("Closing with open write transactions");
6✔
1799
        if (!allow_open_read_transactions && m_transaction_count)
168,408✔
1800
            throw WrongTransactionState("Closing with open read transactions");
6✔
1801
    }
168,402✔
1802
    SharedInfo* info = m_info;
168,402✔
1803
    {
168,402✔
1804
        if (!lock.owns_lock())
168,402✔
1805
            lock.lock();
168,402✔
1806

82,974✔
1807
        if (m_alloc.is_attached())
168,402✔
1808
            m_alloc.detach();
168,402✔
1809

82,974✔
1810
        if (m_is_sync_agent) {
168,402✔
1811
            REALM_ASSERT(info->sync_agent_present);
1,563✔
1812
            info->sync_agent_present = 0; // Set to false
1,563✔
1813
        }
1,563✔
1814
        release_all_read_locks();
168,402✔
1815
        --info->num_participants;
168,402✔
1816
        bool end_of_session = info->num_participants == 0;
168,402✔
1817
        // std::cerr << "closing" << std::endl;
82,974✔
1818
        if (end_of_session) {
168,402✔
1819

70,758✔
1820
            // If the db file is just backing for a transient data structure,
70,758✔
1821
            // we can delete it when done.
70,758✔
1822
            if (Durability(info->durability) == Durability::MemOnly && !m_in_memory_info) {
144,333✔
1823
                try {
20,094✔
1824
                    util::File::remove(m_db_path.c_str());
20,094✔
1825
                }
20,094✔
1826
                catch (...) {
10,059✔
1827
                } // ignored on purpose.
24✔
1828
            }
20,094✔
1829
        }
144,333✔
1830
        lock.unlock();
168,402✔
1831
    }
168,402✔
1832
    {
168,402✔
1833
        CheckedLockGuard local_lock(m_mutex);
168,402✔
1834

82,974✔
1835
        m_new_commit_available.close();
168,402✔
1836
        m_pick_next_writer.close();
168,402✔
1837

82,974✔
1838
        if (m_in_memory_info) {
168,402✔
1839
            m_in_memory_info.reset();
25,260✔
1840
        }
25,260✔
1841
        else {
143,142✔
1842
            // On Windows it is important that we unmap before unlocking, else a SetEndOfFile() call from another
70,344✔
1843
            // thread may interleave which is not permitted on Windows. It is permitted on *nix.
70,344✔
1844
            m_file_map.unmap();
143,142✔
1845
            m_version_manager.reset();
143,142✔
1846
            m_file.rw_unlock();
143,142✔
1847
            // info->~SharedInfo(); // DO NOT Call destructor
70,344✔
1848
            m_file.close();
143,142✔
1849
        }
143,142✔
1850
        m_info = nullptr;
168,402✔
1851
        if (m_logger)
168,402✔
1852
            m_logger->log(util::Logger::Level::detail, "DB closed");
128,790✔
1853
    }
168,402✔
1854
}
168,402✔
1855

1856
bool DB::other_writers_waiting_for_lock() const
1857
{
64,872✔
1858
    SharedInfo* info = m_info;
64,872✔
1859

33,690✔
1860
    uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
64,872✔
1861
    uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
64,872✔
1862
    // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
33,690✔
1863
    // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
33,690✔
1864
    return next_ticket > next_served + 1;
64,872✔
1865
}
64,872✔
1866

1867
class DB::AsyncCommitHelper {
1868
public:
1869
    AsyncCommitHelper(DB* db)
1870
        : m_db(db)
1871
    {
130,137✔
1872
    }
130,137✔
1873
    ~AsyncCommitHelper()
1874
    {
130,137✔
1875
        {
130,137✔
1876
            std::unique_lock lg(m_mutex);
130,137✔
1877
            if (!m_running) {
130,137✔
1878
                return;
79,584✔
1879
            }
79,584✔
1880
            m_running = false;
50,553✔
1881
            m_cv_worker.notify_one();
50,553✔
1882
        }
50,553✔
1883
        m_thread.join();
50,553✔
1884
    }
50,553✔
1885

1886
    void begin_write(util::UniqueFunction<void()> fn)
1887
    {
1,566✔
1888
        std::unique_lock lg(m_mutex);
1,566✔
1889
        start_thread();
1,566✔
1890
        m_pending_writes.emplace_back(std::move(fn));
1,566✔
1891
        m_cv_worker.notify_one();
1,566✔
1892
    }
1,566✔
1893

1894
    void blocking_begin_write()
1895
    {
258,258✔
1896
        std::unique_lock lg(m_mutex);
258,258✔
1897

127,185✔
1898
        // If we support unlocking InterprocessMutex from a different thread
127,185✔
1899
        // than it was locked on, we can sometimes just begin the write on
127,185✔
1900
        // the current thread. This requires that no one is currently waiting
127,185✔
1901
        // for the worker thread to acquire the write lock, as we'll deadlock
127,185✔
1902
        // if we try to async commit while the worker is waiting for the lock.
127,185✔
1903
        bool can_lock_on_caller =
258,258✔
1904
            !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
258,258✔
1905
                                                       m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
131,013✔
1906

127,185✔
1907
        // If we support cross-thread unlocking and m_running is false,
127,185✔
1908
        // can_lock_on_caller should always be true or we forgot to launch the thread
127,185✔
1909
        REALM_ASSERT(can_lock_on_caller || m_running || InterprocessMutex::is_thread_confined);
258,258✔
1910

127,185✔
1911
        // If possible, just begin the write on the current thread
127,185✔
1912
        if (can_lock_on_caller) {
258,258✔
1913
            m_waiting_for_write_mutex = true;
131,013✔
1914
            lg.unlock();
131,013✔
1915
            m_db->do_begin_write();
131,013✔
1916
            lg.lock();
131,013✔
1917
            m_waiting_for_write_mutex = false;
131,013✔
1918
            m_has_write_mutex = true;
131,013✔
1919
            m_owns_write_mutex = false;
131,013✔
1920
            return;
131,013✔
1921
        }
131,013✔
1922

127,185✔
1923
        // Otherwise we have to ask the worker thread to acquire it and wait
127,185✔
1924
        // for that
127,185✔
1925
        start_thread();
127,245✔
1926
        size_t ticket = ++m_write_lock_claim_ticket;
127,245✔
1927
        m_cv_worker.notify_one();
127,245✔
1928
        m_cv_callers.wait(lg, [this, ticket] {
254,982✔
1929
            return ticket == m_write_lock_claim_fulfilled;
254,982✔
1930
        });
254,982✔
1931
    }
127,245✔
1932

1933
    void end_write()
1934
    {
54✔
1935
        std::unique_lock lg(m_mutex);
54✔
1936
        REALM_ASSERT(m_has_write_mutex);
54✔
1937
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
54✔
1938

27✔
1939
        // If we acquired the write lock on the worker thread, also release it
27✔
1940
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
27✔
1941
        if (m_owns_write_mutex) {
54✔
1942
            m_pending_mx_release = true;
51✔
1943
            m_cv_worker.notify_one();
51✔
1944
        }
51✔
1945
        else {
3✔
1946
            m_db->do_end_write();
3✔
1947
            m_has_write_mutex = false;
3✔
1948
        }
3✔
1949
    }
54✔
1950

1951
    bool blocking_end_write()
1952
    {
303,546✔
1953
        std::unique_lock lg(m_mutex);
303,546✔
1954
        if (!m_has_write_mutex) {
303,546✔
1955
            return false;
45,084✔
1956
        }
45,084✔
1957
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
258,462✔
1958

127,245✔
1959
        // If we acquired the write lock on the worker thread, also release it
127,245✔
1960
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
127,245✔
1961
        if (m_owns_write_mutex) {
258,462✔
1962
            m_pending_mx_release = true;
127,770✔
1963
            m_cv_worker.notify_one();
127,770✔
1964
            m_cv_callers.wait(lg, [this] {
255,540✔
1965
                return !m_pending_mx_release;
255,540✔
1966
            });
255,540✔
1967
        }
127,770✔
1968
        else {
130,692✔
1969
            m_db->do_end_write();
130,692✔
1970
            m_has_write_mutex = false;
130,692✔
1971

1972
            // The worker thread may have ignored a request for the write mutex
1973
            // while we were acquiring it, so we need to wake up the thread
1974
            if (has_pending_write_requests()) {
130,692✔
1975
                lg.unlock();
×
1976
                m_cv_worker.notify_one();
×
1977
            }
×
1978
        }
130,692✔
1979
        return true;
258,462✔
1980
    }
258,462✔
1981

1982

1983
    void sync_to_disk(util::UniqueFunction<void()> fn)
1984
    {
1,308✔
1985
        REALM_ASSERT(fn);
1,308✔
1986
        std::unique_lock lg(m_mutex);
1,308✔
1987
        REALM_ASSERT(!m_pending_sync);
1,308✔
1988
        start_thread();
1,308✔
1989
        m_pending_sync = std::move(fn);
1,308✔
1990
        m_cv_worker.notify_one();
1,308✔
1991
    }
1,308✔
1992

1993
private:
1994
    DB* m_db;
1995
    std::thread m_thread;
1996
    std::mutex m_mutex;
1997
    std::condition_variable m_cv_worker;
1998
    std::condition_variable m_cv_callers;
1999
    std::deque<util::UniqueFunction<void()>> m_pending_writes;
2000
    util::UniqueFunction<void()> m_pending_sync;
2001
    size_t m_write_lock_claim_ticket = 0;
2002
    size_t m_write_lock_claim_fulfilled = 0;
2003
    bool m_pending_mx_release = false;
2004
    bool m_running = false;
2005
    bool m_has_write_mutex = false;
2006
    bool m_owns_write_mutex = false;
2007
    bool m_waiting_for_write_mutex = false;
2008

2009
    void main();
2010

2011
    void start_thread()
2012
    {
130,119✔
2013
        if (m_running) {
130,119✔
2014
            return;
79,566✔
2015
        }
79,566✔
2016
        m_running = true;
50,553✔
2017
        m_thread = std::thread([this]() {
50,553✔
2018
            main();
50,553✔
2019
        });
50,553✔
2020
    }
50,553✔
2021

2022
    bool has_pending_write_requests()
2023
    {
381,213✔
2024
        return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
381,213✔
2025
    }
381,213✔
2026
};
2027

2028
void DB::AsyncCommitHelper::main()
2029
{
50,553✔
2030
    std::unique_lock lg(m_mutex);
50,553✔
2031
    while (m_running) {
566,139✔
2032
#if 0 // Enable for testing purposes
2033
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
2034
#endif
2035
        if (m_has_write_mutex) {
515,586✔
2036
            if (auto cb = std::move(m_pending_sync)) {
264,972✔
2037
                // Only one of sync_to_disk(), end_write(), or blocking_end_write()
282✔
2038
                // should be called, so we should never have both a pending sync
282✔
2039
                // and pending release.
282✔
2040
                REALM_ASSERT(!m_pending_mx_release);
1,308✔
2041
                lg.unlock();
1,308✔
2042
                cb();
1,308✔
2043
                cb = nullptr; // Release things captured by the callback before reacquiring the lock
1,308✔
2044
                lg.lock();
1,308✔
2045
                m_pending_mx_release = true;
1,308✔
2046
            }
1,308✔
2047
            if (m_pending_mx_release) {
264,972✔
2048
                REALM_ASSERT(!InterprocessMutex::is_thread_confined || m_owns_write_mutex);
129,129!
2049
                m_db->do_end_write();
129,129✔
2050
                m_pending_mx_release = false;
129,129✔
2051
                m_has_write_mutex = false;
129,129✔
2052
                m_owns_write_mutex = false;
129,129✔
2053

127,554✔
2054
                lg.unlock();
129,129✔
2055
                m_cv_callers.notify_all();
129,129✔
2056
                lg.lock();
129,129✔
2057
                continue;
129,129✔
2058
            }
129,129✔
2059
        }
250,614✔
2060
        else {
250,614✔
2061
            REALM_ASSERT(!m_pending_sync && !m_pending_mx_release);
250,614✔
2062

248,661✔
2063
            // Acquire the write lock if anyone has requested it, but only if
248,661✔
2064
            // another thread is not already waiting for it. If there's another
248,661✔
2065
            // thread requesting and they get it while we're waiting, we'll
248,661✔
2066
            // deadlock if they ask us to perform the sync.
248,661✔
2067
            if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
250,614✔
2068
                lg.unlock();
128,811✔
2069
                m_db->do_begin_write();
128,811✔
2070
                lg.lock();
128,811✔
2071

127,554✔
2072
                REALM_ASSERT(!m_has_write_mutex);
128,811✔
2073
                m_has_write_mutex = true;
128,811✔
2074
                m_owns_write_mutex = true;
128,811✔
2075

127,554✔
2076
                // Synchronous transaction requests get priority over async
127,554✔
2077
                if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
128,811✔
2078
                    ++m_write_lock_claim_fulfilled;
127,245✔
2079
                    m_cv_callers.notify_all();
127,245✔
2080
                    continue;
127,245✔
2081
                }
127,245✔
2082

369✔
2083
                REALM_ASSERT(!m_pending_writes.empty());
1,566✔
2084
                auto callback = std::move(m_pending_writes.front());
1,566✔
2085
                m_pending_writes.pop_front();
1,566✔
2086
                lg.unlock();
1,566✔
2087
                callback();
1,566✔
2088
                // Release things captured by the callback before reacquiring the lock
369✔
2089
                callback = nullptr;
1,566✔
2090
                lg.lock();
1,566✔
2091
                continue;
1,566✔
2092
            }
1,566✔
2093
        }
250,614✔
2094
        m_cv_worker.wait(lg);
257,646✔
2095
    }
257,646✔
2096
    if (m_has_write_mutex && m_owns_write_mutex) {
50,553!
2097
        m_db->do_end_write();
×
2098
    }
×
2099
}
50,553✔
2100

2101

2102
void DB::async_begin_write(util::UniqueFunction<void()> fn)
2103
{
1,566✔
2104
    REALM_ASSERT(m_commit_helper);
1,566✔
2105
    m_commit_helper->begin_write(std::move(fn));
1,566✔
2106
}
1,566✔
2107

2108
void DB::async_end_write()
2109
{
54✔
2110
    REALM_ASSERT(m_commit_helper);
54✔
2111
    m_commit_helper->end_write();
54✔
2112
}
54✔
2113

2114
void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
2115
{
1,308✔
2116
    REALM_ASSERT(m_commit_helper);
1,308✔
2117
    m_commit_helper->sync_to_disk(std::move(fn));
1,308✔
2118
}
1,308✔
2119

2120
bool DB::has_changed(TransactionRef& tr)
2121
{
392,349✔
2122
    if (m_fake_read_lock_if_immutable)
392,349✔
2123
        return false; // immutable doesn't change
×
2124
    bool changed = tr->m_read_lock.m_version != get_version_of_latest_snapshot();
392,349✔
2125
    return changed;
392,349✔
2126
}
392,349✔
2127

2128
bool DB::wait_for_change(TransactionRef& tr)
2129
{
×
2130
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2131
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2132
    while (tr->m_read_lock.m_version == m_info->latest_version_number && m_wait_for_change_enabled) {
×
2133
        m_new_commit_available.wait(m_controlmutex, 0);
×
2134
    }
×
2135
    return tr->m_read_lock.m_version != m_info->latest_version_number;
×
2136
}
×
2137

2138

2139
void DB::wait_for_change_release()
2140
{
×
2141
    if (m_fake_read_lock_if_immutable)
×
2142
        return;
×
2143
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2144
    m_wait_for_change_enabled = false;
×
2145
    m_new_commit_available.notify_all();
×
2146
}
×
2147

2148

2149
void DB::enable_wait_for_change()
2150
{
×
2151
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2152
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2153
    m_wait_for_change_enabled = true;
×
2154
}
×
2155

2156
void DB::upgrade_file_format(bool allow_file_format_upgrade, int target_file_format_version,
2157
                             int current_hist_schema_version, int target_hist_schema_version)
2158
{
92,766✔
2159
    // In a multithreaded scenario multiple threads may initially see a need to
45,825✔
2160
    // upgrade (maybe_upgrade == true) even though one onw thread is supposed to
45,825✔
2161
    // perform the upgrade, but that is ok, because the condition is rechecked
45,825✔
2162
    // in a fully reliable way inside a transaction.
45,825✔
2163

45,825✔
2164
    // First a non-threadsafe but fast check
45,825✔
2165
    int current_file_format_version = m_file_format_version;
92,766✔
2166
    REALM_ASSERT(current_file_format_version <= target_file_format_version);
92,766✔
2167
    REALM_ASSERT(current_hist_schema_version <= target_hist_schema_version);
92,766✔
2168
    bool maybe_upgrade_file_format = (current_file_format_version < target_file_format_version);
92,766✔
2169
    bool maybe_upgrade_hist_schema = (current_hist_schema_version < target_hist_schema_version);
92,766✔
2170
    bool maybe_upgrade = maybe_upgrade_file_format || maybe_upgrade_hist_schema;
92,766✔
2171
    if (maybe_upgrade) {
92,766✔
2172

120✔
2173
#ifdef REALM_DEBUG
240✔
2174
// This sleep() only exists in order to increase the quality of the
120✔
2175
// TEST(Upgrade_Database_2_3_Writes_New_File_Format_new) unit test.
120✔
2176
// The unit test creates multiple threads that all call
120✔
2177
// upgrade_file_format() simultaneously. This sleep() then acts like
120✔
2178
// a simple thread barrier that makes sure the threads meet here, to
120✔
2179
// increase the likelyhood of detecting any potential race problems.
120✔
2180
// See the unit test for details.
120✔
2181
//
120✔
2182
// NOTE: This sleep has been disabled because no problems have been found with
120✔
2183
// this code in a long while, and it was dramatically slowing down a unit test
120✔
2184
// in realm-sync.
120✔
2185

120✔
2186
// millisleep(200);
120✔
2187
#endif
240✔
2188

120✔
2189
        // WriteTransaction wt(*this);
120✔
2190
        auto wt = start_write();
240✔
2191
        bool dirty = false;
240✔
2192

120✔
2193
        // We need to upgrade history first. We may need to access it during migration
120✔
2194
        // when processing the !OID columns
120✔
2195
        int current_hist_schema_version_2 = wt->get_history_schema_version();
240✔
2196
        // The history must either still be using its initial schema or have
120✔
2197
        // been upgraded already to the chosen target schema version via a
120✔
2198
        // concurrent DB object.
120✔
2199
        REALM_ASSERT(current_hist_schema_version_2 == current_hist_schema_version ||
240!
2200
                     current_hist_schema_version_2 == target_hist_schema_version);
240✔
2201
        bool need_hist_schema_upgrade = (current_hist_schema_version_2 < target_hist_schema_version);
240✔
2202
        if (need_hist_schema_upgrade) {
240✔
2203
            if (!allow_file_format_upgrade)
12✔
2204
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2205

6✔
2206
            Replication* repl = get_replication();
12✔
2207
            repl->upgrade_history_schema(current_hist_schema_version_2); // Throws
12✔
2208
            wt->set_history_schema_version(target_hist_schema_version);  // Throws
12✔
2209
            dirty = true;
12✔
2210
        }
12✔
2211

120✔
2212
        // File format upgrade
120✔
2213
        int current_file_format_version_2 = m_alloc.get_committed_file_format_version();
240✔
2214
        // The file must either still be using its initial file_format or have
120✔
2215
        // been upgraded already to the chosen target file format via a
120✔
2216
        // concurrent DB object.
120✔
2217
        REALM_ASSERT(current_file_format_version_2 == current_file_format_version ||
240!
2218
                     current_file_format_version_2 == target_file_format_version);
240✔
2219
        bool need_file_format_upgrade = (current_file_format_version_2 < target_file_format_version);
240✔
2220
        if (need_file_format_upgrade) {
240✔
2221
            if (!allow_file_format_upgrade)
234✔
2222
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2223
            wt->upgrade_file_format(target_file_format_version); // Throws
234✔
2224
            // Note: The file format version stored in the Realm file will be
117✔
2225
            // updated to the new file format version as part of the following
117✔
2226
            // commit operation. This happens in GroupWriter::commit().
117✔
2227
            if (m_upgrade_callback)
234✔
2228
                m_upgrade_callback(current_file_format_version_2, target_file_format_version); // Throws
18✔
2229
            dirty = true;
234✔
2230
        }
234✔
2231
        wt->set_file_format_version(target_file_format_version);
240✔
2232
        m_file_format_version = target_file_format_version;
240✔
2233

120✔
2234
        if (dirty)
240✔
2235
            wt->commit(); // Throws
234✔
2236
    }
240✔
2237
}
92,766✔
2238

2239
void DB::release_read_lock(ReadLockInfo& read_lock) noexcept
2240
{
6,075,261✔
2241
    // ignore if opened with immutable file (then we have no lockfile)
3,915,990✔
2242
    if (m_fake_read_lock_if_immutable)
6,075,261✔
2243
        return;
366✔
2244
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6,074,895✔
2245
    do_release_read_lock(read_lock);
6,074,895✔
2246
}
6,074,895✔
2247

2248
// this is called with m_mutex locked
2249
void DB::do_release_read_lock(ReadLockInfo& read_lock) noexcept
2250
{
6,085,623✔
2251
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
6,085,623✔
2252
    bool found_match = false;
6,085,623✔
2253
    // simple linear search and move-last-over if a match is found.
3,926,421✔
2254
    // common case should have only a modest number of transactions in play..
3,926,421✔
2255
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
10,109,730✔
2256
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
10,109,625✔
2257
            m_local_locks_held[j] = m_local_locks_held.back();
6,085,581✔
2258
            m_local_locks_held.pop_back();
6,085,581✔
2259
            found_match = true;
6,085,581✔
2260
            break;
6,085,581✔
2261
        }
6,085,581✔
2262
    }
10,109,625✔
2263
    if (!found_match) {
6,085,623✔
2264
        REALM_ASSERT(!is_attached());
6✔
2265
        // it's OK, someone called close() and all locks where released
3✔
2266
        return;
6✔
2267
    }
6✔
2268
    --m_transaction_count;
6,085,617✔
2269
    m_version_manager->release_read_lock(read_lock);
6,085,617✔
2270
}
6,085,617✔
2271

2272

2273
DB::ReadLockInfo DB::grab_read_lock(ReadLockInfo::Type type, VersionID version_id)
2274
{
6,042,195✔
2275
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6,042,195✔
2276
    REALM_ASSERT_RELEASE(is_attached());
6,042,195✔
2277
    auto read_lock = m_version_manager->grab_read_lock(type, version_id);
6,042,195✔
2278

3,882,990✔
2279
    m_local_locks_held.emplace_back(read_lock);
6,042,195✔
2280
    ++m_transaction_count;
6,042,195✔
2281
    REALM_ASSERT(read_lock.m_file_size > read_lock.m_top_ref);
6,042,195✔
2282
    return read_lock;
6,042,195✔
2283
}
6,042,195✔
2284

2285
void DB::leak_read_lock(ReadLockInfo& read_lock) noexcept
2286
{
6✔
2287
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6✔
2288
    // simple linear search and move-last-over if a match is found.
3✔
2289
    // common case should have only a modest number of transactions in play..
3✔
2290
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
6✔
2291
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
6✔
2292
            m_local_locks_held[j] = m_local_locks_held.back();
6✔
2293
            m_local_locks_held.pop_back();
6✔
2294
            --m_transaction_count;
6✔
2295
            return;
6✔
2296
        }
6✔
2297
    }
6✔
2298
}
6✔
2299

2300
bool DB::do_try_begin_write()
2301
{
84✔
2302
    // In the non-blocking case, we will only succeed if there is no contention for
42✔
2303
    // the write mutex. For this case we are trivially fair and can ignore the
42✔
2304
    // fairness machinery.
42✔
2305
    bool got_the_lock = m_writemutex.try_lock();
84✔
2306
    if (got_the_lock) {
84✔
2307
        finish_begin_write();
72✔
2308
    }
72✔
2309
    return got_the_lock;
84✔
2310
}
84✔
2311

2312
void DB::do_begin_write()
2313
{
1,384,899✔
2314
    SharedInfo* info = m_info;
1,384,899✔
2315

695,658✔
2316
    // Get write lock - the write lock is held until do_end_write().
695,658✔
2317
    //
695,658✔
2318
    // We use a ticketing scheme to ensure fairness wrt performing write transactions.
695,658✔
2319
    // (But cannot do that on Windows until we have interprocess condition variables there)
695,658✔
2320
    uint32_t my_ticket = info->next_ticket.fetch_add(1, std::memory_order_relaxed);
1,384,899✔
2321
    m_writemutex.lock(); // Throws
1,384,899✔
2322

695,658✔
2323
    // allow for comparison even after wrap around of ticket numbering:
695,658✔
2324
    int32_t diff = int32_t(my_ticket - info->next_served.load(std::memory_order_relaxed));
1,384,899✔
2325
    bool should_yield = diff > 0; // ticket is in the future
1,384,899✔
2326
    // a) the above comparison is only guaranteed to be correct, if the distance
695,658✔
2327
    //    between my_ticket and info->next_served is less than 2^30. This will
695,658✔
2328
    //    be the case since the distance will be bounded by the number of threads
695,658✔
2329
    //    and each thread cannot ever hold more than one ticket.
695,658✔
2330
    // b) we could use 64 bit counters instead, but it is unclear if all platforms
695,658✔
2331
    //    have support for interprocess atomics for 64 bit values.
695,658✔
2332

695,658✔
2333
    timespec time_limit; // only compute the time limit if we're going to use it:
1,384,899✔
2334
    if (should_yield) {
1,384,899✔
2335
        // This clock is not monotonic, so time can move backwards. This can lead
17,445✔
2336
        // to a wrong time limit, but the only effect of a wrong time limit is that
17,445✔
2337
        // we momentarily lose fairness, so we accept it.
17,445✔
2338
        timeval tv;
17,448✔
2339
        gettimeofday(&tv, nullptr);
17,448✔
2340
        time_limit.tv_sec = tv.tv_sec;
17,448✔
2341
        time_limit.tv_nsec = tv.tv_usec * 1000;
17,448✔
2342
        time_limit.tv_nsec += 500000000;        // 500 msec wait
17,448✔
2343
        if (time_limit.tv_nsec >= 1000000000) { // overflow
17,448✔
2344
            time_limit.tv_nsec -= 1000000000;
8,634✔
2345
            time_limit.tv_sec += 1;
8,634✔
2346
        }
8,634✔
2347
    }
17,448✔
2348

695,658✔
2349
    while (should_yield) {
1,516,071✔
2350

131,169✔
2351
        m_pick_next_writer.wait(m_writemutex, &time_limit);
131,172✔
2352
        timeval tv;
131,172✔
2353
        gettimeofday(&tv, nullptr);
131,172✔
2354
        if (time_limit.tv_sec < tv.tv_sec ||
131,172✔
2355
            (time_limit.tv_sec == tv.tv_sec && time_limit.tv_nsec < tv.tv_usec * 1000)) {
131,172!
2356
            // Timeout!
UNCOV
2357
            break;
×
UNCOV
2358
        }
×
2359
        diff = int32_t(my_ticket - info->next_served);
131,172✔
2360
        should_yield = diff > 0; // ticket is in the future, so yield to someone else
131,172✔
2361
    }
131,172✔
2362

695,658✔
2363
    // we may get here because a) it's our turn, b) we timed out
695,658✔
2364
    // we don't distinguish, satisfied that event b) should be rare.
695,658✔
2365
    // In case b), we have to *make* it our turn. Failure to do so could leave us
695,658✔
2366
    // with 'next_served' permanently trailing 'next_ticket'.
695,658✔
2367
    //
695,658✔
2368
    // In doing so, we may bypass other waiters, hence the condition for yielding
695,658✔
2369
    // should take this situation into account by comparing with '>' instead of '!='
695,658✔
2370
    info->next_served = my_ticket;
1,384,899✔
2371
    finish_begin_write();
1,384,899✔
2372
}
1,384,899✔
2373

2374
void DB::finish_begin_write()
2375
{
1,384,980✔
2376
    if (m_info->commit_in_critical_phase) {
1,384,980✔
2377
        m_writemutex.unlock();
×
2378
        throw RuntimeError(ErrorCodes::BrokenInvariant, "Crash of other process detected, session restart required");
×
2379
    }
×
2380

695,721✔
2381

695,721✔
2382
    {
1,384,980✔
2383
        CheckedLockGuard local_lock(m_mutex);
1,384,980✔
2384
        m_write_transaction_open = true;
1,384,980✔
2385
    }
1,384,980✔
2386
    m_alloc.set_read_only(false);
1,384,980✔
2387
}
1,384,980✔
2388

2389
void DB::do_end_write() noexcept
2390
{
1,384,947✔
2391
    m_info->next_served.fetch_add(1, std::memory_order_relaxed);
1,384,947✔
2392

695,724✔
2393
    CheckedLockGuard local_lock(m_mutex);
1,384,947✔
2394
    REALM_ASSERT(m_write_transaction_open);
1,384,947✔
2395
    m_alloc.set_read_only(true);
1,384,947✔
2396
    m_write_transaction_open = false;
1,384,947✔
2397
    m_pick_next_writer.notify_all();
1,384,947✔
2398
    m_writemutex.unlock();
1,384,947✔
2399
}
1,384,947✔
2400

2401

2402
Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to_disk)
2403
{
1,362,378✔
2404
    version_type current_version;
1,362,378✔
2405
    {
1,362,378✔
2406
        current_version = m_version_manager->get_newest_version();
1,362,378✔
2407
    }
1,362,378✔
2408
    version_type new_version = current_version + 1;
1,362,378✔
2409

684,300✔
2410
    if (!transaction.m_objects_to_delete.empty()) {
1,362,378✔
2411
        for (auto it : transaction.m_objects_to_delete) {
1,260✔
2412
            transaction.get_table(it.table_key)->remove_object(it.obj_key);
1,260✔
2413
        }
1,260✔
2414
        transaction.m_objects_to_delete.clear();
654✔
2415
    }
654✔
2416
    if (Replication* repl = get_replication()) {
1,362,378✔
2417
        // If Replication::prepare_commit() fails, then the entire transaction
675,348✔
2418
        // fails. The application then has the option of terminating the
675,348✔
2419
        // transaction with a call to Transaction::Rollback(), which in turn
675,348✔
2420
        // must call Replication::abort_transact().
675,348✔
2421
        new_version = repl->prepare_commit(current_version);        // Throws
1,344,453✔
2422
        low_level_commit(new_version, transaction, commit_to_disk); // Throws
1,344,453✔
2423
        repl->finalize_commit();
1,344,453✔
2424
    }
1,344,453✔
2425
    else {
17,925✔
2426
        low_level_commit(new_version, transaction); // Throws
17,925✔
2427
    }
17,925✔
2428
    return new_version;
1,362,378✔
2429
}
1,362,378✔
2430

2431
VersionID DB::get_version_id_of_latest_snapshot()
2432
{
605,145✔
2433
    if (m_fake_read_lock_if_immutable)
605,145✔
2434
        return {m_fake_read_lock_if_immutable->m_version, 0};
12✔
2435
    return m_version_manager->get_version_id_of_latest_snapshot();
605,133✔
2436
}
605,133✔
2437

2438

2439
DB::version_type DB::get_version_of_latest_snapshot()
2440
{
604,587✔
2441
    return get_version_id_of_latest_snapshot().version;
604,587✔
2442
}
604,587✔
2443

2444

2445
void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction, bool commit_to_disk)
2446
{
1,362,396✔
2447
    SharedInfo* info = m_info;
1,362,396✔
2448

684,318✔
2449
    // Version of oldest snapshot currently (or recently) bound in a transaction
684,318✔
2450
    // of the current session.
684,318✔
2451
    uint64_t oldest_version = 0, oldest_live_version = 0;
1,362,396✔
2452
    TopRefMap top_refs;
1,362,396✔
2453
    bool any_new_unreachables;
1,362,396✔
2454
    {
1,362,396✔
2455
        CheckedLockGuard lock(m_mutex);
1,362,396✔
2456
        m_version_manager->cleanup_versions(oldest_live_version, top_refs, any_new_unreachables);
1,362,396✔
2457
        oldest_version = top_refs.begin()->first;
1,362,396✔
2458
        // Allow for trimming of the history. Some types of histories do not
684,318✔
2459
        // need store changesets prior to the oldest *live* bound snapshot.
684,318✔
2460
        if (auto hist = transaction.get_history()) {
1,362,396✔
2461
            hist->set_oldest_bound_version(oldest_live_version); // Throws
1,344,402✔
2462
        }
1,344,402✔
2463
        // Cleanup any stale mappings
684,318✔
2464
        m_alloc.purge_old_mappings(oldest_version, new_version);
1,362,396✔
2465
    }
1,362,396✔
2466

684,318✔
2467
    // Do the actual commit
684,318✔
2468
    REALM_ASSERT(oldest_version <= new_version);
1,362,396✔
2469
#if REALM_METRICS
1,362,396✔
2470
    transaction.update_num_objects();
1,362,396✔
2471
#endif // REALM_METRICS
1,362,396✔
2472

684,318✔
2473
    GroupWriter out(transaction, Durability(info->durability), m_marker_observer.get()); // Throws
1,362,396✔
2474
    out.set_versions(new_version, top_refs, any_new_unreachables);
1,362,396✔
2475
    auto t1 = std::chrono::steady_clock::now();
1,362,396✔
2476
    auto commit_size = m_alloc.get_commit_size();
1,362,396✔
2477
    if (m_logger) {
1,362,396✔
2478
        m_logger->log(util::Logger::Level::debug, "Initiate commit version: %1", new_version);
205,254✔
2479
    }
205,254✔
2480
    if (auto limit = out.get_evacuation_limit()) {
1,362,396✔
2481
        // Get a work limit based on the size of the transaction we're about to commit
2,643✔
2482
        // Add 4k to ensure progress on small commits
2,643✔
2483
        size_t work_limit = commit_size / 2 + out.get_free_list_size() + 0x1000;
5,280✔
2484
        transaction.cow_outliers(out.get_evacuation_progress(), limit, work_limit);
5,280✔
2485
    }
5,280✔
2486

684,318✔
2487
    ref_type new_top_ref;
1,362,396✔
2488
    // Recursively write all changed arrays to end of file
684,318✔
2489
    {
1,362,396✔
2490
        // protect against race with any other DB trying to attach to the file
684,318✔
2491
        std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
1,362,396✔
2492
        new_top_ref = out.write_group();                         // Throws
1,362,396✔
2493
    }
1,362,396✔
2494
    {
1,362,396✔
2495
        // protect access to shared variables and m_reader_mapping from here
684,318✔
2496
        CheckedLockGuard lock_guard(m_mutex);
1,362,396✔
2497
        m_free_space = out.get_free_space_size();
1,362,396✔
2498
        m_locked_space = out.get_locked_space_size();
1,362,396✔
2499
        m_used_space = out.get_logical_size() - m_free_space;
1,362,396✔
2500
        m_evac_stage.store(EvacStage(out.get_evacuation_stage()));
1,362,396✔
2501
        switch (Durability(info->durability)) {
1,362,396✔
2502
            case Durability::Full:
1,199,016✔
2503
            case Durability::Unsafe:
1,200,888✔
2504
                if (commit_to_disk) {
1,200,888✔
2505
                    out.commit(new_top_ref); // Throws
1,193,409✔
2506
                }
1,193,409✔
2507
                else {
7,479✔
2508
                    out.sync_all_mappings();
7,479✔
2509
                }
7,479✔
2510
                break;
1,200,888✔
2511
            case Durability::MemOnly:
684,327✔
2512
                // In Durability::MemOnly mode, we just use the file as backing for
80,739✔
2513
                // the shared memory. So we never actually sync the data to disk
80,739✔
2514
                // (the OS may do so opportinisticly, or when swapping).
80,739✔
2515
                // however, we still need to flush any private caches into the buffer cache
80,739✔
2516
                out.flush_all_mappings();
161,478✔
2517
                break;
161,478✔
2518
        }
1,362,381✔
2519
        size_t new_file_size = out.get_logical_size();
1,362,381✔
2520
        // We must reset the allocators free space tracking before communicating the new
684,315✔
2521
        // version through the ring buffer. If not, a reader may start updating the allocators
684,315✔
2522
        // mappings while the allocator is in dirty state.
684,315✔
2523
        reset_free_space_tracking();
1,362,381✔
2524
        // Add the new version. If this fails in any way, the VersionList may be corrupted.
684,315✔
2525
        // This can lead to readers seing invalid data which is likely to cause them
684,315✔
2526
        // to crash. Other writers *must* be prevented from writing any further updates
684,315✔
2527
        // to the database. The flag "commit_in_critical_phase" is used to prevent such updates.
684,315✔
2528
        info->commit_in_critical_phase = 1;
1,362,381✔
2529
        {
1,362,381✔
2530
            m_version_manager->add_version(new_top_ref, new_file_size, new_version);
1,362,381✔
2531

684,315✔
2532
            // REALM_ASSERT(m_alloc.matches_section_boundary(new_file_size));
684,315✔
2533
            REALM_ASSERT(new_top_ref < new_file_size);
1,362,381✔
2534
        }
1,362,381✔
2535
        // At this point, the VersionList has been succesfully updated, and the next writer
684,315✔
2536
        // can safely proceed once the writemutex has been lifted.
684,315✔
2537
        info->commit_in_critical_phase = 0;
1,362,381✔
2538
    }
1,362,381✔
2539
    {
1,362,381✔
2540
        // protect against concurrent updates to the .lock file.
684,315✔
2541
        // must release m_mutex before this point to obey lock order
684,315✔
2542
        std::lock_guard<InterprocessMutex> lock(m_controlmutex);
1,362,381✔
2543

684,315✔
2544
        // this is not correct .. but what should we return instead?
684,315✔
2545
        // just returning the number of reachable versions is also misleading,
684,315✔
2546
        // because we retain a transaction history stretching all the way from
684,315✔
2547
        // oldest live version to newest version - even though we may have reclaimed
684,315✔
2548
        // individual versions within this range
684,315✔
2549
        info->number_of_versions = new_version - oldest_version + 1;
1,362,381✔
2550
        info->latest_version_number = new_version;
1,362,381✔
2551

684,315✔
2552
        m_new_commit_available.notify_all();
1,362,381✔
2553
    }
1,362,381✔
2554
    auto t2 = std::chrono::steady_clock::now();
1,362,381✔
2555
    if (m_logger) {
1,362,381✔
2556
        m_logger->log(util::Logger::Level::debug, "Commit of size %1 done in %2 us", commit_size,
205,254✔
2557
                      std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
205,254✔
2558
    }
205,254✔
2559
}
1,362,381✔
2560

2561
#ifdef REALM_DEBUG
2562
void DB::reserve(size_t size)
2563
{
36✔
2564
    REALM_ASSERT(is_attached());
36✔
2565
    m_alloc.reserve_disk_space(size); // Throws
36✔
2566
}
36✔
2567
#endif
2568

2569
bool DB::call_with_lock(const std::string& realm_path, CallbackWithLock&& callback)
2570
{
693✔
2571
    auto lockfile_path = get_core_file(realm_path, CoreFileType::Lock);
693✔
2572

306✔
2573
    File lockfile;
693✔
2574
    lockfile.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
693✔
2575
    File::CloseGuard fcg(lockfile);
693✔
2576
    lockfile.set_fifo_path(realm_path + ".management", "lock.fifo");
693✔
2577
    if (lockfile.try_rw_lock_exclusive()) { // Throws
693✔
2578
        callback(realm_path);
651✔
2579
        return true;
651✔
2580
    }
651✔
2581
    return false;
42✔
2582
}
42✔
2583

2584
std::string DB::get_core_file(const std::string& base_path, CoreFileType type)
2585
{
307,506✔
2586
    switch (type) {
307,506✔
2587
        case CoreFileType::Lock:
144,180✔
2588
            return base_path + ".lock";
144,180✔
2589
        case CoreFileType::Storage:
732✔
2590
            return base_path;
732✔
2591
        case CoreFileType::Management:
144,036✔
2592
            return base_path + ".management";
144,036✔
2593
        case CoreFileType::Note:
17,835✔
2594
            return base_path + ".note";
17,835✔
2595
        case CoreFileType::Log:
732✔
2596
            return base_path + ".log";
732✔
2597
    }
×
2598
    REALM_UNREACHABLE();
×
2599
}
×
2600

2601
void DB::delete_files(const std::string& base_path, bool* did_delete, bool delete_lockfile)
2602
{
726✔
2603
    if (File::try_remove(get_core_file(base_path, CoreFileType::Storage)) && did_delete) {
726✔
2604
        *did_delete = true;
168✔
2605
    }
168✔
2606

363✔
2607
    File::try_remove(get_core_file(base_path, CoreFileType::Note));
726✔
2608
    File::try_remove(get_core_file(base_path, CoreFileType::Log));
726✔
2609
    util::try_remove_dir_recursive(get_core_file(base_path, CoreFileType::Management));
726✔
2610

363✔
2611
    if (delete_lockfile) {
726✔
2612
        File::try_remove(get_core_file(base_path, CoreFileType::Lock));
180✔
2613
    }
180✔
2614
}
726✔
2615

2616
TransactionRef DB::start_read(VersionID version_id)
2617
{
1,807,380✔
2618
    if (!is_attached())
1,807,380✔
2619
        throw StaleAccessor("Stale transaction");
6✔
2620
    TransactionRef tr;
1,807,374✔
2621
    if (m_fake_read_lock_if_immutable) {
1,807,374✔
2622
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Reading);
354✔
2623
    }
354✔
2624
    else {
1,807,020✔
2625
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, version_id);
1,807,020✔
2626
        ReadLockGuard g(*this, read_lock);
1,807,020✔
2627
        read_lock.check();
1,807,020✔
2628
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Reading);
1,807,020✔
2629
        g.release();
1,807,020✔
2630
    }
1,807,020✔
2631
    tr->set_file_format_version(get_file_format_version());
1,807,374✔
2632
    return tr;
1,807,374✔
2633
}
1,807,374✔
2634

2635
TransactionRef DB::start_frozen(VersionID version_id)
2636
{
29,289✔
2637
    if (!is_attached())
29,289✔
2638
        throw StaleAccessor("Stale transaction");
×
2639
    TransactionRef tr;
29,289✔
2640
    if (m_fake_read_lock_if_immutable) {
29,289✔
2641
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Frozen);
12✔
2642
    }
12✔
2643
    else {
29,277✔
2644
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Frozen, version_id);
29,277✔
2645
        ReadLockGuard g(*this, read_lock);
29,277✔
2646
        read_lock.check();
29,277✔
2647
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Frozen);
29,277✔
2648
        g.release();
29,277✔
2649
    }
29,277✔
2650
    tr->set_file_format_version(get_file_format_version());
29,289✔
2651
    return tr;
29,289✔
2652
}
29,289✔
2653

2654
TransactionRef DB::start_write(bool nonblocking)
2655
{
1,001,331✔
2656
    if (m_fake_read_lock_if_immutable) {
1,001,331✔
2657
        REALM_ASSERT(false && "Can't write an immutable DB");
×
2658
    }
×
2659
    if (nonblocking) {
1,001,331✔
2660
        bool success = do_try_begin_write();
84✔
2661
        if (!success) {
84✔
2662
            return TransactionRef();
12✔
2663
        }
12✔
2664
    }
1,001,247✔
2665
    else {
1,001,247✔
2666
        do_begin_write();
1,001,247✔
2667
    }
1,001,247✔
2668
    {
1,001,325✔
2669
        CheckedUniqueLock local_lock(m_mutex);
1,001,319✔
2670
        if (!is_attached()) {
1,001,319✔
2671
            local_lock.unlock();
×
2672
            end_write_on_correct_thread();
×
2673
            throw StaleAccessor("Stale transaction");
×
2674
        }
×
2675
        m_write_transaction_open = true;
1,001,319✔
2676
    }
1,001,319✔
2677
    TransactionRef tr;
1,001,319✔
2678
    try {
1,001,319✔
2679
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, VersionID());
1,001,319✔
2680
        ReadLockGuard g(*this, read_lock);
1,001,319✔
2681
        read_lock.check();
1,001,319✔
2682

505,098✔
2683
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Writing);
1,001,319✔
2684
        tr->set_file_format_version(get_file_format_version());
1,001,319✔
2685
        version_type current_version = read_lock.m_version;
1,001,319✔
2686
        m_alloc.init_mapping_management(current_version);
1,001,319✔
2687
        if (Replication* repl = get_replication()) {
1,001,319✔
2688
            bool history_updated = false;
983,721✔
2689
            repl->initiate_transact(*tr, current_version, history_updated); // Throws
983,721✔
2690
        }
983,721✔
2691
        g.release();
1,001,319✔
2692
    }
1,001,319✔
2693
    catch (...) {
505,098✔
2694
        end_write_on_correct_thread();
×
2695
        throw;
×
2696
    }
×
2697

505,077✔
2698
    return tr;
1,001,256✔
2699
}
1,001,256✔
2700

2701
void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
2702
{
1,566✔
2703
    {
1,566✔
2704
        util::CheckedLockGuard lck(tr->m_async_mutex);
1,566✔
2705
        REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
1,566✔
2706
        tr->m_async_stage = Transaction::AsyncState::Requesting;
1,566✔
2707
        tr->m_request_time_point = std::chrono::steady_clock::now();
1,566✔
2708
        if (tr->db->m_logger) {
1,566✔
2709
            tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock");
1,566✔
2710
        }
1,566✔
2711
    }
1,566✔
2712
    std::weak_ptr<Transaction> weak_tr = tr;
1,566✔
2713
    async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
1,566✔
2714
        if (auto tr = weak_tr.lock()) {
1,566✔
2715
            util::CheckedLockGuard lck(tr->m_async_mutex);
1,566✔
2716
            // If a synchronous transaction happened while we were pending
369✔
2717
            // we may be in HasCommits
369✔
2718
            if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
1,566✔
2719
                tr->m_async_stage = Transaction::AsyncState::HasLock;
1,566✔
2720
            }
1,566✔
2721
            if (tr->db->m_logger) {
1,566✔
2722
                auto t2 = std::chrono::steady_clock::now();
1,566✔
2723
                tr->db->m_logger->log(
1,566✔
2724
                    util::Logger::Level::trace, "Got write lock in %1 us",
1,566✔
2725
                    std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
1,566✔
2726
            }
1,566✔
2727
            if (tr->m_waiting_for_write_lock) {
1,566✔
2728
                tr->m_waiting_for_write_lock = false;
129✔
2729
                tr->m_async_cv.notify_one();
129✔
2730
            }
129✔
2731
            else if (cb) {
1,437✔
2732
                cb();
1,437✔
2733
            }
1,437✔
2734
            tr.reset(); // Release pointer while lock is held
1,566✔
2735
        }
1,566✔
2736
    });
1,566✔
2737
}
1,566✔
2738

2739
inline DB::DB(const DBOptions& options)
2740
    : m_upgrade_callback(std::move(options.upgrade_callback))
2741
{
168,750✔
2742
    if (options.enable_async_writes) {
168,750✔
2743
        m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
130,137✔
2744
    }
130,137✔
2745
}
168,750✔
2746

2747
namespace {
2748
class DBInit : public DB {
2749
public:
2750
    explicit DBInit(const DBOptions& options)
2751
        : DB(options)
2752
    {
168,750✔
2753
    }
168,750✔
2754
};
2755
} // namespace
2756

2757
DBRef DB::create(const std::string& file, bool no_create, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2758
{
24,840✔
2759
    DBRef retval = std::make_shared<DBInit>(options);
24,840✔
2760
    retval->open(file, no_create, options);
24,840✔
2761
    return retval;
24,840✔
2762
}
24,840✔
2763

2764
DBRef DB::create(Replication& repl, const std::string& file, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2765
{
10,473✔
2766
    DBRef retval = std::make_shared<DBInit>(options);
10,473✔
2767
    retval->open(repl, file, options);
10,473✔
2768
    return retval;
10,473✔
2769
}
10,473✔
2770

2771
DBRef DB::create(std::unique_ptr<Replication> repl, const std::string& file,
2772
                 const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2773
{
108,171✔
2774
    REALM_ASSERT(repl);
108,171✔
2775
    DBRef retval = std::make_shared<DBInit>(options);
108,171✔
2776
    retval->m_history = std::move(repl);
108,171✔
2777
    retval->open(*retval->m_history, file, options);
108,171✔
2778
    return retval;
108,171✔
2779
}
108,171✔
2780

2781
DBRef DB::create(std::unique_ptr<Replication> repl, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2782
{
25,260✔
2783
    REALM_ASSERT(repl);
25,260✔
2784
    DBRef retval = std::make_shared<DBInit>(options);
25,260✔
2785
    retval->m_history = std::move(repl);
25,260✔
2786
    retval->open(*retval->m_history, options);
25,260✔
2787
    return retval;
25,260✔
2788
}
25,260✔
2789

2790
DBRef DB::create_in_memory(std::unique_ptr<Replication> repl, const std::string& in_memory_path,
2791
                           const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2792
{
×
2793
    DBRef db = create(std::move(repl), options);
×
2794
    db->m_db_path = in_memory_path;
×
2795
    return db;
×
2796
}
×
2797

2798
DBRef DB::create(BinaryData buffer, bool take_ownership) NO_THREAD_SAFETY_ANALYSIS
2799
{
6✔
2800
    DBOptions options;
6✔
2801
    options.is_immutable = true;
6✔
2802
    DBRef retval = std::make_shared<DBInit>(options);
6✔
2803
    retval->open(buffer, take_ownership);
6✔
2804
    return retval;
6✔
2805
}
6✔
2806

2807
void DB::claim_sync_agent()
2808
{
15,912✔
2809
    REALM_ASSERT(is_attached());
15,912✔
2810
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
15,912✔
2811
    if (m_info->sync_agent_present)
15,912✔
2812
        throw MultipleSyncAgents{};
6✔
2813
    m_info->sync_agent_present = 1; // Set to true
15,906✔
2814
    m_is_sync_agent = true;
15,906✔
2815
}
15,906✔
2816

2817
void DB::release_sync_agent()
2818
{
14,601✔
2819
    REALM_ASSERT(is_attached());
14,601✔
2820
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
14,601✔
2821
    if (!m_is_sync_agent)
14,601✔
2822
        return;
255✔
2823
    REALM_ASSERT(m_info->sync_agent_present);
14,346✔
2824
    m_info->sync_agent_present = 0;
14,346✔
2825
    m_is_sync_agent = false;
14,346✔
2826
}
14,346✔
2827

2828
void DB::do_begin_possibly_async_write()
2829
{
382,098✔
2830
    if (m_commit_helper) {
382,098✔
2831
        m_commit_helper->blocking_begin_write();
258,258✔
2832
    }
258,258✔
2833
    else {
123,840✔
2834
        do_begin_write();
123,840✔
2835
    }
123,840✔
2836
}
382,098✔
2837

2838
void DB::end_write_on_correct_thread() noexcept
2839
{
1,383,585✔
2840
    //    m_local_write_mutex.unlock();
695,415✔
2841
    if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
1,383,585✔
2842
        do_end_write();
1,125,129✔
2843
    }
1,125,129✔
2844
}
1,383,585✔
2845

2846
DisableReplication::DisableReplication(Transaction& t)
2847
    : m_tr(t)
2848
    , m_owner(t.get_db())
2849
    , m_repl(m_owner->get_replication())
2850
    , m_version(t.get_version())
2851
{
102✔
2852
    m_owner->set_replication(nullptr);
102✔
2853
    t.m_history = nullptr;
102✔
2854
}
102✔
2855

2856
DisableReplication::~DisableReplication()
2857
{
102✔
2858
    m_owner->set_replication(m_repl);
102✔
2859
    if (m_version != m_tr.get_version())
102✔
2860
        m_tr.initialize_replication();
102✔
2861
}
102✔
2862

2863
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc