• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18791134616

24 Oct 2025 08:18PM UTC coverage: 75.519% (-4.8%) from 80.282%
18791134616

Pull #22794

github

web-flow
Merge 098c595a0 into 7971a20bf
Pull Request #22794: Use self-hosted MacOS Intel runner

65803 of 87134 relevant lines covered (75.52%)

3.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.63
/src/python/pants/util/contextutil.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
10✔
5

6
import os
10✔
7
import shutil
10✔
8
import ssl
10✔
9
import sys
10✔
10
import tempfile
10✔
11
import threading
10✔
12
import zipfile
10✔
13
from collections.abc import Callable, Iterator, Mapping
10✔
14
from contextlib import contextmanager
10✔
15
from pathlib import Path
10✔
16
from queue import Queue
10✔
17
from socketserver import TCPServer
10✔
18
from typing import IO, Any
10✔
19

20
from pants.util.dirutil import safe_delete
10✔
21

22

23
class InvalidZipPath(ValueError):
10✔
24
    """Indicates a bad zip file path."""
25

26

27
@contextmanager
10✔
28
def environment_as(**kwargs: str | None) -> Iterator[None]:
10✔
29
    """Update the environment to the supplied values, for example:
30

31
    with environment_as(PYTHONPATH='foo:bar:baz',
32
                        PYTHON='/usr/bin/python2.7'):
33
      subprocess.Popen(foo).wait()
34
    """
35
    new_environment = kwargs
4✔
36
    old_environment = {}
4✔
37

38
    def setenv(key: str, val: str | None) -> None:
4✔
39
        if val is not None:
4✔
40
            os.environ[key] = val
4✔
41
        else:
42
            if key in os.environ:
4✔
43
                del os.environ[key]
4✔
44

45
    for key, val in new_environment.items():
4✔
46
        old_environment[key] = os.environ.get(key)
4✔
47
        setenv(key, val)
4✔
48
    try:
4✔
49
        yield
4✔
50
    finally:
51
        for key, val in old_environment.items():
4✔
52
            setenv(key, val)
4✔
53

54

55
def _purge_env() -> None:
10✔
56
    # N.B. Without the use of `del` here (which calls `os.unsetenv` under the hood), subprocess
57
    # invokes or other things that may access the environment at the C level may not see the
58
    # correct env vars (i.e. we can't just replace os.environ with an empty dict).
59
    # See https://docs.python.org/3/library/os.html#os.unsetenv for more info.
60
    #
61
    # Wraps iterable in list() to make a copy and avoid issues with deleting while iterating.
62
    for k in list(os.environ.keys()):
×
63
        del os.environ[k]
×
64

65

66
def _restore_env(env: Mapping[str, str]) -> None:
10✔
67
    for k, v in env.items():
×
68
        os.environ[k] = v
×
69

70

71
@contextmanager
10✔
72
def hermetic_environment_as(*preserve: str, **override: str | None) -> Iterator[None]:
10✔
73
    """Mutate the environment of this process, restoring it on exit.
74

75
    The given `preserve` environment variable names will have their current values preserved, while
76
    the given `override` environment variables will override any values which are already set.
77
    """
78
    old_environment = os.environ.copy()
×
79
    preserve_set = set(preserve)
×
80
    new_environment: dict[str, str | None] = {
×
81
        k: v for k, v in old_environment.items() if k in preserve_set
82
    }
83
    new_environment.update(override)
×
84

85
    _purge_env()
×
86
    try:
×
87
        with environment_as(**new_environment):
×
88
            yield
×
89
    finally:
90
        _purge_env()
×
91
        _restore_env(old_environment)
×
92

93

94
@contextmanager
10✔
95
def argv_as(args: tuple[str, ...]) -> Iterator[None]:
10✔
96
    """Temporarily set `sys.argv` to the supplied value."""
97
    old_args = sys.argv
×
98
    try:
×
99
        sys.argv = list(args)
×
100
        yield
×
101
    finally:
102
        sys.argv = old_args
×
103

104

105
@contextmanager
10✔
106
def temporary_dir(
10✔
107
    root_dir: str | None = None,
108
    cleanup: bool = True,
109
    suffix: str | None = None,
110
    permissions: int | None = None,
111
    prefix: str | None = tempfile.template,
112
) -> Iterator[str]:
113
    """A with-context that creates a temporary directory.
114

115
    :API: public
116

117
    You may specify the following keyword args:
118
    :param root_dir: The parent directory in which to create the temporary directory.
119
    :param cleanup: Whether or not to clean up the temporary directory.
120
    :param suffix: If not None the directory name will end with this suffix.
121
    :param permissions: If provided, sets the directory permissions to this mode.
122
    :param prefix: If not None, the directory name will begin with this prefix,
123
                   otherwise a default prefix is used.
124
    """
125
    path = tempfile.mkdtemp(dir=root_dir, suffix=suffix, prefix=prefix)
10✔
126

127
    try:
10✔
128
        if permissions is not None:
10✔
129
            os.chmod(path, permissions)
×
130
        yield path
10✔
131
    finally:
132
        if cleanup:
10✔
133
            shutil.rmtree(path, ignore_errors=True)
10✔
134

135

136
@contextmanager
10✔
137
def temporary_file_path(
10✔
138
    root_dir: str | None = None,
139
    cleanup: bool = True,
140
    suffix: str | None = None,
141
    permissions: int | None = None,
142
) -> Iterator[str]:
143
    """A with-context that creates a temporary file and returns its path.
144

145
    :API: public
146

147
    You may specify the following keyword args:
148
    :param root_dir: The parent directory to create the temporary file.
149
    :param cleanup: Whether or not to clean up the temporary file.
150
    """
151
    with temporary_file(root_dir, cleanup=cleanup, suffix=suffix, permissions=permissions) as fd:
2✔
152
        fd.close()
2✔
153
        yield fd.name
2✔
154

155

156
@contextmanager
10✔
157
def temporary_file(
10✔
158
    root_dir: str | None = None,
159
    cleanup: bool = True,
160
    suffix: str | None = None,
161
    permissions: int | None = None,
162
    binary_mode: bool = True,
163
) -> Iterator[IO]:
164
    """A with-context that creates a temporary file and returns a writeable file descriptor to it.
165

166
    You may specify the following keyword args:
167
    :param root_dir: The parent directory to create the temporary file.
168
    :param cleanup: Whether or not to clean up the temporary file.
169
    :param suffix: If suffix is specified, the file name will end with that suffix.
170
                       Otherwise there will be no suffix.
171
                       mkstemp() does not put a dot between the file name and the suffix;
172
                       if you need one, put it at the beginning of suffix.
173
                       See :py:class:`tempfile.NamedTemporaryFile`.
174
    :param permissions: If provided, sets the file to use these permissions.
175
    :param binary_mode: Whether file opens in binary or text mode.
176
    """
177
    mode = "w+b" if binary_mode else "w+"  # tempfile's default is 'w+b'
10✔
178
    with tempfile.NamedTemporaryFile(suffix=suffix, dir=root_dir, delete=False, mode=mode) as fd:
10✔
179
        try:
10✔
180
            if permissions is not None:
10✔
181
                os.chmod(fd.name, permissions)
×
182
            yield fd
10✔
183
        finally:
184
            if cleanup:
10✔
185
                safe_delete(fd.name)
10✔
186

187

188
@contextmanager
10✔
189
def overwrite_file_content(
10✔
190
    file_path: str | Path,
191
    temporary_content: bytes | str | Callable[[bytes], bytes] | None = None,
192
) -> Iterator[None]:
193
    """A helper that resets a file after the method runs.
194

195
     It will read a file, save the content, maybe write temporary_content to it, yield, then
196
     write the original content to the file.
197

198
    :param file_path: Absolute path to the file to be reset after the method runs.
199
    :param temporary_content: Content to write to the file, or a function from current content
200
      to new temporary content.
201
    """
202
    file_path = Path(file_path)
2✔
203
    original_content = file_path.read_bytes()
2✔
204
    try:
2✔
205
        if temporary_content is not None:
2✔
206
            if callable(temporary_content):
2✔
207
                content = temporary_content(original_content)
1✔
208
            elif isinstance(temporary_content, bytes):
1✔
209
                content = temporary_content
×
210
            else:
211
                content = temporary_content.encode()
1✔
212
            file_path.write_bytes(content)
2✔
213
        yield
2✔
214
    finally:
215
        file_path.write_bytes(original_content)
2✔
216

217

218
@contextmanager
10✔
219
def pushd(directory: str) -> Iterator[str]:
10✔
220
    """A with-context that encapsulates pushd/popd."""
221
    cwd = os.getcwd()
10✔
222
    os.chdir(directory)
10✔
223
    try:
10✔
224
        yield directory
10✔
225
    finally:
226
        os.chdir(cwd)
10✔
227

228

229
@contextmanager
10✔
230
def open_zip(path_or_file: str | Any, *args, **kwargs) -> Iterator[zipfile.ZipFile]:
10✔
231
    """A with-context for zip files.
232

233
    Passes through *args and **kwargs to zipfile.ZipFile.
234

235
    :API: public
236

237
    :param path_or_file: Full path to zip file.
238
    :param args: Any extra args accepted by `zipfile.ZipFile`.
239
    :param kwargs: Any extra keyword args accepted by `zipfile.ZipFile`.
240
    :raises: `InvalidZipPath` if path_or_file is invalid.
241
    :raises: `zipfile.BadZipfile` if zipfile.ZipFile cannot open a zip at path_or_file.
242
    """
243
    if not path_or_file:
×
244
        raise InvalidZipPath(f"Invalid zip location: {path_or_file}")
×
245
    if "allowZip64" not in kwargs:
×
246
        kwargs["allowZip64"] = True
×
247
    try:
×
248
        zf = zipfile.ZipFile(path_or_file, *args, **kwargs)
×
249
    except zipfile.BadZipfile as bze:
×
250
        # Use the realpath in order to follow symlinks back to the problem source file.
251
        raise zipfile.BadZipfile(f"Bad Zipfile {os.path.realpath(path_or_file)}: {bze}")
×
252
    try:
×
253
        yield zf
×
254
    finally:
255
        zf.close()
×
256

257

258
@contextmanager
10✔
259
def http_server(handler_class: type, ssl_context: ssl.SSLContext | None = None) -> Iterator[int]:
10✔
260
    def serve(port_queue: Queue[int], shutdown_queue: Queue[bool]) -> None:
1✔
261
        httpd = TCPServer(("", 0), handler_class)
1✔
262
        httpd.timeout = 0.1
1✔
263
        if ssl_context:
1✔
264
            httpd.socket = ssl_context.wrap_socket(httpd.socket, server_side=True)
×
265

266
        port_queue.put(httpd.server_address[1])
1✔
267
        while shutdown_queue.empty():
1✔
268
            httpd.handle_request()
1✔
269

270
    port_queue: Queue[int] = Queue()
1✔
271
    shutdown_queue: Queue[bool] = Queue()
1✔
272
    t = threading.Thread(target=lambda: serve(port_queue, shutdown_queue))
1✔
273
    t.daemon = True
1✔
274
    t.start()
1✔
275

276
    try:
1✔
277
        yield port_queue.get(block=True)
1✔
278
    finally:
279
        shutdown_queue.put(True)
1✔
280
        t.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc