• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20974506033

13 Jan 2026 10:14PM UTC coverage: 43.251% (-37.0%) from 80.269%
20974506033

Pull #22976

github

web-flow
Merge a16a40040 into c12556724
Pull Request #22976: WIP: Add the ability to set stdin for a Process

2 of 4 new or added lines in 2 files covered. (50.0%)

17213 existing lines in 540 files now uncovered.

26146 of 60452 relevant lines covered (43.25%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.61
/src/python/pants/engine/process.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import dataclasses
2✔
7
import logging
2✔
8
from collections.abc import Iterable, Mapping
2✔
9
from dataclasses import dataclass, field
2✔
10
from enum import Enum
2✔
11
from typing import Literal
2✔
12

13
from pants.engine.engine_aware import SideEffecting
2✔
14
from pants.engine.fs import EMPTY_DIGEST, Digest, FileDigest
2✔
15
from pants.engine.internals.native_engine import (  # noqa: F401
2✔
16
    ProcessExecutionEnvironment as ProcessExecutionEnvironment,
17
)
18
from pants.engine.internals.session import RunId
2✔
19
from pants.engine.platform import Platform
2✔
20
from pants.engine.rules import collect_rules, rule
2✔
21
from pants.option.global_options import KeepSandboxes
2✔
22
from pants.util.frozendict import FrozenDict
2✔
23
from pants.util.logging import LogLevel
2✔
24

25
logger = logging.getLogger(__name__)
2✔
26

27

28
@dataclass(frozen=True)
2✔
29
class ProductDescription:
2✔
30
    value: str
2✔
31

32

33
class ProcessCacheScope(Enum):
2✔
34
    # Cached in all locations, regardless of success or failure.
35
    ALWAYS = "always"
2✔
36
    # Cached in all locations, but only if the process exits successfully.
37
    SUCCESSFUL = "successful"
2✔
38
    # Cached only locally, regardless of success or failure.
39
    LOCAL_ALWAYS = "local_always"
2✔
40

41
    LOCAL_SUCCESSFUL = "local_successful"
2✔
42
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, regardless of
43
    # success vs. failure.
44
    PER_RESTART_ALWAYS = "per_restart_always"
2✔
45
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, and only if
46
    # successful.
47
    PER_RESTART_SUCCESSFUL = "per_restart_successful"
2✔
48
    # Will run once per Session, i.e. once per run of Pants. This happens because the engine
49
    # de-duplicates identical work; the process is neither memoized in memory nor cached to disk.
50
    PER_SESSION = "per_session"
2✔
51

52

53
@dataclass(frozen=True)
2✔
54
class ProcessConcurrency:
2✔
55
    kind: Literal["exactly", "range", "exclusive"]
2✔
56
    min: int | None = None
2✔
57
    max: int | None = None
2✔
58

59
    def __post_init__(self):
2✔
UNCOV
60
        if self.min is not None and self.min < 1:
×
61
            raise ValueError(f"min concurrency must be >= 1, got {self.min}")
×
UNCOV
62
        if self.max is not None and self.max < 1:
×
63
            raise ValueError(f"max concurrency must be >= 1, got {self.max}")
×
UNCOV
64
        if self.min is not None and self.max is not None and self.min > self.max:
×
65
            raise ValueError(
×
66
                f"min concurrency must be <= max concurrency, got {self.min} and {self.max}"
67
            )
UNCOV
68
        if self.kind == "exactly" and self.min != self.max:
×
69
            raise ValueError(
×
70
                f"exactly concurrency must have min and max equal, got {self.min} and {self.max}"
71
            )
72

73
    @staticmethod
2✔
74
    def range(max: int, min: int = 1):
2✔
75
        """The amount of parallelism that this process is capable of given its inputs. This value
76
        does not directly set the number of cores allocated to the process: that is computed based
77
        on availability, and provided as a template value in the arguments of the process.
78

79
        When set, a `{pants_concurrency}` variable will be templated into the `argv` of the process.
80

81
        Processes which set this value may be preempted (i.e. canceled and restarted) for a short
82
        period after starting if available resources have changed (because other processes have
83
        started or finished).
84
        """
UNCOV
85
        return ProcessConcurrency("range", min, max)
×
86

87
    @staticmethod
2✔
88
    def exactly(count: int):
2✔
89
        """A specific number of cores required to run the process.
90

91
        The process will wait until the specified number of cores are available.
92
        """
UNCOV
93
        return ProcessConcurrency("exactly", count, count)
×
94

95
    @staticmethod
2✔
96
    def exclusive():
2✔
97
        """Exclusive access to all cores.
98

99
        No other processes will be scheduled to run while this process is running.
100
        """
UNCOV
101
        return ProcessConcurrency("exclusive")
×
102

103

104
@dataclass(frozen=True)
2✔
105
class Process:
2✔
106
    argv: tuple[str, ...]
2✔
107
    description: str = dataclasses.field(compare=False)
2✔
108
    level: LogLevel
2✔
109
    input_digest: Digest
2✔
110
    stdin: bytes | None
2✔
111
    immutable_input_digests: FrozenDict[str, Digest]
2✔
112
    use_nailgun: tuple[str, ...]
2✔
113
    working_directory: str | None
2✔
114
    env: FrozenDict[str, str]
2✔
115
    append_only_caches: FrozenDict[str, str]
2✔
116
    output_files: tuple[str, ...]
2✔
117
    output_directories: tuple[str, ...]
2✔
118
    timeout_seconds: int | float
2✔
119
    jdk_home: str | None
2✔
120
    execution_slot_variable: str | None
2✔
121
    concurrency_available: int
2✔
122
    concurrency: ProcessConcurrency | None
2✔
123
    cache_scope: ProcessCacheScope
2✔
124
    remote_cache_speculation_delay_millis: int
2✔
125
    attempt: int
2✔
126

127
    def __init__(
2✔
128
        self,
129
        argv: Iterable[str],
130
        *,
131
        description: str,
132
        level: LogLevel = LogLevel.INFO,
133
        input_digest: Digest = EMPTY_DIGEST,
134
        stdin: bytes | None = None,
135
        immutable_input_digests: Mapping[str, Digest] | None = None,
136
        use_nailgun: Iterable[str] = (),
137
        working_directory: str | None = None,
138
        env: Mapping[str, str] | None = None,
139
        append_only_caches: Mapping[str, str] | None = None,
140
        output_files: Iterable[str] | None = None,
141
        output_directories: Iterable[str] | None = None,
142
        timeout_seconds: int | float | None = None,
143
        jdk_home: str | None = None,
144
        execution_slot_variable: str | None = None,
145
        concurrency_available: int = 0,
146
        concurrency: ProcessConcurrency | None = None,
147
        cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
148
        remote_cache_speculation_delay_millis: int = 0,
149
        attempt: int = 0,
150
    ) -> None:
151
        """Request to run a subprocess, similar to subprocess.Popen.
152

153
        This process will be hermetic, meaning that it cannot access files and environment variables
154
        that are not explicitly populated. For example, $PATH will not be defined by default, unless
155
        populated through the `env` parameter.
156

157
        Usually, you will want to provide input files/directories via the parameter `input_digest`.
158
        The process will then be able to access these paths through relative paths. If you want to
159
        give multiple input digests, first merge them with `merge_digests()`. Files larger than
160
        512KB will be read-only unless they are globbed as part of either `output_files` or
161
        `output_directories`.
162

163
        To provide data to the process's standard input, use the `stdin` parameter with a bytes
164
        object. The bytes will be piped to the process's stdin. Note that stdin is only supported
165
        for local and workspace execution; remote execution does not support stdin and will fail.
166

167
        Often, you will want to capture the files/directories created in the process. To do this,
168
        you can either set `output_files` or `output_directories`. The specified paths should be
169
        specified relative to the `working_directory`, if any, and will then be used to populate
170
        `output_digest` on the `ProcessResult`. If you want to split up this output digest into
171
        multiple digests, use `digest_subset_to_digest()` on the `output_digest`.
172

173
        To actually run the process, use or `await execute_process(Process(...), **implicitly())`
174
        or `await execute_process_or_raise(**implicitly(Process(...)))`.
175

176
        Example:
177

178
            result = await execute_process_or_raise(
179
                **implicitly(Process(["/bin/echo", "hello world"], description="demo")
180
            )
181
            assert result.stdout == b"hello world"
182
        """
183
        if isinstance(argv, str):
2✔
184
            raise ValueError("argv must be a sequence of strings, but was a single string.")
×
185

186
        object.__setattr__(self, "argv", tuple(argv))
2✔
187
        object.__setattr__(self, "description", description)
2✔
188
        object.__setattr__(self, "level", level)
2✔
189
        object.__setattr__(self, "input_digest", input_digest)
2✔
190
        object.__setattr__(self, "stdin", stdin)
2✔
191
        object.__setattr__(
2✔
192
            self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
193
        )
194
        object.__setattr__(self, "use_nailgun", tuple(use_nailgun))
2✔
195
        object.__setattr__(self, "working_directory", working_directory)
2✔
196
        object.__setattr__(self, "env", FrozenDict(env or {}))
2✔
197
        object.__setattr__(self, "append_only_caches", FrozenDict(append_only_caches or {}))
2✔
198
        object.__setattr__(self, "output_files", tuple(output_files or ()))
2✔
199
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
2✔
200
        # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust.
201
        object.__setattr__(
2✔
202
            self,
203
            "timeout_seconds",
204
            timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1,
205
        )
206
        object.__setattr__(self, "jdk_home", jdk_home)
2✔
207
        object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
2✔
208
        object.__setattr__(self, "concurrency_available", concurrency_available)
2✔
209
        object.__setattr__(self, "concurrency", concurrency)
2✔
210
        object.__setattr__(self, "cache_scope", cache_scope)
2✔
211
        object.__setattr__(
2✔
212
            self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
213
        )
214
        object.__setattr__(self, "attempt", attempt)
2✔
215

216
    def __post_init__(self) -> None:
2✔
217
        if self.concurrency_available and self.concurrency:
×
218
            raise ValueError(
×
219
                "Cannot specify both concurrency_available and concurrency. "
220
                "Only one concurrency setting may be used at a time."
221
            )
222

223

224
@dataclass(frozen=True)
2✔
225
class ProcessWithRetries:
2✔
226
    proc: Process
2✔
227
    attempts: int
2✔
228

229

230
@dataclass(frozen=True)
2✔
231
class ProcessResult:
2✔
232
    """Result of executing a process which should not fail.
233

234
    If the process has a non-zero exit code, this will raise an exception, unlike
235
    FallibleProcessResult.
236
    """
237

238
    stdout: bytes
2✔
239
    stdout_digest: FileDigest
2✔
240
    stderr: bytes
2✔
241
    stderr_digest: FileDigest
2✔
242
    output_digest: Digest
2✔
243
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
2✔
244

245
    @property
2✔
246
    def platform(self) -> Platform:
2✔
247
        return self.metadata.platform
×
248

249

250
@dataclass(frozen=True)
2✔
251
class FallibleProcessResult:
2✔
252
    """Result of executing a process which might fail.
253

254
    If the process has a non-zero exit code, this will not raise an exception, unlike ProcessResult.
255
    """
256

257
    stdout: bytes
2✔
258
    stdout_digest: FileDigest
2✔
259
    stderr: bytes
2✔
260
    stderr_digest: FileDigest
2✔
261
    exit_code: int
2✔
262
    output_digest: Digest
2✔
263
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
2✔
264

265
    @property
2✔
266
    def platform(self) -> Platform:
2✔
UNCOV
267
        return self.metadata.platform
×
268

269

270
@dataclass(frozen=True)
2✔
271
class ProcessResultWithRetries:
2✔
272
    results: tuple[FallibleProcessResult, ...]
2✔
273

274
    @property
2✔
275
    def last(self):
2✔
276
        return self.results[-1]
×
277

278

279
@dataclass(frozen=True)
2✔
280
class ProcessResultMetadata:
2✔
281
    """Metadata for a ProcessResult, which is not included in its definition of equality."""
282

283
    class Source(Enum):
2✔
284
        """This is public API as these values are part of the test result report file."""
285

286
        RAN = "ran"
2✔
287
        HIT_LOCALLY = "hit_locally"
2✔
288
        HIT_REMOTELY = "hit_remotely"
2✔
289
        MEMOIZED = "memoized"
2✔
290

291
    # The execution time of the process, in milliseconds, or None if it could not be captured
292
    # (since remote execution does not guarantee its availability).
293
    total_elapsed_ms: int | None
2✔
294
    # The environment that the process ran in (or would have run in, if it was not a cache hit).
295
    execution_environment: ProcessExecutionEnvironment
2✔
296
    # Whether the ProcessResult (when it was created in the attached run_id) came from the local
297
    # or remote cache, or ran locally or remotely. See the `self.source` method.
298
    _source: str
2✔
299
    # The run_id in which a ProcessResult was created. See the `self.source` method.
300
    source_run_id: int
2✔
301

302
    @property
2✔
303
    def platform(self) -> Platform:
2✔
UNCOV
304
        return Platform[self.execution_environment.platform]
×
305

306
    def source(self, current_run_id: RunId) -> Source:
2✔
307
        """Given the current run_id, return the calculated "source" of the ProcessResult.
308

309
        If a ProcessResult is consumed in any run_id other than the one it was created in, the
310
        source implicitly becomes memoization, since the result was re-used in a new run without
311
        being recreated.
312
        """
UNCOV
313
        return (
×
314
            self.Source(self._source)
315
            if self.source_run_id == current_run_id
316
            else self.Source.MEMOIZED
317
        )
318

319

320
class ProcessExecutionFailure(Exception):
2✔
321
    """Used to denote that a process exited, but was unsuccessful in some way.
322

323
    For example, exiting with a non-zero code.
324
    """
325

326
    def __init__(
2✔
327
        self,
328
        exit_code: int,
329
        stdout: bytes,
330
        stderr: bytes,
331
        process_description: str,
332
        *,
333
        keep_sandboxes: KeepSandboxes,
334
    ) -> None:
335
        # These are intentionally "public" members.
UNCOV
336
        self.exit_code = exit_code
×
UNCOV
337
        self.stdout = stdout
×
UNCOV
338
        self.stderr = stderr
×
339

UNCOV
340
        def try_decode(content: bytes) -> str:
×
UNCOV
341
            try:
×
UNCOV
342
                return content.decode()
×
343
            except ValueError:
×
344
                content_repr = repr(stdout)
×
345
                return f"{content_repr[:256]}..." if len(content_repr) > 256 else content_repr
×
346

347
        # NB: We don't use dedent on a single format string here because it would attempt to
348
        # interpret the stdio content.
UNCOV
349
        err_strings = [
×
350
            f"Process '{process_description}' failed with exit code {exit_code}.",
351
            "stdout:",
352
            try_decode(stdout),
353
            "stderr:",
354
            try_decode(stderr),
355
        ]
UNCOV
356
        if keep_sandboxes == KeepSandboxes.never:
×
UNCOV
357
            err_strings.append(
×
358
                "\n\nUse `--keep-sandboxes=on_failure` to preserve the process chroot for inspection."
359
            )
UNCOV
360
        super().__init__("\n".join(err_strings))
×
361

362
    @classmethod
2✔
363
    def from_result(
2✔
364
        cls, result: FallibleProcessResult, description: str, keep_sandboxes: KeepSandboxes
365
    ) -> ProcessExecutionFailure:
366
        return cls(
×
367
            result.exit_code,
368
            result.stdout,
369
            result.stderr,
370
            description,
371
            keep_sandboxes=keep_sandboxes,
372
        )
373

374

375
@rule
2✔
376
async def get_multi_platform_request_description(req: Process) -> ProductDescription:
2✔
377
    return ProductDescription(req.description)
×
378

379

380
@rule
2✔
381
async def fallible_to_exec_result_or_raise(
2✔
382
    fallible_result: FallibleProcessResult,
383
    description: ProductDescription,
384
    keep_sandboxes: KeepSandboxes,
385
) -> ProcessResult:
386
    """Converts a FallibleProcessResult to a ProcessResult or raises an error."""
387

388
    if fallible_result.exit_code == 0:
×
389
        return ProcessResult(
×
390
            stdout=fallible_result.stdout,
391
            stdout_digest=fallible_result.stdout_digest,
392
            stderr=fallible_result.stderr,
393
            stderr_digest=fallible_result.stderr_digest,
394
            output_digest=fallible_result.output_digest,
395
            metadata=fallible_result.metadata,
396
        )
397
    raise ProcessExecutionFailure(
×
398
        fallible_result.exit_code,
399
        fallible_result.stdout,
400
        fallible_result.stderr,
401
        description.value,
402
        keep_sandboxes=keep_sandboxes,
403
    )
404

405

406
# fallible_to_exec_result_or_raise directly converts a FallibleProcessResult
407
# to a ProcessResult, or raises an exception if the process failed.
408
# Its name makes sense when you already have a FallibleProcessResult in hand.
409
#
410
# But, it is common to want to execute a process and automatically raise an exception
411
# on process error. The execute_process_or_raise() alias below facilitates this idiom:
412
#
413
# result = await execute_process_or_raise(
414
#         **implicitly(
415
#             Process(...) # Or something that some other rule can convert to a Process.
416
#         )
417
#     )
418
# Where the execute_process() intrinsic is invoked implicitly to create a FallibleProcessResult.
419
# This is simply a better name for the same rule, when invoked in this use case.
420
execute_process_or_raise = fallible_to_exec_result_or_raise
2✔
421

422

423
@dataclass(frozen=True)
2✔
424
class InteractiveProcessResult:
2✔
425
    exit_code: int
2✔
426

427

428
@dataclass(frozen=True)
2✔
429
class InteractiveProcess(SideEffecting):
2✔
430
    # NB: Although InteractiveProcess supports only some of the features of Process, we construct an
431
    # underlying Process instance to improve code reuse.
432
    process: Process
2✔
433
    run_in_workspace: bool
2✔
434
    forward_signals_to_process: bool
2✔
435
    restartable: bool
2✔
436
    keep_sandboxes: KeepSandboxes
2✔
437

438
    def __init__(
2✔
439
        self,
440
        argv: Iterable[str],
441
        *,
442
        env: Mapping[str, str] | None = None,
443
        description: str = "Interactive process",
444
        input_digest: Digest = EMPTY_DIGEST,
445
        run_in_workspace: bool = False,
446
        forward_signals_to_process: bool = True,
447
        restartable: bool = False,
448
        append_only_caches: Mapping[str, str] | None = None,
449
        immutable_input_digests: Mapping[str, Digest] | None = None,
450
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
451
        working_directory: str | None = None,
452
    ) -> None:
453
        """Request to run a subprocess in the foreground, similar to subprocess.run().
454

455
        Unlike `Process`, the result will not be cached.
456

457
        To run the process, use `await run_interactive_process(InteractiveProcess(..))`
458
        in a `@goal_rule`.
459

460
        `forward_signals_to_process` controls whether pants will allow a SIGINT signal
461
        sent to a process by hitting Ctrl-C in the terminal to actually reach the process,
462
        or capture that signal itself, blocking it from the process.
463
        """
464
        object.__setattr__(
2✔
465
            self,
466
            "process",
467
            Process(
468
                argv,
469
                description=description,
470
                env=env,
471
                input_digest=input_digest,
472
                append_only_caches=append_only_caches,
473
                immutable_input_digests=immutable_input_digests,
474
                working_directory=working_directory,
475
            ),
476
        )
477
        object.__setattr__(self, "run_in_workspace", run_in_workspace)
2✔
478
        object.__setattr__(self, "forward_signals_to_process", forward_signals_to_process)
2✔
479
        object.__setattr__(self, "restartable", restartable)
2✔
480
        object.__setattr__(self, "keep_sandboxes", keep_sandboxes)
2✔
481

482
    @classmethod
2✔
483
    def from_process(
2✔
484
        cls,
485
        process: Process,
486
        *,
487
        forward_signals_to_process: bool = True,
488
        restartable: bool = False,
489
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
490
    ) -> InteractiveProcess:
491
        return InteractiveProcess(
×
492
            argv=process.argv,
493
            env=process.env,
494
            description=process.description,
495
            input_digest=process.input_digest,
496
            forward_signals_to_process=forward_signals_to_process,
497
            restartable=restartable,
498
            append_only_caches=process.append_only_caches,
499
            immutable_input_digests=process.immutable_input_digests,
500
            keep_sandboxes=keep_sandboxes,
501
            working_directory=process.working_directory,
502
        )
503

504

505
def rules():
2✔
506
    return collect_rules()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc