• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.69
/src/python/pants/util/dirutil.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import atexit
1✔
7
import errno
1✔
8
import os
1✔
9
import shutil
1✔
10
import stat
1✔
11
import tempfile
1✔
12
import threading
1✔
13
import uuid
1✔
14
from collections import defaultdict
1✔
15
from collections.abc import Callable, Iterable, Iterator, Sequence
1✔
16
from contextlib import contextmanager
1✔
17
from pathlib import Path
1✔
18
from typing import Any, DefaultDict, Literal, overload
1✔
19

20
from pants.util.strutil import ensure_text
1✔
21

22

23
def longest_dir_prefix(path: str, prefixes: Sequence[str]) -> str | None:
1✔
24
    """Given a list of prefixes, return the one that is the longest prefix to the given path.
25

26
    Returns None if there are no matches.
27
    """
UNCOV
28
    longest_match, longest_prefix = 0, None
×
UNCOV
29
    for prefix in prefixes:
×
UNCOV
30
        if fast_relpath_optional(path, prefix) is not None and len(prefix) > longest_match:
×
UNCOV
31
            longest_match, longest_prefix = len(prefix), prefix
×
32

UNCOV
33
    return longest_prefix
×
34

35

36
def fast_relpath(path: str, start: str) -> str:
1✔
37
    """A prefix-based relpath, with no normalization or support for returning `..`."""
UNCOV
38
    relpath = fast_relpath_optional(path, start)
×
UNCOV
39
    if relpath is None:
×
UNCOV
40
        raise ValueError(f"{start} is not a directory containing {path}")
×
UNCOV
41
    return relpath
×
42

43

44
def fast_relpath_optional(path: str, start: str) -> str | None:
1✔
45
    """A prefix-based relpath, with no normalization or support for returning `..`.
46

47
    Returns None if `start` is not a directory-aware prefix of `path`.
48
    """
UNCOV
49
    if len(start) == 0:
×
50
        # Empty prefix.
UNCOV
51
        return path
×
52

53
    # Determine where the matchable prefix ends.
UNCOV
54
    pref_end = len(start) - 1 if start[-1] == "/" else len(start)
×
UNCOV
55
    if pref_end > len(path):
×
56
        # The prefix is too long to match.
UNCOV
57
        return None
×
UNCOV
58
    elif path[:pref_end] == start[:pref_end] and (len(path) == pref_end or path[pref_end] == "/"):
×
59
        # The prefix matches, and the entries are either identical, or the suffix indicates that
60
        # the prefix is a directory.
UNCOV
61
        return path[pref_end + 1 :]
×
UNCOV
62
    return None
×
63

64

65
def safe_mkdir(directory: str | Path, clean: bool = False) -> None:
1✔
66
    """Ensure a directory is present.
67

68
    If it's not there, create it.  If it is, no-op. If clean is True, ensure the dir is empty.
69

70
    :API: public
71
    """
72
    if clean:
1✔
UNCOV
73
        safe_rmtree(directory)
×
74
    try:
1✔
75
        os.makedirs(directory)
1✔
76
    except OSError as e:
1✔
77
        if e.errno != errno.EEXIST:
1✔
78
            raise
1✔
79

80

81
def safe_mkdir_for(path: str | Path, clean: bool = False) -> None:
1✔
82
    """Ensure that the parent directory for a file is present.
83

84
    If it's not there, create it. If it is, no-op.
85
    """
86
    dirname = os.path.dirname(path)
1✔
87
    if dirname:
1✔
88
        safe_mkdir(dirname, clean=clean)
1✔
89

90

91
def safe_file_dump(
1✔
92
    filename: str, payload: bytes | str = "", mode: str = "w", makedirs: bool = False
93
) -> None:
94
    """Write a string to a file.
95

96
    This method is "safe" to the extent that `safe_open` is "safe". See the explanation on the method
97
    doc there.
98

99
    When `payload` is an empty string (the default), this method can be used as a concise way to
100
    create an empty file along with its containing directory (or truncate it if it already exists).
101

102
    :param filename: The filename of the file to write to.
103
    :param payload: The string to write to the file.
104
    :param mode: A mode argument for the python `open` builtin which should be a write mode variant.
105
                 Defaults to 'w'.
106
    :param makedirs: Whether to make all parent directories of this file before making it.
107
    """
108
    if makedirs:
1✔
109
        os.makedirs(os.path.dirname(filename), exist_ok=True)
1✔
110
    with safe_open(filename, mode=mode) as f:
1✔
111
        f.write(payload)
1✔
112

113

114
@overload
115
def maybe_read_file(filename: str) -> str | None: ...
116

117

118
@overload
119
def maybe_read_file(filename: str, binary_mode: Literal[False]) -> str | None: ...
120

121

122
@overload
123
def maybe_read_file(filename: str, binary_mode: Literal[True]) -> bytes | None: ...
124

125

126
@overload
127
def maybe_read_file(filename: str, binary_mode: bool) -> bytes | str | None: ...
128

129

130
def maybe_read_file(filename: str, binary_mode: bool = False) -> bytes | str | None:
1✔
131
    """Read and return the contents of a file in a single file.read().
132

133
    :param filename: The filename of the file to read.
134
    :param binary_mode: Read from file as bytes or unicode.
135
    :returns: The contents of the file, or None if opening the file fails for any reason
136
    """
UNCOV
137
    try:
×
UNCOV
138
        return read_file(filename, binary_mode=binary_mode)
×
UNCOV
139
    except OSError:
×
UNCOV
140
        return None
×
141

142

143
@overload
144
def read_file(filename: str) -> str: ...
145

146

147
@overload
148
def read_file(filename: str, binary_mode: Literal[False]) -> str: ...
149

150

151
@overload
152
def read_file(filename: str, binary_mode: Literal[True]) -> bytes: ...
153

154

155
@overload
156
def read_file(filename: str, binary_mode: bool) -> bytes | str: ...
157

158

159
def read_file(filename: str, binary_mode: bool = False) -> bytes | str:
1✔
160
    """Read and return the contents of a file in a single file.read().
161

162
    :param filename: The filename of the file to read.
163
    :param binary_mode: Read from file as bytes or unicode.
164
    :returns: The contents of the file.
165
    """
UNCOV
166
    mode = "rb" if binary_mode else "r"
×
UNCOV
167
    with open(filename, mode) as f:
×
UNCOV
168
        content: bytes | str = f.read()
×
UNCOV
169
        return content
×
170

171

172
def safe_walk(path: bytes | str, **kwargs: Any) -> Iterator[tuple[str, list[str], list[str]]]:
1✔
173
    """Just like os.walk, but ensures that the returned values are unicode objects.
174

175
    This isn't strictly safe, in that it is possible that some paths
176
    will not be decodeable, but that case is rare, and the only
177
    alternative is to somehow avoid all interaction between paths and
178
    unicode objects, which seems especially tough in the presence of
179
    unicode_literals. See e.g.
180
    https://mail.python.org/pipermail/python-dev/2008-December/083856.html
181

182
    :API: public
183
    """
184
    # If os.walk is given a text argument, it yields text values; if it
185
    # is given a binary argument, it yields binary values.
UNCOV
186
    return os.walk(ensure_text(path), **kwargs)
×
187

188

189
_MkdtempCleanerType = Callable[[], None]
1✔
190
_MKDTEMP_CLEANER: _MkdtempCleanerType | None = None
1✔
191
_MKDTEMP_DIRS: DefaultDict[int, set[str]] = defaultdict(set)
1✔
192
_MKDTEMP_LOCK = threading.RLock()
1✔
193

194

195
def _mkdtemp_atexit_cleaner() -> None:
1✔
UNCOV
196
    for td in _MKDTEMP_DIRS.pop(os.getpid(), []):
×
UNCOV
197
        safe_rmtree(td)
×
198

199

200
def _mkdtemp_unregister_cleaner() -> None:
1✔
201
    global _MKDTEMP_CLEANER
UNCOV
202
    _MKDTEMP_CLEANER = None
×
203

204

205
def _mkdtemp_register_cleaner(cleaner: _MkdtempCleanerType) -> None:
1✔
206
    global _MKDTEMP_CLEANER
UNCOV
207
    assert callable(cleaner)
×
UNCOV
208
    if _MKDTEMP_CLEANER is None:
×
UNCOV
209
        atexit.register(cleaner)
×
UNCOV
210
        _MKDTEMP_CLEANER = cleaner
×
211

212

213
def safe_mkdtemp(cleaner: _MkdtempCleanerType = _mkdtemp_atexit_cleaner, **kw: Any) -> str:
1✔
214
    """Create a temporary directory that is cleaned up on process exit.
215

216
    Arguments are as to tempfile.mkdtemp.
217

218
    :API: public
219
    """
220
    # Proper lock sanitation on fork [issue 6721] would be desirable here.
UNCOV
221
    with _MKDTEMP_LOCK:
×
UNCOV
222
        return register_rmtree(tempfile.mkdtemp(**kw), cleaner=cleaner)
×
223

224

225
def register_rmtree(directory: str, cleaner: _MkdtempCleanerType = _mkdtemp_atexit_cleaner) -> str:
1✔
226
    """Register an existing directory to be cleaned up at process exit."""
UNCOV
227
    with _MKDTEMP_LOCK:
×
UNCOV
228
        _mkdtemp_register_cleaner(cleaner)
×
UNCOV
229
        _MKDTEMP_DIRS[os.getpid()].add(directory)
×
UNCOV
230
    return directory
×
231

232

233
def safe_rmtree(directory: str | Path) -> None:
1✔
234
    """Delete a directory if it's present. If it's not present, no-op.
235

236
    Note that if the directory argument is a symlink, only the symlink will
237
    be deleted.
238

239
    :API: public
240
    """
UNCOV
241
    if os.path.islink(directory):
×
UNCOV
242
        safe_delete(directory)
×
243
    else:
UNCOV
244
        shutil.rmtree(directory, ignore_errors=True)
×
245

246

247
def safe_open(filename, *args, **kwargs):
1✔
248
    """Open a file safely, ensuring that its directory exists.
249

250
    :API: public
251
    """
252
    safe_mkdir_for(filename)
1✔
253
    return open(filename, *args, **kwargs)
1✔
254

255

256
def safe_delete(filename: str | Path) -> None:
1✔
257
    """Delete a file safely.
258

259
    If it's not present, no-op.
260
    """
UNCOV
261
    try:
×
UNCOV
262
        os.unlink(filename)
×
263
    except OSError as e:
×
264
        if e.errno != errno.ENOENT:
×
265
            raise
×
266

267

268
def safe_concurrent_rename(src: str, dst: str) -> None:
1✔
269
    """Rename src to dst, ignoring errors due to dst already existing.
270

271
    Useful when concurrent processes may attempt to create dst, and it doesn't matter who wins.
272
    """
273
    # Delete dst, in case it existed (with old content) even before any concurrent processes
274
    # attempted this write. This ensures that at least one process writes the new content.
UNCOV
275
    if os.path.isdir(src):  # Note that dst may not exist, so we test for the type of src.
×
UNCOV
276
        safe_rmtree(dst)
×
277
    else:
278
        safe_delete(dst)
×
UNCOV
279
    try:
×
UNCOV
280
        shutil.move(src, dst)
×
281
    except OSError as e:
×
282
        if e.errno != errno.EEXIST:
×
283
            raise
×
284

285

286
@contextmanager
1✔
287
def safe_concurrent_creation(target_path: str) -> Iterator[str]:
1✔
288
    """A contextmanager that yields a temporary path and renames it to a final target path when the
289
    contextmanager exits.
290

291
    Useful when concurrent processes may attempt to create a file, and it doesn't matter who wins.
292

293
    :param target_path: The final target path to rename the temporary path to.
294
    :yields: A temporary path containing the original path with a unique (uuid4) suffix.
295
    """
UNCOV
296
    safe_mkdir_for(target_path)
×
UNCOV
297
    tmp_path = f"{target_path}.tmp.{uuid.uuid4().hex}"
×
UNCOV
298
    try:
×
UNCOV
299
        yield tmp_path
×
UNCOV
300
    except Exception:
×
UNCOV
301
        rm_rf(tmp_path)
×
UNCOV
302
        raise
×
303
    else:
UNCOV
304
        if os.path.exists(tmp_path):
×
UNCOV
305
            safe_concurrent_rename(tmp_path, target_path)
×
306

307

308
def chmod_plus_x(path: str) -> None:
1✔
309
    """Equivalent of unix `chmod a+x path`"""
310
    path_mode = os.stat(path).st_mode
×
311
    path_mode &= int("777", 8)
×
312
    if path_mode & stat.S_IRUSR:
×
313
        path_mode |= stat.S_IXUSR
×
314
    if path_mode & stat.S_IRGRP:
×
315
        path_mode |= stat.S_IXGRP
×
316
    if path_mode & stat.S_IROTH:
×
317
        path_mode |= stat.S_IXOTH
×
318
    os.chmod(path, path_mode)
×
319

320

321
def absolute_symlink(source_path: str, target_path: str) -> None:
1✔
322
    """Create a symlink at target pointing to source using the absolute path.
323

324
    :param source_path: Absolute path to source file
325
    :param target_path: Absolute path to intended symlink
326
    :raises ValueError if source_path or link_path are not unique, absolute paths
327
    :raises OSError on failure UNLESS file already exists or no such file/directory
328
    """
UNCOV
329
    if not os.path.isabs(source_path):
×
330
        raise ValueError(f"Path for source : {source_path} must be absolute")
×
UNCOV
331
    if not os.path.isabs(target_path):
×
332
        raise ValueError(f"Path for link : {target_path} must be absolute")
×
UNCOV
333
    if source_path == target_path:
×
334
        raise ValueError(f"Path for link is identical to source : {source_path}")
×
UNCOV
335
    try:
×
UNCOV
336
        if os.path.lexists(target_path):
×
UNCOV
337
            if os.path.islink(target_path) or os.path.isfile(target_path):
×
UNCOV
338
                os.unlink(target_path)
×
339
            else:
UNCOV
340
                shutil.rmtree(target_path)
×
UNCOV
341
        safe_mkdir_for(target_path)
×
UNCOV
342
        os.symlink(source_path, target_path)
×
343
    except OSError as e:
×
344
        # Another run may beat us to deletion or creation.
345
        if not (e.errno == errno.EEXIST or e.errno == errno.ENOENT):
×
346
            raise
×
347

348

349
def relative_symlink(source_path: str, link_path: str) -> None:
1✔
350
    """Create a symlink at link_path pointing to relative source.
351

352
    :param source_path: Absolute path to source file
353
    :param link_path: Absolute path to intended symlink
354
    :raises ValueError if source_path or link_path are not unique, absolute paths
355
    :raises OSError on failure UNLESS file already exists or no such file/directory
356
    """
UNCOV
357
    if not os.path.isabs(source_path):
×
UNCOV
358
        raise ValueError(f"Path for source:{source_path} must be absolute")
×
UNCOV
359
    if not os.path.isabs(link_path):
×
UNCOV
360
        raise ValueError(f"Path for link:{link_path} must be absolute")
×
UNCOV
361
    if source_path == link_path:
×
UNCOV
362
        raise ValueError(f"Path for link is identical to source:{source_path}")
×
363
    # The failure state below had a long life as an uncaught error. No behavior was changed here, it just adds a catch.
364
    # Raising an exception does differ from absolute_symlink, which takes the liberty of deleting existing directories.
UNCOV
365
    if os.path.isdir(link_path) and not os.path.islink(link_path):
×
UNCOV
366
        raise ValueError(f"Path for link would overwrite an existing directory: {link_path}")
×
UNCOV
367
    try:
×
UNCOV
368
        if os.path.lexists(link_path):
×
UNCOV
369
            os.unlink(link_path)
×
UNCOV
370
        rel_path = os.path.relpath(source_path, os.path.dirname(link_path))
×
UNCOV
371
        safe_mkdir_for(link_path)
×
UNCOV
372
        os.symlink(rel_path, link_path)
×
373
    except OSError as e:
×
374
        # Another run may beat us to deletion or creation.
375
        if not (e.errno == errno.EEXIST or e.errno == errno.ENOENT):
×
376
            raise
×
377

378

379
def touch(path: str, times: int | tuple[int, int] | None = None):
1✔
380
    """Equivalent of unix `touch path`.
381

382
    :API: public
383

384
    :path: The file to touch.
385
    :times Either a tuple of (atime, mtime) or else a single time to use for both.  If not
386
           specified both atime and mtime are updated to the current time.
387
    """
UNCOV
388
    if isinstance(times, tuple) and len(times) > 2:
×
389
        raise ValueError(
×
390
            "`times` must either be a tuple of (atime, mtime) or else a single time to use for both."
391
        )
UNCOV
392
    if isinstance(times, int):
×
393
        times = (times, times)
×
UNCOV
394
    with safe_open(path, "a"):
×
UNCOV
395
        os.utime(path, times)
×
396

397

398
def recursive_dirname(f: str) -> Iterator[str]:
1✔
399
    """Given a relative path like 'a/b/c/d', yield all ascending path components like:
400

401
    'a/b/c/d'
402
    'a/b/c'
403
    'a/b'
404
    'a'
405
    ''
406
    """
UNCOV
407
    prev = None
×
UNCOV
408
    while f != prev:
×
UNCOV
409
        yield f
×
UNCOV
410
        prev = f
×
UNCOV
411
        f = os.path.dirname(f)
×
UNCOV
412
    yield ""
×
413

414

415
def rm_rf(name: str) -> None:
1✔
416
    """Remove a file or a directory similarly to running `rm -rf <name>` in a UNIX shell.
417

418
    :param name: the name of the file or directory to remove.
419
    :raises: OSError on error.
420
    """
UNCOV
421
    if not os.path.exists(name):
×
UNCOV
422
        return
×
423

UNCOV
424
    try:
×
425
        # Avoid using safe_rmtree so we can detect failures.
UNCOV
426
        shutil.rmtree(name)
×
UNCOV
427
    except OSError as e:
×
UNCOV
428
        if e.errno == errno.ENOTDIR:
×
429
            # 'Not a directory', but a file. Attempt to os.unlink the file, raising OSError on failure.
UNCOV
430
            safe_delete(name)
×
UNCOV
431
        elif e.errno != errno.ENOENT:
×
432
            # Pass on 'No such file or directory', otherwise re-raise OSError to surface perm issues etc.
UNCOV
433
            raise
×
434

435

436
def group_by_dir(paths: Iterable[str]) -> dict[str, set[str]]:
1✔
437
    """For a list of file paths, returns a dict of directory path -> files in that dir."""
UNCOV
438
    ret = defaultdict(set)
×
UNCOV
439
    for path in paths:
×
UNCOV
440
        dirname, filename = os.path.split(path)
×
UNCOV
441
        ret[dirname].add(filename)
×
UNCOV
442
    return ret
×
443

444

445
def find_nearest_ancestor_file(files: set[str], dir: str, filename: str) -> str | None:
1✔
446
    """Given a filename return the nearest ancestor file of that name in the directory hierarchy."""
UNCOV
447
    while True:
×
UNCOV
448
        candidate_config_file_path = os.path.join(dir, filename)
×
UNCOV
449
        if candidate_config_file_path in files:
×
UNCOV
450
            return candidate_config_file_path
×
451

UNCOV
452
        if dir == "":
×
UNCOV
453
            return None
×
UNCOV
454
        dir = os.path.dirname(dir)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc