• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_416

12 Jun 2024 07:15PM CUT coverage: 90.941% (-0.006%) from 90.947%
thomas.goyne_416

Pull #7796

Evergreen

tgoyne
Make async_open_realm() return StatusWith<std::shared_ptr<Realm>>

This simplifies a lot of test code and eliminates some cases where the Realm
was being opened on a background thread, which is unsupported on linux.
Pull Request #7796: RCORE-2160 Make upload completion reporting multiprocess-compatible

102098 of 180310 branches covered (56.62%)

205 of 210 new or added lines in 10 files covered. (97.62%)

73 existing lines in 15 files now uncovered.

214551 of 235923 relevant lines covered (90.94%)

5646939.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.36
/src/realm/object-store/sync/async_open_task.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2019 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/async_open_task.hpp>
20

21
#include <realm/sync/subscriptions.hpp>
22
#include <realm/sync/noinst/sync_schema_migration.hpp>
23
#include <realm/object-store/impl/realm_coordinator.hpp>
24
#include <realm/object-store/sync/sync_session.hpp>
25
#include <realm/object-store/thread_safe_reference.hpp>
26

27
namespace realm {
28

29
AsyncOpenTask::AsyncOpenTask(Private, std::shared_ptr<_impl::RealmCoordinator> coordinator,
30
                             std::shared_ptr<realm::SyncSession> session, bool db_first_open)
31
    : m_coordinator(coordinator)
86✔
32
    , m_session(session)
86✔
33
    , m_db_first_open(db_first_open)
86✔
34
{
172✔
35
}
172✔
36

37
void AsyncOpenTask::start(AsyncOpenCallback callback)
38
{
172✔
39
    util::CheckedUniqueLock lock(m_mutex);
172✔
40
    if (!m_session)
172✔
41
        return;
×
42
    auto session = m_session;
172✔
43
    lock.unlock();
172✔
44

45
    std::shared_ptr<AsyncOpenTask> self(shared_from_this());
172✔
46
    session->wait_for_download_completion([callback = std::move(callback), self, this](Status status) mutable {
172✔
47
        std::shared_ptr<_impl::RealmCoordinator> coordinator;
172✔
48
        {
172✔
49
            util::CheckedLockGuard lock(m_mutex);
172✔
50
            if (!m_session)
172✔
51
                return; // Swallow all events if the task has been cancelled.
6✔
52

53
            // Hold on to the coordinator until after we've called the callback
54
            coordinator = std::move(m_coordinator);
166✔
55
        }
166✔
56

57
        if (!status.is_ok()) {
166✔
58
            self->async_open_complete(std::move(callback), coordinator, status);
40✔
59
            return;
40✔
60
        }
40✔
61

62
        self->migrate_schema_or_complete(std::move(callback), coordinator);
126✔
63
    });
126✔
64
    session->revive_if_needed();
172✔
65
}
172✔
66

67
void AsyncOpenTask::cancel()
68
{
6✔
69
    std::shared_ptr<SyncSession> session;
6✔
70
    {
6✔
71
        util::CheckedLockGuard lock(m_mutex);
6✔
72
        if (!m_session)
6✔
UNCOV
73
            return;
×
74

75
        for (auto token : m_registered_callbacks) {
6✔
76
            m_session->unregister_progress_notifier(token);
2✔
77
        }
2✔
78

79
        session = std::move(m_session);
6✔
80
        m_coordinator = nullptr;
6✔
81
    }
6✔
82

83
    // We need to release the mutex before we log the session out as that will invoke the
84
    // wait_for_download_completion callback which will also attempt to acquire the mutex
85
    // thus deadlocking.
86
    if (session) {
6✔
87
        // Does a better way exists for canceling the download?
88
        session->force_close();
6✔
89
    }
6✔
90
}
6✔
91

92
uint64_t AsyncOpenTask::register_download_progress_notifier(std::function<ProgressNotifierCallback>&& callback)
93
{
8✔
94
    util::CheckedLockGuard lock(m_mutex);
8✔
95
    if (m_session) {
8✔
96
        auto token = m_session->register_progress_notifier(std::move(callback),
8✔
97
                                                           SyncSession::ProgressDirection::download, true);
8✔
98
        m_registered_callbacks.emplace_back(token);
8✔
99
        return token;
8✔
100
    }
8✔
101
    return 0;
×
102
}
8✔
103

104
void AsyncOpenTask::unregister_download_progress_notifier(uint64_t token)
105
{
×
106
    util::CheckedLockGuard lock(m_mutex);
×
107
    if (m_session)
×
108
        m_session->unregister_progress_notifier(token);
×
109
}
×
110

111
void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
112
                                                   std::shared_ptr<_impl::RealmCoordinator> coordinator,
113
                                                   Status status)
114
{
126✔
115
    if (!status.is_ok()) {
126✔
116
        async_open_complete(std::move(callback), coordinator, status);
×
117
        return;
×
118
    }
×
119

120
    auto config = coordinator->get_config();
126✔
121
    // FlX sync is not used so there is nothing to bootstrap.
122
    if (!config.sync_config || !config.sync_config->flx_sync_requested) {
126✔
123
        async_open_complete(std::move(callback), coordinator, status);
46✔
124
        return;
46✔
125
    }
46✔
126

127
    SharedRealm shared_realm;
80✔
128
    try {
80✔
129
        shared_realm = coordinator->get_realm(nullptr, m_db_first_open);
80✔
130
    }
80✔
131
    catch (...) {
80✔
132
        async_open_complete(std::move(callback), coordinator, exception_to_status());
4✔
133
        return;
4✔
134
    }
4✔
135
    const auto subscription_set = shared_realm->get_latest_subscription_set();
76✔
136
    const auto sub_state = subscription_set.state();
76✔
137

138
    if (sub_state != sync::SubscriptionSet::State::Complete) {
76✔
139
        // We need to wait until subscription initializer completes
140
        std::shared_ptr<AsyncOpenTask> self(shared_from_this());
42✔
141
        subscription_set.get_state_change_notification(sync::SubscriptionSet::State::Complete)
42✔
142
            .get_async([self, coordinator, callback = std::move(callback)](
42✔
143
                           StatusWith<realm::sync::SubscriptionSet::State> state) mutable {
42✔
144
                self->async_open_complete(std::move(callback), coordinator, state.get_status());
42✔
145
            });
42✔
146
    }
42✔
147
    else {
34✔
148
        async_open_complete(std::move(callback), coordinator, Status::OK());
34✔
149
    }
34✔
150
}
76✔
151

152
void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
153
                                        std::shared_ptr<_impl::RealmCoordinator> coordinator, Status status)
154
{
166✔
155
    {
166✔
156
        util::CheckedLockGuard lock(m_mutex);
166✔
157
        // 'Cancel' may have been called just before 'async_open_complete' is invoked.
158
        if (!m_session)
166✔
159
            return;
×
160

161
        for (auto token : m_registered_callbacks) {
166✔
162
            m_session->unregister_progress_notifier(token);
6✔
163
        }
6✔
164
        m_session = nullptr;
166✔
165
    }
166✔
166

167
    if (status.is_ok()) {
166✔
168
        ThreadSafeReference realm;
122✔
169
        try {
122✔
170
            realm = coordinator->get_unbound_realm();
122✔
171
        }
122✔
172
        catch (...) {
122✔
173
            return callback({}, std::current_exception());
4✔
174
        }
4✔
175
        return callback(std::move(realm), nullptr);
118✔
176
    }
122✔
177
    return callback({}, std::make_exception_ptr(Exception(status)));
44✔
178
}
166✔
179

180
void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
181
                                               std::shared_ptr<_impl::RealmCoordinator> coordinator)
182
{
126✔
183
    util::CheckedUniqueLock lock(m_mutex);
126✔
184
    if (!m_session)
126✔
185
        return;
×
186
    auto session = m_session;
126✔
187
    lock.unlock();
126✔
188

189
    auto pending_migration = [&] {
126✔
190
        auto rt = coordinator->begin_read();
126✔
191
        return _impl::sync_schema_migration::has_pending_migration(*rt);
126✔
192
    }();
126✔
193

194
    if (!pending_migration) {
126✔
195
        wait_for_bootstrap_or_complete(std::move(callback), coordinator, Status::OK());
98✔
196
        return;
98✔
197
    }
98✔
198

199
    // Migrate the schema.
200
    //  * First upload the changes at the old schema version
201
    //  * Then, pause the session, delete all tables, re-initialize the metadata, and finally restart the session.
202
    // The lifetime of the task is extended until the bootstrap completes.
203
    std::shared_ptr<AsyncOpenTask> self(shared_from_this());
28✔
204
    session->wait_for_upload_completion(
28✔
205
        [callback = std::move(callback), coordinator, session, self, this](Status status) mutable {
28✔
206
            {
28✔
207
                util::CheckedLockGuard lock(m_mutex);
28✔
208
                if (!m_session)
28✔
209
                    return; // Swallow all events if the task has been cancelled.
×
210
            }
28✔
211

212
            if (!status.is_ok()) {
28✔
213
                self->async_open_complete(std::move(callback), coordinator, status);
×
214
                return;
×
215
            }
×
216

217
            auto migration_completed_callback = [callback = std::move(callback), coordinator = std::move(coordinator),
28✔
218
                                                 self](Status status) mutable {
28✔
219
                self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
28✔
220
            };
28✔
221
            SyncSession::Internal::migrate_schema(*session, std::move(migration_completed_callback));
28✔
222
        });
28✔
223
}
28✔
224

225
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc