• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / james.stone_381

25 Sep 2023 06:35PM CUT coverage: 90.919% (+0.03%) from 90.892%
james.stone_381

Pull #6670

Evergreen

ironage
optimize: only compare strings once
Pull Request #6670: Sorting stage 3

97114 of 177952 branches covered (0.0%)

879 of 887 new or added lines in 12 files covered. (99.1%)

105 existing lines in 17 files now uncovered.

236103 of 259684 relevant lines covered (90.92%)

7062315.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.41
/src/realm/object-store/impl/epoll/external_commit_helper.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/external_commit_helper.hpp>
20
#include <realm/object-store/impl/realm_coordinator.hpp>
21
#include <realm/util/fifo_helper.hpp>
22

23
#include <realm/util/assert.hpp>
24
#include <realm/util/checked_mutex.hpp>
25
#include <realm/db.hpp>
26

27
#include <algorithm>
28
#include <errno.h>
29
#include <fcntl.h>
30
#include <sstream>
31
#include <stdlib.h>
32
#include <sys/epoll.h>
33
#include <sys/time.h>
34
#include <unistd.h>
35

36
#ifdef __ANDROID__
37
#include <android/log.h>
38
#define ANDROID_LOG __android_log_print
39
#else
40
#define ANDROID_LOG(...)
41
#endif
42

43
using namespace realm;
44
using namespace realm::_impl;
45

46
#define LOGE(...)                                                                                                    \
47
    do {                                                                                                             \
×
48
        fprintf(stderr, __VA_ARGS__);                                                                                \
×
49
        ANDROID_LOG(ANDROID_LOG_ERROR, "REALM", __VA_ARGS__);                                                        \
×
50
    } while (0)
×
51

52
namespace {
53

54
// Make writing to the pipe return -1 when there is no data to read, or no space in the buffer to write to, rather
55
// than blocking.
56
void make_non_blocking(int fd)
57
{
2,806✔
58
    int ret = fcntl(fd, F_SETFL, O_NONBLOCK);
2,806✔
59
    if (ret == -1) {
2,806✔
60
        throw std::system_error(errno, std::system_category());
×
61
    }
×
62
}
2,806✔
63

64
// Write a byte to a pipe to notify anyone waiting for data on the pipe.
65
// But first consume all bytes in the pipe, since linux may only notify on transition from not ready to ready.
66
// If a process dies after reading but before writing, it can consume a pending notification, and possibly prevent
67
// other processes from observing it. This is a transient issue and the next notification will work correctly.
68
void notify_fd(int fd, bool read_first = true)
69
{
10,385✔
70
    while (true) {
10,385✔
71
        if (read_first) {
10,385✔
72
            while (true) {
18,168✔
73
                uint8_t buff[1024];
18,168✔
74
                ssize_t actual = read(fd, buff, sizeof(buff));
18,168✔
75
                if (actual == 0) {
18,168✔
76
                    break; // Not sure why we would see EOF here, but defer error handling to the writer.
×
77
                }
×
78
                if (actual < 0) {
18,168✔
79
                    int err = errno;
10,383✔
80
                    if (err == EWOULDBLOCK || err == EAGAIN)
10,383✔
81
                        break;
10,383✔
82
                    throw std::system_error(err, std::system_category());
×
83
                }
×
84
            }
18,168✔
85
        }
10,383✔
86

10,385✔
87
        char c = 0;
10,385✔
88
        ssize_t ret = write(fd, &c, 1);
10,385✔
89
        if (ret == 1) {
10,385✔
90
            break;
10,385✔
91
        }
10,385✔
92

×
93
        REALM_ASSERT_RELEASE(ret < 0);
×
94
        int err = errno;
×
95
        if (err == EWOULDBLOCK || err == EAGAIN) {
×
96
            REALM_ASSERT_RELEASE(read_first); // otherwise this is just an infinite loop.
×
97
            continue;
×
98
        }
×
99
        throw std::system_error(err, std::system_category());
×
100
    }
×
101
}
10,385✔
102

103
class DaemonThread {
104
public:
105
    DaemonThread();
106
    ~DaemonThread();
107

108
    void add(int fd, RealmCoordinator*) REQUIRES(!m_mutex);
109
    void remove(int fd, RealmCoordinator*) REQUIRES(!m_mutex, !m_running_on_change_mutex);
110

111
    static DaemonThread& shared();
112

113
private:
114
    void listen() REQUIRES(!m_mutex, !m_running_on_change_mutex);
115

116
    // The listener thread
117
    std::thread m_thread;
118
    // File descriptor for epoll
119
    FdHolder m_epoll_fd;
120
    // The two ends of an anonymous pipe used to notify the thread that it should be shut down.
121
    FdHolder m_shutdown_read_fd;
122
    FdHolder m_shutdown_write_fd;
123

124
    util::CheckedMutex m_mutex;
125
    // Safely removing things from epoll is somewhat difficult. epoll_ctl
126
    // itself is thread-safe, but EPOLL_CTL_DEL does not remove the fd from the
127
    // ready list, and of course we may be processing an event on the fd at the
128
    // same time as it's removed. To deal with this, we keep track of the
129
    // currently RealmCoordinators and when we get an event, check that the
130
    // pointer is still in this vector while holding the mutex.
131
    std::vector<RealmCoordinator*> m_live_coordinators GUARDED_BY(m_mutex);
132

133
    // We want destroying an ExternalCommitHelper to block if it's currently
134
    // running on a background thread to ensure that `Realm::close()`
135
    // synchronously closes the file even if notifiers are currently running.
136
    // To avoid lock-order inversions, this needs to be done with a separate
137
    // mutex from the one which guards `m_live_notifiers`.
138
    util::CheckedMutex m_running_on_change_mutex;
139
};
140

141
DaemonThread::DaemonThread()
142
{
2✔
143
    m_epoll_fd = epoll_create(1);
2✔
144
    if (m_epoll_fd == -1) {
2✔
145
        throw std::system_error(errno, std::system_category());
×
146
    }
×
147

2✔
148
    // Create the anonymous pipe
2✔
149
    int pipe_fd[2];
2✔
150
    int ret = pipe(pipe_fd);
2✔
151
    if (ret == -1) {
2✔
152
        throw std::system_error(errno, std::system_category());
×
153
    }
×
154

2✔
155
    m_shutdown_read_fd = pipe_fd[0];
2✔
156
    m_shutdown_write_fd = pipe_fd[1];
2✔
157

2✔
158
    make_non_blocking(m_shutdown_read_fd);
2✔
159
    make_non_blocking(m_shutdown_write_fd);
2✔
160

2✔
161
    epoll_event event{};
2✔
162
    event.events = EPOLLIN;
2✔
163
    event.data.ptr = this;
2✔
164
    ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_shutdown_read_fd, &event);
2✔
165
    if (ret != 0) {
2✔
166
        int err = errno;
×
167
        throw std::system_error(err, std::system_category());
×
168
    }
×
169

2✔
170
    m_thread = std::thread([this] {
2✔
171
        try {
2✔
172
            listen();
2✔
173
        }
2✔
174
        catch (std::exception const& e) {
2✔
175
            LOGE("uncaught exception in notifier thread: %s: %s\n", typeid(e).name(), e.what());
×
176
            throw;
×
177
        }
×
178
        catch (...) {
×
179
            LOGE("uncaught exception in notifier thread\n");
×
180
            throw;
×
181
        }
×
182
    });
2✔
183
}
2✔
184

185
DaemonThread::~DaemonThread()
186
{
2✔
187
    // Not reading first since we know we have never written, and it is illegal
2✔
188
    // to read from the write-side of the pipe. Unlike a fifo, where in and out
2✔
189
    // sides share an fd, with an anonymous pipe, they each have a dedicated fd.
2✔
190
    notify_fd(m_shutdown_write_fd, /*read_first=*/false);
2✔
191
    m_thread.join(); // Wait for the thread to exit
2✔
192
}
2✔
193

194
DaemonThread& DaemonThread::shared()
195
{
5,604✔
196
    static DaemonThread daemon_thread;
5,604✔
197
    return daemon_thread;
5,604✔
198
}
5,604✔
199

200
void DaemonThread::add(int fd, RealmCoordinator* coordinator)
201
{
2,802✔
202
    {
2,802✔
203
        util::CheckedLockGuard lock(m_mutex);
2,802✔
204
        m_live_coordinators.push_back(coordinator);
2,802✔
205
    }
2,802✔
206

2,802✔
207
    epoll_event event{};
2,802✔
208
    event.events = EPOLLIN | EPOLLET;
2,802✔
209
    event.data.ptr = coordinator;
2,802✔
210
    int ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &event);
2,802✔
211
    if (ret != 0) {
2,802✔
212
        int err = errno;
×
213
        throw std::system_error(err, std::system_category());
×
214
    }
×
215
}
2,802✔
216

217
void DaemonThread::remove(int fd, RealmCoordinator* coordinator)
218
{
2,802✔
219
    {
2,802✔
220
        util::CheckedLockGuard lock_1(m_running_on_change_mutex);
2,802✔
221
        util::CheckedLockGuard lock_2(m_mutex);
2,802✔
222
        // std::erase(m_live_coordinators, coordinator); in c++20
2,802✔
223
        auto it = std::find(m_live_coordinators.begin(), m_live_coordinators.end(), coordinator);
2,802✔
224
        if (it != m_live_coordinators.end()) {
2,802✔
225
            m_live_coordinators.erase(it);
2,802✔
226
        }
2,802✔
227
    }
2,802✔
228
    epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
2,802✔
229
}
2,802✔
230

231
void DaemonThread::listen()
232
{
2✔
233
    pthread_setname_np(pthread_self(), "Realm notification listener");
2✔
234

2✔
235
    int ret;
2✔
236

2✔
237
    while (true) {
10,369✔
238
        epoll_event ev{};
10,369✔
239
        ret = epoll_wait(m_epoll_fd, &ev, 1, -1);
10,369✔
240

10,369✔
241
        if (ret == -1 && errno == EINTR) {
10,369✔
242
            // Interrupted system call, try again.
×
243
            continue;
×
244
        }
×
245

10,369✔
246
        if (ret == -1) {
10,369✔
247
            int err = errno;
×
248
            throw std::system_error(err, std::system_category());
×
249
        }
×
250
        if (ret == 0) {
10,369✔
251
            // Spurious wakeup; just wait again
×
252
            continue;
×
253
        }
×
254

10,369✔
255
        if (ev.data.ptr == this) {
10,369✔
256
            // Shutdown fd was notified, so exit
2✔
257
            return;
2✔
258
        }
2✔
259

10,367✔
260
        // One of the ExternalCommitHelper fds was notified. We need to check
10,367✔
261
        // if the target is still alive while holding m_mutex, but we can't
10,367✔
262
        // hold it while calling on_change() as that would lead to a lock-order
10,367✔
263
        // inversions with one of RealmCoordinator's mutexes.
10,367✔
264
        // m_running_on_change_mutex guarantees that the coordinator is not
10,367✔
265
        // torn down while we're inside on_change(), while allowing new
10,367✔
266
        // coordinators to be added.
10,367✔
267
        util::CheckedLockGuard lock(m_running_on_change_mutex);
10,367✔
268
        RealmCoordinator* coordinator = static_cast<RealmCoordinator*>(ev.data.ptr);
10,367✔
269
        {
10,367✔
270
            util::CheckedLockGuard lock(m_mutex);
10,367✔
271
            auto it = std::find(m_live_coordinators.begin(), m_live_coordinators.end(), coordinator);
10,367✔
272
            if (it == m_live_coordinators.end()) {
10,367✔
UNCOV
273
                continue;
×
UNCOV
274
            }
×
275
        }
10,367✔
276

10,367✔
277
        coordinator->on_change();
10,367✔
278
    }
10,367✔
279
}
2✔
280

281
} // anonymous namespace
282

283
void FdHolder::close()
284
{
5,617✔
285
    if (m_fd != -1) {
5,617✔
286
        ::close(m_fd);
2,808✔
287
    }
2,808✔
288
    m_fd = -1;
5,617✔
289
}
5,617✔
290

291
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent, const RealmConfig& config)
292
    : m_parent(parent)
293
{
2,803✔
294
    // Object Store needs to create a named pipe in order to coordinate notifications.
2,803✔
295
    // This can be a problem on some file systems (e.g. FAT32) or due to security policies in SELinux. Most commonly
2,803✔
296
    // it is a problem when saving Realms on external storage:
2,803✔
297
    // https://stackoverflow.com/questions/2740321/how-to-create-named-pipe-mkfifo-in-android
2,803✔
298
    //
2,803✔
299
    // For this reason we attempt to create this file in a temporary location known to be safe to write these files.
2,803✔
300
    //
2,803✔
301
    // In order of priority we attempt to write the file in the following locations:
2,803✔
302
    //  1) Next to the Realm file itself
2,803✔
303
    //  2) A location defined by `Realm::Config::fifo_files_fallback_path`
2,803✔
304
    //  3) A location defined by `DBOptions::set_sys_tmp_dir()`
2,803✔
305
    //
2,803✔
306
    // Core has a similar policy for its named pipes.
2,803✔
307
    //
2,803✔
308
    // Also see https://github.com/realm/realm-java/issues/3140
2,803✔
309
    // Note that hash collisions are okay here because they just result in doing extra work instead of resulting
2,803✔
310
    // in correctness problems.
2,803✔
311

2,803✔
312
    std::string path;
2,803✔
313
    std::string temp_dir = util::normalize_dir(config.fifo_files_fallback_path);
2,803✔
314
    std::string sys_temp_dir = util::normalize_dir(DBOptions::get_sys_tmp_dir());
2,803✔
315
    path = DB::get_core_file(config.path, DB::CoreFileType::Note);
2,803✔
316
    bool fifo_created = realm::util::try_create_fifo(path, !temp_dir.empty() || !sys_temp_dir.empty());
2,803✔
317
    if (!fifo_created && !temp_dir.empty()) {
2,803✔
318
        path = DB::get_core_file(util::format("%1realm_%2", temp_dir, std::hash<std::string>()(config.path)),
2✔
319
                                 DB::CoreFileType::Note);
2✔
320
        fifo_created = realm::util::try_create_fifo(path, !sys_temp_dir.empty());
2✔
321
    }
2✔
322
    if (!fifo_created && !sys_temp_dir.empty()) {
2,803✔
323
        path = DB::get_core_file(util::format("%1realm_%2", sys_temp_dir, std::hash<std::string>()(config.path)),
1✔
324
                                 DB::CoreFileType::Note);
1✔
325
        realm::util::create_fifo(path);
1✔
326
    }
1✔
327

2,803✔
328
    m_notify_fd = open(path.c_str(), O_RDWR);
2,803✔
329
    if (m_notify_fd == -1) {
2,803✔
330
        throw std::system_error(errno, std::system_category());
×
331
    }
×
332

2,803✔
333
    make_non_blocking(m_notify_fd);
2,803✔
334

2,803✔
335
    DaemonThread::shared().add(m_notify_fd, &m_parent);
2,803✔
336
}
2,803✔
337

338
ExternalCommitHelper::~ExternalCommitHelper()
339
{
2,802✔
340
    DaemonThread::shared().remove(m_notify_fd, &m_parent);
2,802✔
341
}
2,802✔
342

343
void ExternalCommitHelper::notify_others()
344
{
10,383✔
345
    notify_fd(m_notify_fd);
10,383✔
346
}
10,383✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc