• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.01
/src/python/pants/engine/process.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import dataclasses
1✔
7
import logging
1✔
8
from collections.abc import Iterable, Mapping
1✔
9
from dataclasses import dataclass, field
1✔
10
from enum import Enum
1✔
11
from typing import Literal
1✔
12

13
from pants.engine.engine_aware import SideEffecting
1✔
14
from pants.engine.fs import EMPTY_DIGEST, Digest, FileDigest
1✔
15
from pants.engine.internals.native_engine import (  # noqa: F401
1✔
16
    ProcessExecutionEnvironment as ProcessExecutionEnvironment,
17
)
18
from pants.engine.internals.session import RunId
1✔
19
from pants.engine.platform import Platform
1✔
20
from pants.engine.rules import collect_rules, rule
1✔
21
from pants.option.global_options import KeepSandboxes
1✔
22
from pants.util.frozendict import FrozenDict
1✔
23
from pants.util.logging import LogLevel
1✔
24

25
logger = logging.getLogger(__name__)
1✔
26

27

28
@dataclass(frozen=True)
1✔
29
class ProductDescription:
1✔
30
    value: str
1✔
31

32

33
class ProcessCacheScope(Enum):
1✔
34
    # Cached in all locations, regardless of success or failure.
35
    ALWAYS = "always"
1✔
36
    # Cached in all locations, but only if the process exits successfully.
37
    SUCCESSFUL = "successful"
1✔
38
    # Cached only locally, regardless of success or failure.
39
    LOCAL_ALWAYS = "local_always"
1✔
40

41
    LOCAL_SUCCESSFUL = "local_successful"
1✔
42
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, regardless of
43
    # success vs. failure.
44
    PER_RESTART_ALWAYS = "per_restart_always"
1✔
45
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, and only if
46
    # successful.
47
    PER_RESTART_SUCCESSFUL = "per_restart_successful"
1✔
48
    # Will run once per Session, i.e. once per run of Pants. This happens because the engine
49
    # de-duplicates identical work; the process is neither memoized in memory nor cached to disk.
50
    PER_SESSION = "per_session"
1✔
51

52

53
@dataclass(frozen=True)
1✔
54
class ProcessConcurrency:
1✔
55
    kind: Literal["exactly", "range", "exclusive"]
1✔
56
    min: int | None = None
1✔
57
    max: int | None = None
1✔
58

59
    def __post_init__(self):
1✔
UNCOV
60
        if self.min is not None and self.min < 1:
×
61
            raise ValueError(f"min concurrency must be >= 1, got {self.min}")
×
UNCOV
62
        if self.max is not None and self.max < 1:
×
63
            raise ValueError(f"max concurrency must be >= 1, got {self.max}")
×
UNCOV
64
        if self.min is not None and self.max is not None and self.min > self.max:
×
65
            raise ValueError(
×
66
                f"min concurrency must be <= max concurrency, got {self.min} and {self.max}"
67
            )
UNCOV
68
        if self.kind == "exactly" and self.min != self.max:
×
69
            raise ValueError(
×
70
                f"exactly concurrency must have min and max equal, got {self.min} and {self.max}"
71
            )
72

73
    @staticmethod
1✔
74
    def range(max: int, min: int = 1):
1✔
75
        """The amount of parallelism that this process is capable of given its inputs. This value
76
        does not directly set the number of cores allocated to the process: that is computed based
77
        on availability, and provided as a template value in the arguments of the process.
78

79
        When set, a `{pants_concurrency}` variable will be templated into the `argv` of the process.
80

81
        Processes which set this value may be preempted (i.e. canceled and restarted) for a short
82
        period after starting if available resources have changed (because other processes have
83
        started or finished).
84
        """
UNCOV
85
        return ProcessConcurrency("range", min, max)
×
86

87
    @staticmethod
1✔
88
    def exactly(count: int):
1✔
89
        """A specific number of cores required to run the process.
90

91
        The process will wait until the specified number of cores are available.
92
        """
UNCOV
93
        return ProcessConcurrency("exactly", count, count)
×
94

95
    @staticmethod
1✔
96
    def exclusive():
1✔
97
        """Exclusive access to all cores.
98

99
        No other processes will be scheduled to run while this process is running.
100
        """
UNCOV
101
        return ProcessConcurrency("exclusive")
×
102

103

104
@dataclass(frozen=True)
1✔
105
class Process:
1✔
106
    argv: tuple[str, ...]
1✔
107
    description: str = dataclasses.field(compare=False)
1✔
108
    level: LogLevel
1✔
109
    input_digest: Digest
1✔
110
    immutable_input_digests: FrozenDict[str, Digest]
1✔
111
    use_nailgun: tuple[str, ...]
1✔
112
    working_directory: str | None
1✔
113
    env: FrozenDict[str, str]
1✔
114
    append_only_caches: FrozenDict[str, str]
1✔
115
    output_files: tuple[str, ...]
1✔
116
    output_directories: tuple[str, ...]
1✔
117
    timeout_seconds: int | float
1✔
118
    jdk_home: str | None
1✔
119
    execution_slot_variable: str | None
1✔
120
    concurrency_available: int
1✔
121
    concurrency: ProcessConcurrency | None
1✔
122
    cache_scope: ProcessCacheScope
1✔
123
    remote_cache_speculation_delay_millis: int
1✔
124
    attempt: int
1✔
125

126
    def __init__(
1✔
127
        self,
128
        argv: Iterable[str],
129
        *,
130
        description: str,
131
        level: LogLevel = LogLevel.INFO,
132
        input_digest: Digest = EMPTY_DIGEST,
133
        immutable_input_digests: Mapping[str, Digest] | None = None,
134
        use_nailgun: Iterable[str] = (),
135
        working_directory: str | None = None,
136
        env: Mapping[str, str] | None = None,
137
        append_only_caches: Mapping[str, str] | None = None,
138
        output_files: Iterable[str] | None = None,
139
        output_directories: Iterable[str] | None = None,
140
        timeout_seconds: int | float | None = None,
141
        jdk_home: str | None = None,
142
        execution_slot_variable: str | None = None,
143
        concurrency_available: int = 0,
144
        concurrency: ProcessConcurrency | None = None,
145
        cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
146
        remote_cache_speculation_delay_millis: int = 0,
147
        attempt: int = 0,
148
    ) -> None:
149
        """Request to run a subprocess, similar to subprocess.Popen.
150

151
        This process will be hermetic, meaning that it cannot access files and environment variables
152
        that are not explicitly populated. For example, $PATH will not be defined by default, unless
153
        populated through the `env` parameter.
154

155
        Usually, you will want to provide input files/directories via the parameter `input_digest`.
156
        The process will then be able to access these paths through relative paths. If you want to
157
        give multiple input digests, first merge them with `merge_digests()`. Files larger than
158
        512KB will be read-only unless they are globbed as part of either `output_files` or
159
        `output_directories`.
160

161
        Often, you will want to capture the files/directories created in the process. To do this,
162
        you can either set `output_files` or `output_directories`. The specified paths should be
163
        specified relative to the `working_directory`, if any, and will then be used to populate
164
        `output_digest` on the `ProcessResult`. If you want to split up this output digest into
165
        multiple digests, use `digest_subset_to_digest()` on the `output_digest`.
166

167
        To actually run the process, use or `await execute_process(Process(...), **implicitly())`
168
        or `await execute_process_or_raise(**implicitly(Process(...)))`.
169

170
        Example:
171

172
            result = await execute_process_or_raise(
173
                **implicitly(Process(["/bin/echo", "hello world"], description="demo")
174
            )
175
            assert result.stdout == b"hello world"
176
        """
UNCOV
177
        if isinstance(argv, str):
×
178
            raise ValueError("argv must be a sequence of strings, but was a single string.")
×
179

UNCOV
180
        object.__setattr__(self, "argv", tuple(argv))
×
UNCOV
181
        object.__setattr__(self, "description", description)
×
UNCOV
182
        object.__setattr__(self, "level", level)
×
UNCOV
183
        object.__setattr__(self, "input_digest", input_digest)
×
UNCOV
184
        object.__setattr__(
×
185
            self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
186
        )
UNCOV
187
        object.__setattr__(self, "use_nailgun", tuple(use_nailgun))
×
UNCOV
188
        object.__setattr__(self, "working_directory", working_directory)
×
UNCOV
189
        object.__setattr__(self, "env", FrozenDict(env or {}))
×
UNCOV
190
        object.__setattr__(self, "append_only_caches", FrozenDict(append_only_caches or {}))
×
UNCOV
191
        object.__setattr__(self, "output_files", tuple(output_files or ()))
×
UNCOV
192
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
×
193
        # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust.
UNCOV
194
        object.__setattr__(
×
195
            self,
196
            "timeout_seconds",
197
            timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1,
198
        )
UNCOV
199
        object.__setattr__(self, "jdk_home", jdk_home)
×
UNCOV
200
        object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
×
UNCOV
201
        object.__setattr__(self, "concurrency_available", concurrency_available)
×
UNCOV
202
        object.__setattr__(self, "concurrency", concurrency)
×
UNCOV
203
        object.__setattr__(self, "cache_scope", cache_scope)
×
UNCOV
204
        object.__setattr__(
×
205
            self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
206
        )
UNCOV
207
        object.__setattr__(self, "attempt", attempt)
×
208

209
    def __post_init__(self) -> None:
1✔
210
        if self.concurrency_available and self.concurrency:
×
211
            raise ValueError(
×
212
                "Cannot specify both concurrency_available and concurrency. "
213
                "Only one concurrency setting may be used at a time."
214
            )
215

216

217
@dataclass(frozen=True)
1✔
218
class ProcessWithRetries:
1✔
219
    proc: Process
1✔
220
    attempts: int
1✔
221

222

223
@dataclass(frozen=True)
1✔
224
class ProcessResult:
1✔
225
    """Result of executing a process which should not fail.
226

227
    If the process has a non-zero exit code, this will raise an exception, unlike
228
    FallibleProcessResult.
229
    """
230

231
    stdout: bytes
1✔
232
    stdout_digest: FileDigest
1✔
233
    stderr: bytes
1✔
234
    stderr_digest: FileDigest
1✔
235
    output_digest: Digest
1✔
236
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
1✔
237

238
    @property
1✔
239
    def platform(self) -> Platform:
1✔
240
        return self.metadata.platform
×
241

242

243
@dataclass(frozen=True)
1✔
244
class FallibleProcessResult:
1✔
245
    """Result of executing a process which might fail.
246

247
    If the process has a non-zero exit code, this will not raise an exception, unlike ProcessResult.
248
    """
249

250
    stdout: bytes
1✔
251
    stdout_digest: FileDigest
1✔
252
    stderr: bytes
1✔
253
    stderr_digest: FileDigest
1✔
254
    exit_code: int
1✔
255
    output_digest: Digest
1✔
256
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
1✔
257

258
    @property
1✔
259
    def platform(self) -> Platform:
1✔
UNCOV
260
        return self.metadata.platform
×
261

262

263
@dataclass(frozen=True)
1✔
264
class ProcessResultWithRetries:
1✔
265
    results: tuple[FallibleProcessResult, ...]
1✔
266

267
    @property
1✔
268
    def last(self):
1✔
269
        return self.results[-1]
×
270

271

272
@dataclass(frozen=True)
1✔
273
class ProcessResultMetadata:
1✔
274
    """Metadata for a ProcessResult, which is not included in its definition of equality."""
275

276
    class Source(Enum):
1✔
277
        """This is public API as these values are part of the test result report file."""
278

279
        RAN = "ran"
1✔
280
        HIT_LOCALLY = "hit_locally"
1✔
281
        HIT_REMOTELY = "hit_remotely"
1✔
282
        MEMOIZED = "memoized"
1✔
283

284
    # The execution time of the process, in milliseconds, or None if it could not be captured
285
    # (since remote execution does not guarantee its availability).
286
    total_elapsed_ms: int | None
1✔
287
    # The environment that the process ran in (or would have run in, if it was not a cache hit).
288
    execution_environment: ProcessExecutionEnvironment
1✔
289
    # Whether the ProcessResult (when it was created in the attached run_id) came from the local
290
    # or remote cache, or ran locally or remotely. See the `self.source` method.
291
    _source: str
1✔
292
    # The run_id in which a ProcessResult was created. See the `self.source` method.
293
    source_run_id: int
1✔
294

295
    @property
1✔
296
    def platform(self) -> Platform:
1✔
UNCOV
297
        return Platform[self.execution_environment.platform]
×
298

299
    def source(self, current_run_id: RunId) -> Source:
1✔
300
        """Given the current run_id, return the calculated "source" of the ProcessResult.
301

302
        If a ProcessResult is consumed in any run_id other than the one it was created in, the
303
        source implicitly becomes memoization, since the result was re-used in a new run without
304
        being recreated.
305
        """
UNCOV
306
        return (
×
307
            self.Source(self._source)
308
            if self.source_run_id == current_run_id
309
            else self.Source.MEMOIZED
310
        )
311

312

313
class ProcessExecutionFailure(Exception):
1✔
314
    """Used to denote that a process exited, but was unsuccessful in some way.
315

316
    For example, exiting with a non-zero code.
317
    """
318

319
    def __init__(
1✔
320
        self,
321
        exit_code: int,
322
        stdout: bytes,
323
        stderr: bytes,
324
        process_description: str,
325
        *,
326
        keep_sandboxes: KeepSandboxes,
327
    ) -> None:
328
        # These are intentionally "public" members.
UNCOV
329
        self.exit_code = exit_code
×
UNCOV
330
        self.stdout = stdout
×
UNCOV
331
        self.stderr = stderr
×
332

UNCOV
333
        def try_decode(content: bytes) -> str:
×
UNCOV
334
            try:
×
UNCOV
335
                return content.decode()
×
336
            except ValueError:
×
337
                content_repr = repr(stdout)
×
338
                return f"{content_repr[:256]}..." if len(content_repr) > 256 else content_repr
×
339

340
        # NB: We don't use dedent on a single format string here because it would attempt to
341
        # interpret the stdio content.
UNCOV
342
        err_strings = [
×
343
            f"Process '{process_description}' failed with exit code {exit_code}.",
344
            "stdout:",
345
            try_decode(stdout),
346
            "stderr:",
347
            try_decode(stderr),
348
        ]
UNCOV
349
        if keep_sandboxes == KeepSandboxes.never:
×
UNCOV
350
            err_strings.append(
×
351
                "\n\nUse `--keep-sandboxes=on_failure` to preserve the process chroot for inspection."
352
            )
UNCOV
353
        super().__init__("\n".join(err_strings))
×
354

355
    @classmethod
1✔
356
    def from_result(
1✔
357
        cls, result: FallibleProcessResult, description: str, keep_sandboxes: KeepSandboxes
358
    ) -> ProcessExecutionFailure:
359
        return cls(
×
360
            result.exit_code,
361
            result.stdout,
362
            result.stderr,
363
            description,
364
            keep_sandboxes=keep_sandboxes,
365
        )
366

367

368
@rule
1✔
369
async def get_multi_platform_request_description(req: Process) -> ProductDescription:
1✔
370
    return ProductDescription(req.description)
×
371

372

373
@rule
1✔
374
async def fallible_to_exec_result_or_raise(
1✔
375
    fallible_result: FallibleProcessResult,
376
    description: ProductDescription,
377
    keep_sandboxes: KeepSandboxes,
378
) -> ProcessResult:
379
    """Converts a FallibleProcessResult to a ProcessResult or raises an error."""
380

381
    if fallible_result.exit_code == 0:
×
382
        return ProcessResult(
×
383
            stdout=fallible_result.stdout,
384
            stdout_digest=fallible_result.stdout_digest,
385
            stderr=fallible_result.stderr,
386
            stderr_digest=fallible_result.stderr_digest,
387
            output_digest=fallible_result.output_digest,
388
            metadata=fallible_result.metadata,
389
        )
390
    raise ProcessExecutionFailure(
×
391
        fallible_result.exit_code,
392
        fallible_result.stdout,
393
        fallible_result.stderr,
394
        description.value,
395
        keep_sandboxes=keep_sandboxes,
396
    )
397

398

399
# fallible_to_exec_result_or_raise directly converts a FallibleProcessResult
400
# to a ProcessResult, or raises an exception if the process failed.
401
# Its name makes sense when you already have a FallibleProcessResult in hand.
402
#
403
# But, it is common to want to execute a process and automatically raise an exception
404
# on process error. The execute_process_or_raise() alias below facilitates this idiom:
405
#
406
# result = await execute_process_or_raise(
407
#         **implicitly(
408
#             Process(...) # Or something that some other rule can convert to a Process.
409
#         )
410
#     )
411
# Where the execute_process() intrinsic is invoked implicitly to create a FallibleProcessResult.
412
# This is simply a better name for the same rule, when invoked in this use case.
413
execute_process_or_raise = fallible_to_exec_result_or_raise
1✔
414

415

416
@dataclass(frozen=True)
1✔
417
class InteractiveProcessResult:
1✔
418
    exit_code: int
1✔
419

420

421
@dataclass(frozen=True)
1✔
422
class InteractiveProcess(SideEffecting):
1✔
423
    # NB: Although InteractiveProcess supports only some of the features of Process, we construct an
424
    # underlying Process instance to improve code reuse.
425
    process: Process
1✔
426
    run_in_workspace: bool
1✔
427
    forward_signals_to_process: bool
1✔
428
    restartable: bool
1✔
429
    keep_sandboxes: KeepSandboxes
1✔
430

431
    def __init__(
1✔
432
        self,
433
        argv: Iterable[str],
434
        *,
435
        env: Mapping[str, str] | None = None,
436
        description: str = "Interactive process",
437
        input_digest: Digest = EMPTY_DIGEST,
438
        run_in_workspace: bool = False,
439
        forward_signals_to_process: bool = True,
440
        restartable: bool = False,
441
        append_only_caches: Mapping[str, str] | None = None,
442
        immutable_input_digests: Mapping[str, Digest] | None = None,
443
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
444
    ) -> None:
445
        """Request to run a subprocess in the foreground, similar to subprocess.run().
446

447
        Unlike `Process`, the result will not be cached.
448

449
        To run the process, use `await run_interactive_process(InteractiveProcess(..))`
450
        in a `@goal_rule`.
451

452
        `forward_signals_to_process` controls whether pants will allow a SIGINT signal
453
        sent to a process by hitting Ctrl-C in the terminal to actually reach the process,
454
        or capture that signal itself, blocking it from the process.
455
        """
UNCOV
456
        object.__setattr__(
×
457
            self,
458
            "process",
459
            Process(
460
                argv,
461
                description=description,
462
                env=env,
463
                input_digest=input_digest,
464
                append_only_caches=append_only_caches,
465
                immutable_input_digests=immutable_input_digests,
466
            ),
467
        )
UNCOV
468
        object.__setattr__(self, "run_in_workspace", run_in_workspace)
×
UNCOV
469
        object.__setattr__(self, "forward_signals_to_process", forward_signals_to_process)
×
UNCOV
470
        object.__setattr__(self, "restartable", restartable)
×
UNCOV
471
        object.__setattr__(self, "keep_sandboxes", keep_sandboxes)
×
472

473
    @classmethod
1✔
474
    def from_process(
1✔
475
        cls,
476
        process: Process,
477
        *,
478
        forward_signals_to_process: bool = True,
479
        restartable: bool = False,
480
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
481
    ) -> InteractiveProcess:
482
        return InteractiveProcess(
×
483
            argv=process.argv,
484
            env=process.env,
485
            description=process.description,
486
            input_digest=process.input_digest,
487
            forward_signals_to_process=forward_signals_to_process,
488
            restartable=restartable,
489
            append_only_caches=process.append_only_caches,
490
            immutable_input_digests=process.immutable_input_digests,
491
            keep_sandboxes=keep_sandboxes,
492
        )
493

494

495
def rules():
1✔
UNCOV
496
    return collect_rules()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc