• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_281750

30 Oct 2023 03:37PM UTC coverage: 90.528% (-1.0%) from 91.571%
github_pull_request_281750

Pull #6073

Evergreen

jedelbo
Log free space and history sizes when opening file
Pull Request #6073: Merge next-major

95488 of 175952 branches covered (0.0%)

8973 of 12277 new or added lines in 149 files covered. (73.09%)

622 existing lines in 51 files now uncovered.

233503 of 257934 relevant lines covered (90.53%)

6533720.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.46
/src/realm/object-store/impl/results_notifier.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2015 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/results_notifier.hpp>
20

21
#include <realm/object-store/shared_realm.hpp>
22
#include <realm/util/scope_exit.hpp>
23

24
#include <numeric>
25

26
using namespace realm;
27
using namespace realm::_impl;
28

29
// Some of the inter-thread synchronization for this class is handled externally
30
// by RealmCoordinator using the "notifier lock" which also guards registering
31
// and unregistering notifiers. This can make it somewhat difficult to tell what
32
// can safely be accessed where.
33
//
34
// The data flow is:
35
// - ResultsNotifier is created on target thread.
36
// - On background worker thread:
37
//   * do_attach_to() called with notifier lock held
38
//     - Writes to m_query
39
//   * do_add_required_change_info() called with notifier lock held
40
//     - Writes to m_info
41
//   * run() called with no locks held
42
//     - Reads m_query
43
//     - Reads m_info
44
//     - Reads m_need_to_run <-- FIXME: data race?
45
//     - Writes m_run_tv
46
//   * do_prepare_handover() called with notifier lock held
47
//     - Reads m_run_tv
48
//     - Writes m_handover_transaction
49
//     - Writes m_handover_tv
50
// - On target thread:
51
//   * prepare_to_deliver() called with notifier lock held
52
//     - Reads m_handover_transaction
53
//     - Reads m_handover_tv
54
//     - Writes m_deliver_transaction
55
//     - Writes m_deliver_handover
56
//   * get_tableview() called with no locks held
57
//     - Reads m_deliver_transaction
58
//     - Reads m_deliver_handover
59
//     - Reads m_results_were_used
60

61
ResultsNotifier::ResultsNotifier(Results& target)
62
    : ResultsNotifierBase(target.get_realm())
63
    , m_query(std::make_unique<Query>(target.get_query()))
64
    , m_descriptor_ordering(target.get_descriptor_ordering())
65
    , m_target_is_in_table_order(target.is_in_table_order())
66
{
3,798✔
67
    if (m_logger) {
3,798✔
NEW
68
        m_description = std::string("'") + std::string(m_query->get_table()->get_class_name()) + std::string("'");
×
NEW
69
        if (m_query->has_conditions()) {
×
NEW
70
            m_description += " where \"";
×
NEW
71
            m_description += m_query->get_description_safe() + "\"";
×
NEW
72
        }
×
NEW
73
        m_logger->log(util::LogCategory::notification, util::Logger::Level::debug, "Creating ResultsNotifier for ",
×
NEW
74
                      m_description);
×
NEW
75
    }
×
76
    reattach();
3,798✔
77
}
3,798✔
78

79
void ResultsNotifier::release_data() noexcept
80
{
3,798✔
81
    m_query = {};
3,798✔
82
    m_run_tv = {};
3,798✔
83
    m_handover_tv = {};
3,798✔
84
    m_handover_transaction = {};
3,798✔
85
    m_delivered_tv = {};
3,798✔
86
    m_delivered_transaction = {};
3,798✔
87
    CollectionNotifier::release_data();
3,798✔
88
}
3,798✔
89

90
bool ResultsNotifier::get_tableview(TableView& out)
91
{
12,332✔
92
    if (!m_delivered_tv)
12,332✔
93
        return false;
7,400✔
94
    auto& transaction = source_shared_group();
4,932✔
95
    if (transaction.get_transact_stage() != DB::transact_Reading)
4,932✔
96
        return false;
54✔
97
    if (m_delivered_transaction->get_version_of_current_transaction() !=
4,878✔
98
        transaction.get_version_of_current_transaction())
4,878✔
99
        return false;
8✔
100

2,435✔
101
    out = std::move(*transaction.import_copy_of(*m_delivered_tv, PayloadPolicy::Move));
4,870✔
102
    m_delivered_tv.reset();
4,870✔
103
    return true;
4,870✔
104
}
4,870✔
105

106
bool ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
107
{
4,092✔
108
    m_info = &info;
4,092✔
109

2,036✔
110
    // When adding or removing a callback the related tables can change due to the way we calculate related tables
2,036✔
111
    // when key path filters are set hence we need to recalculate every time the callbacks are changed.
2,036✔
112
    util::CheckedLockGuard lock(m_callback_mutex);
4,092✔
113
    if (m_did_modify_callbacks) {
4,092✔
114
        update_related_tables(*m_query->get_table());
3,640✔
115
    }
3,640✔
116

2,036✔
117
    return m_query->get_table() && has_run() && have_callbacks();
4,092✔
118
}
4,092✔
119

120
void ResultsNotifier::calculate_changes()
121
{
6,214✔
122
    if (has_run() && have_callbacks()) {
6,214✔
123
        ObjKeys next_objs;
2,458✔
124
        next_objs.reserve(m_run_tv.size());
2,458✔
125
        for (size_t i = 0; i < m_run_tv.size(); ++i)
8,568✔
126
            next_objs.push_back(m_run_tv.get_key(i));
6,110✔
127

1,219✔
128
        auto table_key = m_query->get_table()->get_key();
2,458✔
129
        if (auto it = m_info->tables.find(table_key); it != m_info->tables.end()) {
2,458✔
130
            auto& changes = it->second;
2,418✔
131
            for (auto& key_val : m_previous_objs) {
5,340✔
132
                if (changes.deletions_contains(key_val)) {
5,340✔
133
                    key_val = ObjKey();
126✔
134
                }
126✔
135
            }
5,340✔
136
        }
2,418✔
137

1,219✔
138
        m_change = CollectionChangeBuilder::calculate(m_previous_objs, next_objs,
2,458✔
139
                                                      get_modification_checker(*m_info, m_query->get_table()),
2,458✔
140
                                                      m_target_is_in_table_order);
2,458✔
141

1,219✔
142
        m_previous_objs = std::move(next_objs);
2,458✔
143
    }
2,458✔
144
    else {
3,756✔
145
        size_t sz = m_run_tv.size();
3,756✔
146
        m_previous_objs.resize(sz);
3,756✔
147
        for (size_t i = 0; i < sz; ++i)
10,552✔
148
            m_previous_objs[i] = m_run_tv.get_key(i);
6,796✔
149
    }
3,756✔
150
}
6,214✔
151

152
void ResultsNotifier::run()
153
{
7,798✔
154
    using namespace std::chrono;
7,798✔
155

3,889✔
156
    REALM_ASSERT(m_info || !has_run());
7,798✔
157

3,889✔
158
    auto t1 = steady_clock::now();
7,798✔
159
    util::ScopeExit cleanup([&]() noexcept {
7,798✔
160
        m_run_time_point = steady_clock::now();
7,798✔
161
        if (m_logger) {
7,798✔
NEW
162
            m_logger->log(util::LogCategory::notification, util::Logger::Level::debug,
×
NEW
163
                          "ResultsNotifier %1 did run in %2 us", m_description,
×
NEW
164
                          duration_cast<microseconds>(m_run_time_point - t1).count());
×
NEW
165
        }
×
166
    });
7,798✔
167

3,889✔
168
    // Table's been deleted, so report all objects as deleted
3,889✔
169
    if (!m_query->get_table()) {
7,798✔
170
        m_change = {};
×
171
        m_change.deletions.set(m_previous_objs.size());
×
172
        m_previous_objs.clear();
×
173
        return;
×
174
    }
×
175

3,889✔
176
    {
7,798✔
177
        auto lock = lock_target();
7,798✔
178
        // Don't run the query if the results aren't actually going to be used
3,889✔
179
        if (!get_realm() || (!have_callbacks() && !m_results_were_used))
7,798✔
180
            return;
20✔
181
    }
7,778✔
182

3,879✔
183
    auto new_versions = m_query->sync_view_if_needed();
7,778✔
184
    m_descriptor_ordering.collect_dependencies(m_query->get_table().unchecked_ptr());
7,778✔
185
    m_descriptor_ordering.get_versions(m_query->get_table()->get_parent_group(), new_versions);
7,778✔
186
    if (has_run() && new_versions == m_last_seen_version) {
7,778✔
187
        // We've run previously and none of the tables involved in the query
782✔
188
        // changed so we don't need to rerun the query, but we still need to
782✔
189
        // check each object in the results to see if it was modified
782✔
190
        if (!any_related_table_was_modified(*m_info))
1,564✔
191
            return;
1,304✔
192
        REALM_ASSERT(m_change.empty());
260✔
193
        auto checker = get_modification_checker(*m_info, m_query->get_table());
260✔
194
        for (size_t i = 0; i < m_previous_objs.size(); ++i) {
1,200✔
195
            if (checker(m_previous_objs[i])) {
940✔
196
                m_change.modifications.add(i);
194✔
197
            }
194✔
198
        }
940✔
199
        return;
260✔
200
    }
260✔
201

3,097✔
202
    m_run_tv = TableView(*m_query, size_t(-1));
6,214✔
203
    // Syncing will be done here
3,097✔
204
    m_run_tv.apply_descriptor_ordering(m_descriptor_ordering);
6,214✔
205
    m_last_seen_version = std::move(new_versions);
6,214✔
206

3,097✔
207
    calculate_changes();
6,214✔
208
}
6,214✔
209

210
void ResultsNotifier::do_prepare_handover(Transaction& sg)
211
{
7,798✔
212
    m_handover_tv.reset();
7,798✔
213
    if (m_handover_transaction)
7,798✔
214
        m_handover_transaction->advance_read(sg.get_version_of_current_transaction());
4,068✔
215

3,889✔
216
    if (m_run_tv.is_attached()) {
7,798✔
217
        REALM_ASSERT(m_run_tv.is_in_sync());
6,214✔
218
        if (!m_handover_transaction)
6,214✔
219
            m_handover_transaction = sg.duplicate();
3,730✔
220
        m_handover_tv = m_run_tv.clone_for_handover(m_handover_transaction.get(), PayloadPolicy::Move);
6,214✔
221
        m_run_tv = {};
6,214✔
222
    }
6,214✔
223
}
7,798✔
224

225
bool ResultsNotifier::prepare_to_deliver()
226
{
8,724✔
227
    auto lock = lock_target();
8,724✔
228
    auto realm = get_realm();
8,724✔
229
    if (!realm) {
8,724✔
230
        m_handover_tv.reset();
×
231
        m_delivered_tv.reset();
×
232
        return false;
×
233
    }
×
234
    if (!m_handover_tv) {
8,724✔
235
        bool transaction_is_stale =
2,545✔
236
            m_delivered_transaction &&
2,545✔
237
            (!realm->is_in_read_transaction() ||
2,483✔
238
             realm->read_transaction_version() > m_delivered_transaction->get_version_of_current_transaction());
2,419✔
239
        if (transaction_is_stale) {
2,545✔
240
            m_delivered_tv.reset();
246✔
241
            m_delivered_transaction.reset();
246✔
242
        }
246✔
243
        return true;
2,545✔
244
    }
2,545✔
245

3,080✔
246
    m_results_were_used = !m_delivered_tv;
6,179✔
247
    m_delivered_tv.reset();
6,179✔
248
    if (m_delivered_transaction)
6,179✔
249
        m_delivered_transaction->advance_read(m_handover_transaction->get_version_of_current_transaction());
2,445✔
250
    else
3,734✔
251
        m_delivered_transaction = m_handover_transaction->duplicate();
3,734✔
252
    m_delivered_tv = m_delivered_transaction->import_copy_of(*m_handover_tv, PayloadPolicy::Move);
6,179✔
253
    m_handover_tv.reset();
6,179✔
254

3,080✔
255
    return true;
6,179✔
256
}
6,179✔
257

258
void ResultsNotifier::reattach()
259
{
11,326✔
260
    if (m_query->get_table())
11,326✔
261
        m_query = transaction().import_copy_of(*m_query, PayloadPolicy::Move);
11,326✔
262
}
11,326✔
263

264
ListResultsNotifier::ListResultsNotifier(Results& target)
265
    : ResultsNotifierBase(target.get_realm())
266
    , m_list(target.get_collection())
267
{
1,810✔
268
    REALM_ASSERT(target.get_type() != PropertyType::Object);
1,810✔
269
    auto& ordering = target.get_descriptor_ordering();
1,810✔
270
    for (size_t i = 0, sz = ordering.size(); i < sz; i++) {
2,780✔
271
        auto descr = ordering[i];
970✔
272
        if (descr->get_type() == DescriptorType::Sort)
970✔
273
            m_sort_order = static_cast<const SortDescriptor*>(descr)->is_ascending(0);
928✔
274
        if (descr->get_type() == DescriptorType::Distinct)
970✔
275
            m_distinct = true;
42✔
276
    }
970✔
277
    if (m_logger) {
1,810✔
NEW
278
        auto path = m_list->get_short_path();
×
NEW
279
        auto prop_name = m_list->get_table()->get_column_name(path[0].get_col_key());
×
NEW
280
        path[0] = PathElement(prop_name);
×
NEW
281
        std::string sort_order;
×
NEW
282
        if (m_sort_order) {
×
NEW
283
            sort_order = *m_sort_order ? " sorted ascending" : " sorted descending";
×
NEW
284
        }
×
285

NEW
286
        m_description =
×
NEW
287
            util::format("%1 %2%3%4", m_list->get_collection_type(), m_list->get_obj().get_id(), path, sort_order);
×
NEW
288
        m_logger->log(util::LogCategory::notification, util::Logger::Level::debug,
×
NEW
289
                      "Creating ListResultsNotifier for %1", m_description);
×
NEW
290
    }
×
291
}
1,810✔
292

293
void ListResultsNotifier::release_data() noexcept
294
{
1,810✔
295
    m_list = {};
1,810✔
296
    CollectionNotifier::release_data();
1,810✔
297
}
1,810✔
298

299
bool ListResultsNotifier::get_list_indices(ListIndices& out)
300
{
1,496✔
301
    if (!m_delivered_indices)
1,496✔
302
        return false;
1,106✔
303
    auto& transaction = source_shared_group();
390✔
304
    if (m_delivered_transaction_version != transaction.get_version_of_current_transaction())
390✔
305
        return false;
×
306

195✔
307
    out = std::move(m_delivered_indices);
390✔
308
    m_delivered_indices = util::none;
390✔
309
    return true;
390✔
310
}
390✔
311

312
bool ListResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
313
{
2,780✔
314
    if (!m_list->is_attached())
2,780✔
315
        return false; // origin row was deleted after the notification was added
168✔
316

1,306✔
317
    info.collections.push_back(
2,612✔
318
        {m_list->get_table()->get_key(), m_list->get_owner_key(), m_list->get_stable_path(), &m_change});
2,612✔
319

1,306✔
320
    m_info = &info;
2,612✔
321
    return true;
2,612✔
322
}
2,612✔
323

324
bool ListResultsNotifier::need_to_run()
325
{
3,750✔
326
    REALM_ASSERT(m_info || !has_run());
3,750✔
327

1,875✔
328
    // Don't run the query if the results aren't actually going to be used
1,875✔
329
    if (!is_alive() || (!have_callbacks() && !m_results_were_used))
3,750!
330
        return false;
×
331

1,875✔
332
    return !has_run() || m_list->has_changed();
3,750✔
333
}
3,750✔
334

335
void ListResultsNotifier::calculate_changes()
336
{
3,666✔
337
    // Unsorted lists can just forward the changeset directly from the
1,833✔
338
    // transaction log parsing, but sorted lists need to perform diffing
1,833✔
339
    if (has_run() && have_callbacks() && (m_sort_order || m_distinct)) {
3,666✔
340
        // Update each of the row indices in m_previous_indices to the equivalent
550✔
341
        // new index in the new list
550✔
342
        if (!m_change.insertions.empty() || !m_change.deletions.empty()) {
1,100✔
343
            for (auto& row : m_previous_indices) {
3,172✔
344
                if (m_change.deletions.contains(row))
3,172✔
345
                    row = npos;
1,762✔
346
                else
1,410✔
347
                    row = m_change.insertions.shift(m_change.deletions.unshift(row));
1,410✔
348
            }
3,172✔
349
        }
764✔
350

550✔
351
        m_change = CollectionChangeBuilder::calculate(m_previous_indices, *m_run_indices, [&](size_t index) {
2,622✔
352
            return m_change.modifications.contains(index);
2,622✔
353
        });
2,622✔
354
    }
1,100✔
355

1,833✔
356
    m_previous_indices = *m_run_indices;
3,666✔
357
}
3,666✔
358

359
void ListResultsNotifier::run()
360
{
4,422✔
361
    using namespace std::chrono;
4,422✔
362
    auto t1 = steady_clock::now();
4,422✔
363
    util::ScopeExit cleanup([&]() noexcept {
4,380✔
364
        m_run_time_point = steady_clock::now();
4,338✔
365
        if (m_logger) {
4,338✔
NEW
366
            m_logger->log(util::LogCategory::notification, util::Logger::Level::debug,
×
NEW
367
                          "ListResultsNotifier %1 did run in %2 us", m_description,
×
NEW
368
                          duration_cast<microseconds>(m_run_time_point - t1).count());
×
NEW
369
        }
×
370
    });
4,338✔
371

2,211✔
372
    if (!m_list || !m_list->is_attached()) {
4,422✔
373
        // List was deleted, so report all of the rows being removed
336✔
374
        m_change = {};
672✔
375
        m_change.deletions.set(m_previous_indices.size());
672✔
376
        m_previous_indices.clear();
672✔
377
        report_collection_root_is_deleted();
672✔
378
        return;
672✔
379
    }
672✔
380

1,875✔
381
    if (!need_to_run()) {
3,750✔
382
        cleanup.cancel();
84✔
383
        return;
84✔
384
    }
84✔
385

1,833✔
386
    m_run_indices = std::vector<size_t>();
3,666✔
387
    if (m_distinct)
3,666✔
388
        m_list->distinct(*m_run_indices, m_sort_order);
84✔
389
    else if (m_sort_order)
3,582✔
390
        m_list->sort(*m_run_indices, *m_sort_order);
1,902✔
391
    else {
1,680✔
392
        m_run_indices->resize(m_list->size());
1,680✔
393
        std::iota(m_run_indices->begin(), m_run_indices->end(), 0);
1,680✔
394
    }
1,680✔
395

1,833✔
396
    calculate_changes();
3,666✔
397
}
3,666✔
398

399
void ListResultsNotifier::do_prepare_handover(Transaction& sg)
400
{
4,422✔
401
    if (m_run_indices) {
4,422✔
402
        m_handover_indices = std::move(m_run_indices);
3,666✔
403
        m_run_indices = {};
3,666✔
404
    }
3,666✔
405
    else {
756✔
406
        m_handover_indices = {};
756✔
407
    }
756✔
408
    m_handover_transaction_version = sg.get_version_of_current_transaction();
4,422✔
409
}
4,422✔
410

411
bool ListResultsNotifier::prepare_to_deliver()
412
{
6,946✔
413
    if (!is_alive()) {
6,946✔
414
        return false;
×
415
    }
×
416
    if (!m_handover_indices)
6,946✔
417
        return true;
3,280✔
418

1,833✔
419
    m_results_were_used = !m_delivered_indices;
3,666✔
420
    m_delivered_indices = std::move(m_handover_indices);
3,666✔
421
    m_delivered_transaction_version = m_handover_transaction_version;
3,666✔
422
    m_handover_indices = {};
3,666✔
423

1,833✔
424
    return true;
3,666✔
425
}
3,666✔
426

427
void ListResultsNotifier::reattach()
428
{
3,620✔
429
    if (m_list->is_attached())
3,620✔
430
        m_list = transaction().import_copy_of(*m_list);
3,620✔
431
}
3,620✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc