• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20438429929

22 Dec 2025 04:55PM UTC coverage: 80.287% (+0.003%) from 80.284%
20438429929

Pull #22934

github

web-flow
Merge b49c09e21 into 06f105be8
Pull Request #22934: feat(go): add multi-module support to golangci-lint plugin and upgrade to v2

37 of 62 new or added lines in 3 files covered. (59.68%)

183 existing lines in 9 files now uncovered.

78528 of 97809 relevant lines covered (80.29%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.72
/src/python/pants/engine/process.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import dataclasses
12✔
7
import logging
12✔
8
from collections.abc import Iterable, Mapping
12✔
9
from dataclasses import dataclass, field
12✔
10
from enum import Enum
12✔
11
from typing import Literal
12✔
12

13
from pants.engine.engine_aware import SideEffecting
12✔
14
from pants.engine.fs import EMPTY_DIGEST, Digest, FileDigest
12✔
15
from pants.engine.internals.native_engine import (  # noqa: F401
12✔
16
    ProcessExecutionEnvironment as ProcessExecutionEnvironment,
17
)
18
from pants.engine.internals.session import RunId
12✔
19
from pants.engine.platform import Platform
12✔
20
from pants.engine.rules import collect_rules, rule
12✔
21
from pants.option.global_options import KeepSandboxes
12✔
22
from pants.util.frozendict import FrozenDict
12✔
23
from pants.util.logging import LogLevel
12✔
24

25
logger = logging.getLogger(__name__)
12✔
26

27

28
@dataclass(frozen=True)
12✔
29
class ProductDescription:
12✔
30
    value: str
12✔
31

32

33
class ProcessCacheScope(Enum):
12✔
34
    # Cached in all locations, regardless of success or failure.
35
    ALWAYS = "always"
12✔
36
    # Cached in all locations, but only if the process exits successfully.
37
    SUCCESSFUL = "successful"
12✔
38
    # Cached only locally, regardless of success or failure.
39
    LOCAL_ALWAYS = "local_always"
12✔
40

41
    LOCAL_SUCCESSFUL = "local_successful"
12✔
42
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, regardless of
43
    # success vs. failure.
44
    PER_RESTART_ALWAYS = "per_restart_always"
12✔
45
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, and only if
46
    # successful.
47
    PER_RESTART_SUCCESSFUL = "per_restart_successful"
12✔
48
    # Will run once per Session, i.e. once per run of Pants. This happens because the engine
49
    # de-duplicates identical work; the process is neither memoized in memory nor cached to disk.
50
    PER_SESSION = "per_session"
12✔
51

52

53
@dataclass(frozen=True)
12✔
54
class ProcessConcurrency:
12✔
55
    kind: Literal["exactly", "range", "exclusive"]
12✔
56
    min: int | None = None
12✔
57
    max: int | None = None
12✔
58

59
    def __post_init__(self):
12✔
60
        if self.min is not None and self.min < 1:
1✔
61
            raise ValueError(f"min concurrency must be >= 1, got {self.min}")
×
62
        if self.max is not None and self.max < 1:
1✔
63
            raise ValueError(f"max concurrency must be >= 1, got {self.max}")
×
64
        if self.min is not None and self.max is not None and self.min > self.max:
1✔
65
            raise ValueError(
×
66
                f"min concurrency must be <= max concurrency, got {self.min} and {self.max}"
67
            )
68
        if self.kind == "exactly" and self.min != self.max:
1✔
69
            raise ValueError(
×
70
                f"exactly concurrency must have min and max equal, got {self.min} and {self.max}"
71
            )
72

73
    @staticmethod
12✔
74
    def range(max: int, min: int = 1):
12✔
75
        """The amount of parallelism that this process is capable of given its inputs. This value
76
        does not directly set the number of cores allocated to the process: that is computed based
77
        on availability, and provided as a template value in the arguments of the process.
78

79
        When set, a `{pants_concurrency}` variable will be templated into the `argv` of the process.
80

81
        Processes which set this value may be preempted (i.e. canceled and restarted) for a short
82
        period after starting if available resources have changed (because other processes have
83
        started or finished).
84
        """
85
        return ProcessConcurrency("range", min, max)
1✔
86

87
    @staticmethod
12✔
88
    def exactly(count: int):
12✔
89
        """A specific number of cores required to run the process.
90

91
        The process will wait until the specified number of cores are available.
92
        """
93
        return ProcessConcurrency("exactly", count, count)
1✔
94

95
    @staticmethod
12✔
96
    def exclusive():
12✔
97
        """Exclusive access to all cores.
98

99
        No other processes will be scheduled to run while this process is running.
100
        """
101
        return ProcessConcurrency("exclusive")
1✔
102

103

104
@dataclass(frozen=True)
12✔
105
class Process:
12✔
106
    argv: tuple[str, ...]
12✔
107
    description: str = dataclasses.field(compare=False)
12✔
108
    level: LogLevel
12✔
109
    input_digest: Digest
12✔
110
    immutable_input_digests: FrozenDict[str, Digest]
12✔
111
    use_nailgun: tuple[str, ...]
12✔
112
    working_directory: str | None
12✔
113
    env: FrozenDict[str, str]
12✔
114
    append_only_caches: FrozenDict[str, str]
12✔
115
    output_files: tuple[str, ...]
12✔
116
    output_directories: tuple[str, ...]
12✔
117
    timeout_seconds: int | float
12✔
118
    jdk_home: str | None
12✔
119
    execution_slot_variable: str | None
12✔
120
    concurrency_available: int
12✔
121
    concurrency: ProcessConcurrency | None
12✔
122
    cache_scope: ProcessCacheScope
12✔
123
    remote_cache_speculation_delay_millis: int
12✔
124
    attempt: int
12✔
125

126
    def __init__(
12✔
127
        self,
128
        argv: Iterable[str],
129
        *,
130
        description: str,
131
        level: LogLevel = LogLevel.INFO,
132
        input_digest: Digest = EMPTY_DIGEST,
133
        immutable_input_digests: Mapping[str, Digest] | None = None,
134
        use_nailgun: Iterable[str] = (),
135
        working_directory: str | None = None,
136
        env: Mapping[str, str] | None = None,
137
        append_only_caches: Mapping[str, str] | None = None,
138
        output_files: Iterable[str] | None = None,
139
        output_directories: Iterable[str] | None = None,
140
        timeout_seconds: int | float | None = None,
141
        jdk_home: str | None = None,
142
        execution_slot_variable: str | None = None,
143
        concurrency_available: int = 0,
144
        concurrency: ProcessConcurrency | None = None,
145
        cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
146
        remote_cache_speculation_delay_millis: int = 0,
147
        attempt: int = 0,
148
    ) -> None:
149
        """Request to run a subprocess, similar to subprocess.Popen.
150

151
        This process will be hermetic, meaning that it cannot access files and environment variables
152
        that are not explicitly populated. For example, $PATH will not be defined by default, unless
153
        populated through the `env` parameter.
154

155
        Usually, you will want to provide input files/directories via the parameter `input_digest`.
156
        The process will then be able to access these paths through relative paths. If you want to
157
        give multiple input digests, first merge them with `merge_digests()`. Files larger than
158
        512KB will be read-only unless they are globbed as part of either `output_files` or
159
        `output_directories`.
160

161
        Often, you will want to capture the files/directories created in the process. To do this,
162
        you can either set `output_files` or `output_directories`. The specified paths should be
163
        specified relative to the `working_directory`, if any, and will then be used to populate
164
        `output_digest` on the `ProcessResult`. If you want to split up this output digest into
165
        multiple digests, use `digest_subset_to_digest()` on the `output_digest`.
166

167
        To actually run the process, use or `await execute_process(Process(...), **implicitly())`
168
        or `await execute_process_or_raise(**implicitly(Process(...)))`.
169

170
        Example:
171

172
            result = await execute_process_or_raise(
173
                **implicitly(Process(["/bin/echo", "hello world"], description="demo")
174
            )
175
            assert result.stdout == b"hello world"
176
        """
177
        if isinstance(argv, str):
10✔
178
            raise ValueError("argv must be a sequence of strings, but was a single string.")
×
179

180
        object.__setattr__(self, "argv", tuple(argv))
10✔
181
        object.__setattr__(self, "description", description)
10✔
182
        object.__setattr__(self, "level", level)
10✔
183
        object.__setattr__(self, "input_digest", input_digest)
10✔
184
        object.__setattr__(
10✔
185
            self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
186
        )
187
        object.__setattr__(self, "use_nailgun", tuple(use_nailgun))
10✔
188
        object.__setattr__(self, "working_directory", working_directory)
10✔
189
        object.__setattr__(self, "env", FrozenDict(env or {}))
10✔
190
        object.__setattr__(self, "append_only_caches", FrozenDict(append_only_caches or {}))
10✔
191
        object.__setattr__(self, "output_files", tuple(output_files or ()))
10✔
192
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
10✔
193
        # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust.
194
        object.__setattr__(
10✔
195
            self,
196
            "timeout_seconds",
197
            timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1,
198
        )
199
        object.__setattr__(self, "jdk_home", jdk_home)
10✔
200
        object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
10✔
201
        object.__setattr__(self, "concurrency_available", concurrency_available)
10✔
202
        object.__setattr__(self, "concurrency", concurrency)
10✔
203
        object.__setattr__(self, "cache_scope", cache_scope)
10✔
204
        object.__setattr__(
10✔
205
            self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
206
        )
207
        object.__setattr__(self, "attempt", attempt)
10✔
208

209
    def __post_init__(self) -> None:
12✔
210
        if self.concurrency_available and self.concurrency:
×
211
            raise ValueError(
×
212
                "Cannot specify both concurrency_available and concurrency. "
213
                "Only one concurrency setting may be used at a time."
214
            )
215

216

217
@dataclass(frozen=True)
12✔
218
class ProcessWithRetries:
12✔
219
    proc: Process
12✔
220
    attempts: int
12✔
221

222

223
@dataclass(frozen=True)
12✔
224
class ProcessResult:
12✔
225
    """Result of executing a process which should not fail.
226

227
    If the process has a non-zero exit code, this will raise an exception, unlike
228
    FallibleProcessResult.
229
    """
230

231
    stdout: bytes
12✔
232
    stdout_digest: FileDigest
12✔
233
    stderr: bytes
12✔
234
    stderr_digest: FileDigest
12✔
235
    output_digest: Digest
12✔
236
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
12✔
237

238
    @property
12✔
239
    def platform(self) -> Platform:
12✔
240
        return self.metadata.platform
×
241

242

243
@dataclass(frozen=True)
12✔
244
class FallibleProcessResult:
12✔
245
    """Result of executing a process which might fail.
246

247
    If the process has a non-zero exit code, this will not raise an exception, unlike ProcessResult.
248
    """
249

250
    stdout: bytes
12✔
251
    stdout_digest: FileDigest
12✔
252
    stderr: bytes
12✔
253
    stderr_digest: FileDigest
12✔
254
    exit_code: int
12✔
255
    output_digest: Digest
12✔
256
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
12✔
257

258
    @property
12✔
259
    def platform(self) -> Platform:
12✔
260
        return self.metadata.platform
1✔
261

262

263
@dataclass(frozen=True)
12✔
264
class ProcessResultWithRetries:
12✔
265
    results: tuple[FallibleProcessResult, ...]
12✔
266

267
    @property
12✔
268
    def last(self):
12✔
269
        return self.results[-1]
×
270

271

272
@dataclass(frozen=True)
12✔
273
class ProcessResultMetadata:
12✔
274
    """Metadata for a ProcessResult, which is not included in its definition of equality."""
275

276
    class Source(Enum):
12✔
277
        """This is public API as these values are part of the test result report file."""
278

279
        RAN = "ran"
12✔
280
        HIT_LOCALLY = "hit_locally"
12✔
281
        HIT_REMOTELY = "hit_remotely"
12✔
282
        MEMOIZED = "memoized"
12✔
283

284
    # The execution time of the process, in milliseconds, or None if it could not be captured
285
    # (since remote execution does not guarantee its availability).
286
    total_elapsed_ms: int | None
12✔
287
    # The environment that the process ran in (or would have run in, if it was not a cache hit).
288
    execution_environment: ProcessExecutionEnvironment
12✔
289
    # Whether the ProcessResult (when it was created in the attached run_id) came from the local
290
    # or remote cache, or ran locally or remotely. See the `self.source` method.
291
    _source: str
12✔
292
    # The run_id in which a ProcessResult was created. See the `self.source` method.
293
    source_run_id: int
12✔
294

295
    @property
12✔
296
    def platform(self) -> Platform:
12✔
297
        return Platform[self.execution_environment.platform]
1✔
298

299
    def source(self, current_run_id: RunId) -> Source:
12✔
300
        """Given the current run_id, return the calculated "source" of the ProcessResult.
301

302
        If a ProcessResult is consumed in any run_id other than the one it was created in, the
303
        source implicitly becomes memoization, since the result was re-used in a new run without
304
        being recreated.
305
        """
306
        return (
1✔
307
            self.Source(self._source)
308
            if self.source_run_id == current_run_id
309
            else self.Source.MEMOIZED
310
        )
311

312

313
class ProcessExecutionFailure(Exception):
12✔
314
    """Used to denote that a process exited, but was unsuccessful in some way.
315

316
    For example, exiting with a non-zero code.
317
    """
318

319
    def __init__(
12✔
320
        self,
321
        exit_code: int,
322
        stdout: bytes,
323
        stderr: bytes,
324
        process_description: str,
325
        *,
326
        keep_sandboxes: KeepSandboxes,
327
    ) -> None:
328
        # These are intentionally "public" members.
329
        self.exit_code = exit_code
1✔
330
        self.stdout = stdout
1✔
331
        self.stderr = stderr
1✔
332

333
        def try_decode(content: bytes) -> str:
1✔
334
            try:
1✔
335
                return content.decode()
1✔
336
            except ValueError:
×
337
                content_repr = repr(stdout)
×
338
                return f"{content_repr[:256]}..." if len(content_repr) > 256 else content_repr
×
339

340
        # NB: We don't use dedent on a single format string here because it would attempt to
341
        # interpret the stdio content.
342
        err_strings = [
1✔
343
            f"Process '{process_description}' failed with exit code {exit_code}.",
344
            "stdout:",
345
            try_decode(stdout),
346
            "stderr:",
347
            try_decode(stderr),
348
        ]
349
        if keep_sandboxes == KeepSandboxes.never:
1✔
350
            err_strings.append(
1✔
351
                "\n\nUse `--keep-sandboxes=on_failure` to preserve the process chroot for inspection."
352
            )
353
        super().__init__("\n".join(err_strings))
1✔
354

355
    @classmethod
12✔
356
    def from_result(
12✔
357
        cls, result: FallibleProcessResult, description: str, keep_sandboxes: KeepSandboxes
358
    ) -> ProcessExecutionFailure:
359
        return cls(
×
360
            result.exit_code,
361
            result.stdout,
362
            result.stderr,
363
            description,
364
            keep_sandboxes=keep_sandboxes,
365
        )
366

367

368
@rule
12✔
369
async def get_multi_platform_request_description(req: Process) -> ProductDescription:
12✔
370
    return ProductDescription(req.description)
×
371

372

373
@rule
12✔
374
async def fallible_to_exec_result_or_raise(
12✔
375
    fallible_result: FallibleProcessResult,
376
    description: ProductDescription,
377
    keep_sandboxes: KeepSandboxes,
378
) -> ProcessResult:
379
    """Converts a FallibleProcessResult to a ProcessResult or raises an error."""
380

381
    if fallible_result.exit_code == 0:
×
382
        return ProcessResult(
×
383
            stdout=fallible_result.stdout,
384
            stdout_digest=fallible_result.stdout_digest,
385
            stderr=fallible_result.stderr,
386
            stderr_digest=fallible_result.stderr_digest,
387
            output_digest=fallible_result.output_digest,
388
            metadata=fallible_result.metadata,
389
        )
390
    raise ProcessExecutionFailure(
×
391
        fallible_result.exit_code,
392
        fallible_result.stdout,
393
        fallible_result.stderr,
394
        description.value,
395
        keep_sandboxes=keep_sandboxes,
396
    )
397

398

399
# fallible_to_exec_result_or_raise directly converts a FallibleProcessResult
400
# to a ProcessResult, or raises an exception if the process failed.
401
# Its name makes sense when you already have a FallibleProcessResult in hand.
402
#
403
# But, it is common to want to execute a process and automatically raise an exception
404
# on process error. The execute_process_or_raise() alias below facilitates this idiom:
405
#
406
# result = await execute_process_or_raise(
407
#         **implicitly(
408
#             Process(...) # Or something that some other rule can convert to a Process.
409
#         )
410
#     )
411
# Where the execute_process() intrinsic is invoked implicitly to create a FallibleProcessResult.
412
# This is simply a better name for the same rule, when invoked in this use case.
413
execute_process_or_raise = fallible_to_exec_result_or_raise
12✔
414

415

416
@dataclass(frozen=True)
12✔
417
class InteractiveProcessResult:
12✔
418
    exit_code: int
12✔
419

420

421
@dataclass(frozen=True)
12✔
422
class InteractiveProcess(SideEffecting):
12✔
423
    # NB: Although InteractiveProcess supports only some of the features of Process, we construct an
424
    # underlying Process instance to improve code reuse.
425
    process: Process
12✔
426
    run_in_workspace: bool
12✔
427
    forward_signals_to_process: bool
12✔
428
    restartable: bool
12✔
429
    keep_sandboxes: KeepSandboxes
12✔
430

431
    def __init__(
12✔
432
        self,
433
        argv: Iterable[str],
434
        *,
435
        env: Mapping[str, str] | None = None,
436
        description: str = "Interactive process",
437
        input_digest: Digest = EMPTY_DIGEST,
438
        run_in_workspace: bool = False,
439
        forward_signals_to_process: bool = True,
440
        restartable: bool = False,
441
        append_only_caches: Mapping[str, str] | None = None,
442
        immutable_input_digests: Mapping[str, Digest] | None = None,
443
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
444
        working_directory: str | None = None,
445
    ) -> None:
446
        """Request to run a subprocess in the foreground, similar to subprocess.run().
447

448
        Unlike `Process`, the result will not be cached.
449

450
        To run the process, use `await run_interactive_process(InteractiveProcess(..))`
451
        in a `@goal_rule`.
452

453
        `forward_signals_to_process` controls whether pants will allow a SIGINT signal
454
        sent to a process by hitting Ctrl-C in the terminal to actually reach the process,
455
        or capture that signal itself, blocking it from the process.
456
        """
457
        object.__setattr__(
9✔
458
            self,
459
            "process",
460
            Process(
461
                argv,
462
                description=description,
463
                env=env,
464
                input_digest=input_digest,
465
                append_only_caches=append_only_caches,
466
                immutable_input_digests=immutable_input_digests,
467
                working_directory=working_directory,
468
            ),
469
        )
470
        object.__setattr__(self, "run_in_workspace", run_in_workspace)
9✔
471
        object.__setattr__(self, "forward_signals_to_process", forward_signals_to_process)
9✔
472
        object.__setattr__(self, "restartable", restartable)
9✔
473
        object.__setattr__(self, "keep_sandboxes", keep_sandboxes)
9✔
474

475
    @classmethod
12✔
476
    def from_process(
12✔
477
        cls,
478
        process: Process,
479
        *,
480
        forward_signals_to_process: bool = True,
481
        restartable: bool = False,
482
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
483
    ) -> InteractiveProcess:
UNCOV
484
        return InteractiveProcess(
×
485
            argv=process.argv,
486
            env=process.env,
487
            description=process.description,
488
            input_digest=process.input_digest,
489
            forward_signals_to_process=forward_signals_to_process,
490
            restartable=restartable,
491
            append_only_caches=process.append_only_caches,
492
            immutable_input_digests=process.immutable_input_digests,
493
            keep_sandboxes=keep_sandboxes,
494
            working_directory=process.working_directory,
495
        )
496

497

498
def rules():
12✔
499
    return collect_rules()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc