• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2109

07 Mar 2024 01:56PM UTC coverage: 90.918% (+0.01%) from 90.908%
2109

push

Evergreen

web-flow
Fix querying with a path into nested collections with wildcards (#7404)

Comparing a collection with a list could fail if there was wildcards
in the path and therefore multiple collections to compare with right
hand list.

Linklist is implicitly having wildcard in the path, so if linklists is
in the path there will be a similar problem.  Do not merge values
from different objects into a common list in queries.

93972 of 173176 branches covered (54.26%)

323 of 332 new or added lines in 6 files covered. (97.29%)

91 existing lines in 18 files now uncovered.

238503 of 262328 relevant lines covered (90.92%)

6065347.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.32
/src/realm/db.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2016 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20

21
#include <algorithm>
22
#include <atomic>
23
#include <cerrno>
24
#include <fcntl.h>
25
#include <iostream>
26
#include <mutex>
27
#include <sstream>
28
#include <type_traits>
29
#include <random>
30
#include <deque>
31
#include <thread>
32
#include <chrono>
33
#include <condition_variable>
34

35
#include <realm/disable_sync_to_disk.hpp>
36
#include <realm/group_writer.hpp>
37
#include <realm/impl/simulated_failure.hpp>
38
#include <realm/replication.hpp>
39
#include <realm/util/errno.hpp>
40
#include <realm/util/features.h>
41
#include <realm/util/file_mapper.hpp>
42
#include <realm/util/safe_int_ops.hpp>
43
#include <realm/util/scope_exit.hpp>
44
#include <realm/util/thread.hpp>
45
#include <realm/util/to_string.hpp>
46

47
#ifndef _WIN32
48
#include <sys/wait.h>
49
#include <sys/time.h>
50
#include <unistd.h>
51
#else
52
#include <windows.h>
53
#include <process.h>
54
#endif
55

56
// #define REALM_ENABLE_LOGFILE
57

58

59
using namespace realm;
60
using namespace realm::util;
61
using Durability = DBOptions::Durability;
62

63
namespace {
64

65
// value   change
66
// --------------------
67
//  4      Unknown
68
//  5      Introduction of SharedInfo::file_format_version and
69
//         SharedInfo::history_type.
70
//  6      Using new robust mutex emulation where applicable
71
//  7      Introducing `commit_in_critical_phase` and `sync_agent_present`, and
72
//         changing `daemon_started` and `daemon_ready` from 1-bit to 8-bit
73
//         fields.
74
//  8      Placing the commitlog history inside the Realm file.
75
//  9      Fair write transactions requires an additional condition variable,
76
//         `write_fairness`
77
// 10      Introducing SharedInfo::history_schema_version.
78
// 11      New impl of InterprocessCondVar on windows.
79
// 12      Change `number_of_versions` to an atomic rather than guarding it
80
//         with a lock.
81
// 13      New impl of VersionList and added mutex for it (former RingBuffer)
82
// 14      Added field for tracking ongoing encrypted writes
83
const uint_fast16_t g_shared_info_version = 14;
84

85

86
struct VersionList {
87
    // the VersionList is an array of ReadCount structures.
88
    // it is placed in the "lock-file" and accessed via memory mapping
89
    struct ReadCount {
90
        uint64_t version;
91
        uint64_t filesize;
92
        uint64_t current_top;
93
        uint32_t count_live;
94
        uint32_t count_frozen;
95
        uint32_t count_full;
96
        bool is_active()
97
        {
67,735,152✔
98
            return version != 0;
67,735,152✔
99
        }
67,735,152✔
100
        void deactivate()
101
        {
5,860,233✔
102
            version = 0;
5,860,233✔
103
            count_live = count_frozen = count_full = 0;
5,860,233✔
104
        }
5,860,233✔
105
        void activate(uint64_t v)
106
        {
686,826✔
107
            version = v;
686,826✔
108
        }
686,826✔
109
    };
110

111
    void reserve(uint32_t size) noexcept
112
    {
167,160✔
113
        for (auto i = entries; i < size; ++i)
5,515,944✔
114
            data()[i].deactivate();
5,348,784✔
115
        if (size > entries) {
167,163✔
116
            // Fence preventing downward motion of above writes
82,584✔
117
            std::atomic_signal_fence(std::memory_order_release);
167,163✔
118
            entries = size;
167,163✔
119
        }
167,163✔
120
    }
167,160✔
121

122
    VersionList() noexcept
123
    {
82,623✔
124
        newest = nil; // empty
82,623✔
125
        entries = 0;
82,623✔
126
        reserve(init_readers_size);
82,623✔
127
    }
82,623✔
128

129
    static size_t compute_required_space(uint_fast32_t num_entries) noexcept
130
    {
86,946✔
131
        // get space required for given number of entries beyond the initial count.
42,660✔
132
        // NB: this not the size of the VersionList, it is the size minus whatever was
42,660✔
133
        // the initial size.
42,660✔
134
        return sizeof(ReadCount) * (num_entries - init_readers_size);
86,946✔
135
    }
86,946✔
136

137
    unsigned int capacity() const noexcept
138
    {
1,152,945✔
139
        return entries;
1,152,945✔
140
    }
1,152,945✔
141

142
    ReadCount& get(uint_fast32_t idx) noexcept
143
    {
2,993,436✔
144
        return data()[idx];
2,993,436✔
145
    }
2,993,436✔
146

147
    ReadCount& get_newest() noexcept
148
    {
×
149
        return get(newest);
×
150
    }
×
151
    // returns nullptr if all entries are in use
152
    ReadCount* try_allocate_entry(uint64_t top, uint64_t size, uint64_t version)
153
    {
686,856✔
154
        auto i = allocating.load();
686,856✔
155
        if (i == newest.load()) {
686,856✔
156
            // if newest != allocating we are recovering from a crash and MUST complete the earlier allocation
307,962✔
157
            // but if not, find lowest free entry by linear search.
307,962✔
158
            uint32_t k = 0;
602,391✔
159
            while (k < entries && data()[k].is_active()) {
1,421,937✔
160
                ++k;
819,546✔
161
            }
819,546✔
162
            if (k == entries)
602,391✔
163
                return nullptr;     // no free entries
66✔
164
            allocating.exchange(k); // barrier: prevent upward movement of instructions below
602,325✔
165
            i = k;
602,325✔
166
        }
602,325✔
167
        auto& rc = data()[i];
686,823✔
168
        REALM_ASSERT(rc.count_frozen == 0);
686,790✔
169
        REALM_ASSERT(rc.count_live == 0);
686,790✔
170
        REALM_ASSERT(rc.count_full == 0);
686,790✔
171
        rc.current_top = top;
686,790✔
172
        rc.filesize = size;
686,790✔
173
        rc.activate(version);
686,790✔
174
        newest.store(i); // barrier: prevent downward movement of instructions above
686,790✔
175
        return &rc;
686,790✔
176
    }
686,856✔
177

178
    uint32_t index_of(const ReadCount& rc) noexcept
179
    {
511,458✔
180
        return (uint32_t)(&rc - data());
511,458✔
181
    }
511,458✔
182

183
    void free_entry(ReadCount* rc) noexcept
184
    {
511,461✔
185
        rc->current_top = rc->filesize = -1ULL; // easy to recognize in debugger
511,461✔
186
        rc->deactivate();
511,461✔
187
    }
511,461✔
188

189
    // This method resets the version list to an empty state, then allocates an entry.
190
    // Precondition: This should *only* be done if the caller has established that she
191
    // is the only thread/process that has access to the VersionList. It is currently
192
    // called from init_versioning(), which is called by DB::open() under the
193
    // condition that it is the session initiator and under guard by the control mutex,
194
    // thus ensuring the precondition. It is also called from compact() in a similar situation.
195
    // It is most likely not suited for any other use.
196
    ReadCount& init_versioning(uint64_t top, uint64_t filesize, uint64_t version) noexcept
197
    {
84,471✔
198
        newest = nil;
84,471✔
199
        allocating = 0;
84,471✔
200
        auto t_free = entries;
84,471✔
201
        entries = 0;
84,471✔
202
        reserve(t_free);
84,471✔
203
        return *try_allocate_entry(top, filesize, version);
84,471✔
204
    }
84,471✔
205

206
    void purge_versions(uint64_t& oldest_live_v, TopRefMap& top_refs, bool& any_new_unreachables)
207
    {
602,334✔
208
        oldest_live_v = std::numeric_limits<uint64_t>::max();
602,334✔
209
        auto oldest_full_v = std::numeric_limits<uint64_t>::max();
602,334✔
210
        any_new_unreachables = false;
602,334✔
211
        // correct case where an earlier crash may have left the entry at 'allocating' partially initialized:
307,905✔
212
        const auto index_of_newest = newest.load();
602,334✔
213
        if (auto a = allocating.load(); a != index_of_newest) {
602,334✔
214
            data()[a].deactivate();
×
215
        }
×
216
        // determine fully locked versions - after one of those all versions are considered live.
307,905✔
217
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,545,815✔
218
            if (!rc->is_active())
19,943,481✔
219
                continue;
17,857,998✔
220
            if (rc->count_full) {
2,085,483✔
221
                if (rc->version < oldest_full_v)
×
222
                    oldest_full_v = rc->version;
×
223
            }
×
224
        }
2,085,483✔
225
        // collect reachable versions and determine oldest live reachable version
307,905✔
226
        // (oldest reachable version is the first entry in the top_refs map, so no need to find it explicitly)
307,905✔
227
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,546,211✔
228
            if (!rc->is_active())
19,943,877✔
229
                continue;
17,858,883✔
230
            if (rc->count_frozen || rc->count_live || rc->version >= oldest_full_v) {
2,084,994✔
231
                // entry is still reachable
804,318✔
232
                top_refs.emplace(rc->version, VersionInfo{to_ref(rc->current_top), to_ref(rc->filesize)});
1,574,127✔
233
            }
1,574,127✔
234
            if (rc->count_live || rc->version >= oldest_full_v) {
2,084,994✔
235
                if (rc->version < oldest_live_v)
1,020,939✔
236
                    oldest_live_v = rc->version;
680,154✔
237
            }
1,020,939✔
238
        }
2,084,994✔
239
        // we must have found at least one reachable version
307,905✔
240
        REALM_ASSERT(top_refs.size());
602,334✔
241
        // free unreachable entries and determine if we want to trigger backdating
307,905✔
242
        uint64_t oldest_v = top_refs.begin()->first;
602,334✔
243
        for (auto* rc = data(); rc < data() + entries; ++rc) {
20,546,208✔
244
            if (!rc->is_active())
19,943,874✔
245
                continue;
17,858,289✔
246
            if (rc->count_frozen == 0 && rc->count_live == 0 && rc->version < oldest_full_v) {
2,085,585✔
247
                // entry is becoming unreachable.
263,178✔
248
                // if it is also younger than a reachable version, then set 'any_new_unreachables' to trigger
263,178✔
249
                // backdating
263,178✔
250
                if (rc->version > oldest_v) {
511,464✔
251
                    any_new_unreachables = true;
64,965✔
252
                }
64,965✔
253
                REALM_ASSERT(index_of(*rc) != index_of_newest);
511,464✔
254
                free_entry(rc);
511,464✔
255
            }
511,464✔
256
        }
2,085,585✔
257
        REALM_ASSERT(oldest_v != std::numeric_limits<uint64_t>::max());
602,334✔
258
        REALM_ASSERT(oldest_live_v != std::numeric_limits<uint64_t>::max());
602,334✔
259
    }
602,334✔
260

261
#if REALM_DEBUG
262
    void dump()
263
    {
×
264
        util::format(std::cout, "VersionList has %1 entries: \n", entries);
×
265
        for (auto* rc = data(); rc < data() + entries; ++rc) {
×
266
            util::format(std::cout, "[%1]: version %2, live: %3, full: %4, frozen: %5\n", index_of(*rc), rc->version,
×
267
                         rc->count_live, rc->count_full, rc->count_frozen);
×
268
        }
×
269
    }
×
270
#endif // REALM_DEBUG
271

272
    constexpr static uint32_t nil = (uint32_t)-1;
273
    const static int init_readers_size = 32;
274
    uint32_t entries;
275
    std::atomic<uint32_t> allocating; // atomic for crash safety, not threading
276
    std::atomic<uint32_t> newest;     // atomic for crash safety, not threading
277

278
    // IMPORTANT: The actual data comprising the version list MUST BE PLACED LAST in
279
    // the VersionList structure, as the data area is extended at run time.
280
    // Similarly, the VersionList must be the final element of the SharedInfo structure.
281
    // IMPORTANT II:
282
    // To ensure proper alignment across all platforms, the SharedInfo structure
283
    // should NOT have a stricter alignment requirement than the ReadCount structure.
284
    ReadCount m_data[init_readers_size];
285

286
    // Silence UBSan errors about out-of-bounds reads on m_data by casting to a pointer
287
    ReadCount* data() noexcept
288
    {
74,376,627✔
289
        return m_data;
74,376,627✔
290
    }
74,376,627✔
291
    const ReadCount* data() const noexcept
292
    {
×
293
        return m_data;
×
294
    }
×
295
};
296

297
// Using lambda rather than function so that shared_ptr shared state doesn't need to hold a function pointer.
298
constexpr auto TransactionDeleter = [](Transaction* t) {
1,614,105✔
299
    t->close();
1,614,105✔
300
    delete t;
1,614,105✔
301
};
1,614,105✔
302

303
template <typename... Args>
304
TransactionRef make_transaction_ref(Args&&... args)
305
{
1,617,684✔
306
    return TransactionRef(new Transaction(std::forward<Args>(args)...), TransactionDeleter);
1,617,684✔
307
}
1,617,684✔
308

309
} // anonymous namespace
310

311
namespace realm {
312

313
/// The structure of the contents of the per session `.lock` file. Note that
314
/// this file is transient in that it is recreated/reinitialized at the
315
/// beginning of every session. A session is any sequence of temporally
316
/// overlapping openings of a particular Realm file via DB objects. For
317
/// example, if there are two DB objects, A and B, and the file is
318
/// first opened via A, then opened via B, then closed via A, and finally closed
319
/// via B, then the session streaches from the opening via A to the closing via
320
/// B.
321
///
322
/// IMPORTANT: Remember to bump `g_shared_info_version` if anything is changed
323
/// in the memory layout of this class, or if the meaning of any of the stored
324
/// values change.
325
///
326
/// Members `init_complete`, `shared_info_version`, `size_of_mutex`, and
327
/// `size_of_condvar` may only be modified only while holding an exclusive lock
328
/// on the file, and may be read only while holding a shared (or exclusive) lock
329
/// on the file. All other members (except for the VersionList which has its own mutex)
330
/// may be accessed only while holding a lock on `controlmutex`.
331
///
332
/// SharedInfo must be 8-byte aligned. On 32-bit Apple platforms, mutexes store their
333
/// alignment as part of the mutex state. We're copying the SharedInfo (including
334
/// embedded but alway unlocked mutexes) and it must retain the same alignment
335
/// throughout.
336
struct alignas(8) DB::SharedInfo {
337
    /// Indicates that initialization of the lock file was completed
338
    /// sucessfully.
339
    ///
340
    /// CAUTION: This member must never move or change type, as that would
341
    /// compromize safety of the the session initiation process.
342
    std::atomic<uint8_t> init_complete; // Offset 0
343

344
    /// The size in bytes of a mutex member of SharedInfo. This allows all
345
    /// session participants to be in agreement. Obviously, a size match is not
346
    /// enough to guarantee identical layout internally in the mutex object, but
347
    /// it is hoped that it will catch some (if not most) of the cases where
348
    /// there is a layout discrepancy internally in the mutex object.
349
    uint8_t size_of_mutex; // Offset 1
350

351
    /// Like size_of_mutex, but for condition variable members of SharedInfo.
352
    uint8_t size_of_condvar; // Offset 2
353

354
    /// Set during the critical phase of a commit, when the logs, the VersionList
355
    /// and the database may be out of sync with respect to each other. If a
356
    /// writer crashes during this phase, there is no safe way of continuing
357
    /// with further write transactions. When beginning a write transaction,
358
    /// this must be checked and an exception thrown if set.
359
    ///
360
    /// Note that std::atomic<uint8_t> is guaranteed to have standard layout.
361
    std::atomic<uint8_t> commit_in_critical_phase = {0}; // Offset 3
362

363
    /// The target Realm file format version for the current session. This
364
    /// allows all session participants to be in agreement. It can only differ
365
    /// from what is returned by Group::get_file_format_version() temporarily,
366
    /// and only during the Realm file opening process. If it differs, it means
367
    /// that the file format needs to be upgraded from its current format
368
    /// (Group::get_file_format_version()), the format specified by this member
369
    /// of SharedInfo.
370
    uint8_t file_format_version; // Offset 4
371

372
    /// Stores a value of type Replication::HistoryType. Must match across all
373
    /// session participants.
374
    int8_t history_type; // Offset 5
375

376
    /// The SharedInfo layout version. This allows all session participants to
377
    /// be in agreement. Must be bumped if the layout of the SharedInfo
378
    /// structure is changed. Note, however, that only the part that lies beyond
379
    /// SharedInfoUnchangingLayout can have its layout changed.
380
    ///
381
    /// CAUTION: This member must never move or change type, as that would
382
    /// compromize version agreement checking.
383
    uint16_t shared_info_version = g_shared_info_version; // Offset 6
384

385
    uint16_t durability;           // Offset 8
386
    uint16_t free_write_slots = 0; // Offset 10
387

388
    /// Number of participating shared groups
389
    uint32_t num_participants = 0; // Offset 12
390

391
    /// Latest version number. Guarded by the controlmutex (for lock-free
392
    /// access, use get_version_of_latest_snapshot() instead)
393
    uint64_t latest_version_number; // Offset 16
394

395
    /// Pid of process initiating the session, but only if that process runs
396
    /// with encryption enabled, zero otherwise. This was used to prevent
397
    /// multiprocess encryption until support for that was added.
398
    uint64_t session_initiator_pid = 0; // Offset 24
399

400
    std::atomic<uint64_t> number_of_versions; // Offset 32
401

402
    /// True (1) if there is a sync agent present (a session participant acting
403
    /// as sync client). It is an error to have a session with more than one
404
    /// sync agent. The purpose of this flag is to prevent that from ever
405
    /// happening. If the sync agent crashes and leaves the flag set, the
406
    /// session will need to be restarted (lock file reinitialized) before a new
407
    /// sync agent can be started.
408
    uint8_t sync_agent_present = 0; // Offset 40
409

410
    /// Set when a participant decides to start the daemon, cleared by the
411
    /// daemon when it decides to exit. Participants check during open() and
412
    /// start the daemon if running in async mode.
413
    uint8_t daemon_started = 0; // Offset 41
414

415
    /// Set by the daemon when it is ready to handle commits. Participants must
416
    /// wait during open() on 'daemon_becomes_ready' for this to become true.
417
    /// Cleared by the daemon when it decides to exit.
418
    uint8_t daemon_ready = 0; // Offset 42
419

420
    uint8_t filler_1; // Offset 43
421

422
    /// Stores a history schema version (as returned by
423
    /// Replication::get_history_schema_version()). Must match across all
424
    /// session participants.
425
    uint16_t history_schema_version; // Offset 44
426

427
    uint16_t filler_2; // Offset 46
428

429
    InterprocessMutex::SharedPart shared_writemutex; // Offset 48
430
    InterprocessMutex::SharedPart shared_controlmutex;
431
    InterprocessMutex::SharedPart shared_versionlist_mutex;
432
    InterprocessCondVar::SharedPart room_to_write;
433
    InterprocessCondVar::SharedPart work_to_do;
434
    InterprocessCondVar::SharedPart daemon_becomes_ready;
435
    InterprocessCondVar::SharedPart new_commit_available;
436
    InterprocessCondVar::SharedPart pick_next_writer;
437
    std::atomic<uint32_t> next_ticket;
438
    std::atomic<uint32_t> next_served = 0;
439
    std::atomic<uint64_t> writing_page_offset;
440
    std::atomic<uint64_t> write_counter;
441

442
    // IMPORTANT: The VersionList MUST be the last field in SharedInfo - see above.
443
    VersionList readers;
444

445
    SharedInfo(Durability, Replication::HistoryType, int history_schema_version);
446
    ~SharedInfo() noexcept {}
25,422✔
447

448
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version)
449
    {
84,471✔
450
        // Create our first versioning entry:
41,880✔
451
        readers.init_versioning(top_ref, file_size, initial_version);
84,471✔
452
    }
84,471✔
453
};
454

455

456
DB::SharedInfo::SharedInfo(Durability dura, Replication::HistoryType ht, int hsv)
457
    : size_of_mutex(sizeof(shared_writemutex))
458
    , size_of_condvar(sizeof(room_to_write))
459
    , shared_writemutex()   // Throws
460
    , shared_controlmutex() // Throws
461
{
82,626✔
462
    durability = static_cast<uint16_t>(dura); // durability level is fixed from creation
82,626✔
463
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_type)>(ht + 0));
82,626✔
464
    REALM_ASSERT(!util::int_cast_has_overflow<decltype(history_schema_version)>(hsv));
82,626✔
465
    history_type = ht;
82,626✔
466
    history_schema_version = static_cast<uint16_t>(hsv);
82,626✔
467
    InterprocessCondVar::init_shared_part(new_commit_available); // Throws
82,626✔
468
    InterprocessCondVar::init_shared_part(pick_next_writer);     // Throws
82,626✔
469
    next_ticket = 0;
82,626✔
470

40,671✔
471
// IMPORTANT: The offsets, types (, and meanings) of these members must
40,671✔
472
// never change, not even when the SharedInfo layout version is bumped. The
40,671✔
473
// eternal constancy of this part of the layout is what ensures that a
40,671✔
474
// joining session participant can reliably verify that the actual format is
40,671✔
475
// as expected.
40,671✔
476
#ifndef _WIN32
82,626✔
477
#pragma GCC diagnostic push
82,626✔
478
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
82,626✔
479
#endif
82,626✔
480
    static_assert(offsetof(SharedInfo, init_complete) == 0 && ATOMIC_BOOL_LOCK_FREE == 2 &&
82,626✔
481
                      std::is_same<decltype(init_complete), std::atomic<uint8_t>>::value &&
82,626✔
482
                      offsetof(SharedInfo, shared_info_version) == 6 &&
82,626✔
483
                      std::is_same<decltype(shared_info_version), uint16_t>::value,
82,626✔
484
                  "Forbidden change in SharedInfo layout");
82,626✔
485

40,671✔
486
    // Try to catch some of the memory layout changes that requires bumping of
40,671✔
487
    // the SharedInfo file format version (shared_info_version).
40,671✔
488
    static_assert(
82,626✔
489
        offsetof(SharedInfo, size_of_mutex) == 1 && std::is_same<decltype(size_of_mutex), uint8_t>::value &&
82,626✔
490
            offsetof(SharedInfo, size_of_condvar) == 2 && std::is_same<decltype(size_of_condvar), uint8_t>::value &&
82,626✔
491
            offsetof(SharedInfo, commit_in_critical_phase) == 3 &&
82,626✔
492
            std::is_same<decltype(commit_in_critical_phase), std::atomic<uint8_t>>::value &&
82,626✔
493
            offsetof(SharedInfo, file_format_version) == 4 &&
82,626✔
494
            std::is_same<decltype(file_format_version), uint8_t>::value && offsetof(SharedInfo, history_type) == 5 &&
82,626✔
495
            std::is_same<decltype(history_type), int8_t>::value && offsetof(SharedInfo, durability) == 8 &&
82,626✔
496
            std::is_same<decltype(durability), uint16_t>::value && offsetof(SharedInfo, free_write_slots) == 10 &&
82,626✔
497
            std::is_same<decltype(free_write_slots), uint16_t>::value &&
82,626✔
498
            offsetof(SharedInfo, num_participants) == 12 &&
82,626✔
499
            std::is_same<decltype(num_participants), uint32_t>::value &&
82,626✔
500
            offsetof(SharedInfo, latest_version_number) == 16 &&
82,626✔
501
            std::is_same<decltype(latest_version_number), uint64_t>::value &&
82,626✔
502
            offsetof(SharedInfo, session_initiator_pid) == 24 &&
82,626✔
503
            std::is_same<decltype(session_initiator_pid), uint64_t>::value &&
82,626✔
504
            offsetof(SharedInfo, number_of_versions) == 32 &&
82,626✔
505
            std::is_same<decltype(number_of_versions), std::atomic<uint64_t>>::value &&
82,626✔
506
            offsetof(SharedInfo, sync_agent_present) == 40 &&
82,626✔
507
            std::is_same<decltype(sync_agent_present), uint8_t>::value &&
82,626✔
508
            offsetof(SharedInfo, daemon_started) == 41 && std::is_same<decltype(daemon_started), uint8_t>::value &&
82,626✔
509
            offsetof(SharedInfo, daemon_ready) == 42 && std::is_same<decltype(daemon_ready), uint8_t>::value &&
82,626✔
510
            offsetof(SharedInfo, filler_1) == 43 && std::is_same<decltype(filler_1), uint8_t>::value &&
82,626✔
511
            offsetof(SharedInfo, history_schema_version) == 44 &&
82,626✔
512
            std::is_same<decltype(history_schema_version), uint16_t>::value && offsetof(SharedInfo, filler_2) == 46 &&
82,626✔
513
            std::is_same<decltype(filler_2), uint16_t>::value && offsetof(SharedInfo, shared_writemutex) == 48 &&
82,626✔
514
            std::is_same<decltype(shared_writemutex), InterprocessMutex::SharedPart>::value,
82,626✔
515
        "Caught layout change requiring SharedInfo file format bumping");
82,626✔
516
    static_assert(std::atomic<uint64_t>::is_always_lock_free);
82,626✔
517
#ifndef _WIN32
82,626✔
518
#pragma GCC diagnostic pop
82,626✔
519
#endif
82,626✔
520
}
82,626✔
521

522
class DB::VersionManager {
523
public:
524
    VersionManager(util::InterprocessMutex& mutex)
525
        : m_mutex(mutex)
526
    {
111,342✔
527
    }
111,342✔
528
    virtual ~VersionManager() {}
111,342✔
529

530
    void cleanup_versions(uint64_t& oldest_live_version, TopRefMap& top_refs, bool& any_new_unreachables)
531
        REQUIRES(!m_info_mutex)
532
    {
602,355✔
533
        std::lock_guard lock(m_mutex);
602,355✔
534
        util::CheckedLockGuard info_lock(m_info_mutex);
602,355✔
535
        ensure_reader_mapping();
602,355✔
536
        m_info->readers.purge_versions(oldest_live_version, top_refs, any_new_unreachables);
602,355✔
537
    }
602,355✔
538

539
    version_type get_newest_version() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
540
    {
602,337✔
541
        return get_version_id_of_latest_snapshot().version;
602,337✔
542
    }
602,337✔
543

544
    VersionID get_version_id_of_latest_snapshot() REQUIRES(!m_local_readers_mutex, !m_info_mutex)
545
    {
1,591,743✔
546
        {
1,591,743✔
547
            // First check the local cache. This is an unlocked read, so it may
1,164,462✔
548
            // race with adding a new version. If this happens we'll either see
1,164,462✔
549
            // a stale value (acceptable for a racing write on one thread and
1,164,462✔
550
            // a read on another), or a new value which is guaranteed to not
1,164,462✔
551
            // be an active index in the local cache.
1,164,462✔
552
            util::CheckedLockGuard lock(m_local_readers_mutex);
1,591,743✔
553
            util::CheckedLockGuard info_lock(m_info_mutex);
1,591,743✔
554
            auto index = m_info->readers.newest.load();
1,591,743✔
555
            if (index < m_local_readers.size()) {
1,591,743✔
556
                auto& r = m_local_readers[index];
1,588,173✔
557
                if (r.is_active()) {
1,588,173✔
558
                    return {r.version, index};
1,564,932✔
559
                }
1,564,932✔
560
            }
26,811✔
561
        }
26,811✔
562

13,917✔
563
        std::lock_guard lock(m_mutex);
26,811✔
564
        util::CheckedLockGuard info_lock(m_info_mutex);
26,811✔
565
        auto index = m_info->readers.newest.load();
26,811✔
566
        ensure_reader_mapping(index);
26,811✔
567
        return {m_info->readers.get(index).version, index};
26,811✔
568
    }
26,811✔
569

570
    void release_read_lock(const ReadLockInfo& read_lock) REQUIRES(!m_local_readers_mutex, !m_info_mutex)
571
    {
3,656,499✔
572
        {
3,656,499✔
573
            util::CheckedLockGuard lock(m_local_readers_mutex);
3,656,499✔
574
            REALM_ASSERT(read_lock.m_reader_idx < m_local_readers.size());
3,656,499✔
575
            auto& r = m_local_readers[read_lock.m_reader_idx];
3,656,499✔
576
            auto& f = field_for_type(r, read_lock.m_type);
3,656,499✔
577
            REALM_ASSERT(f > 0);
3,656,499✔
578
            if (--f > 0)
3,656,499✔
579
                return;
2,173,107✔
580
            if (r.count_live == 0 && r.count_full == 0 && r.count_frozen == 0)
1,483,392✔
581
                r.version = 0;
1,464,720✔
582
        }
1,483,392✔
583

750,084✔
584
        std::lock_guard lock(m_mutex);
1,483,392✔
585
        util::CheckedLockGuard info_lock(m_info_mutex);
1,483,392✔
586
        // we should not need to call ensure_full_reader_mapping,
750,084✔
587
        // since releasing a read lock means it has been grabbed
750,084✔
588
        // earlier - and hence must reside in mapped memory:
750,084✔
589
        REALM_ASSERT(read_lock.m_reader_idx < m_local_max_entry);
1,483,392✔
590
        auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,483,392✔
591
        REALM_ASSERT(read_lock.m_version == r.version);
1,483,392✔
592
        --field_for_type(r, read_lock.m_type);
1,483,392✔
593
    }
1,483,392✔
594

595
    ReadLockInfo grab_read_lock(ReadLockInfo::Type type, VersionID version_id = {})
596
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
597
    {
3,656,400✔
598
        ReadLockInfo read_lock;
3,656,400✔
599
        if (try_grab_local_read_lock(read_lock, type, version_id))
3,656,400✔
600
            return read_lock;
2,173,104✔
601

750,039✔
602
        {
1,483,296✔
603
            const bool pick_specific = version_id.version != VersionID().version;
1,483,296✔
604
            std::lock_guard lock(m_mutex);
1,483,296✔
605
            util::CheckedLockGuard info_lock(m_info_mutex);
1,483,296✔
606
            auto newest = m_info->readers.newest.load();
1,483,296✔
607
            REALM_ASSERT(newest != VersionList::nil);
1,483,296✔
608
            read_lock.m_reader_idx = pick_specific ? version_id.index : newest;
1,478,070✔
609
            ensure_reader_mapping((unsigned int)read_lock.m_reader_idx);
1,483,296✔
610
            bool picked_newest = read_lock.m_reader_idx == (unsigned)newest;
1,483,296✔
611
            auto& r = m_info->readers.get(read_lock.m_reader_idx);
1,483,296✔
612
            if (pick_specific && version_id.version != r.version)
1,483,296✔
613
                throw BadVersion(version_id.version);
72✔
614
            if (!picked_newest) {
1,483,224✔
615
                if (type == ReadLockInfo::Frozen && r.count_frozen == 0 && r.count_live == 0)
612✔
616
                    throw BadVersion(version_id.version);
×
617
                if (type != ReadLockInfo::Frozen && r.count_live == 0)
612✔
618
                    throw BadVersion(version_id.version);
60✔
619
            }
1,483,164✔
620
            populate_read_lock(read_lock, r, type);
1,483,164✔
621
        }
1,483,164✔
622

749,973✔
623
        {
1,483,164✔
624
            util::CheckedLockGuard local_lock(m_local_readers_mutex);
1,483,164✔
625
            grow_local_cache(read_lock.m_reader_idx + 1);
1,483,164✔
626
            auto& r2 = m_local_readers[read_lock.m_reader_idx];
1,483,164✔
627
            if (!r2.is_active()) {
1,483,164✔
628
                r2.version = read_lock.m_version;
1,464,645✔
629
                r2.filesize = read_lock.m_file_size;
1,464,645✔
630
                r2.current_top = read_lock.m_top_ref;
1,464,645✔
631
                r2.count_full = r2.count_live = r2.count_frozen = 0;
1,464,645✔
632
            }
1,464,645✔
633
            REALM_ASSERT_EX(field_for_type(r2, type) == 0, type, r2.count_full, r2.count_live, r2.count_frozen);
1,483,164✔
634
            field_for_type(r2, type) = 1;
1,483,164✔
635
        }
1,483,164✔
636

749,973✔
637
        return read_lock;
1,483,164✔
638
    }
1,483,164✔
639

640
    void init_versioning(ref_type top_ref, size_t file_size, uint64_t initial_version) REQUIRES(!m_info_mutex)
641
    {
59,049✔
642
        std::lock_guard lock(m_mutex);
59,049✔
643
        util::CheckedLockGuard info_lock(m_info_mutex);
59,049✔
644
        m_info->init_versioning(top_ref, file_size, initial_version);
59,049✔
645
    }
59,049✔
646

647
    void add_version(ref_type new_top_ref, size_t new_file_size, uint64_t new_version) REQUIRES(!m_info_mutex)
648
    {
602,274✔
649
        std::lock_guard lock(m_mutex);
602,274✔
650
        util::CheckedLockGuard info_lock(m_info_mutex);
602,274✔
651
        ensure_reader_mapping();
602,274✔
652
        if (m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version)) {
602,316✔
653
            return;
602,283✔
654
        }
602,283✔
655
        // allocation failed, expand VersionList (and lockfile) and retry
33✔
656
        auto entries = m_info->readers.capacity();
2,147,483,680✔
657
        auto new_entries = entries + 32;
2,147,483,680✔
658
        expand_version_list(new_entries);
2,147,483,680✔
659
        m_local_max_entry = new_entries;
2,147,483,680✔
660
        m_info->readers.reserve(new_entries);
2,147,483,680✔
661
        auto success = m_info->readers.try_allocate_entry(new_top_ref, new_file_size, new_version);
2,147,483,680✔
662
        REALM_ASSERT_EX(success, new_entries, new_version);
2,147,483,680✔
663
    }
2,147,483,680✔
664

665

666
private:
667
    void grow_local_cache(size_t new_size) REQUIRES(m_local_readers_mutex)
668
    {
1,483,458✔
669
        if (new_size > m_local_readers.size())
1,483,458✔
670
            m_local_readers.resize(new_size, VersionList::ReadCount{});
209,625✔
671
    }
1,483,458✔
672

673
    void populate_read_lock(ReadLockInfo& read_lock, VersionList::ReadCount& r, ReadLockInfo::Type type)
674
    {
3,656,358✔
675
        ++field_for_type(r, type);
3,656,358✔
676
        read_lock.m_type = type;
3,656,358✔
677
        read_lock.m_version = r.version;
3,656,358✔
678
        read_lock.m_top_ref = static_cast<ref_type>(r.current_top);
3,656,358✔
679
        read_lock.m_file_size = static_cast<size_t>(r.filesize);
3,656,358✔
680
    }
3,656,358✔
681

682
    bool try_grab_local_read_lock(ReadLockInfo& read_lock, ReadLockInfo::Type type, VersionID version_id)
683
        REQUIRES(!m_local_readers_mutex, !m_info_mutex)
684
    {
3,656,445✔
685
        const bool pick_specific = version_id.version != VersionID().version;
3,656,445✔
686
        auto index = version_id.index;
3,656,445✔
687
        if (!pick_specific) {
3,656,445✔
688
            util::CheckedLockGuard lock(m_info_mutex);
3,363,078✔
689
            index = m_info->readers.newest.load();
3,363,078✔
690
        }
3,363,078✔
691
        util::CheckedLockGuard local_lock(m_local_readers_mutex);
3,656,445✔
692
        if (index >= m_local_readers.size())
3,656,445✔
693
            return false;
209,631✔
694

2,301,807✔
695
        auto& r = m_local_readers[index];
3,446,814✔
696
        if (!r.is_active())
3,446,814✔
697
            return false;
1,255,122✔
698
        if (pick_specific && r.version != version_id.version)
2,191,692✔
699
            return false;
×
700
        if (field_for_type(r, type) == 0)
2,191,692✔
701
            return false;
18,753✔
702

1,654,818✔
703
        read_lock.m_reader_idx = index;
2,172,939✔
704
        populate_read_lock(read_lock, r, type);
2,172,939✔
705
        return true;
2,172,939✔
706
    }
2,172,939✔
707

708
    static uint32_t& field_for_type(VersionList::ReadCount& r, ReadLockInfo::Type type)
709
    {
13,951,425✔
710
        switch (type) {
13,951,425✔
711
            case ReadLockInfo::Frozen:
135,399✔
712
                return r.count_frozen;
135,399✔
713
            case ReadLockInfo::Live:
13,816,215✔
714
                return r.count_live;
13,816,215✔
715
            case ReadLockInfo::Full:
✔
716
                return r.count_full;
×
717
            default:
✔
718
                REALM_UNREACHABLE(); // silence a warning
719
        }
13,951,425✔
720
    }
13,951,425✔
721

722
    void mark_page_for_writing(uint64_t page_offset) REQUIRES(!m_info_mutex)
723
    {
2,307✔
724
        util::CheckedLockGuard info_lock(m_info_mutex);
2,307✔
725
        m_info->writing_page_offset = page_offset + 1;
2,307✔
726
        m_info->write_counter++;
2,307✔
727
    }
2,307✔
728
    void clear_writing_marker() REQUIRES(!m_info_mutex)
729
    {
2,307✔
730
        util::CheckedLockGuard info_lock(m_info_mutex);
2,307✔
731
        m_info->write_counter++;
2,307✔
732
        m_info->writing_page_offset = 0;
2,307✔
733
    }
2,307✔
734
    // returns false if no page is marked.
735
    // if a page is marked, returns true and optionally the offset of the page marked for writing
736
    // in all cases returns optionally the write counter
737
    bool observe_writer(uint64_t* page_offset, uint64_t* write_counter) REQUIRES(!m_info_mutex)
738
    {
90✔
739
        util::CheckedLockGuard info_lock(m_info_mutex);
90✔
740
        if (write_counter) {
90✔
741
            *write_counter = m_info->write_counter;
90✔
742
        }
90✔
743
        uint64_t marked = m_info->writing_page_offset;
90✔
744
        if (marked && page_offset) {
90!
745
            *page_offset = marked - 1;
×
746
        }
×
747
        return marked != 0;
90✔
748
    }
90✔
749

750
protected:
751
    util::InterprocessMutex& m_mutex;
752
    util::CheckedMutex m_local_readers_mutex;
753
    std::vector<VersionList::ReadCount> m_local_readers GUARDED_BY(m_local_readers_mutex);
754

755
    util::CheckedMutex m_info_mutex;
756
    unsigned int m_local_max_entry GUARDED_BY(m_info_mutex) = 0;
757
    SharedInfo* m_info GUARDED_BY(m_info_mutex) = nullptr;
758

759
    virtual void ensure_reader_mapping(unsigned int required = -1) REQUIRES(m_info_mutex) = 0;
760
    virtual void expand_version_list(unsigned new_entries) REQUIRES(m_info_mutex) = 0;
761
    friend class DB::EncryptionMarkerObserver;
762
};
763

764
class DB::FileVersionManager final : public DB::VersionManager {
765
public:
766
    FileVersionManager(File& file, util::InterprocessMutex& mutex)
767
        : VersionManager(mutex)
768
        , m_file(file)
769
    {
85,920✔
770
        size_t size = 0, required_size = sizeof(SharedInfo);
85,920✔
771
        while (size < required_size) {
171,840✔
772
            // Map the file without the lock held. This could result in the
42,147✔
773
            // mapping being too small and having to remap if the file is grown
42,147✔
774
            // concurrently, but if this is the case we should always see a bigger
42,147✔
775
            // size the next time.
42,147✔
776
            auto new_size = static_cast<size_t>(m_file.get_size());
85,920✔
777
            REALM_ASSERT(new_size > size);
85,920✔
778
            size = new_size;
85,920✔
779
            m_reader_map.remap(m_file, File::access_ReadWrite, size, File::map_NoSync);
85,920✔
780
            m_info = m_reader_map.get_addr();
85,920✔
781

42,147✔
782
            std::lock_guard lock(m_mutex);
85,920✔
783
            m_local_max_entry = m_info->readers.capacity();
85,920✔
784
            required_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(m_local_max_entry);
85,920✔
785
            REALM_ASSERT(required_size >= size);
85,920✔
786
        }
85,920✔
787
    }
85,920✔
788

789
    void expand_version_list(unsigned new_entries) override REQUIRES(m_info_mutex)
790
    {
66✔
791
        size_t new_info_size = sizeof(SharedInfo) + m_info->readers.compute_required_space(new_entries);
66✔
792
        m_file.prealloc(new_info_size);                                          // Throws
66✔
793
        m_reader_map.remap(m_file, util::File::access_ReadWrite, new_info_size); // Throws
66✔
794
        m_info = m_reader_map.get_addr();
66✔
795
    }
66✔
796

797
private:
798
    void ensure_reader_mapping(unsigned int required = -1) override REQUIRES(m_info_mutex)
799
    {
2,379,801✔
800
        using _impl::SimulatedFailure;
2,379,801✔
801
        SimulatedFailure::trigger(SimulatedFailure::shared_group__grow_reader_mapping); // Throws
2,379,801✔
802

1,212,300✔
803
        if (required < m_local_max_entry)
2,379,801✔
804
            return;
1,338,342✔
805

534,291✔
806
        auto new_max_entry = m_info->readers.capacity();
1,041,459✔
807
        if (new_max_entry > m_local_max_entry) {
1,041,459✔
808
            // handle mapping expansion if required
480✔
809
            size_t info_size = sizeof(DB::SharedInfo) + m_info->readers.compute_required_space(new_max_entry);
960✔
810
            m_reader_map.remap(m_file, util::File::access_ReadWrite, info_size); // Throws
960✔
811
            m_local_max_entry = new_max_entry;
960✔
812
            m_info = m_reader_map.get_addr();
960✔
813
        }
960✔
814
    }
1,041,459✔
815

816
    File& m_file;
817
    File::Map<DB::SharedInfo> m_reader_map;
818

819
    friend class DB::EncryptionMarkerObserver;
820
};
821

822
// adapter class for marking/observing encrypted writes
823
class DB::EncryptionMarkerObserver : public util::WriteMarker, public util::WriteObserver {
824
public:
825
    EncryptionMarkerObserver(DB::VersionManager& vm)
826
        : vm(vm)
827
    {
85,920✔
828
    }
85,920✔
829
    bool no_concurrent_writer_seen() override
830
    {
90✔
831
        uint64_t tmp_write_count;
90✔
832
        auto page_may_have_been_written = vm.observe_writer(nullptr, &tmp_write_count);
90✔
833
        if (tmp_write_count != last_seen_count) {
90✔
834
            page_may_have_been_written = true;
×
835
            last_seen_count = tmp_write_count;
×
836
        }
×
837
        if (page_may_have_been_written) {
90✔
838
            calls_since_last_writer_observed = 0;
×
839
            return false;
×
840
        }
×
841
        ++calls_since_last_writer_observed;
90✔
842
        constexpr size_t max_calls = 5; // an arbitrary handful, > 1
90✔
843
        return (calls_since_last_writer_observed >= max_calls);
90✔
844
    }
90✔
845
    void mark(uint64_t pos) override
846
    {
2,307✔
847
        vm.mark_page_for_writing(pos);
2,307✔
848
    }
2,307✔
849
    void unmark() override
850
    {
2,307✔
851
        vm.clear_writing_marker();
2,307✔
852
    }
2,307✔
853

854
private:
855
    DB::VersionManager& vm;
856
    uint64_t last_seen_count = 0;
857
    size_t calls_since_last_writer_observed = 0;
858
};
859

860
class DB::InMemoryVersionManager final : public DB::VersionManager {
861
public:
862
    InMemoryVersionManager(SharedInfo* info, util::InterprocessMutex& mutex)
863
        : VersionManager(mutex)
864
    {
25,422✔
865
        m_info = info;
25,422✔
866
        m_local_max_entry = m_info->readers.capacity();
25,422✔
867
    }
25,422✔
868
    void expand_version_list(unsigned) override
869
    {
×
870
        REALM_ASSERT(false);
×
871
    }
×
872

873
private:
874
    void ensure_reader_mapping(unsigned int) override {}
335,076✔
875
};
876

877
#if REALM_HAVE_STD_FILESYSTEM
878
std::string DBOptions::sys_tmp_dir = std::filesystem::temp_directory_path().string();
879
#else
880
std::string DBOptions::sys_tmp_dir = getenv("TMPDIR") ? getenv("TMPDIR") : "";
881
#endif
882

883
// NOTES ON CREATION AND DESTRUCTION OF SHARED MUTEXES:
884
//
885
// According to the 'process-sharing example' in the POSIX man page
886
// for pthread_mutexattr_init() other processes may continue to use a
887
// process-shared mutex after exit of the process that initialized
888
// it. Also, the example does not contain any call to
889
// pthread_mutex_destroy(), so apparently a process-shared mutex need
890
// not be destroyed at all, nor can it be that a process-shared mutex
891
// is associated with any resources that are local to the initializing
892
// process, because that would imply a leak.
893
//
894
// While it is not explicitly guaranteed in the man page, we shall
895
// assume that is is valid to initialize a process-shared mutex twice
896
// without an intervening call to pthread_mutex_destroy(). We need to
897
// be able to reinitialize a process-shared mutex if the first
898
// initializing process crashes and leaves the shared memory in an
899
// undefined state.
900

901
void DB::open(const std::string& path, bool no_create_file, const DBOptions& options)
902
{
85,713✔
903
    // Exception safety: Since do_open() is called from constructors, if it
42,057✔
904
    // throws, it must leave the file closed.
42,057✔
905
    using util::format;
85,713✔
906

42,057✔
907
    REALM_ASSERT(!is_attached());
85,713✔
908
    REALM_ASSERT(path.size());
85,713✔
909

42,057✔
910
    m_db_path = path;
85,713✔
911

42,057✔
912
    set_logger(options.logger);
85,713✔
913
    if (m_replication) {
85,713✔
914
        m_replication->set_logger(m_logger.get());
58,491✔
915
    }
58,491✔
916
    if (m_logger) {
85,713✔
917
        m_logger->log(util::Logger::Level::detail, "Open file: %1", path);
71,397✔
918
    }
71,397✔
919
    SlabAlloc& alloc = m_alloc;
85,713✔
920
    ref_type top_ref = 0;
85,713✔
921

42,057✔
922
    if (options.is_immutable) {
85,713✔
923
        SlabAlloc::Config cfg;
186✔
924
        cfg.read_only = true;
186✔
925
        cfg.no_create = true;
186✔
926
        cfg.encryption_key = options.encryption_key;
186✔
927
        top_ref = alloc.attach_file(path, cfg);
186✔
928
        SlabAlloc::DetachGuard dg(alloc);
186✔
929
        Group::read_only_version_check(alloc, top_ref, path);
186✔
930
        m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, m_alloc.get_baseline());
186✔
931
        dg.release();
186✔
932
        return;
186✔
933
    }
186✔
934
    std::string lockfile_path = get_core_file(path, CoreFileType::Lock);
85,527✔
935
    std::string coordination_dir = get_core_file(path, CoreFileType::Management);
85,527✔
936
    std::string lockfile_prefix = coordination_dir + "/access_control";
85,527✔
937
    m_alloc.set_read_only(false);
85,527✔
938

41,964✔
939
    Replication::HistoryType openers_hist_type = Replication::hist_None;
85,527✔
940
    int openers_hist_schema_version = 0;
85,527✔
941
    if (Replication* repl = get_replication()) {
85,527✔
942
        openers_hist_type = repl->get_history_type();
58,491✔
943
        openers_hist_schema_version = repl->get_history_schema_version();
58,491✔
944
    }
58,491✔
945

41,964✔
946
    int current_file_format_version;
85,527✔
947
    int target_file_format_version;
85,527✔
948
    int stored_hist_schema_version = -1; // Signals undetermined
85,527✔
949

41,964✔
950
    int retries_left = 10; // number of times to retry before throwing exceptions
85,527✔
951
    // in case there is something wrong with the .lock file... the retries allows
41,964✔
952
    // us to pick a new lockfile initializer in case the first one crashes without
41,964✔
953
    // completing the initialization
41,964✔
954
    std::default_random_engine random_gen;
85,527✔
955
    for (;;) {
171,360✔
956

85,692✔
957
        // if we're retrying, we first wait a random time
85,692✔
958
        if (retries_left < 10) {
171,360✔
959
            if (retries_left == 9) { // we seed it from a true random source if possible
240✔
960
                std::random_device r;
24✔
961
                random_gen.seed(r());
24✔
962
            }
24✔
963
            int max_delay = (10 - retries_left) * 10;
240✔
964
            int msecs = random_gen() % max_delay;
240✔
965
            millisleep(msecs);
240✔
966
        }
240✔
967

85,692✔
968
        m_file.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
171,360✔
969
        File::CloseGuard fcg(m_file);
171,360✔
970
        m_file.set_fifo_path(coordination_dir, "lock.fifo");
171,360✔
971

85,692✔
972
        if (m_file.try_rw_lock_exclusive()) { // Throws
171,360✔
973
            File::UnlockGuard ulg(m_file);
57,204✔
974

27,960✔
975
            // We're alone in the world, and it is Ok to initialize the
27,960✔
976
            // file. Start by truncating the file to zero to ensure that
27,960✔
977
            // the following resize will generate a file filled with zeroes.
27,960✔
978
            //
27,960✔
979
            // This will in particular set m_init_complete to 0.
27,960✔
980
            m_file.resize(0);
57,204✔
981
            m_file.prealloc(sizeof(SharedInfo));
57,204✔
982

27,960✔
983
            // We can crash anytime during this process. A crash prior to
27,960✔
984
            // the first resize could allow another thread which could not
27,960✔
985
            // get the exclusive lock because we hold it, and hence were
27,960✔
986
            // waiting for the shared lock instead, to observe and use an
27,960✔
987
            // old lock file.
27,960✔
988
            m_file_map.map(m_file, File::access_ReadWrite, sizeof(SharedInfo), File::map_NoSync); // Throws
57,204✔
989
            File::UnmapGuard fug(m_file_map);
57,204✔
990
            SharedInfo* info = m_file_map.get_addr();
57,204✔
991

27,960✔
992
            new (info) SharedInfo{options.durability, openers_hist_type, openers_hist_schema_version}; // Throws
57,204✔
993

27,960✔
994
            // Because init_complete is an std::atomic, it's guaranteed not to be observable by others
27,960✔
995
            // as being 1 before the entire SharedInfo header has been written.
27,960✔
996
            info->init_complete = 1;
57,204✔
997
        }
57,204✔
998

85,692✔
999
// We hold the shared lock from here until we close the file!
85,692✔
1000
#if REALM_PLATFORM_APPLE
85,668✔
1001
        // macOS has a bug which can cause a hang waiting to obtain a lock, even
1002
        // if the lock is already open in shared mode, so we work around it by
1003
        // busy waiting. This should occur only briefly during session initialization.
1004
        while (!m_file.try_rw_lock_shared()) {
86,157✔
1005
            sched_yield();
489✔
1006
        }
489✔
1007
#else
1008
        m_file.rw_lock_shared(); // Throws
85,692✔
1009
#endif
85,692✔
1010
        File::UnlockGuard ulg(m_file);
171,360✔
1011

85,692✔
1012
        // The coordination/management dir is created as a side effect of the lock
85,692✔
1013
        // operation above if needed for lock emulation. But it may also be needed
85,692✔
1014
        // for other purposes, so make sure it exists.
85,692✔
1015
        // in worst case there'll be a race on creating this directory.
85,692✔
1016
        // This should be safe but a waste of resources.
85,692✔
1017
        // Unfortunately it cannot be created at an earlier point, because
85,692✔
1018
        // it may then be deleted during the above lock_shared() operation.
85,692✔
1019
        try_make_dir(coordination_dir);
171,360✔
1020

85,692✔
1021
        // If the file is not completely initialized at this point in time, the
85,692✔
1022
        // preceeding initialization attempt must have failed. We know that an
85,692✔
1023
        // initialization process was in progress, because this thread (or
85,692✔
1024
        // process) failed to get an exclusive lock on the file. Because this
85,692✔
1025
        // thread (or process) currently has a shared lock on the file, we also
85,692✔
1026
        // know that the initialization process can no longer be in progress, so
85,692✔
1027
        // the initialization must either have completed or failed at this time.
85,692✔
1028

85,692✔
1029
        // The file is taken to be completely initialized if it is large enough
85,692✔
1030
        // to contain the `init_complete` field, and `init_complete` is true. If
85,692✔
1031
        // the file was not completely initialized, this thread must give up its
85,692✔
1032
        // shared lock, and retry to become the initializer. Eventually, one of
85,692✔
1033
        // two things must happen; either this thread, or another thread
85,692✔
1034
        // succeeds in completing the initialization, or this thread becomes the
85,692✔
1035
        // initializer, and fails the initialization. In either case, the retry
85,692✔
1036
        // loop will eventually terminate.
85,692✔
1037

85,692✔
1038
        // An empty file is (and was) never a successfully initialized file.
85,692✔
1039
        size_t info_size = sizeof(SharedInfo);
171,360✔
1040
        {
171,360✔
1041
            auto file_size = m_file.get_size();
171,360✔
1042
            if (util::int_less_than(file_size, info_size)) {
171,360✔
1043
                if (file_size == 0)
85,233✔
1044
                    continue; // Retry
57,636✔
1045
                info_size = size_t(file_size);
27,597✔
1046
            }
27,597✔
1047
        }
171,360✔
1048

85,692✔
1049
        // Map the initial section of the SharedInfo file that corresponds to
85,692✔
1050
        // the SharedInfo struct, or less if the file is smaller. We know that
85,692✔
1051
        // we have at least one byte, and that is enough to read the
85,692✔
1052
        // `init_complete` flag.
85,692✔
1053
        m_file_map.map(m_file, File::access_ReadWrite, info_size, File::map_NoSync);
146,847✔
1054
        File::UnmapGuard fug_1(m_file_map);
113,724✔
1055
        SharedInfo* info = m_file_map.get_addr();
113,724✔
1056

52,569✔
1057
#ifndef _WIN32
113,724✔
1058
#pragma GCC diagnostic push
113,724✔
1059
#pragma GCC diagnostic ignored "-Winvalid-offsetof"
113,724✔
1060
#endif
113,724✔
1061
        static_assert(offsetof(SharedInfo, init_complete) + sizeof SharedInfo::init_complete <= 1,
113,724✔
1062
                      "Unexpected position or size of SharedInfo::init_complete");
113,724✔
1063
#ifndef _WIN32
113,724✔
1064
#pragma GCC diagnostic pop
113,724✔
1065
#endif
113,724✔
1066
        if (info->init_complete == 0)
113,724✔
1067
            continue;
27,531✔
1068
        REALM_ASSERT(info->init_complete == 1);
86,193✔
1069

42,285✔
1070
        // At this time, we know that the file was completely initialized, but
42,285✔
1071
        // we still need to verify that is was initialized with the memory
42,285✔
1072
        // layout expected by this session participant. We could find that it is
42,285✔
1073
        // initializaed with a different memory layout if other concurrent
42,285✔
1074
        // session participants use different versions of the core library.
42,285✔
1075
        if (info_size < sizeof(SharedInfo)) {
86,193✔
1076
            if (retries_left) {
66✔
1077
                --retries_left;
60✔
1078
                continue;
60✔
1079
            }
60✔
1080
            throw IncompatibleLockFile(path, format("Architecture mismatch: SharedInfo size is %1 but should be %2.",
6✔
1081
                                                    info_size, sizeof(SharedInfo)));
6✔
1082
        }
6✔
1083
        if (info->shared_info_version != g_shared_info_version) {
86,127✔
1084
            if (retries_left) {
66✔
1085
                --retries_left;
60✔
1086
                continue;
60✔
1087
            }
60✔
1088
            throw IncompatibleLockFile(path, format("Version mismatch: SharedInfo version is %1 but should be %2.",
6✔
1089
                                                    info->shared_info_version, g_shared_info_version));
6✔
1090
        }
6✔
1091
        // Validate compatible sizes of mutex and condvar types. Sizes of all
42,219✔
1092
        // other fields are architecture independent, so if condvar and mutex
42,219✔
1093
        // sizes match, the entire struct matches. The offsets of
42,219✔
1094
        // `size_of_mutex` and `size_of_condvar` are known to be as expected due
42,219✔
1095
        // to the preceeding check in `shared_info_version`.
42,219✔
1096
        if (info->size_of_mutex != sizeof info->shared_controlmutex) {
86,061✔
1097
            if (retries_left) {
66✔
1098
                --retries_left;
60✔
1099
                continue;
60✔
1100
            }
60✔
1101
            throw IncompatibleLockFile(path, format("Architecture mismatch: Mutex size is %1 but should be %2.",
6✔
1102
                                                    info->size_of_mutex, sizeof(info->shared_controlmutex)));
6✔
1103
        }
6✔
1104

42,186✔
1105
        if (info->size_of_condvar != sizeof info->room_to_write) {
85,995✔
1106
            if (retries_left) {
66✔
1107
                --retries_left;
60✔
1108
                continue;
60✔
1109
            }
60✔
1110
            throw IncompatibleLockFile(
6✔
1111
                path, format("Architecture mismatch: Condition variable size is %1 but should be %2.",
6✔
1112
                             info->size_of_condvar, sizeof(info->room_to_write)));
6✔
1113
        }
6✔
1114
        m_writemutex.set_shared_part(info->shared_writemutex, lockfile_prefix, "write");
85,929✔
1115
        m_controlmutex.set_shared_part(info->shared_controlmutex, lockfile_prefix, "control");
85,929✔
1116
        m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, lockfile_prefix, "versions");
85,929✔
1117

42,153✔
1118
        // even though fields match wrt alignment and size, there may still be incompatibilities
42,153✔
1119
        // between implementations, so lets ask one of the mutexes if it thinks it'll work.
42,153✔
1120
        if (!m_controlmutex.is_valid()) {
85,929✔
1121
            throw IncompatibleLockFile(
×
1122
                path, "Control mutex is invalid. This suggests that incompatible pthread libraries are in use.");
×
1123
        }
×
1124

42,153✔
1125
        // OK! lock file appears valid. We can now continue operations under the protection
42,153✔
1126
        // of the controlmutex. The controlmutex protects the following activities:
42,153✔
1127
        // - attachment of the database file
42,153✔
1128
        // - start of the async daemon
42,153✔
1129
        // - stop of the async daemon
42,153✔
1130
        // - restore of a backup, if desired
42,153✔
1131
        // - backup of the realm file in preparation of file format upgrade
42,153✔
1132
        // - DB beginning/ending a session
42,153✔
1133
        // - Waiting for and signalling database changes
42,153✔
1134
        {
85,929✔
1135
            std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
85,929✔
1136
            auto version_manager = std::make_unique<FileVersionManager>(m_file, m_versionlist_mutex);
85,929✔
1137

42,153✔
1138
            // proceed to initialize versioning and other metadata information related to
42,153✔
1139
            // the database. Also create the database if we're beginning a new session
42,153✔
1140
            bool begin_new_session = (info->num_participants == 0);
85,929✔
1141
            SlabAlloc::Config cfg;
85,929✔
1142
            cfg.session_initiator = begin_new_session;
85,929✔
1143
            cfg.is_shared = true;
85,929✔
1144
            cfg.read_only = false;
85,929✔
1145
            cfg.skip_validate = !begin_new_session;
85,929✔
1146
            cfg.disable_sync = options.durability == Durability::MemOnly || options.durability == Durability::Unsafe;
85,929✔
1147

42,153✔
1148
            // only the session initiator is allowed to create the database, all other
42,153✔
1149
            // must assume that it already exists.
42,153✔
1150
            cfg.no_create = (begin_new_session ? no_create_file : true);
72,237✔
1151

42,153✔
1152
            // if we're opening a MemOnly file that isn't already opened by
42,153✔
1153
            // someone else then it's a file which should have been deleted on
42,153✔
1154
            // close previously, but wasn't (perhaps due to the process crashing)
42,153✔
1155
            cfg.clear_file = (options.durability == Durability::MemOnly && begin_new_session);
85,929✔
1156

42,153✔
1157
            cfg.encryption_key = options.encryption_key;
85,929✔
1158
            m_marker_observer = std::make_unique<EncryptionMarkerObserver>(*version_manager);
85,929✔
1159
            try {
85,929✔
1160
                top_ref = alloc.attach_file(path, cfg, m_marker_observer.get()); // Throws
85,929✔
1161
            }
85,929✔
1162
            catch (const SlabAlloc::Retry&) {
42,153✔
1163
                // On a SlabAlloc::Retry file mappings are already unmapped, no
1164
                // need to do more
1165
                continue;
×
1166
            }
×
1167

42,108✔
1168
            // Determine target file format version for session (upgrade
42,108✔
1169
            // required if greater than file format version of attached file).
42,108✔
1170
            current_file_format_version = alloc.get_committed_file_format_version();
85,842✔
1171
            target_file_format_version =
85,842✔
1172
                Group::get_target_file_format_version_for_session(current_file_format_version, openers_hist_type);
85,842✔
1173
            BackupHandler backup(path, options.accepted_versions, options.to_be_deleted);
85,842✔
1174
            if (backup.must_restore_from_backup(current_file_format_version)) {
85,842✔
1175
                // we need to unmap before any file ops that'll change the realm
6✔
1176
                // file:
6✔
1177
                // (only strictly needed for Windows)
6✔
1178
                alloc.detach();
12✔
1179
                backup.restore_from_backup();
12✔
1180
                // finally, retry with the restored file instead of the original
6✔
1181
                // one:
6✔
1182
                continue;
12✔
1183
            }
12✔
1184
            backup.cleanup_backups();
85,830✔
1185

42,102✔
1186
            // From here on, if we fail in any way, we must detach the
42,102✔
1187
            // allocator.
42,102✔
1188
            SlabAlloc::DetachGuard alloc_detach_guard(alloc);
85,830✔
1189
            alloc.note_reader_start(this);
85,830✔
1190
            // must come after the alloc detach guard
42,102✔
1191
            auto handler = [this, &alloc]() noexcept {
85,830✔
1192
                alloc.note_reader_end(this);
85,830✔
1193
            };
85,830✔
1194
            auto reader_end_guard = make_scope_exit(handler);
85,830✔
1195

42,102✔
1196
            // Check validity of top array (to give more meaningful errors
42,102✔
1197
            // early)
42,102✔
1198
            if (top_ref) {
85,830✔
1199
                try {
44,196✔
1200
                    alloc.note_reader_start(this);
44,196✔
1201
                    auto reader_end_guard = make_scope_exit([&]() noexcept {
44,196✔
1202
                        alloc.note_reader_end(this);
44,196✔
1203
                    });
44,196✔
1204
                    Array top{alloc};
44,196✔
1205
                    top.init_from_ref(top_ref);
44,196✔
1206
                    Group::validate_top_array(top, alloc);
44,196✔
1207
                }
44,196✔
1208
                catch (const InvalidDatabase& e) {
21,873✔
1209
                    if (e.get_path().empty()) {
×
1210
                        throw InvalidDatabase(e.what(), path);
×
1211
                    }
×
1212
                    throw;
×
1213
                }
×
1214
            }
44,196✔
1215
            if (options.backup_at_file_format_change) {
85,830✔
1216
                backup.backup_realm_if_needed(current_file_format_version, target_file_format_version);
85,821✔
1217
            }
85,821✔
1218
            using gf = _impl::GroupFriend;
85,830✔
1219
            bool file_format_ok;
85,830✔
1220
            // In shared mode (Realm file opened via a DB instance) this
42,102✔
1221
            // version of the core library is able to open Realms using file format
42,102✔
1222
            // versions listed below. Please see Group::get_file_format_version() for
42,102✔
1223
            // information about the individual file format versions.
42,102✔
1224
            if (current_file_format_version == 0) {
85,830✔
1225
                file_format_ok = (top_ref == 0);
41,634✔
1226
            }
41,634✔
1227
            else {
44,196✔
1228
                file_format_ok = backup.is_accepted_file_format(current_file_format_version);
44,196✔
1229
            }
44,196✔
1230

42,102✔
1231
            if (REALM_UNLIKELY(!file_format_ok)) {
85,830✔
1232
                throw UnsupportedFileFormatVersion(current_file_format_version);
12✔
1233
            }
12✔
1234

42,096✔
1235
            if (begin_new_session) {
85,818✔
1236
                // Determine version (snapshot number) and check history
29,304✔
1237
                // compatibility
29,304✔
1238
                version_type version = 0;
59,346✔
1239
                int stored_hist_type = 0;
59,346✔
1240
                gf::get_version_and_history_info(alloc, top_ref, version, stored_hist_type,
59,346✔
1241
                                                 stored_hist_schema_version);
59,346✔
1242
                bool good_history_type = false;
59,346✔
1243
                switch (openers_hist_type) {
59,346✔
1244
                    case Replication::hist_None:
4,377✔
1245
                        good_history_type = (stored_hist_type == Replication::hist_None);
4,377✔
1246
                        if (!good_history_type)
4,377✔
1247
                            throw IncompatibleHistories(
6✔
1248
                                util::format("Realm file at path '%1' has history type '%2', but is being opened "
6✔
1249
                                             "with replication disabled.",
6✔
1250
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1251
                                path);
6✔
1252
                        break;
4,371✔
1253
                    case Replication::hist_OutOfRealm:
2,493✔
1254
                        REALM_ASSERT(false); // No longer in use
×
1255
                        break;
×
1256
                    case Replication::hist_InRealm:
24,489✔
1257
                        good_history_type = (stored_hist_type == Replication::hist_InRealm ||
24,489✔
1258
                                             stored_hist_type == Replication::hist_None);
16,056✔
1259
                        if (!good_history_type)
24,489✔
1260
                            throw IncompatibleHistories(
6✔
1261
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1262
                                             "local history mode.",
6✔
1263
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1264
                                path);
6✔
1265
                        break;
24,483✔
1266
                    case Replication::hist_SyncClient:
28,821✔
1267
                        good_history_type = ((stored_hist_type == Replication::hist_SyncClient) || (top_ref == 0));
28,821✔
1268
                        if (!good_history_type)
28,821✔
1269
                            throw IncompatibleHistories(
6✔
1270
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
6✔
1271
                                             "synchronized history mode.",
6✔
1272
                                             path, Replication::history_type_name(stored_hist_type)),
6✔
1273
                                path);
6✔
1274
                        break;
28,815✔
1275
                    case Replication::hist_SyncServer:
15,114✔
1276
                        good_history_type = ((stored_hist_type == Replication::hist_SyncServer) || (top_ref == 0));
1,659✔
1277
                        if (!good_history_type)
1,659✔
1278
                            throw IncompatibleHistories(
×
1279
                                util::format("Realm file at path '%1' has history type '%2', but is being opened in "
×
1280
                                             "server history mode.",
×
1281
                                             path, Replication::history_type_name(stored_hist_type)),
×
1282
                                path);
×
1283
                        break;
1,659✔
1284
                }
59,328✔
1285

29,295✔
1286
                REALM_ASSERT(stored_hist_schema_version >= 0);
59,328✔
1287
                if (stored_hist_schema_version > openers_hist_schema_version)
59,328✔
1288
                    throw IncompatibleHistories(
×
1289
                        util::format("Unexpected future history schema version %1, current schema %2",
×
1290
                                     stored_hist_schema_version, openers_hist_schema_version),
×
1291
                        path);
×
1292
                bool need_hist_schema_upgrade =
59,328✔
1293
                    (stored_hist_schema_version < openers_hist_schema_version && top_ref != 0);
59,328✔
1294
                if (need_hist_schema_upgrade) {
59,328✔
1295
                    Replication* repl = get_replication();
348✔
1296
                    if (!repl->is_upgradable_history_schema(stored_hist_schema_version))
348✔
1297
                        throw IncompatibleHistories(util::format("Nonupgradable history schema %1, current schema %2",
×
1298
                                                                 stored_hist_schema_version,
×
1299
                                                                 openers_hist_schema_version),
×
1300
                                                    path);
×
1301
                }
59,328✔
1302

29,295✔
1303
                bool need_file_format_upgrade =
59,328✔
1304
                    current_file_format_version < target_file_format_version && top_ref != 0;
59,328✔
1305
                if (!options.allow_file_format_upgrade && (need_hist_schema_upgrade || need_file_format_upgrade)) {
59,328✔
1306
                    throw FileFormatUpgradeRequired(m_db_path);
6✔
1307
                }
6✔
1308

29,292✔
1309
                alloc.convert_from_streaming_form(top_ref);
59,322✔
1310
                try {
59,322✔
1311
                    bool file_changed_size = alloc.align_filesize_for_mmap(top_ref, cfg);
59,322✔
1312
                    if (file_changed_size) {
59,322✔
1313
                        // we need to re-establish proper mappings after file size change.
195✔
1314
                        // we do this simply by aborting and starting all over:
195✔
1315
                        continue;
417✔
1316
                    }
417✔
1317
                }
×
1318
                // something went wrong. Retry.
1319
                catch (SlabAlloc::Retry&) {
×
1320
                    continue;
×
1321
                }
×
1322
                if (options.encryption_key) {
58,905✔
1323
#ifdef _WIN32
1324
                    uint64_t pid = GetCurrentProcessId();
1325
#else
1326
                    static_assert(sizeof(pid_t) <= sizeof(uint64_t), "process identifiers too large");
222✔
1327
                    uint64_t pid = getpid();
222✔
1328
#endif
222✔
1329
                    info->session_initiator_pid = pid;
222✔
1330
                }
222✔
1331

29,097✔
1332
                info->file_format_version = uint_fast8_t(target_file_format_version);
58,905✔
1333

29,097✔
1334
                // Initially there is a single version in the file
29,097✔
1335
                info->number_of_versions = 1;
58,905✔
1336

29,097✔
1337
                info->latest_version_number = version;
58,905✔
1338
                alloc.init_mapping_management(version);
58,905✔
1339

29,097✔
1340
                size_t file_size = 24;
58,905✔
1341
                if (top_ref) {
58,905✔
1342
                    Array top(alloc);
21,054✔
1343
                    top.init_from_ref(top_ref);
21,054✔
1344
                    file_size = Group::get_logical_file_size(top);
21,054✔
1345
                }
21,054✔
1346
                version_manager->init_versioning(top_ref, file_size, version);
58,905✔
1347
            }
58,905✔
1348
            else { // Not the session initiator
26,472✔
1349
                // Durability setting must be consistent across a session. An
12,792✔
1350
                // inconsistency is a logic error, as the user is required to
12,792✔
1351
                // make sure that all possible concurrent session participants
12,792✔
1352
                // use the same durability setting for the same Realm file.
12,792✔
1353
                if (Durability(info->durability) != options.durability)
26,472✔
1354
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "Durability not consistent");
6✔
1355

12,789✔
1356
                // History type must be consistent across a session. An
12,789✔
1357
                // inconsistency is a logic error, as the user is required to
12,789✔
1358
                // make sure that all possible concurrent session participants
12,789✔
1359
                // use the same history type for the same Realm file.
12,789✔
1360
                if (info->history_type != openers_hist_type)
26,466✔
1361
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History type not consistent");
6✔
1362

12,786✔
1363
                // History schema version must be consistent across a
12,786✔
1364
                // session. An inconsistency is a logic error, as the user is
12,786✔
1365
                // required to make sure that all possible concurrent session
12,786✔
1366
                // participants use the same history schema version for the same
12,786✔
1367
                // Realm file.
12,786✔
1368
                if (info->history_schema_version != openers_hist_schema_version)
26,460✔
1369
                    throw RuntimeError(ErrorCodes::IncompatibleSession, "History schema version not consistent");
×
1370

12,786✔
1371
                // We need per session agreement among all participants on the
12,786✔
1372
                // target Realm file format. From a technical perspective, the
12,786✔
1373
                // best way to ensure that, would be to require a bumping of the
12,786✔
1374
                // SharedInfo file format version on any change that could lead
12,786✔
1375
                // to a different result from
12,786✔
1376
                // get_target_file_format_for_session() given the same current
12,786✔
1377
                // Realm file format version and the same history type, as that
12,786✔
1378
                // would prevent the outcome of the Realm opening process from
12,786✔
1379
                // depending on race conditions. However, for practical reasons,
12,786✔
1380
                // we shall instead simply check that there is agreement, and
12,786✔
1381
                // throw the same kind of exception, as would have been thrown
12,786✔
1382
                // with a bumped SharedInfo file format version, if there isn't.
12,786✔
1383
                if (info->file_format_version != target_file_format_version) {
26,460✔
1384
                    throw IncompatibleLockFile(path,
×
1385
                                               format("Version mismatch: File format version is %1 but should be %2.",
×
1386
                                                      info->file_format_version, target_file_format_version));
×
1387
                }
×
1388

12,786✔
1389
                // Even though this session participant is not the session initiator,
12,786✔
1390
                // it may be the one that has to perform the history schema upgrade.
12,786✔
1391
                // See upgrade_file_format(). However we cannot get the actual value
12,786✔
1392
                // at this point as the allocator is not synchronized with the file.
12,786✔
1393
                // The value will be read in a ReadTransaction later.
12,786✔
1394

12,786✔
1395
                // We need to setup the allocators version information, as it is needed
12,786✔
1396
                // to correctly age and later reclaim memory mappings.
12,786✔
1397
                version_type version = info->latest_version_number;
26,460✔
1398
                alloc.init_mapping_management(version);
26,460✔
1399
            }
26,460✔
1400

42,096✔
1401
            m_new_commit_available.set_shared_part(info->new_commit_available, lockfile_prefix, "new_commit",
85,578✔
1402
                                                   options.temp_dir);
85,365✔
1403
            m_pick_next_writer.set_shared_part(info->pick_next_writer, lockfile_prefix, "pick_writer",
85,365✔
1404
                                               options.temp_dir);
85,365✔
1405

41,883✔
1406
            // make our presence noted:
41,883✔
1407
            ++info->num_participants;
85,365✔
1408
            m_info = info;
85,365✔
1409

41,883✔
1410
            // Keep the mappings and file open:
41,883✔
1411
            m_version_manager = std::move(version_manager);
85,365✔
1412
            alloc_detach_guard.release();
85,365✔
1413
            fug_1.release(); // Do not unmap
85,365✔
1414
            fcg.release();   // Do not close
85,365✔
1415
        }
85,365✔
1416
        ulg.release(); // Do not release shared lock
85,365✔
1417
        break;
85,365✔
1418
    }
85,818✔
1419

41,964✔
1420
    if (m_logger) {
85,449✔
1421
        m_logger->log(util::Logger::Level::debug, "   Number of participants: %1", m_info->num_participants);
71,187✔
1422
        m_logger->log(util::Logger::Level::debug, "   Durability: %1", [&] {
71,187✔
1423
            switch (options.durability) {
71,187✔
1424
                case DBOptions::Durability::Full:
49,629✔
1425
                    return "Full";
49,629✔
1426
                case DBOptions::Durability::MemOnly:
21,558✔
1427
                    return "MemOnly";
21,558✔
1428
                case realm::DBOptions::Durability::Unsafe:
✔
1429
                    return "Unsafe";
×
1430
            }
×
1431
            return "";
×
1432
        }());
×
1433
        m_logger->log(util::Logger::Level::debug, "   EncryptionKey: %1", options.encryption_key ? "yes" : "no");
71,157✔
1434
        if (m_logger->would_log(util::Logger::Level::debug)) {
71,187✔
1435
            if (top_ref) {
24,624✔
1436
                Array top(alloc);
24,540✔
1437
                top.init_from_ref(top_ref);
24,540✔
1438
                auto file_size = Group::get_logical_file_size(top);
24,540✔
1439
                auto history_size = Group::get_history_size(top);
24,540✔
1440
                auto freee_space_size = Group::get_free_space_size(top);
24,540✔
1441
                m_logger->log(util::Logger::Level::debug, "   File size: %1", file_size);
24,540✔
1442
                m_logger->log(util::Logger::Level::debug, "   User data size: %1",
24,540✔
1443
                              file_size - (freee_space_size + history_size));
24,540✔
1444
                m_logger->log(util::Logger::Level::debug, "   Free space size: %1", freee_space_size);
24,540✔
1445
                m_logger->log(util::Logger::Level::debug, "   History size: %1", history_size);
24,540✔
1446
            }
24,540✔
1447
            else {
84✔
1448
                m_logger->log(util::Logger::Level::debug, "   Empty file");
84✔
1449
            }
84✔
1450
        }
24,624✔
1451
    }
71,187✔
1452

41,883✔
1453
    // Upgrade file format and/or history schema
41,883✔
1454
    try {
85,368✔
1455
        if (stored_hist_schema_version == -1) {
85,368✔
1456
            // current_hist_schema_version has not been read. Read it now
12,786✔
1457
            stored_hist_schema_version = start_read()->get_history_schema_version();
26,460✔
1458
        }
26,460✔
1459
        if (current_file_format_version == 0) {
85,368✔
1460
            // If the current file format is still undecided, no upgrade is
20,223✔
1461
            // necessary, but we still need to make the chosen file format
20,223✔
1462
            // visible to the rest of the core library by updating the value
20,223✔
1463
            // that will be subsequently returned by
20,223✔
1464
            // Group::get_file_format_version(). For this to work, all session
20,223✔
1465
            // participants must adopt the chosen target Realm file format when
20,223✔
1466
            // the stored file format version is zero regardless of the version
20,223✔
1467
            // of the core library used.
20,223✔
1468
            m_file_format_version = target_file_format_version;
41,622✔
1469
        }
41,622✔
1470
        else {
43,746✔
1471
            m_file_format_version = current_file_format_version;
43,746✔
1472
            upgrade_file_format(options.allow_file_format_upgrade, target_file_format_version,
43,746✔
1473
                                stored_hist_schema_version, openers_hist_schema_version); // Throws
43,746✔
1474
        }
43,746✔
1475
    }
85,368✔
1476
    catch (...) {
41,886✔
1477
        close();
6✔
1478
        throw;
6✔
1479
    }
6✔
1480
    m_alloc.set_read_only(true);
85,359✔
1481
}
85,359✔
1482

1483
void DB::open(BinaryData buffer, bool take_ownership)
1484
{
6✔
1485
    auto top_ref = m_alloc.attach_buffer(buffer.data(), buffer.size());
6✔
1486
    m_fake_read_lock_if_immutable = ReadLockInfo::make_fake(top_ref, buffer.size());
6✔
1487
    if (take_ownership)
6✔
1488
        m_alloc.own_buffer();
×
1489
}
6✔
1490

1491
void DB::open(Replication& repl, const std::string& file, const DBOptions& options)
1492
{
58,491✔
1493
    // Exception safety: Since open() is called from constructors, if it throws,
28,446✔
1494
    // it must leave the file closed.
28,446✔
1495

28,446✔
1496
    REALM_ASSERT(!is_attached());
58,491✔
1497

28,446✔
1498
    repl.initialize(*this); // Throws
58,491✔
1499

28,446✔
1500
    set_replication(&repl);
58,491✔
1501

28,446✔
1502
    bool no_create = false;
58,491✔
1503
    open(file, no_create, options); // Throws
58,491✔
1504
}
58,491✔
1505

1506
class DBLogger : public Logger {
1507
public:
1508
    DBLogger(const std::shared_ptr<Logger>& base_logger, unsigned hash) noexcept
1509
        : Logger(LogCategory::storage, *base_logger)
1510
        , m_hash(hash)
1511
        , m_base_logger_ptr(base_logger)
1512
    {
96,807✔
1513
    }
96,807✔
1514

1515
protected:
1516
    void do_log(const LogCategory& category, Level level, const std::string& message) final
1517
    {
318,918✔
1518
        std::ostringstream ostr;
318,918✔
1519
        auto id = std::this_thread::get_id();
318,918✔
1520
        ostr << "DB: " << m_hash << " Thread " << id << ": " << message;
318,918✔
1521
        Logger::do_log(*m_base_logger_ptr, category, level, ostr.str());
318,918✔
1522
    }
318,918✔
1523

1524
private:
1525
    unsigned m_hash;
1526
    std::shared_ptr<Logger> m_base_logger_ptr;
1527
};
1528

1529
void DB::set_logger(const std::shared_ptr<util::Logger>& logger) noexcept
1530
{
111,135✔
1531
    if (logger)
111,135✔
1532
        m_logger = std::make_shared<DBLogger>(logger, m_log_id);
96,807✔
1533
}
111,135✔
1534

1535
void DB::open(Replication& repl, const DBOptions options)
1536
{
25,422✔
1537
    REALM_ASSERT(!is_attached());
25,422✔
1538
    repl.initialize(*this); // Throws
25,422✔
1539
    set_replication(&repl);
25,422✔
1540

12,711✔
1541
    m_alloc.init_in_memory_buffer();
25,422✔
1542

12,711✔
1543
    set_logger(options.logger);
25,422✔
1544
    m_replication->set_logger(m_logger.get());
25,422✔
1545
    if (m_logger)
25,422✔
1546
        m_logger->detail("Open memory-only realm");
25,410✔
1547

12,711✔
1548
    auto hist_type = repl.get_history_type();
25,422✔
1549
    m_in_memory_info =
25,422✔
1550
        std::make_unique<SharedInfo>(DBOptions::Durability::MemOnly, hist_type, repl.get_history_schema_version());
25,422✔
1551
    SharedInfo* info = m_in_memory_info.get();
25,422✔
1552
    m_writemutex.set_shared_part(info->shared_writemutex, "", "write");
25,422✔
1553
    m_controlmutex.set_shared_part(info->shared_controlmutex, "", "control");
25,422✔
1554
    m_new_commit_available.set_shared_part(info->new_commit_available, "", "new_commit", options.temp_dir);
25,422✔
1555
    m_pick_next_writer.set_shared_part(info->pick_next_writer, "", "pick_writer", options.temp_dir);
25,422✔
1556
    m_versionlist_mutex.set_shared_part(info->shared_versionlist_mutex, "", "versions");
25,422✔
1557

12,711✔
1558
    auto target_file_format_version = uint_fast8_t(Group::get_target_file_format_version_for_session(0, hist_type));
25,422✔
1559
    info->file_format_version = target_file_format_version;
25,422✔
1560
    info->number_of_versions = 1;
25,422✔
1561
    info->latest_version_number = 1;
25,422✔
1562
    info->init_versioning(0, m_alloc.get_baseline(), 1);
25,422✔
1563
    ++info->num_participants;
25,422✔
1564

12,711✔
1565
    m_version_manager = std::make_unique<InMemoryVersionManager>(info, m_versionlist_mutex);
25,422✔
1566

12,711✔
1567
    m_file_format_version = target_file_format_version;
25,422✔
1568

12,711✔
1569
    m_info = info;
25,422✔
1570
    m_alloc.set_read_only(true);
25,422✔
1571
}
25,422✔
1572

1573
void DB::create_new_history(Replication& repl)
1574
{
36✔
1575
    Replication* old_repl = get_replication();
36✔
1576
    try {
36✔
1577
        repl.initialize(*this);
36✔
1578
        set_replication(&repl);
36✔
1579

18✔
1580
        auto tr = start_write();
36✔
1581
        tr->clear_history();
36✔
1582
        tr->replicate(tr.get(), repl);
36✔
1583
        tr->commit();
36✔
1584
    }
36✔
1585
    catch (...) {
18✔
1586
        set_replication(old_repl);
×
1587
        throw;
×
1588
    }
×
1589
}
36✔
1590

1591
void DB::create_new_history(std::unique_ptr<Replication> repl)
1592
{
36✔
1593
    create_new_history(*repl);
36✔
1594
    m_history = std::move(repl);
36✔
1595
}
36✔
1596

1597
// WARNING / FIXME: compact() should NOT be exposed publicly on Windows because it's not crash safe! It may
1598
// corrupt your database if something fails.
1599
// Tracked by https://github.com/realm/realm-core/issues/4111
1600

1601
// A note about lock ordering.
1602
// The local mutex, m_mutex, guards transaction start/stop and map/unmap of the lock file.
1603
// Except for compact(), open() and close(), it should only be held briefly.
1604
// The controlmutex guards operations which change the file size, session initialization
1605
// and session exit.
1606
// The writemutex guards the integrity of the (write) transaction data.
1607
// The controlmutex and writemutex resides in the .lock file and thus requires
1608
// the mapping of the .lock file to work. A straightforward approach would be to lock
1609
// the m_mutex whenever the other mutexes are taken or released...but that would be too
1610
// bad for performance of transaction start/stop.
1611
//
1612
// The locks are to be taken in this order: writemutex->controlmutex->m_mutex
1613
//
1614
// The .lock file is mapped during DB::create() and unmapped by a call to DB::close().
1615
// Once unmapped, it is never mapped again. Hence any observer with a valid DBRef may
1616
// only see the transition from mapped->unmapped, never the opposite.
1617
//
1618
// Trying to create a transaction if the .lock file is unmapped will result in an assert.
1619
// Unmapping (during close()) while transactions are live, is not considered an error. There
1620
// is a potential race between unmapping during close() and any operation carried out by a live
1621
// transaction. The user must ensure that this race never happens if she uses DB::close().
1622
bool DB::compact(bool bump_version_number, util::Optional<const char*> output_encryption_key)
1623
    NO_THREAD_SAFETY_ANALYSIS // this would work except for a known limitation: "No alias analysis" where clang cannot
1624
                              // tell that tr->db->m_mutex is the same thing as m_mutex
1625
{
150✔
1626
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
150✔
1627
    std::string tmp_path = m_db_path + ".tmp_compaction_space";
150✔
1628

75✔
1629
    // To enter compact, the DB object must already have been attached to a file,
75✔
1630
    // since this happens in DB::create().
75✔
1631

75✔
1632
    // Verify that the lock file is still attached. There is no attempt to guard against
75✔
1633
    // a race between close() and compact().
75✔
1634
    if (is_attached() == false) {
150✔
1635
        throw Exception(ErrorCodes::IllegalOperation, m_db_path + ": compact must be done on an open/attached DB");
×
1636
    }
×
1637
    auto info = m_info;
150✔
1638
    Durability dura = Durability(info->durability);
150✔
1639
    const char* write_key = bool(output_encryption_key) ? *output_encryption_key : get_encryption_key();
144✔
1640
    {
150✔
1641
        std::unique_lock<InterprocessMutex> lock(m_controlmutex); // Throws
150✔
1642
        auto t1 = std::chrono::steady_clock::now();
150✔
1643

75✔
1644
        // We must be the ONLY DB object attached if we're to do compaction
75✔
1645
        if (info->num_participants > 1)
150✔
1646
            return false;
×
1647

75✔
1648
        // Holding the controlmutex prevents any other DB from attaching to the file.
75✔
1649

75✔
1650
        // Using start_read here ensures that we have access to the latest entry
75✔
1651
        // in the VersionList. We need to have access to that later to update top_ref and file_size.
75✔
1652
        // This is also needed to attach the group (get the proper top pointer, etc)
75✔
1653
        TransactionRef tr = start_read();
150✔
1654
        auto file_size_before = tr->get_logical_file_size();
150✔
1655

75✔
1656
        // local lock blocking any transaction from starting (and stopping)
75✔
1657
        CheckedLockGuard local_lock(m_mutex);
150✔
1658

75✔
1659
        // We should be the only transaction active - otherwise back out
75✔
1660
        if (m_transaction_count != 1)
150✔
1661
            return false;
6✔
1662

72✔
1663
        // group::write() will throw if the file already exists.
72✔
1664
        // To prevent this, we have to remove the file (should it exist)
72✔
1665
        // before calling group::write().
72✔
1666
        File::try_remove(tmp_path);
144✔
1667

72✔
1668
        // Compact by writing a new file holding only live data, then renaming the new file
72✔
1669
        // so it becomes the database file, replacing the old one in the process.
72✔
1670
        try {
144✔
1671
            File file;
144✔
1672
            file.open(tmp_path, File::access_ReadWrite, File::create_Must, 0);
144✔
1673
            int incr = bump_version_number ? 1 : 0;
138✔
1674
            Group::DefaultTableWriter writer;
144✔
1675
            tr->write(file, write_key, info->latest_version_number + incr, writer); // Throws
144✔
1676
            // Data needs to be flushed to the disk before renaming.
72✔
1677
            bool disable_sync = get_disable_sync_to_disk();
144✔
1678
            if (!disable_sync && dura != Durability::Unsafe)
144!
1679
                file.sync(); // Throws
×
1680
        }
144✔
1681
        catch (...) {
72✔
1682
            // If writing the compact version failed in any way, delete the partially written file to clean up disk
1683
            // space. This is so that we don't fail with 100% disk space used when compacting on a mostly full disk.
1684
            if (File::exists(tmp_path)) {
×
1685
                File::remove(tmp_path);
×
1686
            }
×
1687
            throw;
×
1688
        }
×
1689
        // if we've written a file with a bumped version number, we need to update the lock file to match.
72✔
1690
        if (bump_version_number) {
144✔
1691
            ++info->latest_version_number;
12✔
1692
        }
12✔
1693
        // We need to release any shared mapping *before* releasing the control mutex.
72✔
1694
        // When someone attaches to the new database file, they *must* *not* see and
72✔
1695
        // reuse any existing memory mapping of the stale file.
72✔
1696
        tr->close_read_with_lock();
144✔
1697
        m_alloc.detach();
144✔
1698

72✔
1699
        util::File::move(tmp_path, m_db_path);
144✔
1700

72✔
1701
        SlabAlloc::Config cfg;
144✔
1702
        cfg.session_initiator = true;
144✔
1703
        cfg.is_shared = true;
144✔
1704
        cfg.read_only = false;
144✔
1705
        cfg.skip_validate = false;
144✔
1706
        cfg.no_create = true;
144✔
1707
        cfg.clear_file = false;
144✔
1708
        cfg.encryption_key = write_key;
144✔
1709
        ref_type top_ref;
144✔
1710
        top_ref = m_alloc.attach_file(m_db_path, cfg, m_marker_observer.get());
144✔
1711
        m_alloc.convert_from_streaming_form(top_ref);
144✔
1712
        m_alloc.init_mapping_management(info->latest_version_number);
144✔
1713
        info->number_of_versions = 1;
144✔
1714
        size_t logical_file_size = sizeof(SlabAlloc::Header);
144✔
1715
        if (top_ref) {
144✔
1716
            Array top(m_alloc);
138✔
1717
            top.init_from_ref(top_ref);
138✔
1718
            logical_file_size = Group::get_logical_file_size(top);
138✔
1719
        }
138✔
1720
        m_version_manager->init_versioning(top_ref, logical_file_size, info->latest_version_number);
144✔
1721
        if (m_logger) {
144✔
1722
            auto t2 = std::chrono::steady_clock::now();
60✔
1723
            m_logger->log(util::Logger::Level::info, "DB compacted from: %1 to %2 in %3 us", file_size_before,
60✔
1724
                          logical_file_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1725
        }
60✔
1726
    }
144✔
1727
    return true;
144✔
1728
}
144✔
1729

1730
void DB::write_copy(StringData path, const char* output_encryption_key)
1731
{
246✔
1732
    auto tr = start_read();
246✔
1733
    if (auto hist = tr->get_history()) {
246✔
1734
        if (!hist->no_pending_local_changes(tr->get_version())) {
246✔
1735
            throw Exception(ErrorCodes::IllegalOperation,
6✔
1736
                            "All client changes must be integrated in server before writing copy");
6✔
1737
        }
6✔
1738
    }
240✔
1739

120✔
1740
    class NoClientFileIdWriter : public Group::DefaultTableWriter {
240✔
1741
    public:
240✔
1742
        NoClientFileIdWriter()
240✔
1743
            : Group::DefaultTableWriter(true)
240✔
1744
        {
240✔
1745
        }
240✔
1746
        HistoryInfo write_history(_impl::OutputStream& out) override
240✔
1747
        {
237✔
1748
            auto hist = Group::DefaultTableWriter::write_history(out);
234✔
1749
            hist.sync_file_id = 0;
234✔
1750
            return hist;
234✔
1751
        }
234✔
1752
    } writer;
240✔
1753

120✔
1754
    File file;
240✔
1755
    file.open(path, File::access_ReadWrite, File::create_Must, 0);
240✔
1756
    file.resize(0);
240✔
1757

120✔
1758
    auto t1 = std::chrono::steady_clock::now();
240✔
1759
    tr->write(file, output_encryption_key, m_info->latest_version_number, writer);
240✔
1760
    if (m_logger) {
240✔
1761
        auto t2 = std::chrono::steady_clock::now();
60✔
1762
        m_logger->log(util::Logger::Level::info, "DB written to '%1' in %2 us", path,
60✔
1763
                      std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
60✔
1764
    }
60✔
1765
}
240✔
1766

1767
uint_fast64_t DB::get_number_of_versions()
1768
{
302,631✔
1769
    if (m_fake_read_lock_if_immutable)
302,631✔
1770
        return 1;
6✔
1771
    return m_info->number_of_versions;
302,625✔
1772
}
302,625✔
1773

1774
size_t DB::get_allocated_size() const
1775
{
6✔
1776
    return m_alloc.get_allocated_size();
6✔
1777
}
6✔
1778

1779
void DB::release_all_read_locks() noexcept
1780
{
110,787✔
1781
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
110,787✔
1782
    CheckedLockGuard local_lock(m_mutex); // mx on m_local_locks_held
110,787✔
1783
    for (auto& read_lock : m_local_locks_held) {
54,597✔
1784
        --m_transaction_count;
6✔
1785
        m_version_manager->release_read_lock(read_lock);
6✔
1786
    }
6✔
1787
    m_local_locks_held.clear();
110,787✔
1788
    REALM_ASSERT(m_transaction_count == 0);
110,787✔
1789
}
110,787✔
1790

1791
class DB::AsyncCommitHelper {
1792
public:
1793
    AsyncCommitHelper(DB* db)
1794
        : m_db(db)
1795
    {
72,651✔
1796
    }
72,651✔
1797
    ~AsyncCommitHelper()
1798
    {
72,651✔
1799
        {
72,651✔
1800
            std::unique_lock lg(m_mutex);
72,651✔
1801
            if (!m_running) {
72,651✔
1802
                return;
39,492✔
1803
            }
39,492✔
1804
            m_running = false;
33,159✔
1805
            m_cv_worker.notify_one();
33,159✔
1806
        }
33,159✔
1807
        m_thread.join();
33,159✔
1808
    }
33,159✔
1809

1810
    void begin_write(util::UniqueFunction<void()> fn)
1811
    {
1,614✔
1812
        std::unique_lock lg(m_mutex);
1,614✔
1813
        start_thread();
1,614✔
1814
        m_pending_writes.emplace_back(std::move(fn));
1,614✔
1815
        m_cv_worker.notify_one();
1,614✔
1816
    }
1,614✔
1817

1818
    void blocking_begin_write()
1819
    {
211,866✔
1820
        std::unique_lock lg(m_mutex);
211,866✔
1821

104,295✔
1822
        // If we support unlocking InterprocessMutex from a different thread
104,295✔
1823
        // than it was locked on, we can sometimes just begin the write on
104,295✔
1824
        // the current thread. This requires that no one is currently waiting
104,295✔
1825
        // for the worker thread to acquire the write lock, as we'll deadlock
104,295✔
1826
        // if we try to async commit while the worker is waiting for the lock.
104,295✔
1827
        bool can_lock_on_caller =
211,866✔
1828
            !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() &&
211,866✔
1829
                                                       m_write_lock_claim_ticket == m_write_lock_claim_fulfilled);
107,511✔
1830

104,295✔
1831
        // If we support cross-thread unlocking and m_running is false,
104,295✔
1832
        // can_lock_on_caller should always be true or we forgot to launch the thread
104,295✔
1833
        REALM_ASSERT(can_lock_on_caller || m_running || InterprocessMutex::is_thread_confined);
211,866✔
1834

104,295✔
1835
        // If possible, just begin the write on the current thread
104,295✔
1836
        if (can_lock_on_caller) {
211,866✔
1837
            m_waiting_for_write_mutex = true;
107,511✔
1838
            lg.unlock();
107,511✔
1839
            m_db->do_begin_write();
107,511✔
1840
            lg.lock();
107,511✔
1841
            m_waiting_for_write_mutex = false;
107,511✔
1842
            m_has_write_mutex = true;
107,511✔
1843
            m_owns_write_mutex = false;
107,511✔
1844
            return;
107,511✔
1845
        }
107,511✔
1846

104,295✔
1847
        // Otherwise we have to ask the worker thread to acquire it and wait
104,295✔
1848
        // for that
104,295✔
1849
        start_thread();
104,355✔
1850
        size_t ticket = ++m_write_lock_claim_ticket;
104,355✔
1851
        m_cv_worker.notify_one();
104,355✔
1852
        m_cv_callers.wait(lg, [this, ticket] {
215,235✔
1853
            return ticket == m_write_lock_claim_fulfilled;
215,235✔
1854
        });
215,235✔
1855
    }
104,355✔
1856

1857
    void end_write()
1858
    {
54✔
1859
        std::unique_lock lg(m_mutex);
54✔
1860
        REALM_ASSERT(m_has_write_mutex);
54✔
1861
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
54✔
1862

27✔
1863
        // If we acquired the write lock on the worker thread, also release it
27✔
1864
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
27✔
1865
        if (m_owns_write_mutex) {
54✔
1866
            m_pending_mx_release = true;
51✔
1867
            m_cv_worker.notify_one();
51✔
1868
        }
51✔
1869
        else {
3✔
1870
            m_db->do_end_write();
3✔
1871
            m_has_write_mutex = false;
3✔
1872
        }
3✔
1873
    }
54✔
1874

1875
    bool blocking_end_write()
1876
    {
257,412✔
1877
        std::unique_lock lg(m_mutex);
257,412✔
1878
        if (!m_has_write_mutex) {
257,412✔
1879
            return false;
45,342✔
1880
        }
45,342✔
1881
        REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined);
212,070✔
1882

104,355✔
1883
        // If we acquired the write lock on the worker thread, also release it
104,355✔
1884
        // there even if our mutex supports unlocking cross-thread as it simplifies things.
104,355✔
1885
        if (m_owns_write_mutex) {
212,070✔
1886
            m_pending_mx_release = true;
104,880✔
1887
            m_cv_worker.notify_one();
104,880✔
1888
            m_cv_callers.wait(lg, [this] {
209,760✔
1889
                return !m_pending_mx_release;
209,760✔
1890
            });
209,760✔
1891
        }
104,880✔
1892
        else {
107,190✔
1893
            m_db->do_end_write();
107,190✔
1894
            m_has_write_mutex = false;
107,190✔
1895

1896
            // The worker thread may have ignored a request for the write mutex
1897
            // while we were acquiring it, so we need to wake up the thread
1898
            if (has_pending_write_requests()) {
107,190✔
1899
                lg.unlock();
×
1900
                m_cv_worker.notify_one();
×
1901
            }
×
1902
        }
107,190✔
1903
        return true;
212,070✔
1904
    }
212,070✔
1905

1906

1907
    void sync_to_disk(util::UniqueFunction<void()> fn)
1908
    {
1,356✔
1909
        REALM_ASSERT(fn);
1,356✔
1910
        std::unique_lock lg(m_mutex);
1,356✔
1911
        REALM_ASSERT(!m_pending_sync);
1,356✔
1912
        start_thread();
1,356✔
1913
        m_pending_sync = std::move(fn);
1,356✔
1914
        m_cv_worker.notify_one();
1,356✔
1915
    }
1,356✔
1916

1917
private:
1918
    DB* m_db;
1919
    std::thread m_thread;
1920
    std::mutex m_mutex;
1921
    std::condition_variable m_cv_worker;
1922
    std::condition_variable m_cv_callers;
1923
    std::deque<util::UniqueFunction<void()>> m_pending_writes;
1924
    util::UniqueFunction<void()> m_pending_sync;
1925
    size_t m_write_lock_claim_ticket = 0;
1926
    size_t m_write_lock_claim_fulfilled = 0;
1927
    bool m_pending_mx_release = false;
1928
    bool m_running = false;
1929
    bool m_has_write_mutex = false;
1930
    bool m_owns_write_mutex = false;
1931
    bool m_waiting_for_write_mutex = false;
1932

1933
    void main();
1934

1935
    void start_thread()
1936
    {
107,325✔
1937
        if (m_running) {
107,325✔
1938
            return;
74,166✔
1939
        }
74,166✔
1940
        m_running = true;
33,159✔
1941
        m_thread = std::thread([this]() {
33,159✔
1942
            main();
33,159✔
1943
        });
33,159✔
1944
    }
33,159✔
1945

1946
    bool has_pending_write_requests()
1947
    {
311,844✔
1948
        return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty();
311,844✔
1949
    }
311,844✔
1950
};
1951

1952
DB::~DB() noexcept
1953
{
111,141✔
1954
    close();
111,141✔
1955
}
111,141✔
1956

1957
// Note: close() and close_internal() may be called from the DB::~DB().
1958
// in that case, they will not throw. Throwing can only happen if called
1959
// directly.
1960
void DB::close(bool allow_open_read_transactions)
1961
{
112,182✔
1962
    // make helper thread(s) terminate
55,284✔
1963
    m_commit_helper.reset();
112,182✔
1964

55,284✔
1965
    if (m_fake_read_lock_if_immutable) {
112,182✔
1966
        if (!is_attached())
192✔
1967
            return;
×
1968
        {
192✔
1969
            CheckedLockGuard local_lock(m_mutex);
192✔
1970
            if (!allow_open_read_transactions && m_transaction_count)
192✔
1971
                throw WrongTransactionState("Closing with open read transactions");
×
1972
        }
192✔
1973
        if (m_alloc.is_attached())
192✔
1974
            m_alloc.detach();
192✔
1975
        m_fake_read_lock_if_immutable.reset();
192✔
1976
    }
192✔
1977
    else {
111,990✔
1978
        close_internal(std::unique_lock<InterprocessMutex>(m_controlmutex, std::defer_lock),
111,990✔
1979
                       allow_open_read_transactions);
111,990✔
1980
    }
111,990✔
1981
}
112,182✔
1982

1983
void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_open_read_transactions)
1984
{
111,990✔
1985
    if (!is_attached())
111,990✔
1986
        return;
1,191✔
1987

54,600✔
1988
    {
110,799✔
1989
        CheckedLockGuard local_lock(m_mutex);
110,799✔
1990
        if (m_write_transaction_open)
110,799✔
1991
            throw WrongTransactionState("Closing with open write transactions");
6✔
1992
        if (!allow_open_read_transactions && m_transaction_count)
110,793✔
1993
            throw WrongTransactionState("Closing with open read transactions");
6✔
1994
    }
110,787✔
1995
    SharedInfo* info = m_info;
110,787✔
1996
    {
110,787✔
1997
        if (!lock.owns_lock())
110,787✔
1998
            lock.lock();
110,784✔
1999

54,594✔
2000
        if (m_alloc.is_attached())
110,787✔
2001
            m_alloc.detach();
110,787✔
2002

54,594✔
2003
        if (m_is_sync_agent) {
110,787✔
2004
            REALM_ASSERT(info->sync_agent_present);
1,497✔
2005
            info->sync_agent_present = 0; // Set to false
1,497✔
2006
        }
1,497✔
2007
        release_all_read_locks();
110,787✔
2008
        --info->num_participants;
110,787✔
2009
        bool end_of_session = info->num_participants == 0;
110,787✔
2010
        // std::cerr << "closing" << std::endl;
54,594✔
2011
        if (end_of_session) {
110,787✔
2012

41,799✔
2013
            // If the db file is just backing for a transient data structure,
41,799✔
2014
            // we can delete it when done.
41,799✔
2015
            if (Durability(info->durability) == Durability::MemOnly && !m_in_memory_info) {
84,309✔
2016
                try {
21,642✔
2017
                    util::File::remove(m_db_path.c_str());
21,642✔
2018
                }
21,642✔
2019
                catch (...) {
10,827✔
2020
                } // ignored on purpose.
12✔
2021
            }
21,642✔
2022
        }
84,309✔
2023
        lock.unlock();
110,787✔
2024
    }
110,787✔
2025
    {
110,787✔
2026
        CheckedLockGuard local_lock(m_mutex);
110,787✔
2027

54,594✔
2028
        m_new_commit_available.close();
110,787✔
2029
        m_pick_next_writer.close();
110,787✔
2030

54,594✔
2031
        if (m_in_memory_info) {
110,787✔
2032
            m_in_memory_info.reset();
25,422✔
2033
        }
25,422✔
2034
        else {
85,365✔
2035
            // On Windows it is important that we unmap before unlocking, else a SetEndOfFile() call from another
41,883✔
2036
            // thread may interleave which is not permitted on Windows. It is permitted on *nix.
41,883✔
2037
            m_file_map.unmap();
85,365✔
2038
            m_version_manager.reset();
85,365✔
2039
            m_file.rw_unlock();
85,365✔
2040
            // info->~SharedInfo(); // DO NOT Call destructor
41,883✔
2041
            m_file.close();
85,365✔
2042
        }
85,365✔
2043
        m_info = nullptr;
110,787✔
2044
        if (m_logger)
110,787✔
2045
            m_logger->log(util::Logger::Level::detail, "DB closed");
96,597✔
2046
    }
110,787✔
2047
}
110,787✔
2048

2049
bool DB::other_writers_waiting_for_lock() const
2050
{
62,439✔
2051
    SharedInfo* info = m_info;
62,439✔
2052

32,595✔
2053
    uint32_t next_ticket = info->next_ticket.load(std::memory_order_relaxed);
62,439✔
2054
    uint32_t next_served = info->next_served.load(std::memory_order_relaxed);
62,439✔
2055
    // When holding the write lock, next_ticket = next_served + 1, hence, if the diference between 'next_ticket' and
32,595✔
2056
    // 'next_served' is greater than 1, there is at least one thread waiting to acquire the write lock.
32,595✔
2057
    return next_ticket > next_served + 1;
62,439✔
2058
}
62,439✔
2059

2060
void DB::AsyncCommitHelper::main()
2061
{
33,159✔
2062
    std::unique_lock lg(m_mutex);
33,159✔
2063
    while (m_running) {
457,275✔
2064
#if 0 // Enable for testing purposes
2065
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
2066
#endif
2067
        if (m_has_write_mutex) {
424,116✔
2068
            if (auto cb = std::move(m_pending_sync)) {
219,333✔
2069
                // Only one of sync_to_disk(), end_write(), or blocking_end_write()
306✔
2070
                // should be called, so we should never have both a pending sync
306✔
2071
                // and pending release.
306✔
2072
                REALM_ASSERT(!m_pending_mx_release);
1,356✔
2073
                lg.unlock();
1,356✔
2074
                cb();
1,356✔
2075
                cb = nullptr; // Release things captured by the callback before reacquiring the lock
1,356✔
2076
                lg.lock();
1,356✔
2077
                m_pending_mx_release = true;
1,356✔
2078
            }
1,356✔
2079
            if (m_pending_mx_release) {
219,333✔
2080
                REALM_ASSERT(!InterprocessMutex::is_thread_confined || m_owns_write_mutex);
106,287!
2081
                m_db->do_end_write();
106,287✔
2082
                m_pending_mx_release = false;
106,287✔
2083
                m_has_write_mutex = false;
106,287✔
2084
                m_owns_write_mutex = false;
106,287✔
2085

104,688✔
2086
                lg.unlock();
106,287✔
2087
                m_cv_callers.notify_all();
106,287✔
2088
                lg.lock();
106,287✔
2089
                continue;
106,287✔
2090
            }
106,287✔
2091
        }
204,783✔
2092
        else {
204,783✔
2093
            REALM_ASSERT(!m_pending_sync && !m_pending_mx_release);
204,783✔
2094

202,806✔
2095
            // Acquire the write lock if anyone has requested it, but only if
202,806✔
2096
            // another thread is not already waiting for it. If there's another
202,806✔
2097
            // thread requesting and they get it while we're waiting, we'll
202,806✔
2098
            // deadlock if they ask us to perform the sync.
202,806✔
2099
            if (!m_waiting_for_write_mutex && has_pending_write_requests()) {
204,783✔
2100
                lg.unlock();
105,969✔
2101
                m_db->do_begin_write();
105,969✔
2102
                lg.lock();
105,969✔
2103

104,688✔
2104
                REALM_ASSERT(!m_has_write_mutex);
105,969✔
2105
                m_has_write_mutex = true;
105,969✔
2106
                m_owns_write_mutex = true;
105,969✔
2107

104,688✔
2108
                // Synchronous transaction requests get priority over async
104,688✔
2109
                if (m_write_lock_claim_fulfilled < m_write_lock_claim_ticket) {
105,969✔
2110
                    ++m_write_lock_claim_fulfilled;
104,355✔
2111
                    m_cv_callers.notify_all();
104,355✔
2112
                    continue;
104,355✔
2113
                }
104,355✔
2114

393✔
2115
                REALM_ASSERT(!m_pending_writes.empty());
1,614✔
2116
                auto callback = std::move(m_pending_writes.front());
1,614✔
2117
                m_pending_writes.pop_front();
1,614✔
2118
                lg.unlock();
1,614✔
2119
                callback();
1,614✔
2120
                // Release things captured by the callback before reacquiring the lock
393✔
2121
                callback = nullptr;
1,614✔
2122
                lg.lock();
1,614✔
2123
                continue;
1,614✔
2124
            }
1,614✔
2125
        }
204,783✔
2126
        m_cv_worker.wait(lg);
211,860✔
2127
    }
211,860✔
2128
    if (m_has_write_mutex && m_owns_write_mutex) {
33,159!
2129
        m_db->do_end_write();
×
2130
    }
×
2131
}
33,159✔
2132

2133
void DB::async_begin_write(util::UniqueFunction<void()> fn)
2134
{
1,614✔
2135
    REALM_ASSERT(m_commit_helper);
1,614✔
2136
    m_commit_helper->begin_write(std::move(fn));
1,614✔
2137
}
1,614✔
2138

2139
void DB::async_end_write()
2140
{
54✔
2141
    REALM_ASSERT(m_commit_helper);
54✔
2142
    m_commit_helper->end_write();
54✔
2143
}
54✔
2144

2145
void DB::async_sync_to_disk(util::UniqueFunction<void()> fn)
2146
{
1,356✔
2147
    REALM_ASSERT(m_commit_helper);
1,356✔
2148
    m_commit_helper->sync_to_disk(std::move(fn));
1,356✔
2149
}
1,356✔
2150

2151
bool DB::has_changed(TransactionRef& tr)
2152
{
821,577✔
2153
    if (m_fake_read_lock_if_immutable)
821,577✔
2154
        return false; // immutable doesn't change
×
2155
    bool changed = tr->m_read_lock.m_version != get_version_of_latest_snapshot();
821,577✔
2156
    return changed;
821,577✔
2157
}
821,577✔
2158

2159
bool DB::wait_for_change(TransactionRef& tr)
2160
{
×
2161
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2162
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2163
    while (tr->m_read_lock.m_version == m_info->latest_version_number && m_wait_for_change_enabled) {
×
2164
        m_new_commit_available.wait(m_controlmutex, 0);
×
2165
    }
×
2166
    return tr->m_read_lock.m_version != m_info->latest_version_number;
×
2167
}
×
2168

2169

2170
void DB::wait_for_change_release()
2171
{
×
2172
    if (m_fake_read_lock_if_immutable)
×
2173
        return;
×
2174
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2175
    m_wait_for_change_enabled = false;
×
2176
    m_new_commit_available.notify_all();
×
2177
}
×
2178

2179

2180
void DB::enable_wait_for_change()
2181
{
×
2182
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
×
2183
    std::lock_guard<InterprocessMutex> lock(m_controlmutex);
×
2184
    m_wait_for_change_enabled = true;
×
2185
}
×
2186

2187
bool DB::needs_file_format_upgrade(const std::string& file, const std::vector<char>& encryption_key)
2188
{
54✔
2189
    SlabAlloc alloc;
54✔
2190
    SlabAlloc::Config cfg;
54✔
2191
    cfg.session_initiator = false;
54✔
2192
    cfg.read_only = true;
54✔
2193
    cfg.no_create = true;
54✔
2194
    if (!encryption_key.empty()) {
54✔
2195
        cfg.encryption_key = encryption_key.data();
×
2196
    }
×
2197
    try {
54✔
2198
        alloc.attach_file(file, cfg);
54✔
2199
        if (auto current_file_format_version = alloc.get_committed_file_format_version()) {
54✔
2200
            auto target_file_format_version = Group::g_current_file_format_version;
42✔
2201
            return current_file_format_version < target_file_format_version;
42✔
2202
        }
42✔
2203
    }
6✔
2204
    catch (const FileAccessError& err) {
6✔
2205
        if (err.code() != ErrorCodes::FileNotFound) {
6✔
2206
            throw;
×
2207
        }
×
2208
    }
12✔
2209
    return false;
12✔
2210
}
12✔
2211

2212
void DB::upgrade_file_format(bool allow_file_format_upgrade, int target_file_format_version,
2213
                             int current_hist_schema_version, int target_hist_schema_version)
2214
{
43,743✔
2215
    // In a multithreaded scenario multiple threads may initially see a need to
21,660✔
2216
    // upgrade (maybe_upgrade == true) even though one onw thread is supposed to
21,660✔
2217
    // perform the upgrade, but that is ok, because the condition is rechecked
21,660✔
2218
    // in a fully reliable way inside a transaction.
21,660✔
2219

21,660✔
2220
    // First a non-threadsafe but fast check
21,660✔
2221
    int current_file_format_version = m_file_format_version;
43,743✔
2222
    REALM_ASSERT(current_file_format_version <= target_file_format_version);
43,743✔
2223
    REALM_ASSERT(current_hist_schema_version <= target_hist_schema_version);
43,743✔
2224
    bool maybe_upgrade_file_format = (current_file_format_version < target_file_format_version);
43,743✔
2225
    bool maybe_upgrade_hist_schema = (current_hist_schema_version < target_hist_schema_version);
43,743✔
2226
    bool maybe_upgrade = maybe_upgrade_file_format || maybe_upgrade_hist_schema;
43,743✔
2227
    if (maybe_upgrade) {
43,743✔
2228

138✔
2229
#ifdef REALM_DEBUG
276✔
2230
// This sleep() only exists in order to increase the quality of the
138✔
2231
// TEST(Upgrade_Database_2_3_Writes_New_File_Format_new) unit test.
138✔
2232
// The unit test creates multiple threads that all call
138✔
2233
// upgrade_file_format() simultaneously. This sleep() then acts like
138✔
2234
// a simple thread barrier that makes sure the threads meet here, to
138✔
2235
// increase the likelyhood of detecting any potential race problems.
138✔
2236
// See the unit test for details.
138✔
2237
//
138✔
2238
// NOTE: This sleep has been disabled because no problems have been found with
138✔
2239
// this code in a long while, and it was dramatically slowing down a unit test
138✔
2240
// in realm-sync.
138✔
2241

138✔
2242
// millisleep(200);
138✔
2243
#endif
276✔
2244

138✔
2245
        // WriteTransaction wt(*this);
138✔
2246
        auto wt = start_write();
276✔
2247
        bool dirty = false;
276✔
2248

138✔
2249
        // We need to upgrade history first. We may need to access it during migration
138✔
2250
        // when processing the !OID columns
138✔
2251
        int current_hist_schema_version_2 = wt->get_history_schema_version();
276✔
2252
        // The history must either still be using its initial schema or have
138✔
2253
        // been upgraded already to the chosen target schema version via a
138✔
2254
        // concurrent DB object.
138✔
2255
        REALM_ASSERT(current_hist_schema_version_2 == current_hist_schema_version ||
276!
2256
                     current_hist_schema_version_2 == target_hist_schema_version);
276✔
2257
        bool need_hist_schema_upgrade = (current_hist_schema_version_2 < target_hist_schema_version);
276✔
2258
        if (need_hist_schema_upgrade) {
276✔
2259
            if (!allow_file_format_upgrade)
192✔
2260
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2261

96✔
2262
            Replication* repl = get_replication();
192✔
2263
            repl->upgrade_history_schema(current_hist_schema_version_2); // Throws
192✔
2264
            wt->set_history_schema_version(target_hist_schema_version);  // Throws
192✔
2265
            dirty = true;
192✔
2266
        }
192✔
2267

138✔
2268
        // File format upgrade
138✔
2269
        int current_file_format_version_2 = m_alloc.get_committed_file_format_version();
276✔
2270
        // The file must either still be using its initial file_format or have
138✔
2271
        // been upgraded already to the chosen target file format via a
138✔
2272
        // concurrent DB object.
138✔
2273
        REALM_ASSERT(current_file_format_version_2 == current_file_format_version ||
276!
2274
                     current_file_format_version_2 == target_file_format_version);
276✔
2275
        bool need_file_format_upgrade = (current_file_format_version_2 < target_file_format_version);
276✔
2276
        if (need_file_format_upgrade) {
276✔
2277
            if (!allow_file_format_upgrade)
156✔
2278
                throw FileFormatUpgradeRequired(this->m_db_path);
×
2279
            wt->upgrade_file_format(target_file_format_version); // Throws
156✔
2280
            // Note: The file format version stored in the Realm file will be
78✔
2281
            // updated to the new file format version as part of the following
78✔
2282
            // commit operation. This happens in GroupWriter::commit().
78✔
2283
            if (m_upgrade_callback)
156✔
2284
                m_upgrade_callback(current_file_format_version_2, target_file_format_version); // Throws
18✔
2285
            dirty = true;
156✔
2286
        }
156✔
2287
        wt->set_file_format_version(target_file_format_version);
276✔
2288
        m_file_format_version = target_file_format_version;
276✔
2289

138✔
2290
        if (dirty)
276✔
2291
            wt->commit(); // Throws
270✔
2292
    }
276✔
2293
}
43,743✔
2294

2295
void DB::release_read_lock(ReadLockInfo& read_lock) noexcept
2296
{
3,648,345✔
2297
    // ignore if opened with immutable file (then we have no lockfile)
2,396,889✔
2298
    if (m_fake_read_lock_if_immutable)
3,648,345✔
2299
        return;
384✔
2300
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
3,647,961✔
2301
    do_release_read_lock(read_lock);
3,647,961✔
2302
}
3,647,961✔
2303

2304
// this is called with m_mutex locked
2305
void DB::do_release_read_lock(ReadLockInfo& read_lock) noexcept
2306
{
3,656,469✔
2307
    REALM_ASSERT(!m_fake_read_lock_if_immutable);
3,656,469✔
2308
    bool found_match = false;
3,656,469✔
2309
    // simple linear search and move-last-over if a match is found.
2,405,112✔
2310
    // common case should have only a modest number of transactions in play..
2,405,112✔
2311
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
6,707,631✔
2312
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
6,707,568✔
2313
            m_local_locks_held[j] = m_local_locks_held.back();
3,656,439✔
2314
            m_local_locks_held.pop_back();
3,656,439✔
2315
            found_match = true;
3,656,439✔
2316
            break;
3,656,439✔
2317
        }
3,656,439✔
2318
    }
6,707,568✔
2319
    if (!found_match) {
3,656,469✔
2320
        REALM_ASSERT(!is_attached());
6✔
2321
        // it's OK, someone called close() and all locks where released
3✔
2322
        return;
6✔
2323
    }
6✔
2324
    --m_transaction_count;
3,656,463✔
2325
    m_version_manager->release_read_lock(read_lock);
3,656,463✔
2326
}
3,656,463✔
2327

2328

2329
DB::ReadLockInfo DB::grab_read_lock(ReadLockInfo::Type type, VersionID version_id)
2330
{
3,625,794✔
2331
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
3,625,794✔
2332
    REALM_ASSERT_RELEASE(is_attached());
3,625,794✔
2333
    auto read_lock = m_version_manager->grab_read_lock(type, version_id);
3,625,794✔
2334

2,374,806✔
2335
    m_local_locks_held.emplace_back(read_lock);
3,625,794✔
2336
    ++m_transaction_count;
3,625,794✔
2337
    REALM_ASSERT(read_lock.m_file_size > read_lock.m_top_ref);
3,625,794✔
2338
    return read_lock;
3,625,794✔
2339
}
3,625,794✔
2340

2341
void DB::leak_read_lock(ReadLockInfo& read_lock) noexcept
2342
{
6✔
2343
    CheckedLockGuard lock(m_mutex); // mx on m_local_locks_held
6✔
2344
    // simple linear search and move-last-over if a match is found.
3✔
2345
    // common case should have only a modest number of transactions in play..
3✔
2346
    for (size_t j = 0; j < m_local_locks_held.size(); ++j) {
6✔
2347
        if (m_local_locks_held[j].m_version == read_lock.m_version) {
6✔
2348
            m_local_locks_held[j] = m_local_locks_held.back();
6✔
2349
            m_local_locks_held.pop_back();
6✔
2350
            --m_transaction_count;
6✔
2351
            return;
6✔
2352
        }
6✔
2353
    }
6✔
2354
}
6✔
2355

2356
bool DB::do_try_begin_write()
2357
{
84✔
2358
    // In the non-blocking case, we will only succeed if there is no contention for
42✔
2359
    // the write mutex. For this case we are trivially fair and can ignore the
42✔
2360
    // fairness machinery.
42✔
2361
    bool got_the_lock = m_writemutex.try_lock();
84✔
2362
    if (got_the_lock) {
84✔
2363
        finish_begin_write();
72✔
2364
    }
72✔
2365
    return got_the_lock;
84✔
2366
}
84✔
2367

2368
void DB::do_begin_write()
2369
{
609,996✔
2370
    if (m_logger) {
609,996✔
2371
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "acquire writemutex");
259,152✔
2372
    }
259,152✔
2373

311,700✔
2374
    SharedInfo* info = m_info;
609,996✔
2375

311,700✔
2376
    // Get write lock - the write lock is held until do_end_write().
311,700✔
2377
    //
311,700✔
2378
    // We use a ticketing scheme to ensure fairness wrt performing write transactions.
311,700✔
2379
    // (But cannot do that on Windows until we have interprocess condition variables there)
311,700✔
2380
    uint32_t my_ticket = info->next_ticket.fetch_add(1, std::memory_order_relaxed);
609,996✔
2381
    m_writemutex.lock(); // Throws
609,996✔
2382

311,700✔
2383
    // allow for comparison even after wrap around of ticket numbering:
311,700✔
2384
    int32_t diff = int32_t(my_ticket - info->next_served.load(std::memory_order_relaxed));
609,996✔
2385
    bool should_yield = diff > 0; // ticket is in the future
609,996✔
2386
    // a) the above comparison is only guaranteed to be correct, if the distance
311,700✔
2387
    //    between my_ticket and info->next_served is less than 2^30. This will
311,700✔
2388
    //    be the case since the distance will be bounded by the number of threads
311,700✔
2389
    //    and each thread cannot ever hold more than one ticket.
311,700✔
2390
    // b) we could use 64 bit counters instead, but it is unclear if all platforms
311,700✔
2391
    //    have support for interprocess atomics for 64 bit values.
311,700✔
2392

311,700✔
2393
    timespec time_limit; // only compute the time limit if we're going to use it:
609,996✔
2394
    if (should_yield) {
609,996✔
2395
        // This clock is not monotonic, so time can move backwards. This can lead
20,460✔
2396
        // to a wrong time limit, but the only effect of a wrong time limit is that
20,460✔
2397
        // we momentarily lose fairness, so we accept it.
20,460✔
2398
        timeval tv;
23,463✔
2399
        gettimeofday(&tv, nullptr);
23,463✔
2400
        time_limit.tv_sec = tv.tv_sec;
23,463✔
2401
        time_limit.tv_nsec = tv.tv_usec * 1000;
23,463✔
2402
        time_limit.tv_nsec += 500000000;        // 500 msec wait
23,463✔
2403
        if (time_limit.tv_nsec >= 1000000000) { // overflow
23,463✔
2404
            time_limit.tv_nsec -= 1000000000;
11,418✔
2405
            time_limit.tv_sec += 1;
11,418✔
2406
        }
11,418✔
2407
    }
23,463✔
2408

311,700✔
2409
    while (should_yield) {
755,184✔
2410

135,897✔
2411
        m_pick_next_writer.wait(m_writemutex, &time_limit);
145,188✔
2412
        timeval tv;
145,188✔
2413
        gettimeofday(&tv, nullptr);
145,188✔
2414
        if (time_limit.tv_sec < tv.tv_sec ||
145,188✔
2415
            (time_limit.tv_sec == tv.tv_sec && time_limit.tv_nsec < tv.tv_usec * 1000)) {
145,188✔
2416
            // Timeout!
UNCOV
2417
            break;
×
UNCOV
2418
        }
×
2419
        diff = int32_t(my_ticket - info->next_served);
145,188✔
2420
        should_yield = diff > 0; // ticket is in the future, so yield to someone else
145,188✔
2421
    }
145,188✔
2422

311,700✔
2423
    // we may get here because a) it's our turn, b) we timed out
311,700✔
2424
    // we don't distinguish, satisfied that event b) should be rare.
311,700✔
2425
    // In case b), we have to *make* it our turn. Failure to do so could leave us
311,700✔
2426
    // with 'next_served' permanently trailing 'next_ticket'.
311,700✔
2427
    //
311,700✔
2428
    // In doing so, we may bypass other waiters, hence the condition for yielding
311,700✔
2429
    // should take this situation into account by comparing with '>' instead of '!='
311,700✔
2430
    info->next_served = my_ticket;
609,996✔
2431
    finish_begin_write();
609,996✔
2432
    if (m_logger) {
609,996✔
2433
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex acquired");
259,152✔
2434
    }
259,152✔
2435
}
609,996✔
2436

2437
void DB::finish_begin_write()
2438
{
610,062✔
2439
    if (m_info->commit_in_critical_phase) {
610,062✔
2440
        m_writemutex.unlock();
×
2441
        throw RuntimeError(ErrorCodes::BrokenInvariant, "Crash of other process detected, session restart required");
×
2442
    }
×
2443

311,760✔
2444

311,760✔
2445
    {
610,062✔
2446
        CheckedLockGuard local_lock(m_mutex);
610,062✔
2447
        m_write_transaction_open = true;
610,062✔
2448
    }
610,062✔
2449
    m_alloc.set_read_only(false);
610,062✔
2450
}
610,062✔
2451

2452
void DB::do_end_write() noexcept
2453
{
610,077✔
2454
    m_info->next_served.fetch_add(1, std::memory_order_relaxed);
610,077✔
2455

311,748✔
2456
    CheckedLockGuard local_lock(m_mutex);
610,077✔
2457
    REALM_ASSERT(m_write_transaction_open);
610,077✔
2458
    m_alloc.set_read_only(true);
610,077✔
2459
    m_write_transaction_open = false;
610,077✔
2460
    m_pick_next_writer.notify_all();
610,077✔
2461
    m_writemutex.unlock();
610,077✔
2462
    if (m_logger) {
610,077✔
2463
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "writemutex released");
259,200✔
2464
    }
259,200✔
2465
}
610,077✔
2466

2467

2468
Replication::version_type DB::do_commit(Transaction& transaction, bool commit_to_disk)
2469
{
602,334✔
2470
    version_type current_version;
602,334✔
2471
    {
602,334✔
2472
        current_version = m_version_manager->get_newest_version();
602,334✔
2473
    }
602,334✔
2474
    version_type new_version = current_version + 1;
602,334✔
2475

307,890✔
2476
    if (!transaction.m_tables_to_clear.empty()) {
602,334✔
2477
        for (auto table_key : transaction.m_tables_to_clear) {
678✔
2478
            transaction.get_table_unchecked(table_key)->clear();
678✔
2479
        }
678✔
2480
        transaction.m_tables_to_clear.clear();
678✔
2481
    }
678✔
2482
    if (Replication* repl = get_replication()) {
602,334✔
2483
        // If Replication::prepare_commit() fails, then the entire transaction
295,698✔
2484
        // fails. The application then has the option of terminating the
295,698✔
2485
        // transaction with a call to Transaction::Rollback(), which in turn
295,698✔
2486
        // must call Replication::abort_transact().
295,698✔
2487
        new_version = repl->prepare_commit(current_version);        // Throws
577,923✔
2488
        low_level_commit(new_version, transaction, commit_to_disk); // Throws
577,923✔
2489
        repl->finalize_commit();
577,923✔
2490
    }
577,923✔
2491
    else {
24,411✔
2492
        low_level_commit(new_version, transaction); // Throws
24,411✔
2493
    }
24,411✔
2494

307,890✔
2495
    {
602,334✔
2496
        std::lock_guard lock(m_commit_listener_mutex);
602,334✔
2497
        for (auto listener : m_commit_listeners) {
502,485✔
2498
            listener->on_commit(new_version);
387,351✔
2499
        }
387,351✔
2500
    }
602,334✔
2501

307,890✔
2502
    return new_version;
602,334✔
2503
}
602,334✔
2504

2505
VersionID DB::get_version_id_of_latest_snapshot()
2506
{
989,505✔
2507
    if (m_fake_read_lock_if_immutable)
989,505✔
2508
        return {m_fake_read_lock_if_immutable->m_version, 0};
12✔
2509
    return m_version_manager->get_version_id_of_latest_snapshot();
989,493✔
2510
}
989,493✔
2511

2512

2513
DB::version_type DB::get_version_of_latest_snapshot()
2514
{
988,764✔
2515
    return get_version_id_of_latest_snapshot().version;
988,764✔
2516
}
988,764✔
2517

2518

2519
void DB::low_level_commit(uint_fast64_t new_version, Transaction& transaction, bool commit_to_disk)
2520
{
602,361✔
2521
    SharedInfo* info = m_info;
602,361✔
2522

307,911✔
2523
    // Version of oldest snapshot currently (or recently) bound in a transaction
307,911✔
2524
    // of the current session.
307,911✔
2525
    uint64_t oldest_version = 0, oldest_live_version = 0;
602,361✔
2526
    TopRefMap top_refs;
602,361✔
2527
    bool any_new_unreachables;
602,361✔
2528
    {
602,361✔
2529
        CheckedLockGuard lock(m_mutex);
602,361✔
2530
        m_version_manager->cleanup_versions(oldest_live_version, top_refs, any_new_unreachables);
602,361✔
2531
        oldest_version = top_refs.begin()->first;
602,361✔
2532
        // Allow for trimming of the history. Some types of histories do not
307,911✔
2533
        // need store changesets prior to the oldest *live* bound snapshot.
307,911✔
2534
        if (auto hist = transaction.get_history()) {
602,361✔
2535
            hist->set_oldest_bound_version(oldest_live_version); // Throws
577,893✔
2536
        }
577,893✔
2537
        // Cleanup any stale mappings
307,911✔
2538
        m_alloc.purge_old_mappings(oldest_version, new_version);
602,361✔
2539
    }
602,361✔
2540
    // save number of live versions for later:
307,911✔
2541
    // (top_refs is std::moved into GroupWriter so we'll loose it in the call to set_versions below)
307,911✔
2542
    auto live_versions = top_refs.size();
602,361✔
2543
    // Do the actual commit
307,911✔
2544
    REALM_ASSERT(oldest_version <= new_version);
602,361✔
2545

307,911✔
2546
    GroupWriter out(transaction, Durability(info->durability), m_marker_observer.get()); // Throws
602,361✔
2547
    out.set_versions(new_version, top_refs, any_new_unreachables);
602,361✔
2548
    out.prepare_evacuation();
602,361✔
2549
    auto t1 = std::chrono::steady_clock::now();
602,361✔
2550
    auto commit_size = m_alloc.get_commit_size();
602,361✔
2551
    if (m_logger) {
602,361✔
2552
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Initiate commit version: %1",
253,263✔
2553
                      new_version);
253,263✔
2554
    }
253,263✔
2555
    if (auto limit = out.get_evacuation_limit()) {
602,361✔
2556
        // Get a work limit based on the size of the transaction we're about to commit
2,673✔
2557
        // Add 4k to ensure progress on small commits
2,673✔
2558
        size_t work_limit = commit_size / 2 + out.get_free_list_size() + 0x1000;
5,346✔
2559
        transaction.cow_outliers(out.get_evacuation_progress(), limit, work_limit);
5,346✔
2560
    }
5,346✔
2561

307,911✔
2562
    ref_type new_top_ref;
602,361✔
2563
    // Recursively write all changed arrays to end of file
307,911✔
2564
    {
602,361✔
2565
        // protect against race with any other DB trying to attach to the file
307,911✔
2566
        std::lock_guard<InterprocessMutex> lock(m_controlmutex); // Throws
602,361✔
2567
        new_top_ref = out.write_group();                         // Throws
602,361✔
2568
    }
602,361✔
2569
    {
602,361✔
2570
        // protect access to shared variables and m_reader_mapping from here
307,911✔
2571
        CheckedLockGuard lock_guard(m_mutex);
602,361✔
2572
        m_free_space = out.get_free_space_size();
602,361✔
2573
        m_locked_space = out.get_locked_space_size();
602,361✔
2574
        m_used_space = out.get_logical_size() - m_free_space;
602,361✔
2575
        m_evac_stage.store(EvacStage(out.get_evacuation_stage()));
602,361✔
2576
        out.sync_according_to_durability();
602,361✔
2577
        if (Durability(info->durability) == Durability::Full || Durability(info->durability) == Durability::Unsafe) {
602,361✔
2578
            if (commit_to_disk) {
416,859✔
2579
                GroupCommitter cm(transaction, Durability(info->durability), m_marker_observer.get());
409,329✔
2580
                cm.commit(new_top_ref);
409,329✔
2581
            }
409,329✔
2582
        }
416,859✔
2583
        size_t new_file_size = out.get_logical_size();
602,361✔
2584
        // We must reset the allocators free space tracking before communicating the new
307,911✔
2585
        // version through the ring buffer. If not, a reader may start updating the allocators
307,911✔
2586
        // mappings while the allocator is in dirty state.
307,911✔
2587
        reset_free_space_tracking();
602,361✔
2588
        // Add the new version. If this fails in any way, the VersionList may be corrupted.
307,911✔
2589
        // This can lead to readers seing invalid data which is likely to cause them
307,911✔
2590
        // to crash. Other writers *must* be prevented from writing any further updates
307,911✔
2591
        // to the database. The flag "commit_in_critical_phase" is used to prevent such updates.
307,911✔
2592
        info->commit_in_critical_phase = 1;
602,361✔
2593
        {
602,361✔
2594
            m_version_manager->add_version(new_top_ref, new_file_size, new_version);
602,361✔
2595

307,911✔
2596
            // REALM_ASSERT(m_alloc.matches_section_boundary(new_file_size));
307,911✔
2597
            REALM_ASSERT(new_top_ref < new_file_size);
602,361✔
2598
        }
602,361✔
2599
        // At this point, the VersionList has been succesfully updated, and the next writer
307,911✔
2600
        // can safely proceed once the writemutex has been lifted.
307,911✔
2601
        info->commit_in_critical_phase = 0;
602,361✔
2602
    }
602,361✔
2603
    {
602,361✔
2604
        // protect against concurrent updates to the .lock file.
307,911✔
2605
        // must release m_mutex before this point to obey lock order
307,911✔
2606
        std::lock_guard<InterprocessMutex> lock(m_controlmutex);
602,361✔
2607

307,911✔
2608
        info->number_of_versions = live_versions + 1;
602,361✔
2609
        info->latest_version_number = new_version;
602,361✔
2610

307,911✔
2611
        m_new_commit_available.notify_all();
602,361✔
2612
    }
602,361✔
2613
    auto t2 = std::chrono::steady_clock::now();
602,361✔
2614
    if (m_logger) {
602,361✔
2615
        std::string to_disk_str = commit_to_disk ? util::format(" ref %1", new_top_ref) : " (no commit to disk)";
249,126✔
2616
        m_logger->log(util::LogCategory::transaction, util::Logger::Level::debug, "Commit of size %1 done in %2 us%3",
253,263✔
2617
                      commit_size, std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count(),
253,263✔
2618
                      to_disk_str);
253,263✔
2619
    }
253,263✔
2620
}
602,361✔
2621

2622
#ifdef REALM_DEBUG
2623
void DB::reserve(size_t size)
2624
{
36✔
2625
    REALM_ASSERT(is_attached());
36✔
2626
    m_alloc.reserve_disk_space(size); // Throws
36✔
2627
}
36✔
2628
#endif
2629

2630
bool DB::call_with_lock(const std::string& realm_path, CallbackWithLock&& callback)
2631
{
189✔
2632
    auto lockfile_path = get_core_file(realm_path, CoreFileType::Lock);
189✔
2633

54✔
2634
    File lockfile;
189✔
2635
    lockfile.open(lockfile_path, File::access_ReadWrite, File::create_Auto, 0); // Throws
189✔
2636
    File::CloseGuard fcg(lockfile);
189✔
2637
    lockfile.set_fifo_path(realm_path + ".management", "lock.fifo");
189✔
2638
    if (lockfile.try_rw_lock_exclusive()) { // Throws
189✔
2639
        callback(realm_path);
147✔
2640
        return true;
147✔
2641
    }
147✔
2642
    return false;
42✔
2643
}
42✔
2644

2645
std::string DB::get_core_file(const std::string& base_path, CoreFileType type)
2646
{
194,100✔
2647
    switch (type) {
194,100✔
2648
        case CoreFileType::Lock:
86,598✔
2649
            return base_path + ".lock";
86,598✔
2650
        case CoreFileType::Storage:
924✔
2651
            return base_path;
924✔
2652
        case CoreFileType::Management:
86,451✔
2653
            return base_path + ".management";
86,451✔
2654
        case CoreFileType::Note:
19,206✔
2655
            return base_path + ".note";
19,206✔
2656
        case CoreFileType::Log:
924✔
2657
            return base_path + ".log";
924✔
2658
    }
×
2659
    REALM_UNREACHABLE();
2660
}
×
2661

2662
void DB::delete_files(const std::string& base_path, bool* did_delete, bool delete_lockfile)
2663
{
918✔
2664
    if (File::try_remove(get_core_file(base_path, CoreFileType::Storage)) && did_delete) {
918✔
2665
        *did_delete = true;
174✔
2666
    }
174✔
2667

459✔
2668
    File::try_remove(get_core_file(base_path, CoreFileType::Note));
918✔
2669
    File::try_remove(get_core_file(base_path, CoreFileType::Log));
918✔
2670
    util::try_remove_dir_recursive(get_core_file(base_path, CoreFileType::Management));
918✔
2671

459✔
2672
    if (delete_lockfile) {
918✔
2673
        File::try_remove(get_core_file(base_path, CoreFileType::Lock));
876✔
2674
    }
876✔
2675
}
918✔
2676

2677
TransactionRef DB::start_read(VersionID version_id)
2678
{
1,321,164✔
2679
    if (!is_attached())
1,321,164✔
2680
        throw StaleAccessor("Stale transaction");
6✔
2681
    TransactionRef tr;
1,321,158✔
2682
    if (m_fake_read_lock_if_immutable) {
1,321,158✔
2683
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Reading);
372✔
2684
    }
372✔
2685
    else {
1,320,786✔
2686
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, version_id);
1,320,786✔
2687
        ReadLockGuard g(*this, read_lock);
1,320,786✔
2688
        read_lock.check();
1,320,786✔
2689
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Reading);
1,320,786✔
2690
        g.release();
1,320,786✔
2691
    }
1,320,786✔
2692
    tr->set_file_format_version(get_file_format_version());
1,321,158✔
2693
    return tr;
1,321,158✔
2694
}
1,321,158✔
2695

2696
TransactionRef DB::start_frozen(VersionID version_id)
2697
{
23,640✔
2698
    if (!is_attached())
23,640✔
2699
        throw StaleAccessor("Stale transaction");
×
2700
    TransactionRef tr;
23,640✔
2701
    if (m_fake_read_lock_if_immutable) {
23,640✔
2702
        tr = make_transaction_ref(shared_from_this(), &m_alloc, *m_fake_read_lock_if_immutable, DB::transact_Frozen);
12✔
2703
    }
12✔
2704
    else {
23,628✔
2705
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Frozen, version_id);
23,628✔
2706
        ReadLockGuard g(*this, read_lock);
23,628✔
2707
        read_lock.check();
23,628✔
2708
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Frozen);
23,628✔
2709
        g.release();
23,628✔
2710
    }
23,628✔
2711
    tr->set_file_format_version(get_file_format_version());
23,640✔
2712
    return tr;
23,640✔
2713
}
23,640✔
2714

2715
TransactionRef DB::start_write(bool nonblocking)
2716
{
270,828✔
2717
    if (m_fake_read_lock_if_immutable) {
270,828✔
2718
        REALM_ASSERT(false && "Can't write an immutable DB");
×
2719
    }
×
2720
    if (nonblocking) {
270,828✔
2721
        bool success = do_try_begin_write();
84✔
2722
        if (!success) {
84✔
2723
            return TransactionRef();
12✔
2724
        }
12✔
2725
    }
270,744✔
2726
    else {
270,744✔
2727
        do_begin_write();
270,744✔
2728
    }
270,744✔
2729
    {
270,822✔
2730
        CheckedUniqueLock local_lock(m_mutex);
270,816✔
2731
        if (!is_attached()) {
270,816✔
2732
            local_lock.unlock();
×
2733
            end_write_on_correct_thread();
×
2734
            throw StaleAccessor("Stale transaction");
×
2735
        }
×
2736
        m_write_transaction_open = true;
270,816✔
2737
    }
270,816✔
2738
    TransactionRef tr;
270,816✔
2739
    try {
270,816✔
2740
        ReadLockInfo read_lock = grab_read_lock(ReadLockInfo::Live, VersionID());
270,816✔
2741
        ReadLockGuard g(*this, read_lock);
270,816✔
2742
        read_lock.check();
270,816✔
2743

142,449✔
2744
        tr = make_transaction_ref(shared_from_this(), &m_alloc, read_lock, DB::transact_Writing);
270,816✔
2745
        tr->set_file_format_version(get_file_format_version());
270,816✔
2746
        version_type current_version = read_lock.m_version;
270,816✔
2747
        m_alloc.init_mapping_management(current_version);
270,816✔
2748
        if (Replication* repl = get_replication()) {
270,816✔
2749
            bool history_updated = false;
246,246✔
2750
            repl->initiate_transact(*tr, current_version, history_updated); // Throws
246,246✔
2751
        }
246,246✔
2752
        g.release();
270,816✔
2753
    }
270,816✔
2754
    catch (...) {
142,449✔
2755
        end_write_on_correct_thread();
×
2756
        throw;
×
2757
    }
×
2758

142,374✔
2759
    return tr;
270,702✔
2760
}
270,702✔
2761

2762
void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction<void()>&& when_acquired)
2763
{
1,614✔
2764
    {
1,614✔
2765
        util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2766
        REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle);
1,614✔
2767
        tr->m_async_stage = Transaction::AsyncState::Requesting;
1,614✔
2768
        tr->m_request_time_point = std::chrono::steady_clock::now();
1,614✔
2769
        if (tr->db->m_logger) {
1,614✔
2770
            tr->db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,614✔
2771
                                  "Tr %1: Async request write lock", tr->m_log_id);
1,614✔
2772
        }
1,614✔
2773
    }
1,614✔
2774
    std::weak_ptr<Transaction> weak_tr = tr;
1,614✔
2775
    async_begin_write([weak_tr, cb = std::move(when_acquired)]() {
1,614✔
2776
        if (auto tr = weak_tr.lock()) {
1,614✔
2777
            util::CheckedLockGuard lck(tr->m_async_mutex);
1,614✔
2778
            // If a synchronous transaction happened while we were pending
393✔
2779
            // we may be in HasCommits
393✔
2780
            if (tr->m_async_stage == Transaction::AsyncState::Requesting) {
1,614✔
2781
                tr->m_async_stage = Transaction::AsyncState::HasLock;
1,614✔
2782
            }
1,614✔
2783
            if (tr->db->m_logger) {
1,614✔
2784
                auto t2 = std::chrono::steady_clock::now();
1,614✔
2785
                tr->db->m_logger->log(
1,614✔
2786
                    util::LogCategory::transaction, util::Logger::Level::trace, "Tr %1, Got write lock in %2 us",
1,614✔
2787
                    tr->m_log_id,
1,614✔
2788
                    std::chrono::duration_cast<std::chrono::microseconds>(t2 - tr->m_request_time_point).count());
1,614✔
2789
            }
1,614✔
2790
            if (tr->m_waiting_for_write_lock) {
1,614✔
2791
                tr->m_waiting_for_write_lock = false;
132✔
2792
                tr->m_async_cv.notify_one();
132✔
2793
            }
132✔
2794
            else if (cb) {
1,482✔
2795
                cb();
1,482✔
2796
            }
1,482✔
2797
            tr.reset(); // Release pointer while lock is held
1,614✔
2798
        }
1,614✔
2799
    });
1,614✔
2800
}
1,614✔
2801

2802
inline DB::DB(Private, const DBOptions& options)
2803
    : m_upgrade_callback(std::move(options.upgrade_callback))
2804
    , m_log_id(util::gen_log_id(this))
2805
{
111,141✔
2806
    if (options.enable_async_writes) {
111,141✔
2807
        m_commit_helper = std::make_unique<AsyncCommitHelper>(this);
72,651✔
2808
    }
72,651✔
2809
}
111,141✔
2810

2811
DBRef DB::create(const std::string& file, bool no_create, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2812
{
27,222✔
2813
    DBRef retval = std::make_shared<DB>(Private(), options);
27,222✔
2814
    retval->open(file, no_create, options);
27,222✔
2815
    return retval;
27,222✔
2816
}
27,222✔
2817

2818
DBRef DB::create(Replication& repl, const std::string& file, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2819
{
7,350✔
2820
    DBRef retval = std::make_shared<DB>(Private(), options);
7,350✔
2821
    retval->open(repl, file, options);
7,350✔
2822
    return retval;
7,350✔
2823
}
7,350✔
2824

2825
DBRef DB::create(std::unique_ptr<Replication> repl, const std::string& file,
2826
                 const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2827
{
51,141✔
2828
    REALM_ASSERT(repl);
51,141✔
2829
    DBRef retval = std::make_shared<DB>(Private(), options);
51,141✔
2830
    retval->m_history = std::move(repl);
51,141✔
2831
    retval->open(*retval->m_history, file, options);
51,141✔
2832
    return retval;
51,141✔
2833
}
51,141✔
2834

2835
DBRef DB::create(std::unique_ptr<Replication> repl, const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2836
{
25,422✔
2837
    REALM_ASSERT(repl);
25,422✔
2838
    DBRef retval = std::make_shared<DB>(Private(), options);
25,422✔
2839
    retval->m_history = std::move(repl);
25,422✔
2840
    retval->open(*retval->m_history, options);
25,422✔
2841
    return retval;
25,422✔
2842
}
25,422✔
2843

2844
DBRef DB::create_in_memory(std::unique_ptr<Replication> repl, const std::string& in_memory_path,
2845
                           const DBOptions& options) NO_THREAD_SAFETY_ANALYSIS
2846
{
×
2847
    DBRef db = create(std::move(repl), options);
×
2848
    db->m_db_path = in_memory_path;
×
2849
    return db;
×
2850
}
×
2851

2852
DBRef DB::create(BinaryData buffer, bool take_ownership) NO_THREAD_SAFETY_ANALYSIS
2853
{
6✔
2854
    DBOptions options;
6✔
2855
    options.is_immutable = true;
6✔
2856
    DBRef retval = std::make_shared<DB>(Private(), options);
6✔
2857
    retval->open(buffer, take_ownership);
6✔
2858
    return retval;
6✔
2859
}
6✔
2860

2861
void DB::claim_sync_agent()
2862
{
16,563✔
2863
    REALM_ASSERT(is_attached());
16,563✔
2864
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
16,563✔
2865
    if (m_info->sync_agent_present)
16,563✔
2866
        throw MultipleSyncAgents{};
6✔
2867
    m_info->sync_agent_present = 1; // Set to true
16,557✔
2868
    m_is_sync_agent = true;
16,557✔
2869
}
16,557✔
2870

2871
void DB::release_sync_agent()
2872
{
15,333✔
2873
    REALM_ASSERT(is_attached());
15,333✔
2874
    std::unique_lock<InterprocessMutex> lock(m_controlmutex);
15,333✔
2875
    if (!m_is_sync_agent)
15,333✔
2876
        return;
273✔
2877
    REALM_ASSERT(m_info->sync_agent_present);
15,060✔
2878
    m_info->sync_agent_present = 0;
15,060✔
2879
    m_is_sync_agent = false;
15,060✔
2880
}
15,060✔
2881

2882
void DB::do_begin_possibly_async_write()
2883
{
337,650✔
2884
    if (m_commit_helper) {
337,650✔
2885
        m_commit_helper->blocking_begin_write();
211,866✔
2886
    }
211,866✔
2887
    else {
125,784✔
2888
        do_begin_write();
125,784✔
2889
    }
125,784✔
2890
}
337,650✔
2891

2892
void DB::end_write_on_correct_thread() noexcept
2893
{
608,667✔
2894
    //    m_local_write_mutex.unlock();
311,418✔
2895
    if (!m_commit_helper || !m_commit_helper->blocking_end_write()) {
608,667✔
2896
        do_end_write();
396,588✔
2897
    }
396,588✔
2898
}
608,667✔
2899

2900
void DB::add_commit_listener(CommitListener* listener)
2901
{
88,029✔
2902
    std::lock_guard lock(m_commit_listener_mutex);
88,029✔
2903
    m_commit_listeners.push_back(listener);
88,029✔
2904
}
88,029✔
2905

2906
void DB::remove_commit_listener(CommitListener* listener)
2907
{
87,921✔
2908
    std::lock_guard lock(m_commit_listener_mutex);
87,921✔
2909
    m_commit_listeners.erase(std::remove(m_commit_listeners.begin(), m_commit_listeners.end(), listener),
87,921✔
2910
                             m_commit_listeners.end());
87,921✔
2911
}
87,921✔
2912

2913
DisableReplication::DisableReplication(Transaction& t)
2914
    : m_tr(t)
2915
    , m_owner(t.get_db())
2916
    , m_repl(m_owner->get_replication())
2917
    , m_version(t.get_version())
2918
{
×
2919
    m_owner->set_replication(nullptr);
×
2920
    t.m_history = nullptr;
×
2921
}
×
2922

2923
DisableReplication::~DisableReplication()
2924
{
×
2925
    m_owner->set_replication(m_repl);
×
2926
    if (m_version != m_tr.get_version())
×
2927
        m_tr.initialize_replication();
×
2928
}
×
2929

2930
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc