• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Kozea / Radicale / 17385350629

01 Sep 2025 06:58PM UTC coverage: 73.703% (-0.2%) from 73.856%
17385350629

push

github

web-flow
Merge pull request #1862 from pbiering/support-1856

Final fix for https://github.com/Kozea/Radicale/issues/1856

2122 of 3053 branches covered (69.51%)

Branch coverage included in aggregate %.

29 of 50 new or added lines in 2 files covered. (58.0%)

3 existing lines in 1 file now uncovered.

4980 of 6583 relevant lines covered (75.65%)

12.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.52
/radicale/pathutils.py
1
# This file is part of Radicale - CalDAV and CardDAV server
2
# Copyright © 2014 Jean-Marc Martins
3
# Copyright © 2012-2017 Guillaume Ayoub
4
# Copyright © 2017-2022 Unrud <unrud@outlook.com>
5
# Copyright © 2025-2025 Peter Bieringer <pb@bieringer.de>
6
#
7
# This library is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This library is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with Radicale.  If not, see <http://www.gnu.org/licenses/>.
19

20
"""
3✔
21
Helper functions for working with the file system.
22

23
"""
24

25
import errno
17✔
26
import os
17✔
27
import pathlib
17✔
28
import posixpath
17✔
29
import sys
17✔
30
import threading
17✔
31
from tempfile import TemporaryDirectory
17✔
32
from typing import Iterator, Type, Union
17✔
33

34
from radicale import storage, types, utils
17✔
35

36
if sys.platform == "win32":
17✔
37
    import ctypes
5✔
38
    import ctypes.wintypes
5✔
39
    import msvcrt
5✔
40

41
    LOCKFILE_EXCLUSIVE_LOCK: int = 2
5✔
42
    ULONG_PTR: Union[Type[ctypes.c_uint32], Type[ctypes.c_uint64]]
5✔
43
    if ctypes.sizeof(ctypes.c_void_p) == 4:
5!
44
        ULONG_PTR = ctypes.c_uint32
×
45
    else:
46
        ULONG_PTR = ctypes.c_uint64
5✔
47

48
    class Overlapped(ctypes.Structure):
5✔
49
        _fields_ = [
5✔
50
            ("internal", ULONG_PTR),
51
            ("internal_high", ULONG_PTR),
52
            ("offset", ctypes.wintypes.DWORD),
53
            ("offset_high", ctypes.wintypes.DWORD),
54
            ("h_event", ctypes.wintypes.HANDLE)]
55

56
    kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
5✔
57
    lock_file_ex = kernel32.LockFileEx
5✔
58
    lock_file_ex.argtypes = [
5✔
59
        ctypes.wintypes.HANDLE,
60
        ctypes.wintypes.DWORD,
61
        ctypes.wintypes.DWORD,
62
        ctypes.wintypes.DWORD,
63
        ctypes.wintypes.DWORD,
64
        ctypes.POINTER(Overlapped)]
65
    lock_file_ex.restype = ctypes.wintypes.BOOL
5✔
66
    unlock_file_ex = kernel32.UnlockFileEx
5✔
67
    unlock_file_ex.argtypes = [
5✔
68
        ctypes.wintypes.HANDLE,
69
        ctypes.wintypes.DWORD,
70
        ctypes.wintypes.DWORD,
71
        ctypes.wintypes.DWORD,
72
        ctypes.POINTER(Overlapped)]
73
    unlock_file_ex.restype = ctypes.wintypes.BOOL
5✔
74
else:
75
    import fcntl
12✔
76

77
if sys.platform == "linux":
17✔
78
    import ctypes
6✔
79

80
    RENAME_EXCHANGE: int = 2
6✔
81
    renameat2 = None
6✔
82
    try:
6✔
83
        renameat2 = ctypes.CDLL(None, use_errno=True).renameat2
6✔
84
    except AttributeError:
×
85
        pass
×
86
    else:
87
        renameat2.argtypes = [
6✔
88
            ctypes.c_int, ctypes.c_char_p,
89
            ctypes.c_int, ctypes.c_char_p,
90
            ctypes.c_uint]
91
        renameat2.restype = ctypes.c_int
6✔
92

93
if sys.platform == "darwin":
17✔
94
    # Definition missing in PyPy
95
    F_FULLFSYNC: int = getattr(fcntl, "F_FULLFSYNC", 51)
6✔
96

97

98
class RwLock:
17✔
99
    """A readers-Writer lock that locks a file."""
100

101
    _path: str
17✔
102
    _readers: int
17✔
103
    _writer: bool
17✔
104
    _lock: threading.Lock
17✔
105

106
    def __init__(self, path: str) -> None:
17✔
107
        self._path = path
17✔
108
        self._readers = 0
17✔
109
        self._writer = False
17✔
110
        self._lock = threading.Lock()
17✔
111

112
    @property
17✔
113
    def locked(self) -> str:
17✔
114
        with self._lock:
17✔
115
            if self._readers > 0:
17✔
116
                return "r"
17✔
117
            if self._writer:
17✔
118
                return "w"
17✔
119
            return ""
17✔
120

121
    @types.contextmanager
17✔
122
    def acquire(self, mode: str) -> Iterator[None]:
17✔
123
        if mode not in "rw":
17!
124
            raise ValueError("Invalid mode: %r" % mode)
×
125
        with open(self._path, "w+") as lock_file:
17✔
126
            if sys.platform == "win32":
17✔
127
                handle = msvcrt.get_osfhandle(lock_file.fileno())
5✔
128
                flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
5✔
129
                overlapped = Overlapped()
5✔
130
                try:
5✔
131
                    if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
5!
132
                        raise ctypes.WinError()
×
133
                except OSError as e:
×
134
                    raise RuntimeError("Locking the storage failed: %s" % e
×
135
                                       ) from e
136
            else:
137
                _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
12✔
138
                try:
12✔
139
                    fcntl.flock(lock_file.fileno(), _cmd)
12✔
140
                except OSError as e:
×
141
                    raise RuntimeError("Locking the storage failed: %s" % e
×
142
                                       ) from e
143
            with self._lock:
17✔
144
                if self._writer or mode == "w" and self._readers != 0:
17!
145
                    raise RuntimeError("Locking the storage failed: "
×
146
                                       "Guarantees failed")
147
                if mode == "r":
17✔
148
                    self._readers += 1
17✔
149
                else:
150
                    self._writer = True
17✔
151
            try:
17✔
152
                yield
17✔
153
            finally:
154
                with self._lock:
17✔
155
                    if mode == "r":
17✔
156
                        self._readers -= 1
17✔
157
                    self._writer = False
17✔
158

159

160
def rename_exchange(src: str, dst: str) -> None:
17✔
161
    """Exchange the files or directories `src` and `dst`.
162

163
    Both `src` and `dst` must exist but may be of different types.
164

165
    On Linux with renameat2 the operation is atomic.
166
    On other platforms it's not atomic.
167

168
    """
169
    src_dir, src_base = os.path.split(src)
17✔
170
    dst_dir, dst_base = os.path.split(dst)
17✔
171
    src_dir = src_dir or os.curdir
17✔
172
    dst_dir = dst_dir or os.curdir
17✔
173
    if not src_base or not dst_base:
17!
174
        raise ValueError("Invalid arguments: %r -> %r" % (src, dst))
×
175
    if sys.platform == "linux" and renameat2:
17✔
176
        src_base_bytes = os.fsencode(src_base)
6✔
177
        dst_base_bytes = os.fsencode(dst_base)
6✔
178
        src_dir_fd = os.open(src_dir, 0)
6✔
179
        try:
6✔
180
            dst_dir_fd = os.open(dst_dir, 0)
6✔
181
            try:
6✔
182
                if renameat2(src_dir_fd, src_base_bytes,
6!
183
                             dst_dir_fd, dst_base_bytes,
184
                             RENAME_EXCHANGE) == 0:
185
                    return
6✔
186
                errno_ = ctypes.get_errno()
×
187
                # Fallback if RENAME_EXCHANGE not supported by filesystem
188
                if errno_ != errno.EINVAL:
×
189
                    raise OSError(errno_, os.strerror(errno_))
×
190
            finally:
191
                os.close(dst_dir_fd)
6✔
192
        finally:
193
            os.close(src_dir_fd)
6!
194
    with TemporaryDirectory(prefix=".Radicale.tmp-", dir=src_dir
11✔
195
                            ) as tmp_dir:
196
        os.rename(dst, os.path.join(tmp_dir, "interim"))
11✔
197
        os.rename(src, dst)
11✔
198
        os.rename(os.path.join(tmp_dir, "interim"), src)
11✔
199

200

201
def fsync(fd: int) -> None:
17✔
202
    if sys.platform == "darwin":
17✔
203
        try:
6✔
204
            fcntl.fcntl(fd, F_FULLFSYNC)
6✔
205
            return
6✔
206
        except OSError as e:
×
207
            # Fallback if F_FULLFSYNC not supported by filesystem
208
            if e.errno != errno.EINVAL:
×
209
                raise
×
210
    os.fsync(fd)
11✔
211

212

213
def strip_path(path: str) -> str:
17✔
214
    assert sanitize_path(path) == path
17✔
215
    return path.strip("/")
17✔
216

217

218
def unstrip_path(stripped_path: str, trailing_slash: bool = False) -> str:
17✔
219
    assert strip_path(sanitize_path(stripped_path)) == stripped_path
17✔
220
    assert stripped_path or trailing_slash
17✔
221
    path = "/%s" % stripped_path
17✔
222
    if trailing_slash and not path.endswith("/"):
17✔
223
        path += "/"
17✔
224
    return path
17✔
225

226

227
def sanitize_path(path: str) -> str:
17✔
228
    """Make path absolute with leading slash to prevent access to other data.
229

230
    Preserve potential trailing slash.
231

232
    """
233
    trailing_slash = "/" if path.endswith("/") else ""
17✔
234
    path = posixpath.normpath(path)
17✔
235
    new_path = "/"
17✔
236
    for part in path.split("/"):
17✔
237
        if not is_safe_path_component(part):
17✔
238
            continue
17✔
239
        new_path = posixpath.join(new_path, part)
17✔
240
    trailing_slash = "" if new_path.endswith("/") else trailing_slash
17✔
241
    return new_path + trailing_slash
17✔
242

243

244
def is_safe_path_component(path: str) -> bool:
17✔
245
    """Check if path is a single component of a path.
246

247
    Check that the path is safe to join too.
248

249
    """
250
    return bool(path) and "/" not in path and path not in (".", "..")
17✔
251

252

253
def is_safe_filesystem_path_component(path: str) -> bool:
17✔
254
    """Check if path is a single component of a local and posix filesystem
255
       path.
256

257
    Check that the path is safe to join too.
258

259
    """
260
    return (
17✔
261
        bool(path) and not os.path.splitdrive(path)[0] and
262
        (sys.platform != "win32" or ":" not in path) and  # Block NTFS-ADS
263
        not os.path.split(path)[0] and path not in (os.curdir, os.pardir) and
264
        not path.startswith(".") and not path.endswith("~") and
265
        is_safe_path_component(path))
266

267

268
def path_to_filesystem(root: str, sane_path: str) -> str:
17✔
269
    """Convert `sane_path` to a local filesystem path relative to `root`.
270

271
    `root` must be a secure filesystem path, it will be prepend to the path.
272

273
    `sane_path` must be a sanitized path without leading or trailing ``/``.
274

275
    Conversion of `sane_path` is done in a secure manner,
276
    or raises ``ValueError``.
277

278
    """
279
    assert sane_path == strip_path(sanitize_path(sane_path))
17✔
280
    safe_path = root
17✔
281
    parts = sane_path.split("/") if sane_path else []
17✔
282
    for part in parts:
17✔
283
        if not is_safe_filesystem_path_component(part):
17!
284
            raise UnsafePathError(part)
×
285
        safe_path_parent = safe_path
17✔
286
        safe_path = os.path.join(safe_path, part)
17✔
287
        # Check for conflicting files (e.g. case-insensitive file systems
288
        # or short names on Windows file systems)
289
        if (os.path.lexists(safe_path) and
17!
290
                part not in (e.name for e in os.scandir(safe_path_parent))):
291
            raise CollidingPathError(part)
×
292
    return safe_path
17✔
293

294

295
class UnsafePathError(ValueError):
17✔
296

297
    def __init__(self, path: str) -> None:
17✔
298
        super().__init__("Can't translate name safely to filesystem: %r" %
×
299
                         path)
300

301

302
class CollidingPathError(ValueError):
17✔
303

304
    def __init__(self, path: str) -> None:
17✔
305
        super().__init__("File name collision: %r" % path)
×
306

307

308
def name_from_path(path: str, collection: "storage.BaseCollection") -> str:
17✔
309
    """Return Radicale item name from ``path``."""
310
    assert sanitize_path(path) == path
17✔
311
    start = unstrip_path(collection.path, True)
17✔
312
    if not (path + "/").startswith(start):
17!
313
        raise ValueError("%r doesn't start with %r" % (path, start))
×
314
    name = path[len(start):]
17✔
315
    if name and not is_safe_path_component(name):
17!
316
        raise ValueError("%r is not a component in collection %r" %
×
317
                         (name, collection.path))
318
    return name
17✔
319

320

321
def path_permissions(path):
17✔
322
    path = pathlib.Path(path)
17✔
323

324
    try:
17✔
325
        uid = utils.unknown_if_empty(path.stat().st_uid)
17✔
NEW
326
    except (KeyError, NotImplementedError):
×
NEW
327
        uid = "UNKNOWN"
×
328

329
    try:
17✔
330
        gid = utils.unknown_if_empty(path.stat().st_gid)
17✔
NEW
331
    except (KeyError, NotImplementedError):
×
NEW
332
        gid = "UNKNOWN"
×
333

334
    try:
17✔
335
        mode = utils.unknown_if_empty("%o" % path.stat().st_mode)
17✔
NEW
336
    except (KeyError, NotImplementedError):
×
NEW
337
        mode = "UNKNOWN"
×
338

339
    try:
17✔
340
        owner = utils.unknown_if_empty(path.owner())
17✔
341
    except (KeyError, NotImplementedError):
5✔
342
        owner = "UNKNOWN"
5✔
343

344
    try:
17✔
345
        group = utils.unknown_if_empty(path.group())
17✔
346
    except (KeyError, NotImplementedError):
5✔
347
        group = "UNKNOWN"
5✔
348

349
    return [owner, uid, group, gid, mode]
17✔
350

351

352
def path_permissions_as_string(path):
17✔
353
    pp = path_permissions(path)
17✔
354
    s = "path=%r owner=%s(%s) group=%s(%s) mode=%s" % (path, pp[0], pp[1], pp[2], pp[3], pp[4])
17✔
355
    return s
17✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc