• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.84
/src/python/pants/util/contextutil.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import os
1✔
7
import shutil
1✔
8
import ssl
1✔
9
import sys
1✔
10
import tempfile
1✔
11
import threading
1✔
12
import zipfile
1✔
13
from collections.abc import Callable, Iterator, Mapping
1✔
14
from contextlib import contextmanager
1✔
15
from pathlib import Path
1✔
16
from queue import Queue
1✔
17
from socketserver import TCPServer
1✔
18
from typing import IO, Any
1✔
19

20
from pants.util.dirutil import safe_delete
1✔
21

22

23
class InvalidZipPath(ValueError):
1✔
24
    """Indicates a bad zip file path."""
25

26

27
@contextmanager
1✔
28
def environment_as(**kwargs: str | None) -> Iterator[None]:
1✔
29
    """Update the environment to the supplied values, for example:
30

31
    with environment_as(PYTHONPATH='foo:bar:baz',
32
                        PYTHON='/usr/bin/python2.7'):
33
      subprocess.Popen(foo).wait()
34
    """
UNCOV
35
    new_environment = kwargs
×
UNCOV
36
    old_environment = {}
×
37

UNCOV
38
    def setenv(key: str, val: str | None) -> None:
×
UNCOV
39
        if val is not None:
×
UNCOV
40
            os.environ[key] = val
×
41
        else:
UNCOV
42
            if key in os.environ:
×
UNCOV
43
                del os.environ[key]
×
44

UNCOV
45
    for key, val in new_environment.items():
×
UNCOV
46
        old_environment[key] = os.environ.get(key)
×
UNCOV
47
        setenv(key, val)
×
UNCOV
48
    try:
×
UNCOV
49
        yield
×
50
    finally:
UNCOV
51
        for key, val in old_environment.items():
×
UNCOV
52
            setenv(key, val)
×
53

54

55
def _purge_env() -> None:
1✔
56
    # N.B. Without the use of `del` here (which calls `os.unsetenv` under the hood), subprocess
57
    # invokes or other things that may access the environment at the C level may not see the
58
    # correct env vars (i.e. we can't just replace os.environ with an empty dict).
59
    # See https://docs.python.org/3/library/os.html#os.unsetenv for more info.
60
    #
61
    # Wraps iterable in list() to make a copy and avoid issues with deleting while iterating.
UNCOV
62
    for k in list(os.environ.keys()):
×
UNCOV
63
        del os.environ[k]
×
64

65

66
def _restore_env(env: Mapping[str, str]) -> None:
1✔
UNCOV
67
    for k, v in env.items():
×
UNCOV
68
        os.environ[k] = v
×
69

70

71
@contextmanager
1✔
72
def hermetic_environment_as(*preserve: str, **override: str | None) -> Iterator[None]:
1✔
73
    """Mutate the environment of this process, restoring it on exit.
74

75
    The given `preserve` environment variable names will have their current values preserved, while
76
    the given `override` environment variables will override any values which are already set.
77
    """
UNCOV
78
    old_environment = os.environ.copy()
×
UNCOV
79
    preserve_set = set(preserve)
×
UNCOV
80
    new_environment: dict[str, str | None] = {
×
81
        k: v for k, v in old_environment.items() if k in preserve_set
82
    }
UNCOV
83
    new_environment.update(override)
×
84

UNCOV
85
    _purge_env()
×
UNCOV
86
    try:
×
UNCOV
87
        with environment_as(**new_environment):
×
UNCOV
88
            yield
×
89
    finally:
UNCOV
90
        _purge_env()
×
UNCOV
91
        _restore_env(old_environment)
×
92

93

94
@contextmanager
1✔
95
def argv_as(args: tuple[str, ...]) -> Iterator[None]:
1✔
96
    """Temporarily set `sys.argv` to the supplied value."""
97
    old_args = sys.argv
×
98
    try:
×
99
        sys.argv = list(args)
×
100
        yield
×
101
    finally:
102
        sys.argv = old_args
×
103

104

105
@contextmanager
1✔
106
def temporary_dir(
1✔
107
    root_dir: str | None = None,
108
    cleanup: bool = True,
109
    suffix: str | None = None,
110
    permissions: int | None = None,
111
    prefix: str | None = tempfile.template,
112
) -> Iterator[str]:
113
    """A with-context that creates a temporary directory.
114

115
    :API: public
116

117
    You may specify the following keyword args:
118
    :param root_dir: The parent directory in which to create the temporary directory.
119
    :param cleanup: Whether or not to clean up the temporary directory.
120
    :param suffix: If not None the directory name will end with this suffix.
121
    :param permissions: If provided, sets the directory permissions to this mode.
122
    :param prefix: If not None, the directory name will begin with this prefix,
123
                   otherwise a default prefix is used.
124
    """
125
    path = tempfile.mkdtemp(dir=root_dir, suffix=suffix, prefix=prefix)
1✔
126

127
    try:
1✔
128
        if permissions is not None:
1✔
UNCOV
129
            os.chmod(path, permissions)
×
130
        yield path
1✔
131
    finally:
132
        if cleanup:
1✔
133
            shutil.rmtree(path, ignore_errors=True)
1✔
134

135

136
@contextmanager
1✔
137
def temporary_file_path(
1✔
138
    root_dir: str | None = None,
139
    cleanup: bool = True,
140
    suffix: str | None = None,
141
    permissions: int | None = None,
142
) -> Iterator[str]:
143
    """A with-context that creates a temporary file and returns its path.
144

145
    :API: public
146

147
    You may specify the following keyword args:
148
    :param root_dir: The parent directory to create the temporary file.
149
    :param cleanup: Whether or not to clean up the temporary file.
150
    """
UNCOV
151
    with temporary_file(root_dir, cleanup=cleanup, suffix=suffix, permissions=permissions) as fd:
×
UNCOV
152
        fd.close()
×
UNCOV
153
        yield fd.name
×
154

155

156
@contextmanager
1✔
157
def temporary_file(
1✔
158
    root_dir: str | None = None,
159
    cleanup: bool = True,
160
    suffix: str | None = None,
161
    permissions: int | None = None,
162
    binary_mode: bool = True,
163
) -> Iterator[IO]:
164
    """A with-context that creates a temporary file and returns a writeable file descriptor to it.
165

166
    You may specify the following keyword args:
167
    :param root_dir: The parent directory to create the temporary file.
168
    :param cleanup: Whether or not to clean up the temporary file.
169
    :param suffix: If suffix is specified, the file name will end with that suffix.
170
                       Otherwise there will be no suffix.
171
                       mkstemp() does not put a dot between the file name and the suffix;
172
                       if you need one, put it at the beginning of suffix.
173
                       See :py:class:`tempfile.NamedTemporaryFile`.
174
    :param permissions: If provided, sets the file to use these permissions.
175
    :param binary_mode: Whether file opens in binary or text mode.
176
    """
UNCOV
177
    mode = "w+b" if binary_mode else "w+"  # tempfile's default is 'w+b'
×
UNCOV
178
    with tempfile.NamedTemporaryFile(suffix=suffix, dir=root_dir, delete=False, mode=mode) as fd:
×
UNCOV
179
        try:
×
UNCOV
180
            if permissions is not None:
×
UNCOV
181
                os.chmod(fd.name, permissions)
×
UNCOV
182
            yield fd
×
183
        finally:
UNCOV
184
            if cleanup:
×
UNCOV
185
                safe_delete(fd.name)
×
186

187

188
@contextmanager
1✔
189
def overwrite_file_content(
1✔
190
    file_path: str | Path,
191
    temporary_content: bytes | str | Callable[[bytes], bytes] | None = None,
192
) -> Iterator[None]:
193
    """A helper that resets a file after the method runs.
194

195
     It will read a file, save the content, maybe write temporary_content to it, yield, then
196
     write the original content to the file.
197

198
    :param file_path: Absolute path to the file to be reset after the method runs.
199
    :param temporary_content: Content to write to the file, or a function from current content
200
      to new temporary content.
201
    """
UNCOV
202
    file_path = Path(file_path)
×
UNCOV
203
    original_content = file_path.read_bytes()
×
UNCOV
204
    try:
×
UNCOV
205
        if temporary_content is not None:
×
UNCOV
206
            if callable(temporary_content):
×
UNCOV
207
                content = temporary_content(original_content)
×
UNCOV
208
            elif isinstance(temporary_content, bytes):
×
209
                content = temporary_content
×
210
            else:
UNCOV
211
                content = temporary_content.encode()
×
UNCOV
212
            file_path.write_bytes(content)
×
UNCOV
213
        yield
×
214
    finally:
UNCOV
215
        file_path.write_bytes(original_content)
×
216

217

218
@contextmanager
1✔
219
def pushd(directory: str) -> Iterator[str]:
1✔
220
    """A with-context that encapsulates pushd/popd."""
UNCOV
221
    cwd = os.getcwd()
×
UNCOV
222
    os.chdir(directory)
×
UNCOV
223
    try:
×
UNCOV
224
        yield directory
×
225
    finally:
UNCOV
226
        os.chdir(cwd)
×
227

228

229
@contextmanager
1✔
230
def open_zip(path_or_file: str | Any, *args, **kwargs) -> Iterator[zipfile.ZipFile]:
1✔
231
    """A with-context for zip files.
232

233
    Passes through *args and **kwargs to zipfile.ZipFile.
234

235
    :API: public
236

237
    :param path_or_file: Full path to zip file.
238
    :param args: Any extra args accepted by `zipfile.ZipFile`.
239
    :param kwargs: Any extra keyword args accepted by `zipfile.ZipFile`.
240
    :raises: `InvalidZipPath` if path_or_file is invalid.
241
    :raises: `zipfile.BadZipfile` if zipfile.ZipFile cannot open a zip at path_or_file.
242
    """
UNCOV
243
    if not path_or_file:
×
UNCOV
244
        raise InvalidZipPath(f"Invalid zip location: {path_or_file}")
×
UNCOV
245
    if "allowZip64" not in kwargs:
×
UNCOV
246
        kwargs["allowZip64"] = True
×
UNCOV
247
    try:
×
UNCOV
248
        zf = zipfile.ZipFile(path_or_file, *args, **kwargs)
×
UNCOV
249
    except zipfile.BadZipfile as bze:
×
250
        # Use the realpath in order to follow symlinks back to the problem source file.
UNCOV
251
        raise zipfile.BadZipfile(f"Bad Zipfile {os.path.realpath(path_or_file)}: {bze}")
×
UNCOV
252
    try:
×
UNCOV
253
        yield zf
×
254
    finally:
UNCOV
255
        zf.close()
×
256

257

258
@contextmanager
1✔
259
def http_server(handler_class: type, ssl_context: ssl.SSLContext | None = None) -> Iterator[int]:
1✔
UNCOV
260
    def serve(port_queue: Queue[int], shutdown_queue: Queue[bool]) -> None:
×
UNCOV
261
        httpd = TCPServer(("", 0), handler_class)
×
UNCOV
262
        httpd.timeout = 0.1
×
UNCOV
263
        if ssl_context:
×
UNCOV
264
            httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
×
265

UNCOV
266
        port_queue.put(httpd.server_address[1])
×
UNCOV
267
        while shutdown_queue.empty():
×
UNCOV
268
            httpd.handle_request()
×
269

UNCOV
270
    port_queue: Queue[int] = Queue()
×
UNCOV
271
    shutdown_queue: Queue[bool] = Queue()
×
UNCOV
272
    t = threading.Thread(target=lambda: serve(port_queue, shutdown_queue))
×
UNCOV
273
    t.daemon = True
×
UNCOV
274
    t.start()
×
275

UNCOV
276
    try:
×
UNCOV
277
        yield port_queue.get(block=True)
×
278
    finally:
UNCOV
279
        shutdown_queue.put(True)
×
UNCOV
280
        t.join()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc