• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19250292619

11 Nov 2025 12:09AM UTC coverage: 77.865% (-2.4%) from 80.298%
19250292619

push

github

web-flow
flag non-runnable targets used with `code_quality_tool` (#22875)

2 of 5 new or added lines in 2 files covered. (40.0%)

1487 existing lines in 72 files now uncovered.

71448 of 91759 relevant lines covered (77.86%)

3.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.78
/src/python/pants/util/contextutil.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
11✔
5

6
import os
11✔
7
import shutil
11✔
8
import ssl
11✔
9
import sys
11✔
10
import tempfile
11✔
11
import threading
11✔
12
import zipfile
11✔
13
from collections.abc import Callable, Iterator, Mapping
11✔
14
from contextlib import contextmanager
11✔
15
from pathlib import Path
11✔
16
from queue import Queue
11✔
17
from socketserver import TCPServer
11✔
18
from typing import IO, Any
11✔
19

20
from pants.util.dirutil import safe_delete
11✔
21

22

23
class InvalidZipPath(ValueError):
11✔
24
    """Indicates a bad zip file path."""
25

26

27
@contextmanager
11✔
28
def environment_as(**kwargs: str | None) -> Iterator[None]:
11✔
29
    """Update the environment to the supplied values, for example:
30

31
    with environment_as(PYTHONPATH='foo:bar:baz',
32
                        PYTHON='/usr/bin/python2.7'):
33
      subprocess.Popen(foo).wait()
34
    """
35
    new_environment = kwargs
5✔
36
    old_environment = {}
5✔
37

38
    def setenv(key: str, val: str | None) -> None:
5✔
39
        if val is not None:
5✔
40
            os.environ[key] = val
5✔
41
        else:
42
            if key in os.environ:
5✔
43
                del os.environ[key]
5✔
44

45
    for key, val in new_environment.items():
5✔
46
        old_environment[key] = os.environ.get(key)
5✔
47
        setenv(key, val)
5✔
48
    try:
5✔
49
        yield
5✔
50
    finally:
51
        for key, val in old_environment.items():
5✔
52
            setenv(key, val)
5✔
53

54

55
def _purge_env() -> None:
11✔
56
    # N.B. Without the use of `del` here (which calls `os.unsetenv` under the hood), subprocess
57
    # invokes or other things that may access the environment at the C level may not see the
58
    # correct env vars (i.e. we can't just replace os.environ with an empty dict).
59
    # See https://docs.python.org/3/library/os.html#os.unsetenv for more info.
60
    #
61
    # Wraps iterable in list() to make a copy and avoid issues with deleting while iterating.
62
    for k in list(os.environ.keys()):
1✔
63
        del os.environ[k]
1✔
64

65

66
def _restore_env(env: Mapping[str, str]) -> None:
11✔
67
    for k, v in env.items():
1✔
68
        os.environ[k] = v
1✔
69

70

71
@contextmanager
11✔
72
def hermetic_environment_as(*preserve: str, **override: str | None) -> Iterator[None]:
11✔
73
    """Mutate the environment of this process, restoring it on exit.
74

75
    The given `preserve` environment variable names will have their current values preserved, while
76
    the given `override` environment variables will override any values which are already set.
77
    """
78
    old_environment = os.environ.copy()
1✔
79
    preserve_set = set(preserve)
1✔
80
    new_environment: dict[str, str | None] = {
1✔
81
        k: v for k, v in old_environment.items() if k in preserve_set
82
    }
83
    new_environment.update(override)
1✔
84

85
    _purge_env()
1✔
86
    try:
1✔
87
        with environment_as(**new_environment):
1✔
88
            yield
1✔
89
    finally:
90
        _purge_env()
1✔
91
        _restore_env(old_environment)
1✔
92

93

94
@contextmanager
11✔
95
def argv_as(args: tuple[str, ...]) -> Iterator[None]:
11✔
96
    """Temporarily set `sys.argv` to the supplied value."""
97
    old_args = sys.argv
×
98
    try:
×
99
        sys.argv = list(args)
×
100
        yield
×
101
    finally:
102
        sys.argv = old_args
×
103

104

105
@contextmanager
11✔
106
def temporary_dir(
11✔
107
    root_dir: str | None = None,
108
    cleanup: bool = True,
109
    suffix: str | None = None,
110
    permissions: int | None = None,
111
    prefix: str | None = tempfile.template,
112
) -> Iterator[str]:
113
    """A with-context that creates a temporary directory.
114

115
    :API: public
116

117
    You may specify the following keyword args:
118
    :param root_dir: The parent directory in which to create the temporary directory.
119
    :param cleanup: Whether or not to clean up the temporary directory.
120
    :param suffix: If not None the directory name will end with this suffix.
121
    :param permissions: If provided, sets the directory permissions to this mode.
122
    :param prefix: If not None, the directory name will begin with this prefix,
123
                   otherwise a default prefix is used.
124
    """
125
    path = tempfile.mkdtemp(dir=root_dir, suffix=suffix, prefix=prefix)
11✔
126

127
    try:
11✔
128
        if permissions is not None:
11✔
129
            os.chmod(path, permissions)
1✔
130
        yield path
11✔
131
    finally:
132
        if cleanup:
11✔
133
            shutil.rmtree(path, ignore_errors=True)
11✔
134

135

136
@contextmanager
11✔
137
def temporary_file_path(
11✔
138
    root_dir: str | None = None,
139
    cleanup: bool = True,
140
    suffix: str | None = None,
141
    permissions: int | None = None,
142
) -> Iterator[str]:
143
    """A with-context that creates a temporary file and returns its path.
144

145
    :API: public
146

147
    You may specify the following keyword args:
148
    :param root_dir: The parent directory to create the temporary file.
149
    :param cleanup: Whether or not to clean up the temporary file.
150
    """
151
    with temporary_file(root_dir, cleanup=cleanup, suffix=suffix, permissions=permissions) as fd:
2✔
152
        fd.close()
2✔
153
        yield fd.name
2✔
154

155

156
@contextmanager
11✔
157
def temporary_file(
11✔
158
    root_dir: str | None = None,
159
    cleanup: bool = True,
160
    suffix: str | None = None,
161
    permissions: int | None = None,
162
    binary_mode: bool = True,
163
) -> Iterator[IO]:
164
    """A with-context that creates a temporary file and returns a writeable file descriptor to it.
165

166
    You may specify the following keyword args:
167
    :param root_dir: The parent directory to create the temporary file.
168
    :param cleanup: Whether or not to clean up the temporary file.
169
    :param suffix: If suffix is specified, the file name will end with that suffix.
170
                       Otherwise there will be no suffix.
171
                       mkstemp() does not put a dot between the file name and the suffix;
172
                       if you need one, put it at the beginning of suffix.
173
                       See :py:class:`tempfile.NamedTemporaryFile`.
174
    :param permissions: If provided, sets the file to use these permissions.
175
    :param binary_mode: Whether file opens in binary or text mode.
176
    """
177
    mode = "w+b" if binary_mode else "w+"  # tempfile's default is 'w+b'
11✔
178
    with tempfile.NamedTemporaryFile(suffix=suffix, dir=root_dir, delete=False, mode=mode) as fd:
11✔
179
        try:
11✔
180
            if permissions is not None:
11✔
181
                os.chmod(fd.name, permissions)
1✔
182
            yield fd
11✔
183
        finally:
184
            if cleanup:
11✔
185
                safe_delete(fd.name)
11✔
186

187

188
@contextmanager
11✔
189
def overwrite_file_content(
11✔
190
    file_path: str | Path,
191
    temporary_content: bytes | str | Callable[[bytes], bytes] | None = None,
192
) -> Iterator[None]:
193
    """A helper that resets a file after the method runs.
194

195
     It will read a file, save the content, maybe write temporary_content to it, yield, then
196
     write the original content to the file.
197

198
    :param file_path: Absolute path to the file to be reset after the method runs.
199
    :param temporary_content: Content to write to the file, or a function from current content
200
      to new temporary content.
201
    """
202
    file_path = Path(file_path)
3✔
203
    original_content = file_path.read_bytes()
3✔
204
    try:
3✔
205
        if temporary_content is not None:
3✔
206
            if callable(temporary_content):
3✔
207
                content = temporary_content(original_content)
2✔
208
            elif isinstance(temporary_content, bytes):
1✔
209
                content = temporary_content
×
210
            else:
211
                content = temporary_content.encode()
1✔
212
            file_path.write_bytes(content)
3✔
213
        yield
3✔
214
    finally:
215
        file_path.write_bytes(original_content)
3✔
216

217

218
@contextmanager
11✔
219
def pushd(directory: str) -> Iterator[str]:
11✔
220
    """A with-context that encapsulates pushd/popd."""
221
    cwd = os.getcwd()
11✔
222
    os.chdir(directory)
11✔
223
    try:
11✔
224
        yield directory
11✔
225
    finally:
226
        os.chdir(cwd)
11✔
227

228

229
@contextmanager
11✔
230
def open_zip(path_or_file: str | Any, *args, **kwargs) -> Iterator[zipfile.ZipFile]:
11✔
231
    """A with-context for zip files.
232

233
    Passes through *args and **kwargs to zipfile.ZipFile.
234

235
    :API: public
236

237
    :param path_or_file: Full path to zip file.
238
    :param args: Any extra args accepted by `zipfile.ZipFile`.
239
    :param kwargs: Any extra keyword args accepted by `zipfile.ZipFile`.
240
    :raises: `InvalidZipPath` if path_or_file is invalid.
241
    :raises: `zipfile.BadZipfile` if zipfile.ZipFile cannot open a zip at path_or_file.
242
    """
243
    if not path_or_file:
1✔
244
        raise InvalidZipPath(f"Invalid zip location: {path_or_file}")
1✔
245
    if "allowZip64" not in kwargs:
1✔
246
        kwargs["allowZip64"] = True
1✔
247
    try:
1✔
248
        zf = zipfile.ZipFile(path_or_file, *args, **kwargs)
1✔
249
    except zipfile.BadZipfile as bze:
1✔
250
        # Use the realpath in order to follow symlinks back to the problem source file.
251
        raise zipfile.BadZipfile(f"Bad Zipfile {os.path.realpath(path_or_file)}: {bze}")
1✔
252
    try:
1✔
253
        yield zf
1✔
254
    finally:
255
        zf.close()
1✔
256

257

258
@contextmanager
11✔
259
def http_server(handler_class: type, ssl_context: ssl.SSLContext | None = None) -> Iterator[int]:
11✔
260
    def serve(port_queue: Queue[int], shutdown_queue: Queue[bool]) -> None:
1✔
261
        httpd = TCPServer(("", 0), handler_class)
1✔
262
        httpd.timeout = 0.1
1✔
263
        if ssl_context:
1✔
UNCOV
264
            httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
×
265

266
        port_queue.put(httpd.server_address[1])
1✔
267
        while shutdown_queue.empty():
1✔
268
            httpd.handle_request()
1✔
269

270
    port_queue: Queue[int] = Queue()
1✔
271
    shutdown_queue: Queue[bool] = Queue()
1✔
272
    t = threading.Thread(target=lambda: serve(port_queue, shutdown_queue))
1✔
273
    t.daemon = True
1✔
274
    t.start()
1✔
275

276
    try:
1✔
277
        yield port_queue.get(block=True)
1✔
278
    finally:
279
        shutdown_queue.put(True)
1✔
280
        t.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc