• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18517631058

15 Oct 2025 04:18AM UTC coverage: 69.207% (-11.1%) from 80.267%
18517631058

Pull #22745

github

web-flow
Merge 642a76ca1 into 99919310e
Pull Request #22745: [windows] Add windows support in the stdio crate.

53815 of 77759 relevant lines covered (69.21%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.98
/src/python/pants/util/dirutil.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
7✔
5

6
import atexit
7✔
7
import errno
7✔
8
import os
7✔
9
import shutil
7✔
10
import stat
7✔
11
import tempfile
7✔
12
import threading
7✔
13
import uuid
7✔
14
from collections import defaultdict
7✔
15
from collections.abc import Callable, Iterable, Iterator, Sequence
7✔
16
from contextlib import contextmanager
7✔
17
from pathlib import Path
7✔
18
from typing import Any, DefaultDict, Literal, overload
7✔
19

20
from pants.util.strutil import ensure_text
7✔
21

22

23
def longest_dir_prefix(path: str, prefixes: Sequence[str]) -> str | None:
7✔
24
    """Given a list of prefixes, return the one that is the longest prefix to the given path.
25

26
    Returns None if there are no matches.
27
    """
28
    longest_match, longest_prefix = 0, None
×
29
    for prefix in prefixes:
×
30
        if fast_relpath_optional(path, prefix) is not None and len(prefix) > longest_match:
×
31
            longest_match, longest_prefix = len(prefix), prefix
×
32

33
    return longest_prefix
×
34

35

36
def fast_relpath(path: str, start: str) -> str:
7✔
37
    """A prefix-based relpath, with no normalization or support for returning `..`."""
38
    relpath = fast_relpath_optional(path, start)
7✔
39
    if relpath is None:
7✔
40
        raise ValueError(f"{start} is not a directory containing {path}")
×
41
    return relpath
7✔
42

43

44
def fast_relpath_optional(path: str, start: str) -> str | None:
7✔
45
    """A prefix-based relpath, with no normalization or support for returning `..`.
46

47
    Returns None if `start` is not a directory-aware prefix of `path`.
48
    """
49
    if len(start) == 0:
7✔
50
        # Empty prefix.
51
        return path
1✔
52

53
    # Determine where the matchable prefix ends.
54
    pref_end = len(start) - 1 if start[-1] == "/" else len(start)
7✔
55
    if pref_end > len(path):
7✔
56
        # The prefix is too long to match.
57
        return None
1✔
58
    elif path[:pref_end] == start[:pref_end] and (len(path) == pref_end or path[pref_end] == "/"):
7✔
59
        # The prefix matches, and the entries are either identical, or the suffix indicates that
60
        # the prefix is a directory.
61
        return path[pref_end + 1 :]
7✔
62
    return None
1✔
63

64

65
def safe_mkdir(directory: str | Path, clean: bool = False) -> None:
7✔
66
    """Ensure a directory is present.
67

68
    If it's not there, create it.  If it is, no-op. If clean is True, ensure the dir is empty.
69

70
    :API: public
71
    """
72
    if clean:
7✔
73
        safe_rmtree(directory)
×
74
    try:
7✔
75
        os.makedirs(directory)
7✔
76
    except OSError as e:
7✔
77
        if e.errno != errno.EEXIST:
7✔
78
            raise
3✔
79

80

81
def safe_mkdir_for(path: str | Path, clean: bool = False) -> None:
7✔
82
    """Ensure that the parent directory for a file is present.
83

84
    If it's not there, create it. If it is, no-op.
85
    """
86
    dirname = os.path.dirname(path)
7✔
87
    if dirname:
7✔
88
        safe_mkdir(dirname, clean=clean)
7✔
89

90

91
def safe_file_dump(
7✔
92
    filename: str, payload: bytes | str = "", mode: str = "w", makedirs: bool = False
93
) -> None:
94
    """Write a string to a file.
95

96
    This method is "safe" to the extent that `safe_open` is "safe". See the explanation on the method
97
    doc there.
98

99
    When `payload` is an empty string (the default), this method can be used as a concise way to
100
    create an empty file along with its containing directory (or truncate it if it already exists).
101

102
    :param filename: The filename of the file to write to.
103
    :param payload: The string to write to the file.
104
    :param mode: A mode argument for the python `open` builtin which should be a write mode variant.
105
                 Defaults to 'w'.
106
    :param makedirs: Whether to make all parent directories of this file before making it.
107
    """
108
    if makedirs:
7✔
109
        os.makedirs(os.path.dirname(filename), exist_ok=True)
7✔
110
    with safe_open(filename, mode=mode) as f:
7✔
111
        f.write(payload)
7✔
112

113

114
@overload
115
def maybe_read_file(filename: str) -> str | None: ...
116

117

118
@overload
119
def maybe_read_file(filename: str, binary_mode: Literal[False]) -> str | None: ...
120

121

122
@overload
123
def maybe_read_file(filename: str, binary_mode: Literal[True]) -> bytes | None: ...
124

125

126
@overload
127
def maybe_read_file(filename: str, binary_mode: bool) -> bytes | str | None: ...
128

129

130
def maybe_read_file(filename: str, binary_mode: bool = False) -> bytes | str | None:
7✔
131
    """Read and return the contents of a file in a single file.read().
132

133
    :param filename: The filename of the file to read.
134
    :param binary_mode: Read from file as bytes or unicode.
135
    :returns: The contents of the file, or None if opening the file fails for any reason
136
    """
137
    try:
×
138
        return read_file(filename, binary_mode=binary_mode)
×
139
    except OSError:
×
140
        return None
×
141

142

143
@overload
144
def read_file(filename: str) -> str: ...
145

146

147
@overload
148
def read_file(filename: str, binary_mode: Literal[False]) -> str: ...
149

150

151
@overload
152
def read_file(filename: str, binary_mode: Literal[True]) -> bytes: ...
153

154

155
@overload
156
def read_file(filename: str, binary_mode: bool) -> bytes | str: ...
157

158

159
def read_file(filename: str, binary_mode: bool = False) -> bytes | str:
7✔
160
    """Read and return the contents of a file in a single file.read().
161

162
    :param filename: The filename of the file to read.
163
    :param binary_mode: Read from file as bytes or unicode.
164
    :returns: The contents of the file.
165
    """
166
    mode = "rb" if binary_mode else "r"
3✔
167
    with open(filename, mode) as f:
3✔
168
        content: bytes | str = f.read()
3✔
169
        return content
3✔
170

171

172
def safe_walk(path: bytes | str, **kwargs: Any) -> Iterator[tuple[str, list[str], list[str]]]:
7✔
173
    """Just like os.walk, but ensures that the returned values are unicode objects.
174

175
    This isn't strictly safe, in that it is possible that some paths
176
    will not be decodeable, but that case is rare, and the only
177
    alternative is to somehow avoid all interaction between paths and
178
    unicode objects, which seems especially tough in the presence of
179
    unicode_literals. See e.g.
180
    https://mail.python.org/pipermail/python-dev/2008-December/083856.html
181

182
    :API: public
183
    """
184
    # If os.walk is given a text argument, it yields text values; if it
185
    # is given a binary argument, it yields binary values.
186
    return os.walk(ensure_text(path), **kwargs)
×
187

188

189
_MkdtempCleanerType = Callable[[], None]
7✔
190
_MKDTEMP_CLEANER: _MkdtempCleanerType | None = None
7✔
191
_MKDTEMP_DIRS: DefaultDict[int, set[str]] = defaultdict(set)
7✔
192
_MKDTEMP_LOCK = threading.RLock()
7✔
193

194

195
def _mkdtemp_atexit_cleaner() -> None:
7✔
196
    for td in _MKDTEMP_DIRS.pop(os.getpid(), []):
×
197
        safe_rmtree(td)
×
198

199

200
def _mkdtemp_unregister_cleaner() -> None:
7✔
201
    global _MKDTEMP_CLEANER
202
    _MKDTEMP_CLEANER = None
×
203

204

205
def _mkdtemp_register_cleaner(cleaner: _MkdtempCleanerType) -> None:
7✔
206
    global _MKDTEMP_CLEANER
207
    assert callable(cleaner)
7✔
208
    if _MKDTEMP_CLEANER is None:
7✔
209
        atexit.register(cleaner)
7✔
210
        _MKDTEMP_CLEANER = cleaner
7✔
211

212

213
def safe_mkdtemp(cleaner: _MkdtempCleanerType = _mkdtemp_atexit_cleaner, **kw: Any) -> str:
7✔
214
    """Create a temporary directory that is cleaned up on process exit.
215

216
    Arguments are as to tempfile.mkdtemp.
217

218
    :API: public
219
    """
220
    # Proper lock sanitation on fork [issue 6721] would be desirable here.
221
    with _MKDTEMP_LOCK:
7✔
222
        return register_rmtree(tempfile.mkdtemp(**kw), cleaner=cleaner)
7✔
223

224

225
def register_rmtree(directory: str, cleaner: _MkdtempCleanerType = _mkdtemp_atexit_cleaner) -> str:
7✔
226
    """Register an existing directory to be cleaned up at process exit."""
227
    with _MKDTEMP_LOCK:
7✔
228
        _mkdtemp_register_cleaner(cleaner)
7✔
229
        _MKDTEMP_DIRS[os.getpid()].add(directory)
7✔
230
    return directory
7✔
231

232

233
def safe_rmtree(directory: str | Path) -> None:
7✔
234
    """Delete a directory if it's present. If it's not present, no-op.
235

236
    Note that if the directory argument is a symlink, only the symlink will
237
    be deleted.
238

239
    :API: public
240
    """
241
    if os.path.islink(directory):
2✔
242
        safe_delete(directory)
×
243
    else:
244
        shutil.rmtree(directory, ignore_errors=True)
2✔
245

246

247
def safe_open(filename, *args, **kwargs):
7✔
248
    """Open a file safely, ensuring that its directory exists.
249

250
    :API: public
251
    """
252
    safe_mkdir_for(filename)
7✔
253
    return open(filename, *args, **kwargs)
7✔
254

255

256
def safe_delete(filename: str | Path) -> None:
7✔
257
    """Delete a file safely.
258

259
    If it's not present, no-op.
260
    """
261
    try:
7✔
262
        os.unlink(filename)
7✔
263
    except OSError as e:
×
264
        if e.errno != errno.ENOENT:
×
265
            raise
×
266

267

268
def safe_concurrent_rename(src: str, dst: str) -> None:
7✔
269
    """Rename src to dst, ignoring errors due to dst already existing.
270

271
    Useful when concurrent processes may attempt to create dst, and it doesn't matter who wins.
272
    """
273
    # Delete dst, in case it existed (with old content) even before any concurrent processes
274
    # attempted this write. This ensures that at least one process writes the new content.
275
    if os.path.isdir(src):  # Note that dst may not exist, so we test for the type of src.
×
276
        safe_rmtree(dst)
×
277
    else:
278
        safe_delete(dst)
×
279
    try:
×
280
        shutil.move(src, dst)
×
281
    except OSError as e:
×
282
        if e.errno != errno.EEXIST:
×
283
            raise
×
284

285

286
@contextmanager
7✔
287
def safe_concurrent_creation(target_path: str) -> Iterator[str]:
7✔
288
    """A contextmanager that yields a temporary path and renames it to a final target path when the
289
    contextmanager exits.
290

291
    Useful when concurrent processes may attempt to create a file, and it doesn't matter who wins.
292

293
    :param target_path: The final target path to rename the temporary path to.
294
    :yields: A temporary path containing the original path with a unique (uuid4) suffix.
295
    """
296
    safe_mkdir_for(target_path)
×
297
    tmp_path = f"{target_path}.tmp.{uuid.uuid4().hex}"
×
298
    try:
×
299
        yield tmp_path
×
300
    except Exception:
×
301
        rm_rf(tmp_path)
×
302
        raise
×
303
    else:
304
        if os.path.exists(tmp_path):
×
305
            safe_concurrent_rename(tmp_path, target_path)
×
306

307

308
def chmod_plus_x(path: str) -> None:
7✔
309
    """Equivalent of unix `chmod a+x path`"""
310
    path_mode = os.stat(path).st_mode
×
311
    path_mode &= int("777", 8)
×
312
    if path_mode & stat.S_IRUSR:
×
313
        path_mode |= stat.S_IXUSR
×
314
    if path_mode & stat.S_IRGRP:
×
315
        path_mode |= stat.S_IXGRP
×
316
    if path_mode & stat.S_IROTH:
×
317
        path_mode |= stat.S_IXOTH
×
318
    os.chmod(path, path_mode)
×
319

320

321
def absolute_symlink(source_path: str, target_path: str) -> None:
7✔
322
    """Create a symlink at target pointing to source using the absolute path.
323

324
    :param source_path: Absolute path to source file
325
    :param target_path: Absolute path to intended symlink
326
    :raises ValueError if source_path or link_path are not unique, absolute paths
327
    :raises OSError on failure UNLESS file already exists or no such file/directory
328
    """
329
    if not os.path.isabs(source_path):
×
330
        raise ValueError(f"Path for source : {source_path} must be absolute")
×
331
    if not os.path.isabs(target_path):
×
332
        raise ValueError(f"Path for link : {target_path} must be absolute")
×
333
    if source_path == target_path:
×
334
        raise ValueError(f"Path for link is identical to source : {source_path}")
×
335
    try:
×
336
        if os.path.lexists(target_path):
×
337
            if os.path.islink(target_path) or os.path.isfile(target_path):
×
338
                os.unlink(target_path)
×
339
            else:
340
                shutil.rmtree(target_path)
×
341
        safe_mkdir_for(target_path)
×
342
        os.symlink(source_path, target_path)
×
343
    except OSError as e:
×
344
        # Another run may beat us to deletion or creation.
345
        if not (e.errno == errno.EEXIST or e.errno == errno.ENOENT):
×
346
            raise
×
347

348

349
def relative_symlink(source_path: str, link_path: str) -> None:
7✔
350
    """Create a symlink at link_path pointing to relative source.
351

352
    :param source_path: Absolute path to source file
353
    :param link_path: Absolute path to intended symlink
354
    :raises ValueError if source_path or link_path are not unique, absolute paths
355
    :raises OSError on failure UNLESS file already exists or no such file/directory
356
    """
357
    if not os.path.isabs(source_path):
×
358
        raise ValueError(f"Path for source:{source_path} must be absolute")
×
359
    if not os.path.isabs(link_path):
×
360
        raise ValueError(f"Path for link:{link_path} must be absolute")
×
361
    if source_path == link_path:
×
362
        raise ValueError(f"Path for link is identical to source:{source_path}")
×
363
    # The failure state below had a long life as an uncaught error. No behavior was changed here, it just adds a catch.
364
    # Raising an exception does differ from absolute_symlink, which takes the liberty of deleting existing directories.
365
    if os.path.isdir(link_path) and not os.path.islink(link_path):
×
366
        raise ValueError(f"Path for link would overwrite an existing directory: {link_path}")
×
367
    try:
×
368
        if os.path.lexists(link_path):
×
369
            os.unlink(link_path)
×
370
        rel_path = os.path.relpath(source_path, os.path.dirname(link_path))
×
371
        safe_mkdir_for(link_path)
×
372
        os.symlink(rel_path, link_path)
×
373
    except OSError as e:
×
374
        # Another run may beat us to deletion or creation.
375
        if not (e.errno == errno.EEXIST or e.errno == errno.ENOENT):
×
376
            raise
×
377

378

379
def touch(path: str, times: int | tuple[int, int] | None = None):
7✔
380
    """Equivalent of unix `touch path`.
381

382
    :API: public
383

384
    :path: The file to touch.
385
    :times Either a tuple of (atime, mtime) or else a single time to use for both.  If not
386
           specified both atime and mtime are updated to the current time.
387
    """
388
    if isinstance(times, tuple) and len(times) > 2:
1✔
389
        raise ValueError(
×
390
            "`times` must either be a tuple of (atime, mtime) or else a single time to use for both."
391
        )
392
    if isinstance(times, int):
1✔
393
        times = (times, times)
×
394
    with safe_open(path, "a"):
1✔
395
        os.utime(path, times)
1✔
396

397

398
def recursive_dirname(f: str) -> Iterator[str]:
7✔
399
    """Given a relative path like 'a/b/c/d', yield all ascending path components like:
400

401
    'a/b/c/d'
402
    'a/b/c'
403
    'a/b'
404
    'a'
405
    ''
406
    """
407
    prev = None
7✔
408
    while f != prev:
7✔
409
        yield f
7✔
410
        prev = f
7✔
411
        f = os.path.dirname(f)
7✔
412
    yield ""
7✔
413

414

415
def rm_rf(name: str) -> None:
7✔
416
    """Remove a file or a directory similarly to running `rm -rf <name>` in a UNIX shell.
417

418
    :param name: the name of the file or directory to remove.
419
    :raises: OSError on error.
420
    """
421
    if not os.path.exists(name):
3✔
422
        return
3✔
423

424
    try:
3✔
425
        # Avoid using safe_rmtree so we can detect failures.
426
        shutil.rmtree(name)
3✔
427
    except OSError as e:
×
428
        if e.errno == errno.ENOTDIR:
×
429
            # 'Not a directory', but a file. Attempt to os.unlink the file, raising OSError on failure.
430
            safe_delete(name)
×
431
        elif e.errno != errno.ENOENT:
×
432
            # Pass on 'No such file or directory', otherwise re-raise OSError to surface perm issues etc.
433
            raise
×
434

435

436
def group_by_dir(paths: Iterable[str]) -> dict[str, set[str]]:
7✔
437
    """For a list of file paths, returns a dict of directory path -> files in that dir."""
438
    ret = defaultdict(set)
×
439
    for path in paths:
×
440
        dirname, filename = os.path.split(path)
×
441
        ret[dirname].add(filename)
×
442
    return ret
×
443

444

445
def find_nearest_ancestor_file(files: set[str], dir: str, filename: str) -> str | None:
7✔
446
    """Given a filename return the nearest ancestor file of that name in the directory hierarchy."""
447
    while True:
×
448
        candidate_config_file_path = os.path.join(dir, filename)
×
449
        if candidate_config_file_path in files:
×
450
            return candidate_config_file_path
×
451

452
        if dir == "":
×
453
            return None
×
454
        dir = os.path.dirname(dir)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc