• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2088

01 Mar 2024 04:55PM UTC coverage: 90.906% (+0.009%) from 90.897%
2088

push

Evergreen

web-flow
Merge pull request #7407 from realm/release/14.1.0

Release 14.1.0

93916 of 173116 branches covered (54.25%)

238350 of 262194 relevant lines covered (90.91%)

6006140.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.32
/src/realm/db.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20

21
#include <algorithm>
22
#include <atomic>
23
#include <cerrno>
24
#include <fcntl.h>
25
#include <iostream>
26
#include <mutex>
27
#include <sstream>
28
#include <type_traits>
29
#include <random>
30
#include <deque>
31
#include <thread>
32
#include <chrono>
33
#include <condition_variable>
34

35
#include <realm/disable_sync_to_disk.hpp>
36
#include <realm/group_writer.hpp>
37
#include <realm/impl/simulated_failure.hpp>
38
#include <realm/replication.hpp>
39
#include <realm/util/errno.hpp>
40
#include <realm/util/features.h>
41
#include <realm/util/file_mapper.hpp>
42
#include <realm/util/safe_int_ops.hpp>
43
#include <realm/util/scope_exit.hpp>
44
#include <realm/util/thread.hpp>
45
#include <realm/util/to_string.hpp>
46

47
#ifndef _WIN32
48
#include <sys/wait.h>
49
#include <sys/time.h>
50
#include <unistd.h>
51
#else
52
#include <windows.h>
53
#include <process.h>
54
#endif
55

56
// #define REALM_ENABLE_LOGFILE
57

58

59
using namespace realm;
60
using namespace realm::util;
61
using Durability = DBOptions::Durability;
62

63
namespace {
64

65
// value   change
66
// --------------------
67
//  4      Unknown
68
//  5      Introduction of SharedInfo::file_format_version and
69
//         SharedInfo::history_type.
70
//  6      Using new robust mutex emulation where applicable
71
//  7      Introducing `commit_in_critical_phase` and `sync_agent_present`, and
72
//         changing `daemon_started` and `daemon_ready` from 1-bit to 8-bit
73
//         fields.
74
//  8      Placing the commitlog history inside the Realm file.
75
//  9      Fair write transactions requires an additional condition variable,
76
//         `write_fairness`
77
// 10      Introducing SharedInfo::history_schema_version.
78
// 11      New impl of InterprocessCondVar on windows.
79
// 12      Change `number_of_versions` to an atomic rather than guarding it
80
//         with a lock.
81
// 13      New impl of VersionList and added mutex for it (former RingBuffer)
82
// 14      Added field for tracking ongoing encrypted writes
83
const uint_fast16_t g_shared_info_version = 14;
84

85

86
struct VersionList {
87
    // the VersionList is an array of ReadCount structures.
88
    // it is placed in the "lock-file" and accessed via memory mapping
89
    struct ReadCount {
90
        uint64_t version;
91
        uint64_t filesize;
92
        uint64_t current_top;
93
        uint32_t count_live;
94
        uint32_t count_frozen;
95
        uint32_t count_full;
96
        bool is_active()
97
        {
73,132,554✔
98
            return version != 0;
73,132,554✔
99
        }
73,132,554✔
100
        void deactivate()
101
        {
5,870,439✔
102
            version = 0;
5,870,439✔
103
            count_live = count_frozen = count_full = 0;
5,870,439✔
104
        }
5,870,439✔
105
        void activate(uint64_t v)
106
        {
689,040✔
107
            version = v;
689,040✔
108
        }
689,040✔
109
    };
110

111
    void reserve(uint32_t size) noexcept
112
    {
167,403✔
113
        for (auto i = entries; i < size; ++i)
5,524,203✔
114
            data()[i].deactivate();
5,356,800✔
115
        if (size > entries) {
167,403✔
116
            // Fence preventing downward motion of above writes
82,488✔
117
            std::atomic_signal_fence(std::memory_order_release);
167,403✔
118
            entries = size;
167,403✔
119
        }
167,403✔
120
    }
167,403✔
121

122
    VersionList() noexcept
123
    {
82,812✔
124
        newest = nil; // empty
82,812✔
125
        entries = 0;
82,812✔
126
        reserve(init_readers_size);
82,812✔
127
    }
82,812✔
128

129
    static size_t compute_required_space(uint_fast32_t num_entries) noexcept
130
    {
86,916✔
131
        // get space required for given number of entries beyond the initial count.
42,663✔
132
        // NB: this not the size of the VersionList, it is the size minus whatever was
42,663✔
133
        // the initial size.
42,663✔
134
        return sizeof(ReadCount) * (num_entries - init_readers_size);
86,916✔
135
    }
86,916✔
136

137
    unsigned int capacity() const noexcept
138
    {
1,157,253✔
139
        return entries;
1,157,253✔
140
    }
1,157,253✔
141

142
    ReadCount& get(uint_fast32_t idx) noexcept
143
    {
3,027,864✔
144
        return data()[idx];
3,027,864✔
145
    }
3,027,864✔
146

147
    ReadCount& get_newest() noexcept
148
    {
×
149
        return get(newest);
×
150
    }
×
151
    // returns nullptr if all entries are in use
152
    ReadCount* try_allocate_entry(uint64_t top, uint64_t size, uint64_t version)
153
    {
689,100✔
154
        auto i = allocating.load();
689,100✔
155
        if (i == newest.load()) {
689,100✔
156
            // if newest != allocating we are recovering from a crash and MUST complete the earlier allocation
306,468✔
157
            // but if not, find lowest free entry by linear search.
306,468✔
158
            uint32_t k = 0;
604,554✔
159
            while (k < entries && data()[k].is_active()) {
1,428,546✔
160
                ++k;
823,992✔
161
            }
823,992✔
162
            if (k == entries)
604,554✔
163
                return nullptr;     // no free entries
66✔
164
            allocating.exchange(k); // barrier: prevent upward movement of instructions below
604,488✔
165
            i = k;
604,488✔
166
        }
604,488✔
167
        auto& rc = data()[i];
689,067✔
168
        REALM_ASSERT(rc.count_frozen == 0);
689,034✔
169
        REALM_ASSERT(rc.count_live == 0);
689,034✔
170
        REALM_ASSERT(rc.count_full == 0);
689,034✔
171
        rc.current_top = top;
689,034✔
172
        rc.filesize = size;
689,034✔
173
        rc.activate(version);
689,034✔
174
        newest.store(i); // barrier: prevent downward movement of instructions above
689,034✔
175
        return &rc;
689,034✔
176
    }
689,100✔
177

178
    uint32_t index_of(const ReadCount& rc) noexcept
179
    {
513,633✔
180
        return (uint32_t)(&rc - data());
513,633✔
181
    }
513,633✔
182

183
    void free_entry(ReadCount* rc) noexcept
184
    {
513,645✔
185
        rc->current_top = rc->filesize = -1ULL; // easy to recognize in debugger
513,645✔
186
        rc->deactivate();
513,645✔
187
    }
513,645✔
188

189
    // This method resets the version list to an empty state, then allocates an entry.
190
    // Precondition: This should *only* be done if the caller has established that she
191
    // is the only thread/process that has access to the VersionList. It is currently
192
    // called from init_versioning(), which is called by DB::open() under the
193
    // condition that it is the session initiator and under guard by the control mutex,
194
    // thus ensuring the precondition. It is also called from compact() in a similar situation.
195
    // It is most likely not suited for any other use.
196
    ReadCount& init_versioning(uint64_t top, uint64_t filesize, uint64_t version) noexcept
197
    {
84,525✔
198
        newest = nil;
84,525✔
199
        allocating = 0;
84,525✔
200
        auto t_free = entries;
84,525✔
201
        entries = 0;
84,525✔
202
        reserve(t_free);
84,525✔
203
        return *try_allocate_entry(top, filesize, version);
84,525✔
204
    }
84,525✔
205

206
    void purge_versions(uint64_t& oldest_live_v, TopRefMap& top_refs, bool& any_new_unreachables)
207
    {
604,503✔
208
        oldest_live_v = std::numeric_limits<uint64_t>::max();
604,503✔
209
        auto oldest_full_v = std::numeric_limits<uint64_t>::max();
604,503✔
210
        any_new_unreachables = false;
604,503✔
211
        // correct case where an earlier crash may have left the entry at 'allocating' partially initialized:
306,387✔
212
        const auto index_of_newest = newest.load();
604,503✔
213
        if (auto a = allocating.load(); a != index_of_newest) {
604,503✔
214
            data()[a].deactivate();
×
215
        }
×
216
        // determine fully locked versions - after one of those all versions are considered live.
306,387✔
217
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,624,439✔
218
            if (!rc->is_active())
20,019,936✔
219
                continue;
17,915,919✔
220
            if (rc->count_full) {
2,104,017✔
221
                if (rc->version < oldest_full_v)
×
222
                    oldest_full_v = rc->version;
×
223
            }
×
224
        }
2,104,017✔
225
        // collect reachable versions and determine oldest live reachable version
306,387✔
226
        // (oldest reachable version is the first entry in the top_refs map, so no need to find it explicitly)
306,387✔
227
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,626,239✔
228
            if (!rc->is_active())
20,021,736✔
229
                continue;
17,917,848✔
230
            if (rc->count_frozen || rc->count_live || rc->version >= oldest_full_v) {
2,103,888✔
231
                // entry is still reachable
812,106✔
232
                top_refs.emplace(rc->version, VersionInfo{to_ref(rc->current_top), to_ref(rc->filesize)});
1,590,822✔
233
            }
1,590,822✔
234
            if (rc->count_live || rc->version >= oldest_full_v) {
2,103,888✔
235
                if (rc->version < oldest_live_v)
1,030,257✔
236
                    oldest_live_v = rc->version;
684,315✔
237
            }
1,030,257✔
238
        }
2,103,888✔
239
        // we must have found at least one reachable version
306,387✔
240
        REALM_ASSERT(top_refs.size());
604,503✔
241
        // free unreachable entries and determine if we want to trigger backdating
306,387✔
242
        uint64_t oldest_v = top_refs.begin()->first;
604,503✔
243
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,625,750✔
244
            if (!rc->is_active())
20,021,247✔
245
                continue;
17,917,422✔
246
            if (rc->count_frozen == 0 && rc->count_live == 0 && rc->version < oldest_full_v) {
2,103,825✔
247
                // entry is becoming unreachable.
261,636✔
248
                // if it is also younger than a reachable version, then set 'any_new_unreachables' to trigger
261,636✔
249
                // backdating
261,636✔
250
                if (rc->version > oldest_v) {
513,627✔
251
                    any_new_unreachables = true;
67,686✔
252
                }
67,686✔
253
                REALM_ASSERT(index_of(*rc) != index_of_newest);
513,627✔
254
                free_entry(rc);
513,627✔
255
            }
513,627✔
256
        }
2,103,825✔
257
        REALM_ASSERT(oldest_v != std::numeric_limits<uint64_t>::max());
604,503✔
258
        REALM_ASSERT(oldest_live_v != std::numeric_limits<uint64_t>::max());
604,503✔
259
    }
604,503✔
260

261
#if REALM_DEBUG
262
    void dump()
263
    {
×
264
        util::format(std::cout, "VersionList has %1 entries: \n", entries);
×
265
        for (auto* rc = data(); rc < data() + entries; ++rc) {
×
266
            util::format(std::cout, "[%1]: version %2, live: %3, full: %4, frozen: %5\n", index_of(*rc), rc->version,
×
267
                         rc->count_live, rc->count_full, rc->count_frozen);
×
268
        }
×
269
    }
×
270
#endif // REALM_DEBUG
271

272
    constexpr static uint32_t nil = (uint32_t)-1;
273
    const static int init_readers_size = 32;
274
    uint32_t entries;
275
    std::atomic<uint32_t> allocating; // atomic for crash safety, not threading
276
    std::atomic<uint32_t> newest;     // atomic for crash safety, not threading
277

278
    // IMPORTANT: The actual data comprising the version list MUST BE PLACED LAST in
279
    // the VersionList structure, as the data area is extended at run time.
280
    // Similarly, the VersionList must be the final element of the SharedInfo structure.
281
    // IMPORTANT II:
282
    // To ensure proper alignment across all platforms, the SharedInfo structure
283
    // should NOT have a stricter alignment requirement than the ReadCount structure.
284
    ReadCount m_data[init_readers_size];
285

286
    // Silence UBSan errors about out-of-bounds reads on m_data by casting to a pointer
287
    ReadCount* data() noexcept
288
    {
74,666,898✔
289
        return m_data;
74,666,898✔
290
    }
74,666,898✔
291
    const ReadCount* data() const noexcept
292
    {
×
293
        return m_data;
×
294
    }
×
295
};
296

297
// Using lambda rather than function so that shared_ptr shared state doesn't need to hold a function pointer.
298
constexpr auto TransactionDeleter = [](Transaction* t) {
1,679,334✔
299
    t->close();
1,679,334✔
300
    delete t;
1,679,334✔
301
};
1,679,334✔
302

303
template <typename... Args>
304
TransactionRef make_transaction_ref(Args&&... args)
305
{
1,681,824✔
306
    return TransactionRef(new Transaction(std::forward<Args>(args)...), TransactionDeleter);
1,681,824✔
307
}
1,681,824✔
308

309
} // anonymous namespace
310

311
namespace realm {
312

313
/// The structure of the contents of the per session `.lock` file. Note that
314
/// this file is transient in that it is recreated/reinitialized at the
315
/// beginning of every session. A session is any sequence of temporally
316
/// overlapping openings of a particular Realm file via DB objects. For
317
/// example, if there are two DB objects, A and B, and the file is
318
/// first opened via A, then opened via B, then closed via A, and finally closed
319
/// via B, then the session streaches from the opening via A to the closing via
320
/// B.
321
///
322
/// IMPORTANT: Remember to bump `g_shared_info_version` if anything is changed
323
/// in the memory layout of this class, or if the meaning of any of the stored
324
/// values change.
325
///
326
/// Members `init_complete`, `shared_info_version`, `size_of_mutex`, and
327
/// `size_of_condvar` may only be modified only while holding an exclusive lock
328
/// on the file, and may be read only while holding a shared (or exclusive) lock
329
/// on the file. All other members (except for the VersionList which has its own mutex)
330
/// may be accessed only while holding a lock on `controlmutex`.
331
///
332
/// SharedInfo must be 8-byte aligned. On 32-bit Apple platforms, mutexes store their
333
/// alignment as part of the mutex state. We're copying the SharedInfo (including
334
/// embedded but alway unlocked mutexes) and it must retain the same alignment
335
/// throughout.
336
struct alignas(8) DB::SharedInfo {
337
    /// Indicates that initialization of the lock file was completed
338
    /// sucessfully.
339
    ///
340
    /// CAUTION: This member must never move or change type, as that would
341
    /// compromize safety of the the session initiation process.
342
    std::atomic<uint8_t> init_complete; // Offset 0
343

344
    /// The size in bytes of a mutex member of SharedInfo. This allows all
345
    /// session participants to be in agreement. Obviously, a size match is not
346
    /// enough to guarantee identical layout internally in the mutex object, but
347
    /// it is hoped that it will catch some (if not most) of the cases where
348
    /// there is a layout discrepancy internally in the mutex object.
349
    uint8_t size_of_mutex; // Offset 1
350

351
    /// Like size_of_mutex, but for condition variable members of SharedInfo.
352
    uint8_t size_of_condvar; // Offset 2
353

354
    /// Set during the critical phase of a commit, when the logs, the VersionList
355
    /// and the database may be out of sync with respect to each other. If a
356
    /// writer crashes during this phase, there is no safe way of continuing
357
    /// with further write transactions. When beginning a write transaction,
358
    /// this must be checked and an exception thrown if set.
359
    ///
360
    /// Note that std::atomic<uint8_t> is guaranteed to have standard layout.
361
    std::atomic<uint8_t> commit_in_critical_phase = {0}; // Offset 3
362

363
    /// The target Realm file format version for the current session. This
364
    /// allows all session participants to be in agreement. It can only differ
365
    /// from what is returned by Group::get_file_format_version() temporarily,
366
    /// and only during the Realm file opening process. If it differs, it means
367
    /// that the file format needs to be upgraded from its current format
368
    /// (Group::get_file_format_version()), the format specified by this member
369
    /// of SharedInfo.
370
    uint8_t file_format_version; // Offset 4
371

372
    /// Stores a value of type Replication::HistoryType. Must match across all
373
    /// session participants.
374
    int8_t history_type; // Offset 5
375

376
    /// The SharedInfo layout version. This allows all session participants to
377
    /// be in agreement. Must be bumped if the layout of the SharedInfo
378
    /// structure is changed. Note, however, that only the part that lies beyond
379
    /// SharedInfoUnchangingLayout can have its layout changed.
380
    ///
381
    /// CAUTION: This member must never move or change type, as that would
382
    /// compromize version agreement checking.
383
    uint16_t shared_info_version = g_shared_info_version; // Offset 6
384

385
    uint16_t durability;           // Offset 8
386
    uint16_t free_write_slots = 0; // Offset 10
387

388
    /// Number of participating shared groups
389
    uint32_t num_participants = 0; // Offset 12
390

391
    /// Latest version number. Guarded by the controlmutex (for lock-free
392
    /// access, use get_version_of_latest_snapshot() instead)
393
    uint64_t latest_version_number; // Offset 16
394

395
    /// Pid of process initiating the session, but only if that process runs
396
    /// with encryption enabled, zero otherwise. This was used to prevent
397
    /// multiprocess encryption until support for that was added.
398
    uint64_t session_initiator_pid = 0; // Offset 24
399

400
    std::atomic<uint64_t> number_of_versions; // Offset 32
401

402
    /// True (1) if there is a sync agent present (a session participant acting
403
    /// as sync client). It is an error to have a session with more than one
404
    /// sync agent. The purpose of this flag is to prevent that from ever
405
    /// happening. If the sync agent crashes and leaves the flag set, the
406
    /// session will need to be restarted (lock file reinitialized) before a new
407
    /// sync agent can be started.
408
    uint8_t sync_agent_present = 0; // Offset 40
409

410
    /// Set when a participant decides to start the daemon, cleared by the
411
    /// daemon when it decides to exit. Participants check during open() and
412
    /// start the daemon if running in async mode.
413
    uint8_t daemon_started = 0; // Offset 41
414

415
    /// Set by the daemon when it is ready to handle commits. Participants must
416
    /// wait during open() on 'daemon_becomes_ready' for this to become true.
417
    /// Cleared by the daemon when it decides to exit.
418
    uint8_t daemon_ready = 0; // Offset 42
419

420
    uint8_t filler_1; // Offset 43
421

422
    /// Stores a history schema version (as returned by
423
    /// Replication::get_history_schema_version()). Must match across all
424
    /// session participants.
425
    uint16_t history_schema_version; // Offset 44
426

427
    uint16_t filler_2; // Offset 46
428

429
    InterprocessMutex::SharedPart shared_writemutex; // Offset 48
430
    InterprocessMutex::SharedPart shared_controlmutex;
431
    InterprocessMutex::SharedPart shared_versionlist_mutex;
432
    InterprocessCondVar::SharedPart room_to_write;
433
    InterprocessCondVar::SharedPart work_to_do;
434
    InterprocessCondVar::SharedPart daemon_becomes_ready;
435
    InterprocessCondVar::SharedPart new_commit_available;
436
    InterprocessCondVar::SharedPart pick_next_writer;
437
    std::atomic<uint32_t> next_ticket;
438
    std::atomic<uint32_t> next_served = 0;
439
    std::atomic<uint64_t> writing_page_offset;
440
    std::atomic<uint64_t> write_counter;
441

442
    // IMPORTANT: The VersionList MUST be the last field in SharedInfo - see above.
443
    VersionList readers;
444

445
    SharedInfo(Durability, Replication::HistoryType, int history_schema_version);
446
    ~SharedInfo() noexcept {}
25,422✔
447

448
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version)
449
    {
84,525✔
450
        // Create our first versioning entry:
41,607✔
451
        readers.init_versioning(top_ref, file_size, initial_version);
84,525✔
452
    }
84,525✔
453
};
454

455

456
DB::SharedInfo::SharedInfo(Durability dura, Replication::HistoryType ht, int hsv)
457
    : size_of_mutex(sizeof(shared_writemutex))
458
    , size_of_condvar(sizeof(room_to_write))
459
    , shared_writemutex()   // Throws
460
    , shared_controlmutex() // Throws
461
{
82,812✔
462
    durability = static_cast<uint16_t>(dura); // durability level is fixed from creation
82,812✔
463
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_type)>(ht + 0));
82,812✔
464
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_schema_version)>(hsv));
82,812✔
465
    history_type = ht;
82,812✔
466
    history_schema_version = static_cast<uint16_t>(hsv);
82,812✔
467
    InterprocessCondVar::init_shared_part(new_commit_available); // Throws
82,812✔
468
    InterprocessCondVar::init_shared_part(pick_next_writer);     // Throws
82,812✔
469
    next_ticket = 0;
82,812✔
470

40,848✔
471
// IMPORTANT: The offsets, types (, and meanings) of these members must
40,848✔
472
// never change, not even when the SharedInfo layout version is bumped. The
40,848✔
473
// eternal constancy of this part of the layout is what ensures that a
40,848✔
474
// joining session participant can reliably verify that the actual format is
40,848✔
475
// as expected.
40,848✔
476
#ifndef _WIN32
82,812✔
477
#pragma GCC diagnostic push
82,812✔
478
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
82,812✔
479
#endif
82,812✔
480
    static_assert(offsetof(SharedInfo, init_complete) == 0 && ATOMIC_BOOL_LOCK_FREE == 2 &&
82,812✔
481
                      std::is_same<decltype(init_complete), std::atomic<uint8_t>>::value &&
82,812✔
482
                      offsetof(SharedInfo, shared_info_version) == 6 &&
82,812✔
483
                      std::is_same<decltype(shared_info_version), uint16_t>::value,
82,812✔
484
                  "Forbidden change in SharedInfo layout");
82,812✔
485

40,848✔
486
    // Try to catch some of the memory layout changes that requires bumping of
40,848✔
487
    // the SharedInfo file format version (shared_info_version).
40,848✔
488
    static_assert(
82,812✔
489
        offsetof(SharedInfo, size_of_mutex) == 1 && std::is_same<decltype(size_of_mutex), uint8_t>::value &&
82,812✔
490
            offsetof(SharedInfo, size_of_condvar) == 2 && std::is_same<decltype(size_of_condvar), uint8_t>::value &&
82,812✔
491
            offsetof(SharedInfo, commit_in_critical_phase) == 3 &&
82,812✔
492
            std::is_same<decltype(commit_in_critical_phase), std::atomic<uint8_t>>::value &&
82,812✔
493
            offsetof(SharedInfo, file_format_version) == 4 &&
82,812✔
494
            std::is_same<decltype(file_format_version), uint8_t>::value && offsetof(SharedInfo, history_type) == 5 &&
82,812✔
495
            std::is_same<decltype(history_type), int8_t>::value && offsetof(SharedInfo, durability) == 8 &&
82,812✔
496
            std::is_same<decltype(durability), uint16_t>::value && offsetof(SharedInfo, free_write_slots) == 10 &&
82,812✔
497
            std::is_same<decltype(free_write_slots), uint16_t>::value &&
82,812✔
498
            offsetof(SharedInfo, num_participants) == 12 &&
82,812✔
499
            std::is_same<decltype(num_participants), uint32_t>::value &&
82,812✔
500
            offsetof(SharedInfo, latest_version_number) == 16 &&
82,812✔
501
            std::is_same<decltype(latest_version_number), uint64_t>::value &&
82,812✔
502
            offsetof(SharedInfo, session_initiator_pid) == 24 &&
82,812✔
503
            std::is_same<decltype(session_initiator_pid), uint64_t>::value &&
82,812✔
504
            offsetof(SharedInfo, number_of_versions) == 32 &&
82,812✔
505
            std::is_same<decltype(number_of_versions), std::atomic<uint64_t>>::value &&
82,812✔
506
            offsetof(SharedInfo, sync_agent_present) == 40 &&
82,812✔
507
            std::is_same<decltype(sync_agent_present), uint8_t>::value &&
82,812✔
508
            offsetof(SharedInfo, daemon_started) == 41 && std::is_same<decltype(daemon_started), uint8_t>::value &&
82,812✔
509
            offsetof(SharedInfo, daemon_ready) == 42 && std::is_same<decltype(daemon_ready), uint8_t>::value &&
82,812✔
510
            offsetof(SharedInfo, filler_1) == 43 && std::is_same<decltype(filler_1), uint8_t>::value &&
82,812✔
511
            offsetof(SharedInfo, history_schema_version) == 44 &&
82,812✔
512
            std::is_same<decltype(history_schema_version), uint16_t>::value && offsetof(SharedInfo, filler_2) == 46 &&
82,812✔
513
            std::is_same<decltype(filler_2), uint16_t>::value && offsetof(SharedInfo, shared_writemutex) == 48 &&
82,812✔
514
            std::is_same<decltype(shared_writemutex), InterprocessMutex::SharedPart>::value,
82,812✔
515
        "Caught layout change requiring SharedInfo file format bumping");
82,812✔
516
    static_assert(std::atomic<uint64_t>::is_always_lock_free);
82,812✔
517
#ifndef _WIN32
82,812✔
518
#pragma GCC diagnostic pop
82,812✔
519
#endif
82,812✔
520
}
82,812✔
521

522
class DB::VersionManager {
523
public:
524
    VersionManager(util::InterprocessMutex& mutex)
525
        : m_mutex(mutex)
526
    {
111,312✔
527
    }
111,312✔
528
    virtual ~VersionManager() {}
111,312✔
529

530
    void cleanup_versions(uint64_t& oldest_live_version, TopRefMap& top_refs, bool& any_new_unreachables)
531
        REQUIRES(!m_info_mutex)
532
    {
604,539✔
533
        std::lock_guard lock(m_mutex);
604,539✔
534
        util::CheckedLockGuard info_lock(m_info_mutex);
604,539✔
535
        ensure_reader_mapping();
604,539✔
536
        m_info->readers.purge_versions(oldest_live_version, top_refs, any_new_unreachables);
604,539✔
537
    }
604,539✔
538

539
    version_type get_newest_version() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
540
    {
604,512✔
541
        return get_version_id_of_latest_snapshot().version;
604,512✔
542
    }
604,512✔
543

544
    VersionID get_version_id_of_latest_snapshot() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
545
    {
6,509,025✔
546
        {
6,509,025✔
547
            // First check the local cache. This is an unlocked read, so it may
2,738,022✔
548
            // race with adding a new version. If this happens we'll either see
2,738,022✔
549
            // a stale value (acceptable for a racing write on one thread and
2,738,022✔
550
            // a read on another), or a new value which is guaranteed to not
2,738,022✔
551
            // be an active index in the local cache.
2,738,022✔
552
            util::CheckedLockGuard lock(m_local_readers_mutex);
6,509,025✔
553
            util::CheckedLockGuard info_lock(m_info_mutex);
6,509,025✔
554
            auto index = m_info->readers.newest.load();
6,509,025✔
555
            if (index < m_local_readers.size()) {
6,509,025✔
556
                auto& r = m_local_readers[index];
6,505,509✔
557
                if (r.is_active()) {
6,505,509✔
558
                    return {r.version, index};
6,483,033✔
559
                }
6,483,033✔
560
            }
25,992✔
561
        }
25,992✔
562

14,037✔
563
        std::lock_guard lock(m_mutex);
25,992✔
564
        util::CheckedLockGuard info_lock(m_info_mutex);
25,992✔
565
        auto index = m_info->readers.newest.load();
25,992✔
566
        ensure_reader_mapping(index);
25,992✔
567
        return {m_info->readers.get(index).version, index};
25,992✔
568
    }
25,992✔
569

570
    void release_read_lock(const ReadLockInfo& read_lock) REQUIRES(!m_local_readers_mutex, !m_info_mutex)
571
    {
3,889,053✔
572
        {
3,889,053✔
573
            util::CheckedLockGuard lock(m_local_readers_mutex);
3,889,053✔
574
            REALM_ASSERT(read_lock.m_reader_idx < m_local_readers.size());
3,889,053✔
575
            auto& r = m_local_readers[read_lock.m_reader_idx];
3,889,053✔
576
            auto& f = field_for_type(r, read_lock.m_type);
3,889,053✔
577
            REALM_ASSERT(f > 0);
3,889,053✔
578
            if (--f > 0)
3,889,053✔
579
                return;
2,388,219✔
580
            if (r.count_live == 0 && r.count_full == 0 && r.count_frozen == 0)
1,500,834✔
581
                r.version = 0;
1,482,480✔
582
        }
1,500,834✔
583

751,377✔
584
        std::lock_guard lock(m_mutex);
1,500,834✔
585
        util::CheckedLockGuard info_lock(m_info_mutex);
1,500,834✔
586
        // we should not need to call ensure_full_reader_mapping,
751,377✔
587
        // since releasing a read lock means it has been grabbed
751,377✔
588
        // earlier - and hence must reside in mapped memory:
751,377✔
589
        REALM_ASSERT(read_lock.m_reader_idx < m_local_max_entry);
1,500,834✔
590
        auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,500,834✔
591
        REALM_ASSERT(read_lock.m_version == r.version);
1,500,834✔
592
        --field_for_type(r, read_lock.m_type);
1,500,834✔
593
    }
1,500,834✔
594

595
    ReadLockInfo grab_read_lock(ReadLockInfo::Type type, VersionID version_id = {})
596
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
597
    {
3,888,849✔
598
        ReadLockInfo read_lock;
3,888,849✔
599
        if (try_grab_local_read_lock(read_lock, type, version_id))
3,888,849✔
600
            return read_lock;
2,388,570✔
601

750,942✔
602
        {
1,500,279✔
603
            const bool pick_specific = version_id.version != VersionID().version;
1,500,279✔
604
            std::lock_guard lock(m_mutex);
1,500,279✔
605
            util::CheckedLockGuard info_lock(m_info_mutex);
1,500,279✔
606
            auto newest = m_info->readers.newest.load();
1,500,279✔
607
            REALM_ASSERT(newest != VersionList::nil);
1,500,279✔
608
            read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
1,495,065✔
609
            ensure_reader_mapping((unsigned int)read_lock.m_reader_idx);
1,500,279✔
610
            bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
1,500,279✔
611
            auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,500,279✔
612
            if (pick_specific && version_id.version != r.version)
1,500,279✔
613
                throw BadVersion(version_id.version);
72✔
614
            if (!picked_newest) {
1,500,207✔
615
                if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
612✔
616
                    throw BadVersion(version_id.version);
×
617
                if (type != ReadLockInfo::Frozen && r.count_live == 0)
612✔
618
                    throw BadVersion(version_id.version);
60✔
619
            }
1,500,147✔
620
            populate_read_lock(read_lock, r, type);
1,500,147✔
621
        }
1,500,147✔
622

750,876✔
623
        {
1,500,147✔
624
            util::CheckedLockGuard local_lock(m_local_readers_mutex);
1,500,147✔
625
            grow_local_cache(read_lock.m_reader_idx + 1);
1,500,147✔
626
            auto& r2 = m_local_readers[read_lock.m_reader_idx];
1,500,147✔
627
            if (!r2.is_active()) {
1,500,147✔
628
                r2.version = read_lock.m_version;
1,482,426✔
629
                r2.filesize = read_lock.m_file_size;
1,482,426✔
630
                r2.current_top = read_lock.m_top_ref;
1,482,426✔
631
                r2.count_full = r2.count_live = r2.count_frozen = 0;
1,482,426✔
632
            }
1,482,426✔
633
            REALM_ASSERT_EX(field_for_type(r2, type) == 0, type, r2.count_full, r2.count_live, r2.count_frozen);
1,500,147✔
634
            field_for_type(r2, type) = 1;
1,500,147✔
635
        }
1,500,147✔
636

750,876✔
637
        return read_lock;
1,500,147✔
638
    }
1,500,147✔
639

640
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version) REQUIRES(!m_info_mutex)
641
    {
59,103✔
642
        std::lock_guard lock(m_mutex);
59,103✔
643
        util::CheckedLockGuard info_lock(m_info_mutex);
59,103✔
644
        m_info->init_versioning(top_ref, file_size, initial_version);
59,103✔
645
    }
59,103✔
646

647
    void add_version(ref_type new_top_ref, size_t new_file_size, uint64_t new_version) REQUIRES(!m_info_mutex)
648
    {
604,482✔
649
        std::lock_guard lock(m_mutex);
604,482✔
650
        util::CheckedLockGuard info_lock(m_info_mutex);
604,482✔
651
        ensure_reader_mapping();
604,482✔
652
        if (m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version)) {
604,482✔
653
            return;
604,425✔
654
        }
604,425✔
655
        // allocation failed, expand VersionList (and lockfile) and retry
30✔
656
        auto entries = m_info->readers.capacity();
57✔
657
        auto new_entries = entries + 32;
57✔
658
        expand_version_list(new_entries);
57✔
659
        m_local_max_entry = new_entries;
57✔
660
        m_info->readers.reserve(new_entries);
57✔
661
        auto success = m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version);
57✔
662
        REALM_ASSERT_EX(success, new_entries, new_version);
57✔
663
    }
57✔
664

665

666
private:
667
    void grow_local_cache(size_t new_size) REQUIRES(m_local_readers_mutex)
668
    {
1,501,167✔
669
        if (new_size > m_local_readers.size())
1,501,167✔
670
            m_local_readers.resize(new_size, VersionList::ReadCount{});
209,592✔
671
    }
1,501,167✔
672

673
    void populate_read_lock(ReadLockInfo& read_lock, VersionList::ReadCount& r, ReadLockInfo::Type type)
674
    {
3,888,717✔
675
        ++field_for_type(r, type);
3,888,717✔
676
        read_lock.m_type = type;
3,888,717✔
677
        read_lock.m_version = r.version;
3,888,717✔
678
        read_lock.m_top_ref = static_cast<ref_type>(r.current_top);
3,888,717✔
679
        read_lock.m_file_size = static_cast<size_t>(r.filesize);
3,888,717✔
680
    }
3,888,717✔
681

682
    bool try_grab_local_read_lock(ReadLockInfo& read_lock, ReadLockInfo::Type type, VersionID version_id)
683
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
684
    {
3,888,372✔
685
        const bool pick_specific = version_id.version != VersionID().version;
3,888,372✔
686
        auto index = version_id.index;
3,888,372✔
687
        if (!pick_specific) {
3,888,372✔
688
            util::CheckedLockGuard lock(m_info_mutex);
3,596,517✔
689
            index = m_info->readers.newest.load();
3,596,517✔
690
        }
3,596,517✔
691
        util::CheckedLockGuard local_lock(m_local_readers_mutex);
3,888,372✔
692
        if (index >= m_local_readers.size())
3,888,372✔
693
            return false;
209,592✔
694

2,514,645✔
695
        auto& r = m_local_readers[index];
3,678,780✔
696
        if (!r.is_active())
3,678,780✔
697
            return false;
1,272,960✔
698
        if (pick_specific && r.version != version_id.version)
2,405,820✔
699
            return false;
×
700
        if (field_for_type(r, type) == 0)
2,405,820✔
701
            return false;
18,684✔
702

1,866,045✔
703
        read_lock.m_reader_idx = index;
2,387,136✔
704
        populate_read_lock(read_lock, r, type);
2,387,136✔
705
        return true;
2,387,136✔
706
    }
2,387,136✔
707

708
    static uint32_t& field_for_type(VersionList::ReadCount& r, ReadLockInfo::Type type)
709
    {
14,680,362✔
710
        switch (type) {
14,680,362✔
711
            case ReadLockInfo::Frozen:
135,297✔
712
                return r.count_frozen;
135,297✔
713
            case ReadLockInfo::Live:
14,545,332✔
714
                return r.count_live;
14,545,332✔
715
            case ReadLockInfo::Full:
✔
716
                return r.count_full;
×
717
            default:
✔
718
                REALM_UNREACHABLE(); // silence a warning
719
        }
14,680,362✔
720
    }
14,680,362✔
721

722
    void mark_page_for_writing(uint64_t page_offset) REQUIRES(!m_info_mutex)
723
    {
2,613✔
724
        util::CheckedLockGuard info_lock(m_info_mutex);
2,613✔
725
        m_info->writing_page_offset = page_offset + 1;
2,613✔
726
        m_info->write_counter++;
2,613✔
727
    }
2,613✔
728
    void clear_writing_marker() REQUIRES(!m_info_mutex)
729
    {
2,613✔
730
        util::CheckedLockGuard info_lock(m_info_mutex);
2,613✔
731
        m_info->write_counter++;
2,613✔
732
        m_info->writing_page_offset = 0;
2,613✔
733
    }
2,613✔
734
    // returns false if no page is marked.
735
    // if a page is marked, returns true and optionally the offset of the page marked for writing
736
    // in all cases returns optionally the write counter
737
    bool observe_writer(uint64_t* page_offset, uint64_t* write_counter) REQUIRES(!m_info_mutex)
738
    {
90✔
739
        util::CheckedLockGuard info_lock(m_info_mutex);
90✔
740
        if (write_counter) {
90✔
741
            *write_counter = m_info->write_counter;
90✔
742
        }
90✔
743
        uint64_t marked = m_info->writing_page_offset;
90✔
744
        if (marked && page_offset) {
90!
745
            *page_offset = marked - 1;
×
746
        }
×
747
        return marked != 0;
90✔
748
    }
90✔
749

750
protected:
751
    util::InterprocessMutex& m_mutex;
752
    util::CheckedMutex m_local_readers_mutex;
753
    std::vector<VersionList::ReadCount> m_local_readers GUARDED_BY(m_local_readers_mutex);
754

755
    util::CheckedMutex m_info_mutex;
756
    unsigned int m_local_max_entry GUARDED_BY(m_info_mutex) = 0;
757
    SharedInfo* m_info GUARDED_BY(m_info_mutex) = nullptr;
758

759
    virtual void ensure_reader_mapping(unsigned int required = -1) REQUIRES(m_info_mutex) = 0;
760
    virtual void expand_version_list(unsigned new_entries) REQUIRES(m_info_mutex) = 0;
761
    friend class DB::EncryptionMarkerObserver;
762
};
763

764
class DB::FileVersionManager final : public DB::VersionManager {
765
public:
766
    FileVersionManager(File& file, util::InterprocessMutex& mutex)
767
        : VersionManager(mutex)
768
        , m_file(file)
769
    {
85,890✔
770
        size_t size = 0, required_size = sizeof(SharedInfo);
85,890✔
771
        while (size < required_size) {
171,780✔
772
            // Map the file without the lock held. This could result in the
42,150✔
773
            // mapping being too small and having to remap if the file is grown
42,150✔
774
            // concurrently, but if this is the case we should always see a bigger
42,150✔
775
            // size the next time.
42,150✔
776
            auto new_size = static_cast<size_t>(m_file.get_size());
85,890✔
777
            REALM_ASSERT(new_size > size);
85,890✔
778
            size = new_size;
85,890✔
779
            m_reader_map.remap(m_file, File::access_ReadWrite, size, File::map_NoSync);
85,890✔
780
            m_info = m_reader_map.get_addr();
85,890✔
781

42,150✔
782
            std::lock_guard lock(m_mutex);
85,890✔
783
            m_local_max_entry = m_info->readers.capacity();
85,890✔
784
            required_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry);
85,890✔
785
            REALM_ASSERT(required_size >= size);
85,890✔
786
        }
85,890✔
787
    }
85,890✔
788

789
    void expand_version_list(unsigned new_entries) override REQUIRES(m_info_mutex)
790
    {
66✔
791
        size_t new_info_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(new_entries);
66✔
792
        m_file.prealloc(new_info_size);                                          // Throws
66✔
793
        m_reader_map.remap(m_file, util::File::access_ReadWrite, new_info_size); // Throws
66✔
794
        m_info = m_reader_map.get_addr();
66✔
795
    }
66✔
796

797
private:
798
    void ensure_reader_mapping(unsigned int required = -1) override REQUIRES(m_info_mutex)
799
    {
2,400,822✔
800
        using _impl::SimulatedFailure;
2,400,822✔
801
        SimulatedFailure::trigger(SimulatedFailure::shared_group__grow_reader_mapping); // Throws
2,400,822✔
802

1,210,899✔
803
        if (required < m_local_max_entry)
2,400,822✔
804
            return;
1,355,169✔
805

531,201✔
806
        auto new_max_entry = m_info->readers.capacity();
1,045,653✔
807
        if (new_max_entry > m_local_max_entry) {
1,045,653✔
808
            // handle mapping expansion if required
480✔
809
            size_t info_size = sizeof(DB::SharedInfo) + m_info->readers.compute_required_space(new_max_entry);
960✔
810
            m_reader_map.remap(m_file, util::File::access_ReadWrite, info_size); // Throws
960✔
811
            m_local_max_entry = new_max_entry;
960✔
812
            m_info = m_reader_map.get_addr();
960✔
813
        }
960✔
814
    }
1,045,653✔
815

816
    File& m_file;
817
    File::Map<DB::SharedInfo> m_reader_map;
818

819
    friend class DB::EncryptionMarkerObserver;
820
};
821

822
// adapter class for marking/observing encrypted writes
823
class DB::EncryptionMarkerObserver : public util::WriteMarker, public util::WriteObserver {
824
public:
825
    EncryptionMarkerObserver(DB::VersionManager& vm)
826
        : vm(vm)
827
    {
85,890✔
828
    }
85,890✔
829
    bool no_concurrent_writer_seen() override
830
    {
90✔
831
        uint64_t tmp_write_count;
90✔
832
        auto page_may_have_been_written = vm.observe_writer(nullptr, &tmp_write_count);
90✔
833
        if (tmp_write_count != last_seen_count) {
90✔
834
            page_may_have_been_written = true;
×
835
            last_seen_count = tmp_write_count;
×
836
        }
×
837
        if (page_may_have_been_written) {
90✔
838
            calls_since_last_writer_observed = 0;
×
839
            return false;
×
840
        }
×
841
        ++calls_since_last_writer_observed;
90✔
842
        constexpr size_t max_calls = 5; // an arbitrary handful, > 1
90✔
843
        return (calls_since_last_writer_observed >= max_calls);
90✔
844
    }
90✔
845
    void mark(uint64_t pos) override
846
    {
2,613✔
847
        vm.mark_page_for_writing(pos);
2,613✔
848
    }
2,613✔
849
    void unmark() override
850
    {
2,613✔
851
        vm.clear_writing_marker();
2,613✔
852
    }
2,613✔
853

854
private:
855
    DB::VersionManager& vm;
856
    uint64_t last_seen_count = 0;
857
    size_t calls_since_last_writer_observed = 0;
858
};
859

860
class DB::InMemoryVersionManager final : public DB::VersionManager {
861
public:
862
    InMemoryVersionManager(SharedInfo* info, util::InterprocessMutex& mutex)
863
        : VersionManager(mutex)
864
    {
25,422✔
865
        m_info = info;
25,422✔
866
        m_local_max_entry = m_info->readers.capacity();
25,422✔
867
    }
25,422✔
868
    void expand_version_list(unsigned) override
869
    {
×
870
        REALM_ASSERT(false);
×
871
    }
×
872

873
private:
874
    void ensure_reader_mapping(unsigned int) override {}
335,073✔
875
};
876

877
#if REALM_HAVE_STD_FILESYSTEM
878
std::string DBOptions::sys_tmp_dir = std::filesystem::temp_directory_path().string();
879
#else
880
std::string DBOptions::sys_tmp_dir = getenv("TMPDIR") ? getenv("TMPDIR") : "";
881
#endif
882

883
// NOTES ON CREATION AND DESTRUCTION OF SHARED MUTEXES:
884
//
885
// According to the 'process-sharing example' in the POSIX man page
886
// for pthread_mutexattr_init() other processes may continue to use a
887
// process-shared mutex after exit of the process that initialized
888
// it. Also, the example does not contain any call to
889
// pthread_mutex_destroy(), so apparently a process-shared mutex need
890
// not be destroyed at all, nor can it be that a process-shared mutex
891
// is associated with any resources that are local to the initializing
892
// process, because that would imply a leak.
893
//
894
// While it is not explicitly guaranteed in the man page, we shall
895
// assume that is is valid to initialize a process-shared mutex twice
896
// without an intervening call to pthread_mutex_destroy(). We need to
897
// be able to reinitialize a process-shared mutex if the first
898
// initializing process crashes and leaves the shared memory in an
899
// undefined state.
900

901
void DB::open(const std::string& path, bool no_create_file, const DBOptions& options)
902
{
85,674✔
903
    // Exception safety: Since do_open() is called from constructors, if it
42,060✔
904
    // throws, it must leave the file closed.
42,060✔
905
    using util::format;
85,674✔
906

42,060✔
907
    REALM_ASSERT(!is_attached());
85,674✔
908
    REALM_ASSERT(path.size());
85,674✔
909

42,060✔
910
    m_db_path = path;
85,674✔
911

42,060✔
912
    set_logger(options.logger);
85,674✔
913
    if (m_replication) {
85,674✔
914
        m_replication->set_logger(m_logger.get());
58,461✔
915
    }
58,461✔
916
    if (m_logger) {
85,674✔
917
        m_logger->log(util::Logger::Level::detail, "Open file: %1", path);
71,391✔
918
    }
71,391✔
919
    SlabAlloc& alloc = m_alloc;
85,674✔
920
    ref_type top_ref = 0;
85,674✔
921

42,060✔
922
    if (options.is_immutable) {
85,674✔
923
        SlabAlloc::Config cfg;
186✔
924
        cfg.read_only = true;
186✔
925
        cfg.no_create = true;
186✔
926
        cfg.encryption_key = options.encryption_key;
186✔
927
        top_ref = alloc.attach_file(path, cfg);
186✔
928
        SlabAlloc::DetachGuard dg(alloc);
186✔
929
        Group::read_only_version_check(alloc, top_ref, path);
186✔
930
        m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, m_alloc.get_baseline());
186✔
931
        dg.release();
186✔
932
        return;
186✔
933
    }
186✔
934
    std::string lockfile_path = get_core_file(path, CoreFileType::Lock);
85,488✔
935
    std::string coordination_dir = get_core_file(path, CoreFileType::Management);
85,488✔
936
    std::string lockfile_prefix = coordination_dir + "/access_control";
85,488✔
937
    m_alloc.set_read_only(false);
85,488✔
938

41,967✔
939
    Replication::HistoryType openers_hist_type = Replication::hist_None;
85,488✔
940
    int openers_hist_schema_version = 0;
85,488✔
941
    if (Replication* repl = get_replication()) {
85,488✔
942
        openers_hist_type = repl->get_history_type();
58,461✔
943
        openers_hist_schema_version = repl->get_history_schema_version();
58,461✔
944
    }
58,461✔
945

41,967✔
946
    int current_file_format_version;
85,488✔
947
    int target_file_format_version;
85,488✔
948
    int stored_hist_schema_version = -1; // Signals undetermined
85,488✔
949

41,967✔
950
    int retries_left = 10; // number of times to retry before throwing exceptions
85,488✔
951
    // in case there is something wrong with the .lock file... the retries allows
41,967✔
952
    // us to pick a new lockfile initializer in case the first one crashes without
41,967✔
953
    // completing the initialization
41,967✔
954
    std::default_random_engine random_gen;
85,488✔
955
    for (;;) {
162,348✔
956

80,076✔
957
        // if we're retrying, we first wait a random time
80,076✔
958
        if (retries_left < 10) {
162,348✔
959
            if (retries_left == 9) { // we seed it from a true random source if possible
240✔
960
                std::random_device r;
24✔
961
                random_gen.seed(r());
24✔
962
            }
24✔
963
            int max_delay = (10 - retries_left) * 10;
240✔
964
            int msecs = random_gen() % max_delay;
240✔
965
            millisleep(msecs);
240✔
966
        }
240✔
967

80,076✔
968
        m_file.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
162,348✔
969
        File::CloseGuard fcg(m_file);
162,348✔
970
        m_file.set_fifo_path(coordination_dir, "lock.fifo");
162,348✔
971

80,076✔
972
        if (m_file.try_rw_lock_exclusive()) { // Throws
162,348✔
973
            File::UnlockGuard ulg(m_file);
57,390✔
974

28,137✔
975
            // We're alone in the world, and it is Ok to initialize the
28,137✔
976
            // file. Start by truncating the file to zero to ensure that
28,137✔
977
            // the following resize will generate a file filled with zeroes.
28,137✔
978
            //
28,137✔
979
            // This will in particular set m_init_complete to 0.
28,137✔
980
            m_file.resize(0);
57,390✔
981
            m_file.prealloc(sizeof(SharedInfo));
57,390✔
982

28,137✔
983
            // We can crash anytime during this process. A crash prior to
28,137✔
984
            // the first resize could allow another thread which could not
28,137✔
985
            // get the exclusive lock because we hold it, and hence were
28,137✔
986
            // waiting for the shared lock instead, to observe and use an
28,137✔
987
            // old lock file.
28,137✔
988
            m_file_map.map(m_file, File::access_ReadWrite, sizeof(SharedInfo), File::map_NoSync); // Throws
57,390✔
989
            File::UnmapGuard fug(m_file_map);
57,390✔
990
            SharedInfo* info = m_file_map.get_addr();
57,390✔
991

28,137✔
992
            new (info) SharedInfo{options.durability, openers_hist_type, openers_hist_schema_version}; // Throws
57,390✔
993

28,137✔
994
            // Because init_complete is an std::atomic, it's guaranteed not to be observable by others
28,137✔
995
            // as being 1 before the entire SharedInfo header has been written.
28,137✔
996
            info->init_complete = 1;
57,390✔
997
        }
57,390✔
998

80,076✔
999
// We hold the shared lock from here until we close the file!
80,076✔
1000
#if REALM_PLATFORM_APPLE
82,272✔
1001
        // macOS has a bug which can cause a hang waiting to obtain a lock, even
1002
        // if the lock is already open in shared mode, so we work around it by
1003
        // busy waiting. This should occur only briefly during session initialization.
1004
        while (!m_file.try_rw_lock_shared()) {
82,776✔
1005
            sched_yield();
504✔
1006
        }
504✔
1007
#else
1008
        m_file.rw_lock_shared(); // Throws
80,076✔
1009
#endif
80,076✔
1010
        File::UnlockGuard ulg(m_file);
162,348✔
1011

80,076✔
1012
        // The coordination/management dir is created as a side effect of the lock
80,076✔
1013
        // operation above if needed for lock emulation. But it may also be needed
80,076✔
1014
        // for other purposes, so make sure it exists.
80,076✔
1015
        // in worst case there'll be a race on creating this directory.
80,076✔
1016
        // This should be safe but a waste of resources.
80,076✔
1017
        // Unfortunately it cannot be created at an earlier point, because
80,076✔
1018
        // it may then be deleted during the above lock_shared() operation.
80,076✔
1019
        try_make_dir(coordination_dir);
162,348✔
1020

80,076✔
1021
        // If the file is not completely initialized at this point in time, the
80,076✔
1022
        // preceeding initialization attempt must have failed. We know that an
80,076✔
1023
        // initialization process was in progress, because this thread (or
80,076✔
1024
        // process) failed to get an exclusive lock on the file. Because this
80,076✔
1025
        // thread (or process) currently has a shared lock on the file, we also
80,076✔
1026
        // know that the initialization process can no longer be in progress, so
80,076✔
1027
        // the initialization must either have completed or failed at this time.
80,076✔
1028

80,076✔
1029
        // The file is taken to be completely initialized if it is large enough
80,076✔
1030
        // to contain the `init_complete` field, and `init_complete` is true. If
80,076✔
1031
        // the file was not completely initialized, this thread must give up its
80,076✔
1032
        // shared lock, and retry to become the initializer. Eventually, one of
80,076✔
1033
        // two things must happen; either this thread, or another thread
80,076✔
1034
        // succeeds in completing the initialization, or this thread becomes the
80,076✔
1035
        // initializer, and fails the initialization. In either case, the retry
80,076✔
1036
        // loop will eventually terminate.
80,076✔
1037

80,076✔
1038
        // An empty file is (and was) never a successfully initialized file.
80,076✔
1039
        size_t info_size = sizeof(SharedInfo);
162,348✔
1040
        {
162,348✔
1041
            auto file_size = m_file.get_size();
162,348✔
1042
            if (util::int_less_than(file_size, info_size)) {
162,348✔
1043
                if (file_size == 0)
76,251✔
1044
                    continue; // Retry
54,669✔
1045
                info_size = size_t(file_size);
21,582✔
1046
            }
21,582✔
1047
        }
162,348✔
1048

80,076✔
1049
        // Map the initial section of the SharedInfo file that corresponds to
80,076✔
1050
        // the SharedInfo struct, or less if the file is smaller. We know that
80,076✔
1051
        // we have at least one byte, and that is enough to read the
80,076✔
1052
        // `init_complete` flag.
80,076✔
1053
        m_file_map.map(m_file, File::access_ReadWrite, info_size, File::map_NoSync);
140,487✔
1054
        File::UnmapGuard fug_1(m_file_map);
107,679✔
1055
        SharedInfo* info = m_file_map.get_addr();
107,679✔
1056

47,268✔
1057
#ifndef _WIN32
107,679✔
1058
#pragma GCC diagnostic push
107,679✔
1059
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
107,679✔
1060
#endif
107,679✔
1061
        static_assert(offsetof(SharedInfo, init_complete) + sizeof SharedInfo::init_complete <= 1,
107,679✔
1062
                      "Unexpected position or size of SharedInfo::init_complete");
107,679✔
1063
#ifndef _WIN32
107,679✔
1064
#pragma GCC diagnostic pop
107,679✔
1065
#endif
107,679✔
1066
        if (info->init_complete == 0)
107,679✔
1067
            continue;
21,516✔
1068
        REALM_ASSERT(info->init_complete == 1);
86,163✔
1069

42,288✔
1070
        // At this time, we know that the file was completely initialized, but
42,288✔
1071
        // we still need to verify that is was initialized with the memory
42,288✔
1072
        // layout expected by this session participant. We could find that it is
42,288✔
1073
        // initializaed with a different memory layout if other concurrent
42,288✔
1074
        // session participants use different versions of the core library.
42,288✔
1075
        if (info_size < sizeof(SharedInfo)) {
86,163✔
1076
            if (retries_left) {
66✔
1077
                --retries_left;
60✔
1078
                continue;
60✔
1079
            }
60✔
1080
            throw IncompatibleLockFile(path, format("Architecture mismatch: SharedInfo size is %1 but should be %2.",
6✔
1081
                                                    info_size, sizeof(SharedInfo)));
6✔
1082
        }
6✔
1083
        if (info->shared_info_version != g_shared_info_version) {
86,097✔
1084
            if (retries_left) {
66✔
1085
                --retries_left;
60✔
1086
                continue;
60✔
1087
            }
60✔
1088
            throw IncompatibleLockFile(path, format("Version mismatch: SharedInfo version is %1 but should be %2.",
6✔
1089
                                                    info->shared_info_version, g_shared_info_version));
6✔
1090
        }
6✔
1091
        // Validate compatible sizes of mutex and condvar types. Sizes of all
42,222✔
1092
        // other fields are architecture independent, so if condvar and mutex
42,222✔
1093
        // sizes match, the entire struct matches. The offsets of
42,222✔
1094
        // `size_of_mutex` and `size_of_condvar` are known to be as expected due
42,222✔
1095
        // to the preceeding check in `shared_info_version`.
42,222✔
1096
        if (info->size_of_mutex != sizeof info->shared_controlmutex) {
86,031✔
1097
            if (retries_left) {
66✔
1098
                --retries_left;
60✔
1099
                continue;
60✔
1100
            }
60✔
1101
            throw IncompatibleLockFile(path, format("Architecture mismatch: Mutex size is %1 but should be %2.",
6✔
1102
                                                    info->size_of_mutex, sizeof(info->shared_controlmutex)));
6✔
1103
        }
6✔
1104

42,189✔
1105
        if (info->size_of_condvar != sizeof info->room_to_write) {
85,965✔
1106
            if (retries_left) {
66✔
1107
                --retries_left;
60✔
1108
                continue;
60✔
1109
            }
60✔
1110
            throw IncompatibleLockFile(
6✔
1111
                path, format("Architecture mismatch: Condition variable size is %1 but should be %2.",
6✔
1112
                             info->size_of_condvar, sizeof(info->room_to_write)));
6✔
1113
        }
6✔
1114
        m_writemutex.set_shared_part(info->shared_writemutex, lockfile_prefix, "write");
85,899✔
1115
        m_controlmutex.set_shared_part(info->shared_controlmutex, lockfile_prefix, "control");
85,899✔
1116
        m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, lockfile_prefix, "versions");
85,899✔
1117

42,156✔
1118
        // even though fields match wrt alignment and size, there may still be incompatibilities
42,156✔
1119
        // between implementations, so lets ask one of the mutexes if it thinks it'll work.
42,156✔
1120
        if (!m_controlmutex.is_valid()) {
85,899✔
1121
            throw IncompatibleLockFile(
×
1122
                path, "Control mutex is invalid. This suggests that incompatible pthread libraries are in use.");
×
1123
        }
×
1124

42,156✔
1125
        // OK! lock file appears valid. We can now continue operations under the protection
42,156✔
1126
        // of the controlmutex. The controlmutex protects the following activities:
42,156✔
1127
        // - attachment of the database file
42,156✔
1128
        // - start of the async daemon
42,156✔
1129
        // - stop of the async daemon
42,156✔
1130
        // - restore of a backup, if desired
42,156✔
1131
        // - backup of the realm file in preparation of file format upgrade
42,156✔
1132
        // - DB beginning/ending a session
42,156✔
1133
        // - Waiting for and signalling database changes
42,156✔
1134
        {
85,899✔
1135
            std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
85,899✔
1136
            auto version_manager = std::make_unique<FileVersionManager>(m_file, m_versionlist_mutex);
85,899✔
1137

42,156✔
1138
            // proceed to initialize versioning and other metadata information related to
42,156✔
1139
            // the database. Also create the database if we're beginning a new session
42,156✔
1140
            bool begin_new_session = (info->num_participants == 0);
85,899✔
1141
            SlabAlloc::Config cfg;
85,899✔
1142
            cfg.session_initiator = begin_new_session;
85,899✔
1143
            cfg.is_shared = true;
85,899✔
1144
            cfg.read_only = false;
85,899✔
1145
            cfg.skip_validate = !begin_new_session;
85,899✔
1146
            cfg.disable_sync = options.durability == Durability::MemOnly || options.durability == Durability::Unsafe;
85,899✔
1147

42,156✔
1148
            // only the session initiator is allowed to create the database, all other
42,156✔
1149
            // must assume that it already exists.
42,156✔
1150
            cfg.no_create = (begin_new_session ? no_create_file : true);
72,567✔
1151

42,156✔
1152
            // if we're opening a MemOnly file that isn't already opened by
42,156✔
1153
            // someone else then it's a file which should have been deleted on
42,156✔
1154
            // close previously, but wasn't (perhaps due to the process crashing)
42,156✔
1155
            cfg.clear_file = (options.durability == Durability::MemOnly && begin_new_session);
85,899✔
1156

42,156✔
1157
            cfg.encryption_key = options.encryption_key;
85,899✔
1158
            m_marker_observer = std::make_unique<EncryptionMarkerObserver>(*version_manager);
85,899✔
1159
            try {
85,899✔
1160
                top_ref = alloc.attach_file(path, cfg, m_marker_observer.get()); // Throws
85,899✔
1161
            }
85,899✔
1162
            catch (const SlabAlloc::Retry&) {
42,156✔
1163
                // On a SlabAlloc::Retry file mappings are already unmapped, no
1164
                // need to do more
1165
                continue;
×
1166
            }
×
1167

42,111✔
1168
            // Determine target file format version for session (upgrade
42,111✔
1169
            // required if greater than file format version of attached file).
42,111✔
1170
            current_file_format_version = alloc.get_committed_file_format_version();
85,812✔
1171
            target_file_format_version =
85,812✔
1172
                Group::get_target_file_format_version_for_session(current_file_format_version, openers_hist_type);
85,812✔
1173
            BackupHandler backup(path, options.accepted_versions, options.to_be_deleted);
85,812✔
1174
            if (backup.must_restore_from_backup(current_file_format_version)) {
85,812✔
1175
                // we need to unmap before any file ops that'll change the realm
6✔
1176
                // file:
6✔
1177
                // (only strictly needed for Windows)
6✔
1178
                alloc.detach();
12✔
1179
                backup.restore_from_backup();
12✔
1180
                // finally, retry with the restored file instead of the original
6✔
1181
                // one:
6✔
1182
                continue;
12✔
1183
            }
12✔
1184
            backup.cleanup_backups();
85,800✔
1185

42,105✔
1186
            // From here on, if we fail in any way, we must detach the
42,105✔
1187
            // allocator.
42,105✔
1188
            SlabAlloc::DetachGuard alloc_detach_guard(alloc);
85,800✔
1189
            alloc.note_reader_start(this);
85,800✔
1190
            // must come after the alloc detach guard
42,105✔
1191
            auto handler = [this, &alloc]() noexcept {
85,800✔
1192
                alloc.note_reader_end(this);
85,800✔
1193
            };
85,800✔
1194
            auto reader_end_guard = make_scope_exit(handler);
85,800✔
1195

42,105✔
1196
            // Check validity of top array (to give more meaningful errors
42,105✔
1197
            // early)
42,105✔
1198
            if (top_ref) {
85,800✔
1199
                try {
44,157✔
1200
                    alloc.note_reader_start(this);
44,157✔
1201
                    auto reader_end_guard = make_scope_exit([&]() noexcept {
44,157✔
1202
                        alloc.note_reader_end(this);
44,157✔
1203
                    });
44,157✔
1204
                    Array top{alloc};
44,157✔
1205
                    top.init_from_ref(top_ref);
44,157✔
1206
                    Group::validate_top_array(top, alloc);
44,157✔
1207
                }
44,157✔
1208
                catch (const InvalidDatabase& e) {
21,867✔
1209
                    if (e.get_path().empty()) {
×
1210
                        throw InvalidDatabase(e.what(), path);
×
1211
                    }
×
1212
                    throw;
×
1213
                }
×
1214
            }
44,157✔
1215
            if (options.backup_at_file_format_change) {
85,800✔
1216
                backup.backup_realm_if_needed(current_file_format_version, target_file_format_version);
85,791✔
1217
            }
85,791✔
1218
            using gf = _impl::GroupFriend;
85,800✔
1219
            bool file_format_ok;
85,800✔
1220
            // In shared mode (Realm file opened via a DB instance) this
42,105✔
1221
            // version of the core library is able to open Realms using file format
42,105✔
1222
            // versions listed below. Please see Group::get_file_format_version() for
42,105✔
1223
            // information about the individual file format versions.
42,105✔
1224
            if (current_file_format_version == 0) {
85,800✔
1225
                file_format_ok = (top_ref == 0);
41,643✔
1226
            }
41,643✔
1227
            else {
44,157✔
1228
                file_format_ok = backup.is_accepted_file_format(current_file_format_version);
44,157✔
1229
            }
44,157✔
1230

42,105✔
1231
            if (REALM_UNLIKELY(!file_format_ok)) {
85,800✔
1232
                throw UnsupportedFileFormatVersion(current_file_format_version);
12✔
1233
            }
12✔
1234

42,099✔
1235
            if (begin_new_session) {
85,788✔
1236
                // Determine version (snapshot number) and check history
29,031✔
1237
                // compatibility
29,031✔
1238
                version_type version = 0;
59,400✔
1239
                int stored_hist_type = 0;
59,400✔
1240
                gf::get_version_and_history_info(alloc, top_ref, version, stored_hist_type,
59,400✔
1241
                                                 stored_hist_schema_version);
59,400✔
1242
                bool good_history_type = false;
59,400✔
1243
                switch (openers_hist_type) {
59,400✔
1244
                    case Replication::hist_None:
4,473✔
1245
                        good_history_type = (stored_hist_type == Replication::hist_None);
4,473✔
1246
                        if (!good_history_type)
4,473✔
1247
                            throw IncompatibleHistories(
6✔
1248
                                util::format("Realm file at path '%1' has history type '%2', but is being opened "
6✔
1249
                                             "with replication disabled.",
6✔
1250
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1251
                                path);
6✔
1252
                        break;
4,467✔
1253
                    case Replication::hist_OutOfRealm:
2,214✔
1254
                        REALM_ASSERT(false); // No longer in use
×
1255
                        break;
×
1256
                    case Replication::hist_InRealm:
24,468✔
1257
                        good_history_type = (stored_hist_type == Replication::hist_InRealm ||
24,468✔
1258
                                             stored_hist_type == Replication::hist_None);
16,062✔
1259
                        if (!good_history_type)
24,468✔
1260
                            throw IncompatibleHistories(
6✔
1261
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1262
                                             "local history mode.",
6✔
1263
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1264
                                path);
6✔
1265
                        break;
24,462✔
1266
                    case Replication::hist_SyncClient:
28,800✔
1267
                        good_history_type = ((stored_hist_type == Replication::hist_SyncClient) || (top_ref == 0));
28,800✔
1268
                        if (!good_history_type)
28,800✔
1269
                            throw IncompatibleHistories(
6✔
1270
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1271
                                             "synchronized history mode.",
6✔
1272
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1273
                                path);
6✔
1274
                        break;
28,794✔
1275
                    case Replication::hist_SyncServer:
15,108✔
1276
                        good_history_type = ((stored_hist_type == Replication::hist_SyncServer) || (top_ref == 0));
1,656✔
1277
                        if (!good_history_type)
1,656✔
1278
                            throw IncompatibleHistories(
×
1279
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
×
1280
                                             "server history mode.",
×
1281
                                             path, Replication::history_type_name(stored_hist_type)),
×
1282
                                path);
×
1283
                        break;
1,656✔
1284
                }
59,379✔
1285

29,019✔
1286
                REALM_ASSERT(stored_hist_schema_version >= 0);
59,379✔
1287
                if (stored_hist_schema_version > openers_hist_schema_version)
59,379✔
1288
                    throw IncompatibleHistories(
×
1289
                        util::format("Unexpected future history schema version %1, current schema %2",
×
1290
                                     stored_hist_schema_version, openers_hist_schema_version),
×
1291
                        path);
×
1292
                bool need_hist_schema_upgrade =
59,379✔
1293
                    (stored_hist_schema_version < openers_hist_schema_version && top_ref != 0);
59,379✔
1294
                if (need_hist_schema_upgrade) {
59,379✔
1295
                    Replication* repl = get_replication();
348✔
1296
                    if (!repl->is_upgradable_history_schema(stored_hist_schema_version))
348✔
1297
                        throw IncompatibleHistories(util::format("Nonupgradable history schema %1, current schema %2",
×
1298
                                                                 stored_hist_schema_version,
×
1299
                                                                 openers_hist_schema_version),
×
1300
                                                    path);
×
1301
                }
59,379✔
1302

29,019✔
1303
                bool need_file_format_upgrade =
59,379✔
1304
                    current_file_format_version < target_file_format_version && top_ref != 0;
59,379✔
1305
                if (!options.allow_file_format_upgrade && (need_hist_schema_upgrade || need_file_format_upgrade)) {
59,379✔
1306
                    throw FileFormatUpgradeRequired(m_db_path);
6✔
1307
                }
6✔
1308

29,016✔
1309
                alloc.convert_from_streaming_form(top_ref);
59,373✔
1310
                try {
59,373✔
1311
                    bool file_changed_size = alloc.align_filesize_for_mmap(top_ref, cfg);
59,373✔
1312
                    if (file_changed_size) {
59,373✔
1313
                        // we need to re-establish proper mappings after file size change.
195✔
1314
                        // we do this simply by aborting and starting all over:
195✔
1315
                        continue;
417✔
1316
                    }
417✔
1317
                }
×
1318
                // something went wrong. Retry.
1319
                catch (SlabAlloc::Retry&) {
×
1320
                    continue;
×
1321
                }
×
1322
                if (options.encryption_key) {
58,959✔
1323
#ifdef _WIN32
1324
                    uint64_t pid = GetCurrentProcessId();
1325
#else
1326
                    static_assert(sizeof(pid_t) <= sizeof(uint64_t), "process identifiers too large");
243✔
1327
                    uint64_t pid = getpid();
243✔
1328
#endif
243✔
1329
                    info->session_initiator_pid = pid;
243✔
1330
                }
243✔
1331

28,824✔
1332
                info->file_format_version = uint_fast8_t(target_file_format_version);
58,959✔
1333

28,824✔
1334
                // Initially there is a single version in the file
28,824✔
1335
                info->number_of_versions = 1;
58,959✔
1336

28,824✔
1337
                info->latest_version_number = version;
58,959✔
1338
                alloc.init_mapping_management(version);
58,959✔
1339

28,824✔
1340
                size_t file_size = 24;
58,959✔
1341
                if (top_ref) {
58,959✔
1342
                    Array top(alloc);
21,105✔
1343
                    top.init_from_ref(top_ref);
21,105✔
1344
                    file_size = Group::get_logical_file_size(top);
21,105✔
1345
                }
21,105✔
1346
                version_manager->init_versioning(top_ref, file_size, version);
58,959✔
1347
            }
58,959✔
1348
            else { // Not the session initiator
26,388✔
1349
                // Durability setting must be consistent across a session. An
13,068✔
1350
                // inconsistency is a logic error, as the user is required to
13,068✔
1351
                // make sure that all possible concurrent session participants
13,068✔
1352
                // use the same durability setting for the same Realm file.
13,068✔
1353
                if (Durability(info->durability) != options.durability)
26,388✔
1354
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "Durability not consistent");
6✔
1355

13,065✔
1356
                // History type must be consistent across a session. An
13,065✔
1357
                // inconsistency is a logic error, as the user is required to
13,065✔
1358
                // make sure that all possible concurrent session participants
13,065✔
1359
                // use the same history type for the same Realm file.
13,065✔
1360
                if (info->history_type != openers_hist_type)
26,382✔
1361
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History type not consistent");
6✔
1362

13,062✔
1363
                // History schema version must be consistent across a
13,062✔
1364
                // session. An inconsistency is a logic error, as the user is
13,062✔
1365
                // required to make sure that all possible concurrent session
13,062✔
1366
                // participants use the same history schema version for the same
13,062✔
1367
                // Realm file.
13,062✔
1368
                if (info->history_schema_version != openers_hist_schema_version)
26,376✔
1369
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History schema version not consistent");
×
1370

13,062✔
1371
                // We need per session agreement among all participants on the
13,062✔
1372
                // target Realm file format. From a technical perspective, the
13,062✔
1373
                // best way to ensure that, would be to require a bumping of the
13,062✔
1374
                // SharedInfo file format version on any change that could lead
13,062✔
1375
                // to a different result from
13,062✔
1376
                // get_target_file_format_for_session() given the same current
13,062✔
1377
                // Realm file format version and the same history type, as that
13,062✔
1378
                // would prevent the outcome of the Realm opening process from
13,062✔
1379
                // depending on race conditions. However, for practical reasons,
13,062✔
1380
                // we shall instead simply check that there is agreement, and
13,062✔
1381
                // throw the same kind of exception, as would have been thrown
13,062✔
1382
                // with a bumped SharedInfo file format version, if there isn't.
13,062✔
1383
                if (info->file_format_version != target_file_format_version) {
26,376✔
1384
                    throw IncompatibleLockFile(path,
×
1385
                                               format("Version mismatch: File format version is %1 but should be %2.",
×
1386
                                                      info->file_format_version, target_file_format_version));
×
1387
                }
×
1388

13,062✔
1389
                // Even though this session participant is not the session initiator,
13,062✔
1390
                // it may be the one that has to perform the history schema upgrade.
13,062✔
1391
                // See upgrade_file_format(). However we cannot get the actual value
13,062✔
1392
                // at this point as the allocator is not synchronized with the file.
13,062✔
1393
                // The value will be read in a ReadTransaction later.
13,062✔
1394

13,062✔
1395
                // We need to setup the allocators version information, as it is needed
13,062✔
1396
                // to correctly age and later reclaim memory mappings.
13,062✔
1397
                version_type version = info->latest_version_number;
26,376✔
1398
                alloc.init_mapping_management(version);
26,376✔
1399
            }
26,376✔
1400

42,099✔
1401
            m_new_commit_available.set_shared_part(info->new_commit_available, lockfile_prefix, "new_commit",
85,548✔
1402
                                                   options.temp_dir);
85,335✔
1403
            m_pick_next_writer.set_shared_part(info->pick_next_writer, lockfile_prefix, "pick_writer",
85,335✔
1404
                                               options.temp_dir);
85,335✔
1405

41,886✔
1406
            // make our presence noted:
41,886✔
1407
            ++info->num_participants;
85,335✔
1408
            m_info = info;
85,335✔
1409

41,886✔
1410
            // Keep the mappings and file open:
41,886✔
1411
            m_version_manager = std::move(version_manager);
85,335✔
1412
            alloc_detach_guard.release();
85,335✔
1413
            fug_1.release(); // Do not unmap
85,335✔
1414
            fcg.release();   // Do not close
85,335✔
1415
        }
85,335✔
1416
        ulg.release(); // Do not release shared lock
85,335✔
1417
        break;
85,335✔
1418
    }
85,788✔
1419

41,967✔
1420
    if (m_logger) {
85,410✔
1421
        m_logger->log(util::Logger::Level::debug, "   Number of participants: %1", m_info->num_participants);
71,187✔
1422
        m_logger->log(util::Logger::Level::debug, "   Durability: %1", [&] {
71,187✔
1423
            switch (options.durability) {
71,187✔
1424
                case DBOptions::Durability::Full:
49,629✔
1425
                    return "Full";
49,629✔
1426
                case DBOptions::Durability::MemOnly:
21,558✔
1427
                    return "MemOnly";
21,558✔
1428
                case realm::DBOptions::Durability::Unsafe:
✔
1429
                    return "Unsafe";
×
1430
            }
×
1431
            return "";
×
1432
        }());
×
1433
        m_logger->log(util::Logger::Level::debug, "   EncryptionKey: %1", options.encryption_key ? "yes" : "no");
71,157✔
1434
        if (m_logger->would_log(util::Logger::Level::debug)) {
71,187✔
1435
            if (top_ref) {
24,624✔
1436
                Array top(alloc);
24,540✔
1437
                top.init_from_ref(top_ref);
24,540✔
1438
                auto file_size = Group::get_logical_file_size(top);
24,540✔
1439
                auto history_size = Group::get_history_size(top);
24,540✔
1440
                auto freee_space_size = Group::get_free_space_size(top);
24,540✔
1441
                m_logger->log(util::Logger::Level::debug, "   File size: %1", file_size);
24,540✔
1442
                m_logger->log(util::Logger::Level::debug, "   User data size: %1",
24,540✔
1443
                              file_size - (freee_space_size + history_size));
24,540✔
1444
                m_logger->log(util::Logger::Level::debug, "   Free space size: %1", freee_space_size);
24,540✔
1445
                m_logger->log(util::Logger::Level::debug, "   History size: %1", history_size);
24,540✔
1446
            }
24,540✔
1447
            else {
84✔
1448
                m_logger->log(util::Logger::Level::debug, "   Empty file");
84✔
1449
            }
84✔
1450
        }
24,624✔
1451
    }
71,187✔
1452

41,886✔
1453
    // Upgrade file format and/or history schema
41,886✔
1454
    try {
85,329✔
1455
        if (stored_hist_schema_version == -1) {
85,329✔
1456
            // current_hist_schema_version has not been read. Read it now
13,062✔
1457
            stored_hist_schema_version = start_read()->get_history_schema_version();
26,376✔
1458
        }
26,376✔
1459
        if (current_file_format_version == 0) {
85,329✔
1460
            // If the current file format is still undecided, no upgrade is
20,232✔
1461
            // necessary, but we still need to make the chosen file format
20,232✔
1462
            // visible to the rest of the core library by updating the value
20,232✔
1463
            // that will be subsequently returned by
20,232✔
1464
            // Group::get_file_format_version(). For this to work, all session
20,232✔
1465
            // participants must adopt the chosen target Realm file format when
20,232✔
1466
            // the stored file format version is zero regardless of the version
20,232✔
1467
            // of the core library used.
20,232✔
1468
            m_file_format_version = target_file_format_version;
41,631✔
1469
        }
41,631✔
1470
        else {
43,698✔
1471
            m_file_format_version = current_file_format_version;
43,698✔
1472
            upgrade_file_format(options.allow_file_format_upgrade, target_file_format_version,
43,698✔
1473
                                stored_hist_schema_version, openers_hist_schema_version); // Throws
43,698✔
1474
        }
43,698✔
1475
    }
85,329✔
1476
    catch (...) {
41,889✔
1477
        close();
6✔
1478
        throw;
6✔
1479
    }
6✔
1480
    m_alloc.set_read_only(true);
85,329✔
1481
}
85,329✔
1482

1483
void DB::open(BinaryData buffer, bool take_ownership)
1484
{
6✔
1485
    auto top_ref = m_alloc.attach_buffer(buffer.data(), buffer.size());
6✔
1486
    m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, buffer.size());
6✔
1487
    if (take_ownership)
6✔
1488
        m_alloc.own_buffer();
×
1489
}
6✔
1490

1491
void DB::open(Replication& repl, const std::string& file, const DBOptions& options)
1492
{
58,461✔
1493
    // Exception safety: Since open() is called from constructors, if it throws,
28,449✔
1494
    // it must leave the file closed.
28,449✔
1495

28,449✔
1496
    REALM_ASSERT(!is_attached());
58,461✔
1497

28,449✔
1498
    repl.initialize(*this); // Throws
58,461✔
1499

28,449✔
1500
    set_replication(&repl);
58,461✔
1501

28,449✔
1502
    bool no_create = false;
58,461✔
1503
    open(file, no_create, options); // Throws
58,461✔
1504
}
58,461✔
1505

1506
class DBLogger : public Logger {
1507
public:
1508
    DBLogger(const std::shared_ptr<Logger>& base_logger, unsigned hash) noexcept
1509
        : Logger(LogCategory::storage, *base_logger)
1510
        , m_hash(hash)
1511
        , m_base_logger_ptr(base_logger)
1512
    {
96,807✔
1513
    }
96,807✔
1514

1515
protected:
1516
    void do_log(const LogCategory& category, Level level, const std::string& message) final
1517
    {
318,741✔
1518
        std::ostringstream ostr;
318,741✔
1519
        auto id = std::this_thread::get_id();
318,741✔
1520
        ostr << "DB: " << m_hash << " Thread " << id << ": " << message;
318,741✔
1521
        Logger::do_log(*m_base_logger_ptr, category, level, ostr.str());
318,741✔
1522
    }
318,741✔
1523

1524
private:
1525
    unsigned m_hash;
1526
    std::shared_ptr<Logger> m_base_logger_ptr;
1527
};
1528

1529
void DB::set_logger(const std::shared_ptr<util::Logger>& logger) noexcept
1530
{
111,099✔
1531
    if (logger)
111,099✔
1532
        m_logger = std::make_shared<DBLogger>(logger, m_log_id);
96,807✔
1533
}
111,099✔
1534

1535
void DB::open(Replication& repl, const DBOptions options)
1536
{
25,422✔
1537
    REALM_ASSERT(!is_attached());
25,422✔
1538
    repl.initialize(*this); // Throws
25,422✔
1539
    set_replication(&repl);
25,422✔
1540

12,711✔
1541
    m_alloc.init_in_memory_buffer();
25,422✔
1542

12,711✔
1543
    set_logger(options.logger);
25,422✔
1544
    m_replication->set_logger(m_logger.get());
25,422✔
1545
    if (m_logger)
25,422✔
1546
        m_logger->detail("Open memory-only realm");
25,410✔
1547

12,711✔
1548
    auto hist_type = repl.get_history_type();
25,422✔
1549
    m_in_memory_info =
25,422✔
1550
        std::make_unique<SharedInfo>(DBOptions::Durability::MemOnly, hist_type, repl.get_history_schema_version());
25,422✔
1551
    SharedInfo* info = m_in_memory_info.get();
25,422✔
1552
    m_writemutex.set_shared_part(info->shared_writemutex, "", "write");
25,422✔
1553
    m_controlmutex.set_shared_part(info->shared_controlmutex, "", "control");
25,422✔
1554
    m_new_commit_available.set_shared_part(info->new_commit_available, "", "new_commit", options.temp_dir);
25,422✔
1555
    m_pick_next_writer.set_shared_part(info->pick_next_writer, "", "pick_writer", options.temp_dir);
25,422✔
1556
    m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, "", "versions");
25,422✔
1557

12,711✔
1558
    auto target_file_format_version = uint_fast8_t(Group::get_target_file_format_version_for_session(0, hist_type));
25,422✔
1559
    info->file_format_version = target_file_format_version;
25,422✔
1560
    info->number_of_versions = 1;
25,422✔
1561
    info->latest_version_number = 1;
25,422✔
1562
    info->init_versioning(0, m_alloc.get_baseline(), 1);
25,422✔
1563
    ++info->num_participants;
25,422✔
1564

12,711✔
1565
    m_version_manager = std::make_unique<InMemoryVersionManager>(info, m_versionlist_mutex);
25,422✔
1566

12,711✔
1567
    m_file_format_version = target_file_format_version;
25,422✔
1568

12,711✔
1569
    m_info = info;
25,422✔
1570
    m_alloc.set_read_only(true);
25,422✔
1571
}
25,422✔
1572

1573
void DB::create_new_history(Replication& repl)
1574
{
36✔
1575
    Replication* old_repl = get_replication();
36✔
1576
    try {
36✔
1577
        repl.initialize(*this);
36✔
1578
        set_replication(&repl);
36✔
1579

18✔
1580
        auto tr = start_write();
36✔
1581
        tr->clear_history();
36✔
1582
        tr->replicate(tr.get(), repl);
36✔
1583
        tr->commit();
36✔
1584
    }
36✔
1585
    catch (...) {
18✔
1586
        set_replication(old_repl);
×
1587
        throw;
×
1588
    }
×
1589
}
36✔
1590

1591
void DB::create_new_history(std::unique_ptr<Replication> repl)
1592
{
36✔
1593
    create_new_history(*repl);
36✔
1594
    m_history = std::move(repl);
36✔
1595
}
36✔
1596

1597
// WARNING / FIXME: compact() should NOT be exposed publicly on Windows because it's not crash safe! It may
1598
// corrupt your database if something fails.
1599
// Tracked by https://github.com/realm/realm-core/issues/4111
1600

1601
// A note about lock ordering.
1602
// The local mutex, m_mutex, guards transaction start/stop and map/unmap of the lock file.
1603
// Except for compact(), open() and close(), it should only be held briefly.
1604
// The controlmutex guards operations which change the file size, session initialization
1605
// and session exit.
1606
// The writemutex guards the integrity of the (write) transaction data.
1607
// The controlmutex and writemutex resides in the .lock file and thus requires
1608
// the mapping of the .lock file to work. A straightforward approach would be to lock
1609
// the m_mutex whenever the other mutexes are taken or released...but that would be too
1610
// bad for performance of transaction start/stop.
1611
//
1612
// The locks are to be taken in this order: writemutex->controlmutex->m_mutex
1613
//
1614
// The .lock file is mapped during DB::create() and unmapped by a call to DB::close().
1615
// Once unmapped, it is never mapped again. Hence any observer with a valid DBRef may
1616
// only see the transition from mapped->unmapped, never the opposite.
1617
//
1618
// Trying to create a transaction if the .lock file is unmapped will result in an assert.
1619
// Unmapping (during close()) while transactions are live, is not considered an error. There
1620
// is a potential race between unmapping during close() and any operation carried out by a live
1621
// transaction. The user must ensure that this race never happens if she uses DB::close().
1622
bool DB::compact(bool bump_version_number, util::Optional<const char*> output_encryption_key)
1623
    NO_THREAD_SAFETY_ANALYSIS // this would work except for a known limitation: "No alias analysis" where clang cannot
1624
                              // tell that tr->db->m_mutex is the same thing as m_mutex
1625
{
150✔
1626
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
150✔
1627
    std::string tmp_path = m_db_path + ".tmp_compaction_space";
150✔
1628

75✔
1629
    // To enter compact, the DB object must already have been attached to a file,
75✔
1630
    // since this happens in DB::create().
75✔
1631

75✔
1632
    // Verify that the lock file is still attached. There is no attempt to guard against
75✔
1633
    // a race between close() and compact().
75✔
1634
    if (is_attached() == false) {
150✔
1635
        throw Exception(ErrorCodes::IllegalOperation, m_db_path + ": compact must be done on an open/attached DB");
×
1636
    }
×
1637
    auto info = m_info;
150✔
1638
    Durability dura = Durability(info->durability);
150✔
1639
    const char* write_key = bool(output_encryption_key) ? *output_encryption_key : get_encryption_key();
144✔
1640
    {
150✔
1641
        std::unique_lock<InterprocessMutex> lock(m_controlmutex); // Throws
150✔
1642
        auto t1 = std::chrono::steady_clock::now();
150✔
1643

75✔
1644
        // We must be the ONLY DB object attached if we're to do compaction
75✔
1645
        if (info->num_participants > 1)
150✔
1646
            return false;
×
1647

75✔
1648
        // Holding the controlmutex prevents any other DB from attaching to the file.
75✔
1649

75✔
1650
        // Using start_read here ensures that we have access to the latest entry
75✔
1651
        // in the VersionList. We need to have access to that later to update top_ref and file_size.
75✔
1652
        // This is also needed to attach the group (get the proper top pointer, etc)
75✔
1653
        TransactionRef tr = start_read();
150✔
1654
        auto file_size_before = tr->get_logical_file_size();
150✔
1655

75✔
1656
        // local lock blocking any transaction from starting (and stopping)
75✔
1657
        CheckedLockGuard local_lock(m_mutex);
150✔
1658

75✔
1659
        // We should be the only transaction active - otherwise back out
75✔
1660
        if (m_transaction_count != 1)
150✔
1661
            return false;
6✔
1662

72✔
1663
        // group::write() will throw if the file already exists.
72✔
1664
        // To prevent this, we have to remove the file (should it exist)
72✔
1665
        // before calling group::write().
72✔
1666
        File::try_remove(tmp_path);
144✔
1667

72✔
1668
        // Compact by writing a new file holding only live data, then renaming the new file
72✔
1669
        // so it becomes the database file, replacing the old one in the process.
72✔
1670
        try {
144✔
1671
            File file;
144✔
1672
            file.open(tmp_path, File::access_ReadWrite, File::create_Must, 0);
144✔
1673
            int incr = bump_version_number ? 1 : 0;
138✔
1674
            Group::DefaultTableWriter writer;
144✔
1675
            tr->write(file, write_key, info->latest_version_number + incr, writer); // Throws
144✔
1676
            // Data needs to be flushed to the disk before renaming.
72✔
1677
            bool disable_sync = get_disable_sync_to_disk();
144✔
1678
            if (!disable_sync && dura != Durability::Unsafe)
144!
1679
                file.sync(); // Throws
×
1680
        }
144✔
1681
        catch (...) {
72✔
1682
            // If writing the compact version failed in any way, delete the partially written file to clean up disk
1683
            // space. This is so that we don't fail with 100% disk space used when compacting on a mostly full disk.
1684
            if (File::exists(tmp_path)) {
×
1685
                File::remove(tmp_path);
×
1686
            }
×
1687
            throw;
×
1688
        }
×
1689
        // if we've written a file with a bumped version number, we need to update the lock file to match.
72✔
1690
        if (bump_version_number) {
144✔
1691
            ++info->latest_version_number;
12✔
1692
        }
12✔
1693
        // We need to release any shared mapping *before* releasing the control mutex.
72✔
1694
        // When someone attaches to the new database file, they *must* *not* see and
72✔
1695
        // reuse any existing memory mapping of the stale file.
72✔
1696
        tr->close_read_with_lock();
144✔
1697
        m_alloc.detach();
144✔
1698

72✔
1699
        util::File::move(tmp_path, m_db_path);
144✔
1700

72✔
1701
        SlabAlloc::Config cfg;
144✔
1702
        cfg.session_initiator = true;
144✔
1703
        cfg.is_shared = true;
144✔
1704
        cfg.read_only = false;
144✔
1705
        cfg.skip_validate = false;
144✔
1706
        cfg.no_create = true;
144✔
1707
        cfg.clear_file = false;
144✔
1708
        cfg.encryption_key = write_key;
144✔
1709
        ref_type top_ref;
144✔
1710
        top_ref = m_alloc.attach_file(m_db_path, cfg, m_marker_observer.get());
144✔
1711
        m_alloc.convert_from_streaming_form(top_ref);
144✔
1712
        m_alloc.init_mapping_management(info->latest_version_number);
144✔
1713
        info->number_of_versions = 1;
144✔
1714
        size_t logical_file_size = sizeof(SlabAlloc::Header);
144✔
1715
        if (top_ref) {
144✔
1716
            Array top(m_alloc);
138✔
1717
            top.init_from_ref(top_ref);
138✔
1718
            logical_file_size = Group::get_logical_file_size(top);
138✔
1719
        }
138✔
1720
        m_version_manager->init_versioning(top_ref, logical_file_size, info->latest_version_number);
144✔
1721
        if (m_logger) {
144✔
1722
            auto t2 = std::chrono::steady_clock::now();
60✔
1723
            m_logger->log(util::Logger::Level::info, "DB compacted from: %1 to %2 in %3 us", file_size_before,
60✔
1724
                          logical_file_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1725
        }
60✔
1726
    }
144✔
1727
    return true;
144✔
1728
}
144✔
1729

1730
void DB::write_copy(StringData path, const char* output_encryption_key)
1731
{
246✔
1732
    auto tr = start_read();
246✔
1733
    if (auto hist = tr->get_history()) {
246✔
1734
        if (!hist->no_pending_local_changes(tr->get_version())) {
246✔
1735
            throw Exception(ErrorCodes::IllegalOperation,
6✔
1736
                            "All client changes must be integrated in server before writing copy");
6✔
1737
        }
6✔
1738
    }
240✔
1739

120✔
1740
    class NoClientFileIdWriter : public Group::DefaultTableWriter {
240✔
1741
    public:
240✔
1742
        NoClientFileIdWriter()
240✔
1743
            : Group::DefaultTableWriter(true)
240✔
1744
        {
240✔
1745
        }
240✔
1746
        HistoryInfo write_history(_impl::OutputStream& out) override
240✔
1747
        {
237✔
1748
            auto hist = Group::DefaultTableWriter::write_history(out);
234✔
1749
            hist.sync_file_id = 0;
234✔
1750
            return hist;
234✔
1751
        }
234✔
1752
    } writer;
240✔
1753

120✔
1754
    File file;
240✔
1755
    file.open(path, File::access_ReadWrite, File::create_Must, 0);
240✔
1756
    file.resize(0);
240✔
1757

120✔
1758
    auto t1 = std::chrono::steady_clock::now();
240✔
1759
    tr->write(file, output_encryption_key, m_info->latest_version_number, writer);
240✔
1760
    if (m_logger) {
240✔
1761
        auto t2 = std::chrono::steady_clock::now();
60✔
1762
        m_logger->log(util::Logger::Level::info, "DB written to '%1' in %2 us", path,
60✔
1763
                      std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1764
    }
60✔
1765
}
240✔
1766

1767
uint_fast64_t DB::get_number_of_versions()
1768
{
302,628✔
1769
    if (m_fake_read_lock_if_immutable)
302,628✔
1770
        return 1;
6✔
1771
    return m_info->number_of_versions;
302,622✔
1772
}
302,622✔
1773

1774
size_t DB::get_allocated_size() const
1775
{
6✔
1776
    return m_alloc.get_allocated_size();
6✔
1777
}
6✔
1778

1779
void DB::release_all_read_locks() noexcept
1780
{
110,757✔
1781
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
110,757✔
1782
    CheckedLockGuard local_lock(m_mutex); // mx on m_local_locks_held
110,757✔
1783
    for (auto& read_lock : m_local_locks_held) {
54,600✔
1784
        --m_transaction_count;
6✔
1785
        m_version_manager->release_read_lock(read_lock);
6✔
1786
    }
6✔
1787
    m_local_locks_held.clear();
110,757✔
1788
    REALM_ASSERT(m_transaction_count == 0);
110,757✔
1789
}
110,757✔
1790

1791
class DB::AsyncCommitHelper {
1792
public:
1793
    AsyncCommitHelper(DB* db)
1794
        : m_db(db)
1795
    {
72,651✔
1796
    }
72,651✔
1797
    ~AsyncCommitHelper()
1798
    {
72,651✔
1799
        {
72,651✔
1800
            std::unique_lock lg(m_mutex);
72,651✔
1801
            if (!m_running) {
72,651✔
1802
                return;
39,492✔
1803
            }
39,492✔
1804
            m_running = false;
33,159✔
1805
            m_cv_worker.notify_one();
33,159✔
1806
        }
33,159✔
1807
        m_thread.join();
33,159✔
1808
    }
33,159✔
1809

1810
    void begin_write(util::UniqueFunction<void()> fn)
1811
    {
1,614✔
1812
        std::unique_lock lg(m_mutex);
1,614✔
1813
        start_thread();
1,614✔
1814
        m_pending_writes.emplace_back(std::move(fn));
1,614✔
1815
        m_cv_worker.notify_one();
1,614✔
1816
    }
1,614✔
1817

1818
    void blocking_begin_write()
1819
    {
211,869✔
1820
        std::unique_lock lg(m_mutex);
211,869✔
1821

104,298✔
1822
        // If we support unlocking InterprocessMutex from a different thread
104,298✔
1823
        // than it was locked on, we can sometimes just begin the write on
104,298✔
1824
        // the current thread. This requires that no one is currently waiting
104,298✔
1825
        // for the worker thread to acquire the write lock, as we'll deadlock
104,298✔
1826
        // if we try to async commit while the worker is waiting for the lock.
104,298✔
1827
        bool can_lock_on_caller =
211,869✔
1828
            !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
211,869✔
1829
                                                       m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
107,511✔
1830

104,298✔
1831
        // If we support cross-thread unlocking and m_running is false,
104,298✔
1832
        // can_lock_on_caller should always be true or we forgot to launch the thread
104,298✔
1833
        REALM_ASSERT(can_lock_on_caller || m_running || InterprocessMutex::is_thread_confined);
211,869✔
1834

104,298✔
1835
        // If possible, just begin the write on the current thread
104,298✔
1836
        if (can_lock_on_caller) {
211,869✔
1837
            m_waiting_for_write_mutex = true;
107,511✔
1838
            lg.unlock();
107,511✔
1839
            m_db->do_begin_write();
107,511✔
1840
            lg.lock();
107,511✔
1841
            m_waiting_for_write_mutex = false;
107,511✔
1842
            m_has_write_mutex = true;
107,511✔
1843
            m_owns_write_mutex = false;
107,511✔
1844
            return;
107,511✔
1845
        }
107,511✔
1846

104,298✔
1847
        // Otherwise we have to ask the worker thread to acquire it and wait
104,298✔
1848
        // for that
104,298✔
1849
        start_thread();
104,358✔
1850
        size_t ticket = ++m_write_lock_claim_ticket;
104,358✔
1851
        m_cv_worker.notify_one();
104,358✔
1852
        m_cv_callers.wait(lg, [this, ticket] {
209,568✔
1853
            return ticket == m_write_lock_claim_fulfilled;
209,568✔
1854
        });
209,568✔
1855
    }
104,358✔
1856

1857
    void end_write()
1858
    {
54✔
1859
        std::unique_lock lg(m_mutex);
54✔
1860
        REALM_ASSERT(m_has_write_mutex);
54✔
1861
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
54✔
1862

27✔
1863
        // If we acquired the write lock on the worker thread, also release it
27✔
1864
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
27✔
1865
        if (m_owns_write_mutex) {
54✔
1866
            m_pending_mx_release = true;
51✔
1867
            m_cv_worker.notify_one();
51✔
1868
        }
51✔
1869
        else {
3✔
1870
            m_db->do_end_write();
3✔
1871
            m_has_write_mutex = false;
3✔
1872
        }
3✔
1873
    }
54✔
1874

1875
    bool blocking_end_write()
1876
    {
257,421✔
1877
        std::unique_lock lg(m_mutex);
257,421✔
1878
        if (!m_has_write_mutex) {
257,421✔
1879
            return false;
45,348✔
1880
        }
45,348✔
1881
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
212,073✔
1882

104,358✔
1883
        // If we acquired the write lock on the worker thread, also release it
104,358✔
1884
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
104,358✔
1885
        if (m_owns_write_mutex) {
212,073✔
1886
            m_pending_mx_release = true;
104,883✔
1887
            m_cv_worker.notify_one();
104,883✔
1888
            m_cv_callers.wait(lg, [this] {
209,766✔
1889
                return !m_pending_mx_release;
209,766✔
1890
            });
209,766✔
1891
        }
104,883✔
1892
        else {
107,190✔
1893
            m_db->do_end_write();
107,190✔
1894
            m_has_write_mutex = false;
107,190✔
1895

1896
            // The worker thread may have ignored a request for the write mutex
1897
            // while we were acquiring it, so we need to wake up the thread
1898
            if (has_pending_write_requests()) {
107,190✔
1899
                lg.unlock();
×
1900
                m_cv_worker.notify_one();
×
1901
            }
×
1902
        }
107,190✔
1903
        return true;
212,073✔
1904
    }
212,073✔
1905

1906

1907
    void sync_to_disk(util::UniqueFunction<void()> fn)
1908
    {
1,356✔
1909
        REALM_ASSERT(fn);
1,356✔
1910
        std::unique_lock lg(m_mutex);
1,356✔
1911
        REALM_ASSERT(!m_pending_sync);
1,356✔
1912
        start_thread();
1,356✔
1913
        m_pending_sync = std::move(fn);
1,356✔
1914
        m_cv_worker.notify_one();
1,356✔
1915
    }
1,356✔
1916

1917
private:
1918
    DB* m_db;
1919
    std::thread m_thread;
1920
    std::mutex m_mutex;
1921
    std::condition_variable m_cv_worker;
1922
    std::condition_variable m_cv_callers;
1923
    std::deque<util::UniqueFunction<void()>> m_pending_writes;
1924
    util::UniqueFunction<void()> m_pending_sync;
1925
    size_t m_write_lock_claim_ticket = 0;
1926
    size_t m_write_lock_claim_fulfilled = 0;
1927
    bool m_pending_mx_release = false;
1928
    bool m_running = false;
1929
    bool m_has_write_mutex = false;
1930
    bool m_owns_write_mutex = false;
1931
    bool m_waiting_for_write_mutex = false;
1932

1933
    void main();
1934

1935
    void start_thread()
1936
    {
107,328✔
1937
        if (m_running) {
107,328✔
1938
            return;
74,169✔
1939
        }
74,169✔
1940
        m_running = true;
33,159✔
1941
        m_thread = std::thread([this]() {
33,159✔
1942
            main();
33,159✔
1943
        });
33,159✔
1944
    }
33,159✔
1945

1946
    bool has_pending_write_requests()
1947
    {
311,874✔
1948
        return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
311,874✔
1949
    }
311,874✔
1950
};
1951

1952
DB::~DB() noexcept
1953
{
111,111✔
1954
    close();
111,111✔
1955
}
111,111✔
1956

1957
// Note: close() and close_internal() may be called from the DB::~DB().
1958
// in that case, they will not throw. Throwing can only happen if called
1959
// directly.
1960
void DB::close(bool allow_open_read_transactions)
1961
{
112,131✔
1962
    // make helper thread(s) terminate
55,293✔
1963
    m_commit_helper.reset();
112,131✔
1964

55,293✔
1965
    if (m_fake_read_lock_if_immutable) {
112,131✔
1966
        if (!is_attached())
192✔
1967
            return;
×
1968
        {
192✔
1969
            CheckedLockGuard local_lock(m_mutex);
192✔
1970
            if (!allow_open_read_transactions && m_transaction_count)
192✔
1971
                throw WrongTransactionState("Closing with open read transactions");
×
1972
        }
192✔
1973
        if (m_alloc.is_attached())
192✔
1974
            m_alloc.detach();
192✔
1975
        m_fake_read_lock_if_immutable.reset();
192✔
1976
    }
192✔
1977
    else {
111,939✔
1978
        close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
111,939✔
1979
                       allow_open_read_transactions);
111,939✔
1980
    }
111,939✔
1981
}
112,131✔
1982

1983
void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
1984
{
111,939✔
1985
    if (!is_attached())
111,939✔
1986
        return;
1,170✔
1987

54,603✔
1988
    {
110,769✔
1989
        CheckedLockGuard local_lock(m_mutex);
110,769✔
1990
        if (m_write_transaction_open)
110,769✔
1991
            throw WrongTransactionState("Closing with open write transactions");
6✔
1992
        if (!allow_open_read_transactions && m_transaction_count)
110,763✔
1993
            throw WrongTransactionState("Closing with open read transactions");
6✔
1994
    }
110,757✔
1995
    SharedInfo* info = m_info;
110,757✔
1996
    {
110,757✔
1997
        if (!lock.owns_lock())
110,757✔
1998
            lock.lock();
110,757✔
1999

54,597✔
2000
        if (m_alloc.is_attached())
110,757✔
2001
            m_alloc.detach();
110,757✔
2002

54,597✔
2003
        if (m_is_sync_agent) {
110,757✔
2004
            REALM_ASSERT(info->sync_agent_present);
1,491✔
2005
            info->sync_agent_present = 0; // Set to false
1,491✔
2006
        }
1,491✔
2007
        release_all_read_locks();
110,757✔
2008
        --info->num_participants;
110,757✔
2009
        bool end_of_session = info->num_participants == 0;
110,757✔
2010
        // std::cerr << "closing" << std::endl;
54,597✔
2011
        if (end_of_session) {
110,757✔
2012

41,526✔
2013
            // If the db file is just backing for a transient data structure,
41,526✔
2014
            // we can delete it when done.
41,526✔
2015
            if (Durability(info->durability) == Durability::MemOnly && !m_in_memory_info) {
84,360✔
2016
                try {
21,642✔
2017
                    util::File::remove(m_db_path.c_str());
21,642✔
2018
                }
21,642✔
2019
                catch (...) {
10,827✔
2020
                } // ignored on purpose.
12✔
2021
            }
21,642✔
2022
        }
84,360✔
2023
        lock.unlock();
110,757✔
2024
    }
110,757✔
2025
    {
110,757✔
2026
        CheckedLockGuard local_lock(m_mutex);
110,757✔
2027

54,597✔
2028
        m_new_commit_available.close();
110,757✔
2029
        m_pick_next_writer.close();
110,757✔
2030

54,597✔
2031
        if (m_in_memory_info) {
110,757✔
2032
            m_in_memory_info.reset();
25,422✔
2033
        }
25,422✔
2034
        else {
85,335✔
2035
            // On Windows it is important that we unmap before unlocking, else a SetEndOfFile() call from another
41,886✔
2036
            // thread may interleave which is not permitted on Windows. It is permitted on *nix.
41,886✔
2037
            m_file_map.unmap();
85,335✔
2038
            m_version_manager.reset();
85,335✔
2039
            m_file.rw_unlock();
85,335✔
2040
            // info->~SharedInfo(); // DO NOT Call destructor
41,886✔
2041
            m_file.close();
85,335✔
2042
        }
85,335✔
2043
        m_info = nullptr;
110,757✔
2044
        if (m_logger)
110,757✔
2045
            m_logger->log(util::Logger::Level::detail, "DB closed");
96,594✔
2046
    }
110,757✔
2047
}
110,757✔
2048

2049
bool DB::other_writers_waiting_for_lock() const
2050
{
61,908✔
2051
    SharedInfo* info = m_info;
61,908✔
2052

31,878✔
2053
    uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
61,908✔
2054
    uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
61,908✔
2055
    // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
31,878✔
2056
    // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
31,878✔
2057
    return next_ticket > next_served + 1;
61,908✔
2058
}
61,908✔
2059

2060
void DB::AsyncCommitHelper::main()
2061
{
33,159✔
2062
    std::unique_lock lg(m_mutex);
33,159✔
2063
    while (m_running) {
457,257✔
2064
#if 0 // Enable for testing purposes
2065
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
2066
#endif
2067
        if (m_has_write_mutex) {
424,098✔
2068
            if (auto cb = std::move(m_pending_sync)) {
219,312✔
2069
                // Only one of sync_to_disk(), end_write(), or blocking_end_write()
306✔
2070
                // should be called, so we should never have both a pending sync
306✔
2071
                // and pending release.
306✔
2072
                REALM_ASSERT(!m_pending_mx_release);
1,356✔
2073
                lg.unlock();
1,356✔
2074
                cb();
1,356✔
2075
                cb = nullptr; // Release things captured by the callback before reacquiring the lock
1,356✔
2076
                lg.lock();
1,356✔
2077
                m_pending_mx_release = true;
1,356✔
2078
            }
1,356✔
2079
            if (m_pending_mx_release) {
219,312✔
2080
                REALM_ASSERT(!InterprocessMutex::is_thread_confined || m_owns_write_mutex);
106,290!
2081
                m_db->do_end_write();
106,290✔
2082
                m_pending_mx_release = false;
106,290✔
2083
                m_has_write_mutex = false;
106,290✔
2084
                m_owns_write_mutex = false;
106,290✔
2085

104,691✔
2086
                lg.unlock();
106,290✔
2087
                m_cv_callers.notify_all();
106,290✔
2088
                lg.lock();
106,290✔
2089
                continue;
106,290✔
2090
            }
106,290✔
2091
        }
204,786✔
2092
        else {
204,786✔
2093
            REALM_ASSERT(!m_pending_sync && !m_pending_mx_release);
204,786✔
2094

202,809✔
2095
            // Acquire the write lock if anyone has requested it, but only if
202,809✔
2096
            // another thread is not already waiting for it. If there's another
202,809✔
2097
            // thread requesting and they get it while we're waiting, we'll
202,809✔
2098
            // deadlock if they ask us to perform the sync.
202,809✔
2099
            if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
204,786✔
2100
                lg.unlock();
105,972✔
2101
                m_db->do_begin_write();
105,972✔
2102
                lg.lock();
105,972✔
2103

104,691✔
2104
                REALM_ASSERT(!m_has_write_mutex);
105,972✔
2105
                m_has_write_mutex = true;
105,972✔
2106
                m_owns_write_mutex = true;
105,972✔
2107

104,691✔
2108
                // Synchronous transaction requests get priority over async
104,691✔
2109
                if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
105,972✔
2110
                    ++m_write_lock_claim_fulfilled;
104,358✔
2111
                    m_cv_callers.notify_all();
104,358✔
2112
                    continue;
104,358✔
2113
                }
104,358✔
2114

393✔
2115
                REALM_ASSERT(!m_pending_writes.empty());
1,614✔
2116
                auto callback = std::move(m_pending_writes.front());
1,614✔
2117
                m_pending_writes.pop_front();
1,614✔
2118
                lg.unlock();
1,614✔
2119
                callback();
1,614✔
2120
                // Release things captured by the callback before reacquiring the lock
393✔
2121
                callback = nullptr;
1,614✔
2122
                lg.lock();
1,614✔
2123
                continue;
1,614✔
2124
            }
1,614✔
2125
        }
204,786✔
2126
        m_cv_worker.wait(lg);
211,836✔
2127
    }
211,836✔
2128
    if (m_has_write_mutex && m_owns_write_mutex) {
33,159!
2129
        m_db->do_end_write();
×
2130
    }
×
2131
}
33,159✔
2132

2133
void DB::async_begin_write(util::UniqueFunction<void()> fn)
2134
{
1,614✔
2135
    REALM_ASSERT(m_commit_helper);
1,614✔
2136
    m_commit_helper->begin_write(std::move(fn));
1,614✔
2137
}
1,614✔
2138

2139
void DB::async_end_write()
2140
{
54✔
2141
    REALM_ASSERT(m_commit_helper);
54✔
2142
    m_commit_helper->end_write();
54✔
2143
}
54✔
2144

2145
void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
2146
{
1,356✔
2147
    REALM_ASSERT(m_commit_helper);
1,356✔
2148
    m_commit_helper->sync_to_disk(std::move(fn));
1,356✔
2149
}
1,356✔
2150

2151
bool DB::has_changed(TransactionRef& tr)
2152
{
5,739,108✔
2153
    if (m_fake_read_lock_if_immutable)
5,739,108✔
2154
        return false; // immutable doesn't change
×
2155
    bool changed = tr->m_read_lock.m_version != get_version_of_latest_snapshot();
5,739,108✔
2156
    return changed;
5,739,108✔
2157
}
5,739,108✔
2158

2159
bool DB::wait_for_change(TransactionRef& tr)
2160
{
×
2161
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2162
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2163
    while (tr->m_read_lock.m_version == m_info->latest_version_number && m_wait_for_change_enabled) {
×
2164
        m_new_commit_available.wait(m_controlmutex, 0);
×
2165
    }
×
2166
    return tr->m_read_lock.m_version != m_info->latest_version_number;
×
2167
}
×
2168

2169

2170
void DB::wait_for_change_release()
2171
{
×
2172
    if (m_fake_read_lock_if_immutable)
×
2173
        return;
×
2174
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2175
    m_wait_for_change_enabled = false;
×
2176
    m_new_commit_available.notify_all();
×
2177
}
×
2178

2179

2180
void DB::enable_wait_for_change()
2181
{
×
2182
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2183
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2184
    m_wait_for_change_enabled = true;
×
2185
}
×
2186

2187
bool DB::needs_file_format_upgrade(const std::string& file, const std::vector<char>& encryption_key)
2188
{
54✔
2189
    SlabAlloc alloc;
54✔
2190
    SlabAlloc::Config cfg;
54✔
2191
    cfg.session_initiator = false;
54✔
2192
    cfg.read_only = true;
54✔
2193
    cfg.no_create = true;
54✔
2194
    if (!encryption_key.empty()) {
54✔
2195
        cfg.encryption_key = encryption_key.data();
×
2196
    }
×
2197
    try {
54✔
2198
        alloc.attach_file(file, cfg);
54✔
2199
        if (auto current_file_format_version = alloc.get_committed_file_format_version()) {
54✔
2200
            auto target_file_format_version = Group::g_current_file_format_version;
42✔
2201
            return current_file_format_version < target_file_format_version;
42✔
2202
        }
42✔
2203
    }
6✔
2204
    catch (const FileAccessError& err) {
6✔
2205
        if (err.code() != ErrorCodes::FileNotFound) {
6✔
2206
            throw;
×
2207
        }
×
2208
    }
12✔
2209
    return false;
12✔
2210
}
12✔
2211

2212
void DB::upgrade_file_format(bool allow_file_format_upgrade, int target_file_format_version,
2213
                             int current_hist_schema_version, int target_hist_schema_version)
2214
{
43,704✔
2215
    // In a multithreaded scenario multiple threads may initially see a need to
21,654✔
2216
    // upgrade (maybe_upgrade == true) even though one onw thread is supposed to
21,654✔
2217
    // perform the upgrade, but that is ok, because the condition is rechecked
21,654✔
2218
    // in a fully reliable way inside a transaction.
21,654✔
2219

21,654✔
2220
    // First a non-threadsafe but fast check
21,654✔
2221
    int current_file_format_version = m_file_format_version;
43,704✔
2222
    REALM_ASSERT(current_file_format_version <= target_file_format_version);
43,704✔
2223
    REALM_ASSERT(current_hist_schema_version <= target_hist_schema_version);
43,704✔
2224
    bool maybe_upgrade_file_format = (current_file_format_version < target_file_format_version);
43,704✔
2225
    bool maybe_upgrade_hist_schema = (current_hist_schema_version < target_hist_schema_version);
43,704✔
2226
    bool maybe_upgrade = maybe_upgrade_file_format || maybe_upgrade_hist_schema;
43,704✔
2227
    if (maybe_upgrade) {
43,704✔
2228

138✔
2229
#ifdef REALM_DEBUG
276✔
2230
// This sleep() only exists in order to increase the quality of the
138✔
2231
// TEST(Upgrade_Database_2_3_Writes_New_File_Format_new) unit test.
138✔
2232
// The unit test creates multiple threads that all call
138✔
2233
// upgrade_file_format() simultaneously. This sleep() then acts like
138✔
2234
// a simple thread barrier that makes sure the threads meet here, to
138✔
2235
// increase the likelyhood of detecting any potential race problems.
138✔
2236
// See the unit test for details.
138✔
2237
//
138✔
2238
// NOTE: This sleep has been disabled because no problems have been found with
138✔
2239
// this code in a long while, and it was dramatically slowing down a unit test
138✔
2240
// in realm-sync.
138✔
2241

138✔
2242
// millisleep(200);
138✔
2243
#endif
276✔
2244

138✔
2245
        // WriteTransaction wt(*this);
138✔
2246
        auto wt = start_write();
276✔
2247
        bool dirty = false;
276✔
2248

138✔
2249
        // We need to upgrade history first. We may need to access it during migration
138✔
2250
        // when processing the !OID columns
138✔
2251
        int current_hist_schema_version_2 = wt->get_history_schema_version();
276✔
2252
        // The history must either still be using its initial schema or have
138✔
2253
        // been upgraded already to the chosen target schema version via a
138✔
2254
        // concurrent DB object.
138✔
2255
        REALM_ASSERT(current_hist_schema_version_2 == current_hist_schema_version ||
276!
2256
                     current_hist_schema_version_2 == target_hist_schema_version);
276✔
2257
        bool need_hist_schema_upgrade = (current_hist_schema_version_2 < target_hist_schema_version);
276✔
2258
        if (need_hist_schema_upgrade) {
276✔
2259
            if (!allow_file_format_upgrade)
192✔
2260
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2261

96✔
2262
            Replication* repl = get_replication();
192✔
2263
            repl->upgrade_history_schema(current_hist_schema_version_2); // Throws
192✔
2264
            wt->set_history_schema_version(target_hist_schema_version);  // Throws
192✔
2265
            dirty = true;
192✔
2266
        }
192✔
2267

138✔
2268
        // File format upgrade
138✔
2269
        int current_file_format_version_2 = m_alloc.get_committed_file_format_version();
276✔
2270
        // The file must either still be using its initial file_format or have
138✔
2271
        // been upgraded already to the chosen target file format via a
138✔
2272
        // concurrent DB object.
138✔
2273
        REALM_ASSERT(current_file_format_version_2 == current_file_format_version ||
276!
2274
                     current_file_format_version_2 == target_file_format_version);
276✔
2275
        bool need_file_format_upgrade = (current_file_format_version_2 < target_file_format_version);
276✔
2276
        if (need_file_format_upgrade) {
276✔
2277
            if (!allow_file_format_upgrade)
156✔
2278
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2279
            wt->upgrade_file_format(target_file_format_version); // Throws
156✔
2280
            // Note: The file format version stored in the Realm file will be
78✔
2281
            // updated to the new file format version as part of the following
78✔
2282
            // commit operation. This happens in GroupWriter::commit().
78✔
2283
            if (m_upgrade_callback)
156✔
2284
                m_upgrade_callback(current_file_format_version_2, target_file_format_version); // Throws
18✔
2285
            dirty = true;
156✔
2286
        }
156✔
2287
        wt->set_file_format_version(target_file_format_version);
276✔
2288
        m_file_format_version = target_file_format_version;
276✔
2289

138✔
2290
        if (dirty)
276✔
2291
            wt->commit(); // Throws
270✔
2292
    }
276✔
2293
}
43,704✔
2294

2295
void DB::release_read_lock(ReadLockInfo& read_lock) noexcept
2296
{
3,880,695✔
2297
    // ignore if opened with immutable file (then we have no lockfile)
2,610,144✔
2298
    if (m_fake_read_lock_if_immutable)
3,880,695✔
2299
        return;
384✔
2300
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
3,880,311✔
2301
    do_release_read_lock(read_lock);
3,880,311✔
2302
}
3,880,311✔
2303

2304
// this is called with m_mutex locked
2305
void DB::do_release_read_lock(ReadLockInfo& read_lock) noexcept
2306
{
3,889,524✔
2307
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
3,889,524✔
2308
    bool found_match = false;
3,889,524✔
2309
    // simple linear search and move-last-over if a match is found.
2,619,129✔
2310
    // common case should have only a modest number of transactions in play..
2,619,129✔
2311
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
7,007,640✔
2312
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
7,007,160✔
2313
            m_local_locks_held[j] = m_local_locks_held.back();
3,889,071✔
2314
            m_local_locks_held.pop_back();
3,889,071✔
2315
            found_match = true;
3,889,071✔
2316
            break;
3,889,071✔
2317
        }
3,889,071✔
2318
    }
7,007,160✔
2319
    if (!found_match) {
3,889,524✔
2320
        REALM_ASSERT(!is_attached());
6✔
2321
        // it's OK, someone called close() and all locks where released
3✔
2322
        return;
6✔
2323
    }
6✔
2324
    --m_transaction_count;
3,889,518✔
2325
    m_version_manager->release_read_lock(read_lock);
3,889,518✔
2326
}
3,889,518✔
2327

2328

2329
DB::ReadLockInfo DB::grab_read_lock(ReadLockInfo::Type type, VersionID version_id)
2330
{
3,863,181✔
2331
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
3,863,181✔
2332
    REALM_ASSERT_RELEASE(is_attached());
3,863,181✔
2333
    auto read_lock = m_version_manager->grab_read_lock(type, version_id);
3,863,181✔
2334

2,593,035✔
2335
    m_local_locks_held.emplace_back(read_lock);
3,863,181✔
2336
    ++m_transaction_count;
3,863,181✔
2337
    REALM_ASSERT(read_lock.m_file_size > read_lock.m_top_ref);
3,863,181✔
2338
    return read_lock;
3,863,181✔
2339
}
3,863,181✔
2340

2341
void DB::leak_read_lock(ReadLockInfo& read_lock) noexcept
2342
{
6✔
2343
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6✔
2344
    // simple linear search and move-last-over if a match is found.
3✔
2345
    // common case should have only a modest number of transactions in play..
3✔
2346
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
6✔
2347
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
6✔
2348
            m_local_locks_held[j] = m_local_locks_held.back();
6✔
2349
            m_local_locks_held.pop_back();
6✔
2350
            --m_transaction_count;
6✔
2351
            return;
6✔
2352
        }
6✔
2353
    }
6✔
2354
}
6✔
2355

2356
bool DB::do_try_begin_write()
2357
{
84✔
2358
    // In the non-blocking case, we will only succeed if there is no contention for
42✔
2359
    // the write mutex. For this case we are trivially fair and can ignore the
42✔
2360
    // fairness machinery.
42✔
2361
    bool got_the_lock = m_writemutex.try_lock();
84✔
2362
    if (got_the_lock) {
84✔
2363
        finish_begin_write();
72✔
2364
    }
72✔
2365
    return got_the_lock;
84✔
2366
}
84✔
2367

2368
void DB::do_begin_write()
2369
{
612,195✔
2370
    if (m_logger) {
612,195✔
2371
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "acquire writemutex");
259,164✔
2372
    }
259,164✔
2373

310,257✔
2374
    SharedInfo* info = m_info;
612,195✔
2375

310,257✔
2376
    // Get write lock - the write lock is held until do_end_write().
310,257✔
2377
    //
310,257✔
2378
    // We use a ticketing scheme to ensure fairness wrt performing write transactions.
310,257✔
2379
    // (But cannot do that on Windows until we have interprocess condition variables there)
310,257✔
2380
    uint32_t my_ticket = info->next_ticket.fetch_add(1, std::memory_order_relaxed);
612,195✔
2381
    m_writemutex.lock(); // Throws
612,195✔
2382

310,257✔
2383
    // allow for comparison even after wrap around of ticket numbering:
310,257✔
2384
    int32_t diff = int32_t(my_ticket - info->next_served.load(std::memory_order_relaxed));
612,195✔
2385
    bool should_yield = diff > 0; // ticket is in the future
612,195✔
2386
    // a) the above comparison is only guaranteed to be correct, if the distance
310,257✔
2387
    //    between my_ticket and info->next_served is less than 2^30. This will
310,257✔
2388
    //    be the case since the distance will be bounded by the number of threads
310,257✔
2389
    //    and each thread cannot ever hold more than one ticket.
310,257✔
2390
    // b) we could use 64 bit counters instead, but it is unclear if all platforms
310,257✔
2391
    //    have support for interprocess atomics for 64 bit values.
310,257✔
2392

310,257✔
2393
    timespec time_limit; // only compute the time limit if we're going to use it:
612,195✔
2394
    if (should_yield) {
612,195✔
2395
        // This clock is not monotonic, so time can move backwards. This can lead
19,953✔
2396
        // to a wrong time limit, but the only effect of a wrong time limit is that
19,953✔
2397
        // we momentarily lose fairness, so we accept it.
19,953✔
2398
        timeval tv;
20,001✔
2399
        gettimeofday(&tv, nullptr);
20,001✔
2400
        time_limit.tv_sec = tv.tv_sec;
20,001✔
2401
        time_limit.tv_nsec = tv.tv_usec * 1000;
20,001✔
2402
        time_limit.tv_nsec += 500000000;        // 500 msec wait
20,001✔
2403
        if (time_limit.tv_nsec >= 1000000000) { // overflow
20,001✔
2404
            time_limit.tv_nsec -= 1000000000;
9,936✔
2405
            time_limit.tv_sec += 1;
9,936✔
2406
        }
9,936✔
2407
    }
20,001✔
2408

310,257✔
2409
    while (should_yield) {
747,402✔
2410

135,159✔
2411
        m_pick_next_writer.wait(m_writemutex, &time_limit);
135,207✔
2412
        timeval tv;
135,207✔
2413
        gettimeofday(&tv, nullptr);
135,207✔
2414
        if (time_limit.tv_sec < tv.tv_sec ||
135,207✔
2415
            (time_limit.tv_sec == tv.tv_sec && time_limit.tv_nsec < tv.tv_usec * 1000)) {
135,279✔
2416
            // Timeout!
2417
            break;
×
2418
        }
×
2419
        diff = int32_t(my_ticket - info->next_served);
135,207✔
2420
        should_yield = diff > 0; // ticket is in the future, so yield to someone else
135,207✔
2421
    }
135,207✔
2422

310,257✔
2423
    // we may get here because a) it's our turn, b) we timed out
310,257✔
2424
    // we don't distinguish, satisfied that event b) should be rare.
310,257✔
2425
    // In case b), we have to *make* it our turn. Failure to do so could leave us
310,257✔
2426
    // with 'next_served' permanently trailing 'next_ticket'.
310,257✔
2427
    //
310,257✔
2428
    // In doing so, we may bypass other waiters, hence the condition for yielding
310,257✔
2429
    // should take this situation into account by comparing with '>' instead of '!='
310,257✔
2430
    info->next_served = my_ticket;
612,195✔
2431
    finish_begin_write();
612,195✔
2432
    if (m_logger) {
612,195✔
2433
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex acquired");
259,164✔
2434
    }
259,164✔
2435
}
612,195✔
2436

2437
void DB::finish_begin_write()
2438
{
612,264✔
2439
    if (m_info->commit_in_critical_phase) {
612,264✔
2440
        m_writemutex.unlock();
×
2441
        throw RuntimeError(ErrorCodes::BrokenInvariant, "Crash of other process detected, session restart required");
×
2442
    }
×
2443

310,308✔
2444

310,308✔
2445
    {
612,264✔
2446
        CheckedLockGuard local_lock(m_mutex);
612,264✔
2447
        m_write_transaction_open = true;
612,264✔
2448
    }
612,264✔
2449
    m_alloc.set_read_only(false);
612,264✔
2450
}
612,264✔
2451

2452
void DB::do_end_write() noexcept
2453
{
612,207✔
2454
    m_info->next_served.fetch_add(1, std::memory_order_relaxed);
612,207✔
2455

310,284✔
2456
    CheckedLockGuard local_lock(m_mutex);
612,207✔
2457
    REALM_ASSERT(m_write_transaction_open);
612,207✔
2458
    m_alloc.set_read_only(true);
612,207✔
2459
    m_write_transaction_open = false;
612,207✔
2460
    m_pick_next_writer.notify_all();
612,207✔
2461
    m_writemutex.unlock();
612,207✔
2462
    if (m_logger) {
612,207✔
2463
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex released");
259,212✔
2464
    }
259,212✔
2465
}
612,207✔
2466

2467

2468
Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to_disk)
2469
{
604,512✔
2470
    version_type current_version;
604,512✔
2471
    {
604,512✔
2472
        current_version = m_version_manager->get_newest_version();
604,512✔
2473
    }
604,512✔
2474
    version_type new_version = current_version + 1;
604,512✔
2475

306,387✔
2476
    if (!transaction.m_tables_to_clear.empty()) {
604,512✔
2477
        for (auto table_key : transaction.m_tables_to_clear) {
678✔
2478
            transaction.get_table_unchecked(table_key)->clear();
678✔
2479
        }
678✔
2480
        transaction.m_tables_to_clear.clear();
678✔
2481
    }
678✔
2482
    if (Replication* repl = get_replication()) {
604,512✔
2483
        // If Replication::prepare_commit() fails, then the entire transaction
294,192✔
2484
        // fails. The application then has the option of terminating the
294,192✔
2485
        // transaction with a call to Transaction::Rollback(), which in turn
294,192✔
2486
        // must call Replication::abort_transact().
294,192✔
2487
        new_version = repl->prepare_commit(current_version);        // Throws
580,098✔
2488
        low_level_commit(new_version, transaction, commit_to_disk); // Throws
580,098✔
2489
        repl->finalize_commit();
580,098✔
2490
    }
580,098✔
2491
    else {
24,414✔
2492
        low_level_commit(new_version, transaction); // Throws
24,414✔
2493
    }
24,414✔
2494

306,387✔
2495
    {
604,512✔
2496
        std::lock_guard lock(m_commit_listener_mutex);
604,512✔
2497
        for (auto listener : m_commit_listeners) {
503,148✔
2498
            listener->on_commit(new_version);
387,942✔
2499
        }
387,942✔
2500
    }
604,512✔
2501

306,387✔
2502
    return new_version;
604,512✔
2503
}
604,512✔
2504

2505
VersionID DB::get_version_id_of_latest_snapshot()
2506
{
5,904,606✔
2507
    if (m_fake_read_lock_if_immutable)
5,904,606✔
2508
        return {m_fake_read_lock_if_immutable->m_version, 0};
12✔
2509
    return m_version_manager->get_version_id_of_latest_snapshot();
5,904,594✔
2510
}
5,904,594✔
2511

2512

2513
DB::version_type DB::get_version_of_latest_snapshot()
2514
{
5,903,889✔
2515
    return get_version_id_of_latest_snapshot().version;
5,903,889✔
2516
}
5,903,889✔
2517

2518

2519
void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction, bool commit_to_disk)
2520
{
604,524✔
2521
    SharedInfo* info = m_info;
604,524✔
2522

306,387✔
2523
    // Version of oldest snapshot currently (or recently) bound in a transaction
306,387✔
2524
    // of the current session.
306,387✔
2525
    uint64_t oldest_version = 0, oldest_live_version = 0;
604,524✔
2526
    TopRefMap top_refs;
604,524✔
2527
    bool any_new_unreachables;
604,524✔
2528
    {
604,524✔
2529
        CheckedLockGuard lock(m_mutex);
604,524✔
2530
        m_version_manager->cleanup_versions(oldest_live_version, top_refs, any_new_unreachables);
604,524✔
2531
        oldest_version = top_refs.begin()->first;
604,524✔
2532
        // Allow for trimming of the history. Some types of histories do not
306,387✔
2533
        // need store changesets prior to the oldest *live* bound snapshot.
306,387✔
2534
        if (auto hist = transaction.get_history()) {
604,524✔
2535
            hist->set_oldest_bound_version(oldest_live_version); // Throws
580,089✔
2536
        }
580,089✔
2537
        // Cleanup any stale mappings
306,387✔
2538
        m_alloc.purge_old_mappings(oldest_version, new_version);
604,524✔
2539
    }
604,524✔
2540
    // save number of live versions for later:
306,387✔
2541
    // (top_refs is std::moved into GroupWriter so we'll loose it in the call to set_versions below)
306,387✔
2542
    auto live_versions = top_refs.size();
604,524✔
2543
    // Do the actual commit
306,387✔
2544
    REALM_ASSERT(oldest_version <= new_version);
604,524✔
2545

306,387✔
2546
    GroupWriter out(transaction, Durability(info->durability), m_marker_observer.get()); // Throws
604,524✔
2547
    out.set_versions(new_version, top_refs, any_new_unreachables);
604,524✔
2548
    out.prepare_evacuation();
604,524✔
2549
    auto t1 = std::chrono::steady_clock::now();
604,524✔
2550
    auto commit_size = m_alloc.get_commit_size();
604,524✔
2551
    if (m_logger) {
604,524✔
2552
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Initiate commit version: %1",
253,278✔
2553
                      new_version);
253,278✔
2554
    }
253,278✔
2555
    if (auto limit = out.get_evacuation_limit()) {
604,524✔
2556
        // Get a work limit based on the size of the transaction we're about to commit
2,718✔
2557
        // Add 4k to ensure progress on small commits
2,718✔
2558
        size_t work_limit = commit_size / 2 + out.get_free_list_size() + 0x1000;
5,394✔
2559
        transaction.cow_outliers(out.get_evacuation_progress(), limit, work_limit);
5,394✔
2560
    }
5,394✔
2561

306,387✔
2562
    ref_type new_top_ref;
604,524✔
2563
    // Recursively write all changed arrays to end of file
306,387✔
2564
    {
604,524✔
2565
        // protect against race with any other DB trying to attach to the file
306,387✔
2566
        std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
604,524✔
2567
        new_top_ref = out.write_group();                         // Throws
604,524✔
2568
    }
604,524✔
2569
    {
604,524✔
2570
        // protect access to shared variables and m_reader_mapping from here
306,387✔
2571
        CheckedLockGuard lock_guard(m_mutex);
604,524✔
2572
        m_free_space = out.get_free_space_size();
604,524✔
2573
        m_locked_space = out.get_locked_space_size();
604,524✔
2574
        m_used_space = out.get_logical_size() - m_free_space;
604,524✔
2575
        m_evac_stage.store(EvacStage(out.get_evacuation_stage()));
604,524✔
2576
        out.sync_according_to_durability();
604,524✔
2577
        if (Durability(info->durability) == Durability::Full || Durability(info->durability) == Durability::Unsafe) {
604,524✔
2578
            if (commit_to_disk) {
419,061✔
2579
                GroupCommitter cm(transaction, Durability(info->durability), m_marker_observer.get());
411,534✔
2580
                cm.commit(new_top_ref);
411,534✔
2581
            }
411,534✔
2582
        }
419,061✔
2583
        size_t new_file_size = out.get_logical_size();
604,524✔
2584
        // We must reset the allocators free space tracking before communicating the new
306,387✔
2585
        // version through the ring buffer. If not, a reader may start updating the allocators
306,387✔
2586
        // mappings while the allocator is in dirty state.
306,387✔
2587
        reset_free_space_tracking();
604,524✔
2588
        // Add the new version. If this fails in any way, the VersionList may be corrupted.
306,387✔
2589
        // This can lead to readers seing invalid data which is likely to cause them
306,387✔
2590
        // to crash. Other writers *must* be prevented from writing any further updates
306,387✔
2591
        // to the database. The flag "commit_in_critical_phase" is used to prevent such updates.
306,387✔
2592
        info->commit_in_critical_phase = 1;
604,524✔
2593
        {
604,524✔
2594
            m_version_manager->add_version(new_top_ref, new_file_size, new_version);
604,524✔
2595

306,387✔
2596
            // REALM_ASSERT(m_alloc.matches_section_boundary(new_file_size));
306,387✔
2597
            REALM_ASSERT(new_top_ref < new_file_size);
604,524✔
2598
        }
604,524✔
2599
        // At this point, the VersionList has been succesfully updated, and the next writer
306,387✔
2600
        // can safely proceed once the writemutex has been lifted.
306,387✔
2601
        info->commit_in_critical_phase = 0;
604,524✔
2602
    }
604,524✔
2603
    {
604,524✔
2604
        // protect against concurrent updates to the .lock file.
306,387✔
2605
        // must release m_mutex before this point to obey lock order
306,387✔
2606
        std::lock_guard<InterprocessMutex> lock(m_controlmutex);
604,524✔
2607

306,387✔
2608
        info->number_of_versions = live_versions + 1;
604,524✔
2609
        info->latest_version_number = new_version;
604,524✔
2610

306,387✔
2611
        m_new_commit_available.notify_all();
604,524✔
2612
    }
604,524✔
2613
    auto t2 = std::chrono::steady_clock::now();
604,524✔
2614
    if (m_logger) {
604,524✔
2615
        std::string to_disk_str = commit_to_disk ? util::format(" ref %1", new_top_ref) : " (no commit to disk)";
249,141✔
2616
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Commit of size %1 done in %2 us%3",
253,278✔
2617
                      commit_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count(),
253,278✔
2618
                      to_disk_str);
253,278✔
2619
    }
253,278✔
2620
}
604,524✔
2621

2622
#ifdef REALM_DEBUG
2623
void DB::reserve(size_t size)
2624
{
36✔
2625
    REALM_ASSERT(is_attached());
36✔
2626
    m_alloc.reserve_disk_space(size); // Throws
36✔
2627
}
36✔
2628
#endif
2629

2630
bool DB::call_with_lock(const std::string& realm_path, CallbackWithLock&& callback)
2631
{
189✔
2632
    auto lockfile_path = get_core_file(realm_path, CoreFileType::Lock);
189✔
2633

54✔
2634
    File lockfile;
189✔
2635
    lockfile.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
189✔
2636
    File::CloseGuard fcg(lockfile);
189✔
2637
    lockfile.set_fifo_path(realm_path + ".management", "lock.fifo");
189✔
2638
    if (lockfile.try_rw_lock_exclusive()) { // Throws
189✔
2639
        callback(realm_path);
147✔
2640
        return true;
147✔
2641
    }
147✔
2642
    return false;
42✔
2643
}
42✔
2644

2645
std::string DB::get_core_file(const std::string& base_path, CoreFileType type)
2646
{
194,037✔
2647
    switch (type) {
194,037✔
2648
        case CoreFileType::Lock:
86,568✔
2649
            return base_path + ".lock";
86,568✔
2650
        case CoreFileType::Storage:
924✔
2651
            return base_path;
924✔
2652
        case CoreFileType::Management:
86,415✔
2653
            return base_path + ".management";
86,415✔
2654
        case CoreFileType::Note:
19,206✔
2655
            return base_path + ".note";
19,206✔
2656
        case CoreFileType::Log:
924✔
2657
            return base_path + ".log";
924✔
2658
    }
×
2659
    REALM_UNREACHABLE();
2660
}
×
2661

2662
void DB::delete_files(const std::string& base_path, bool* did_delete, bool delete_lockfile)
2663
{
918✔
2664
    if (File::try_remove(get_core_file(base_path, CoreFileType::Storage)) && did_delete) {
918✔
2665
        *did_delete = true;
174✔
2666
    }
174✔
2667

459✔
2668
    File::try_remove(get_core_file(base_path, CoreFileType::Note));
918✔
2669
    File::try_remove(get_core_file(base_path, CoreFileType::Log));
918✔
2670
    util::try_remove_dir_recursive(get_core_file(base_path, CoreFileType::Management));
918✔
2671

459✔
2672
    if (delete_lockfile) {
918✔
2673
        File::try_remove(get_core_file(base_path, CoreFileType::Lock));
876✔
2674
    }
876✔
2675
}
918✔
2676

2677
TransactionRef DB::start_read(VersionID version_id)
2678
{
1,380,954✔
2679
    if (!is_attached())
1,380,954✔
2680
        throw StaleAccessor("Stale transaction");
6✔
2681
    TransactionRef tr;
1,380,948✔
2682
    if (m_fake_read_lock_if_immutable) {
1,380,948✔
2683
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Reading);
372✔
2684
    }
372✔
2685
    else {
1,380,576✔
2686
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, version_id);
1,380,576✔
2687
        ReadLockGuard g(*this, read_lock);
1,380,576✔
2688
        read_lock.check();
1,380,576✔
2689
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Reading);
1,380,576✔
2690
        g.release();
1,380,576✔
2691
    }
1,380,576✔
2692
    tr->set_file_format_version(get_file_format_version());
1,380,948✔
2693
    return tr;
1,380,948✔
2694
}
1,380,948✔
2695

2696
TransactionRef DB::start_frozen(VersionID version_id)
2697
{
23,625✔
2698
    if (!is_attached())
23,625✔
2699
        throw StaleAccessor("Stale transaction");
×
2700
    TransactionRef tr;
23,625✔
2701
    if (m_fake_read_lock_if_immutable) {
23,625✔
2702
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Frozen);
12✔
2703
    }
12✔
2704
    else {
23,613✔
2705
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Frozen, version_id);
23,613✔
2706
        ReadLockGuard g(*this, read_lock);
23,613✔
2707
        read_lock.check();
23,613✔
2708
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Frozen);
23,613✔
2709
        g.release();
23,613✔
2710
    }
23,613✔
2711
    tr->set_file_format_version(get_file_format_version());
23,625✔
2712
    return tr;
23,625✔
2713
}
23,625✔
2714

2715
TransactionRef DB::start_write(bool nonblocking)
2716
{
274,308✔
2717
    if (m_fake_read_lock_if_immutable) {
274,308✔
2718
        REALM_ASSERT(false && "Can't write an immutable DB");
×
2719
    }
×
2720
    if (nonblocking) {
274,308✔
2721
        bool success = do_try_begin_write();
84✔
2722
        if (!success) {
84✔
2723
            return TransactionRef();
12✔
2724
        }
12✔
2725
    }
274,224✔
2726
    else {
274,224✔
2727
        do_begin_write();
274,224✔
2728
    }
274,224✔
2729
    {
274,302✔
2730
        CheckedUniqueLock local_lock(m_mutex);
274,296✔
2731
        if (!is_attached()) {
274,296✔
2732
            local_lock.unlock();
×
2733
            end_write_on_correct_thread();
×
2734
            throw StaleAccessor("Stale transaction");
×
2735
        }
×
2736
        m_write_transaction_open = true;
274,296✔
2737
    }
274,296✔
2738
    TransactionRef tr;
274,296✔
2739
    try {
274,296✔
2740
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, VersionID());
274,296✔
2741
        ReadLockGuard g(*this, read_lock);
274,296✔
2742
        read_lock.check();
274,296✔
2743

142,200✔
2744
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Writing);
274,296✔
2745
        tr->set_file_format_version(get_file_format_version());
274,296✔
2746
        version_type current_version = read_lock.m_version;
274,296✔
2747
        m_alloc.init_mapping_management(current_version);
274,296✔
2748
        if (Replication* repl = get_replication()) {
274,296✔
2749
            bool history_updated = false;
249,729✔
2750
            repl->initiate_transact(*tr, current_version, history_updated); // Throws
249,729✔
2751
        }
249,729✔
2752
        g.release();
274,296✔
2753
    }
274,296✔
2754
    catch (...) {
142,200✔
2755
        end_write_on_correct_thread();
×
2756
        throw;
×
2757
    }
×
2758

142,134✔
2759
    return tr;
274,218✔
2760
}
274,218✔
2761

2762
void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
2763
{
1,614✔
2764
    {
1,614✔
2765
        util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2766
        REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
1,614✔
2767
        tr->m_async_stage = Transaction::AsyncState::Requesting;
1,614✔
2768
        tr->m_request_time_point = std::chrono::steady_clock::now();
1,614✔
2769
        if (tr->db->m_logger) {
1,614✔
2770
            tr->db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,614✔
2771
                                  "Tr %1: Async request write lock", tr->m_log_id);
1,614✔
2772
        }
1,614✔
2773
    }
1,614✔
2774
    std::weak_ptr<Transaction> weak_tr = tr;
1,614✔
2775
    async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
1,614✔
2776
        if (auto tr = weak_tr.lock()) {
1,614✔
2777
            util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2778
            // If a synchronous transaction happened while we were pending
393✔
2779
            // we may be in HasCommits
393✔
2780
            if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
1,614✔
2781
                tr->m_async_stage = Transaction::AsyncState::HasLock;
1,614✔
2782
            }
1,614✔
2783
            if (tr->db->m_logger) {
1,614✔
2784
                auto t2 = std::chrono::steady_clock::now();
1,614✔
2785
                tr->db->m_logger->log(
1,614✔
2786
                    util::LogCategory::transaction, util::Logger::Level::trace, "Tr %1, Got write lock in %2 us",
1,614✔
2787
                    tr->m_log_id,
1,614✔
2788
                    std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
1,614✔
2789
            }
1,614✔
2790
            if (tr->m_waiting_for_write_lock) {
1,614✔
2791
                tr->m_waiting_for_write_lock = false;
129✔
2792
                tr->m_async_cv.notify_one();
129✔
2793
            }
129✔
2794
            else if (cb) {
1,485✔
2795
                cb();
1,485✔
2796
            }
1,485✔
2797
            tr.reset(); // Release pointer while lock is held
1,614✔
2798
        }
1,614✔
2799
    });
1,614✔
2800
}
1,614✔
2801

2802
inline DB::DB(Private, const DBOptions& options)
2803
    : m_upgrade_callback(std::move(options.upgrade_callback))
2804
    , m_log_id(util::gen_log_id(this))
2805
{
111,111✔
2806
    if (options.enable_async_writes) {
111,111✔
2807
        m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
72,651✔
2808
    }
72,651✔
2809
}
111,111✔
2810

2811
DBRef DB::create(const std::string& file, bool no_create, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2812
{
27,222✔
2813
    DBRef retval = std::make_shared<DB>(Private(), options);
27,222✔
2814
    retval->open(file, no_create, options);
27,222✔
2815
    return retval;
27,222✔
2816
}
27,222✔
2817

2818
DBRef DB::create(Replication& repl, const std::string& file, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2819
{
7,320✔
2820
    DBRef retval = std::make_shared<DB>(Private(), options);
7,320✔
2821
    retval->open(repl, file, options);
7,320✔
2822
    return retval;
7,320✔
2823
}
7,320✔
2824

2825
DBRef DB::create(std::unique_ptr<Replication> repl, const std::string& file,
2826
                 const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2827
{
51,141✔
2828
    REALM_ASSERT(repl);
51,141✔
2829
    DBRef retval = std::make_shared<DB>(Private(), options);
51,141✔
2830
    retval->m_history = std::move(repl);
51,141✔
2831
    retval->open(*retval->m_history, file, options);
51,141✔
2832
    return retval;
51,141✔
2833
}
51,141✔
2834

2835
DBRef DB::create(std::unique_ptr<Replication> repl, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2836
{
25,422✔
2837
    REALM_ASSERT(repl);
25,422✔
2838
    DBRef retval = std::make_shared<DB>(Private(), options);
25,422✔
2839
    retval->m_history = std::move(repl);
25,422✔
2840
    retval->open(*retval->m_history, options);
25,422✔
2841
    return retval;
25,422✔
2842
}
25,422✔
2843

2844
DBRef DB::create_in_memory(std::unique_ptr<Replication> repl, const std::string& in_memory_path,
2845
                           const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2846
{
×
2847
    DBRef db = create(std::move(repl), options);
×
2848
    db->m_db_path = in_memory_path;
×
2849
    return db;
×
2850
}
×
2851

2852
DBRef DB::create(BinaryData buffer, bool take_ownership) NO_THREAD_SAFETY_ANALYSIS
2853
{
6✔
2854
    DBOptions options;
6✔
2855
    options.is_immutable = true;
6✔
2856
    DBRef retval = std::make_shared<DB>(Private(), options);
6✔
2857
    retval->open(buffer, take_ownership);
6✔
2858
    return retval;
6✔
2859
}
6✔
2860

2861
void DB::claim_sync_agent()
2862
{
16,551✔
2863
    REALM_ASSERT(is_attached());
16,551✔
2864
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
16,551✔
2865
    if (m_info->sync_agent_present)
16,551✔
2866
        throw MultipleSyncAgents{};
6✔
2867
    m_info->sync_agent_present = 1; // Set to true
16,545✔
2868
    m_is_sync_agent = true;
16,545✔
2869
}
16,545✔
2870

2871
void DB::release_sync_agent()
2872
{
15,339✔
2873
    REALM_ASSERT(is_attached());
15,339✔
2874
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
15,339✔
2875
    if (!m_is_sync_agent)
15,339✔
2876
        return;
285✔
2877
    REALM_ASSERT(m_info->sync_agent_present);
15,054✔
2878
    m_info->sync_agent_present = 0;
15,054✔
2879
    m_is_sync_agent = false;
15,054✔
2880
}
15,054✔
2881

2882
void DB::do_begin_possibly_async_write()
2883
{
336,366✔
2884
    if (m_commit_helper) {
336,366✔
2885
        m_commit_helper->blocking_begin_write();
211,869✔
2886
    }
211,869✔
2887
    else {
124,497✔
2888
        do_begin_write();
124,497✔
2889
    }
124,497✔
2890
}
336,366✔
2891

2892
void DB::end_write_on_correct_thread() noexcept
2893
{
610,812✔
2894
    //    m_local_write_mutex.unlock();
309,966✔
2895
    if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
610,812✔
2896
        do_end_write();
398,733✔
2897
    }
398,733✔
2898
}
610,812✔
2899

2900
void DB::add_commit_listener(CommitListener* listener)
2901
{
88,032✔
2902
    std::lock_guard lock(m_commit_listener_mutex);
88,032✔
2903
    m_commit_listeners.push_back(listener);
88,032✔
2904
}
88,032✔
2905

2906
void DB::remove_commit_listener(CommitListener* listener)
2907
{
87,924✔
2908
    std::lock_guard lock(m_commit_listener_mutex);
87,924✔
2909
    m_commit_listeners.erase(std::remove(m_commit_listeners.begin(), m_commit_listeners.end(), listener),
87,924✔
2910
                             m_commit_listeners.end());
87,924✔
2911
}
87,924✔
2912

2913
DisableReplication::DisableReplication(Transaction& t)
2914
    : m_tr(t)
2915
    , m_owner(t.get_db())
2916
    , m_repl(m_owner->get_replication())
2917
    , m_version(t.get_version())
2918
{
×
2919
    m_owner->set_replication(nullptr);
×
2920
    t.m_history = nullptr;
×
2921
}
×
2922

2923
DisableReplication::~DisableReplication()
2924
{
×
2925
    m_owner->set_replication(m_repl);
×
2926
    if (m_version != m_tr.get_version())
×
2927
        m_tr.initialize_replication();
×
2928
}
×
2929

2930
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc