• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20147226056

11 Dec 2025 08:58PM UTC coverage: 78.827% (-1.5%) from 80.293%
20147226056

push

github

web-flow
Forwarded the `style` and `complete-platform` args from pants.toml to PEX (#22910)

## Context

After Apple switched to the `arm64` architecture, some package
publishers stopped releasing `x86_64` variants of their packages for
`darwin`. As a result, generating a universal lockfile now fails because
no single package version is compatible with both `x86_64` and `arm64`
on `darwin`.

The solution is to use the `--style` and `--complete-platform` flags
with PEX. For example:
```
pex3 lock create \
    --style strict \
    --complete-platform 3rdparty/platforms/manylinux_2_28_aarch64.json \
    --complete-platform 3rdparty/platforms/macosx_26_0_arm64.json \
    -r 3rdparty/python/requirements_pyarrow.txt \
    -o python-pyarrow.lock
```

See the Slack discussion here:
https://pantsbuild.slack.com/archives/C046T6T9U/p1760098582461759

## Reproduction

* `BUILD`
```
python_requirement(
    name="awswrangler",
    requirements=["awswrangler==3.12.1"],
    resolve="awswrangler",
)
```
* Run `pants generate-lockfiles --resolve=awswrangler` on macOS with an
`arm64` CPU
```
pip: ERROR: Cannot install awswrangler==3.12.1 because these package versions have conflicting dependencies.
pip: ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/topics/dependency-resolution/#dealing-with-dependency-conflicts
pip:  
pip:  The conflict is caused by:
pip:      awswrangler 3.12.1 depends on pyarrow<18.0.0 and >=8.0.0; sys_platform == "darwin" and platform_machine == "x86_64"
pip:      awswrangler 3.12.1 depends on pyarrow<21.0.0 and >=18.0.0; sys_platform != "darwin" or platform_machine != "x86_64"
pip:  
pip:  Additionally, some packages in these conflicts have no matching distributions available for your environment:
pip:      pyarrow
pip:  
pip:  To fix this you could try to:
pip:  1. loosen the range of package versions you've specified
pip:  2. remove package versions to allow pip to attempt to solve the dependency conflict
```

## Implementation
... (continued)

77 of 100 new or added lines in 6 files covered. (77.0%)

868 existing lines in 42 files now uncovered.

74471 of 94474 relevant lines covered (78.83%)

3.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.11
/src/python/pants/engine/process.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
11✔
5

6
import dataclasses
11✔
7
import logging
11✔
8
from collections.abc import Iterable, Mapping
11✔
9
from dataclasses import dataclass, field
11✔
10
from enum import Enum
11✔
11
from typing import Literal
11✔
12

13
from pants.engine.engine_aware import SideEffecting
11✔
14
from pants.engine.fs import EMPTY_DIGEST, Digest, FileDigest
11✔
15
from pants.engine.internals.native_engine import (  # noqa: F401
11✔
16
    ProcessExecutionEnvironment as ProcessExecutionEnvironment,
17
)
18
from pants.engine.internals.session import RunId
11✔
19
from pants.engine.platform import Platform
11✔
20
from pants.engine.rules import collect_rules, rule
11✔
21
from pants.option.global_options import KeepSandboxes
11✔
22
from pants.util.frozendict import FrozenDict
11✔
23
from pants.util.logging import LogLevel
11✔
24

25
logger = logging.getLogger(__name__)
11✔
26

27

28
@dataclass(frozen=True)
11✔
29
class ProductDescription:
11✔
30
    value: str
11✔
31

32

33
class ProcessCacheScope(Enum):
11✔
34
    # Cached in all locations, regardless of success or failure.
35
    ALWAYS = "always"
11✔
36
    # Cached in all locations, but only if the process exits successfully.
37
    SUCCESSFUL = "successful"
11✔
38
    # Cached only locally, regardless of success or failure.
39
    LOCAL_ALWAYS = "local_always"
11✔
40

41
    LOCAL_SUCCESSFUL = "local_successful"
11✔
42
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, regardless of
43
    # success vs. failure.
44
    PER_RESTART_ALWAYS = "per_restart_always"
11✔
45
    # Cached only in memory (i.e. memoized in pantsd), but never persistently, and only if
46
    # successful.
47
    PER_RESTART_SUCCESSFUL = "per_restart_successful"
11✔
48
    # Will run once per Session, i.e. once per run of Pants. This happens because the engine
49
    # de-duplicates identical work; the process is neither memoized in memory nor cached to disk.
50
    PER_SESSION = "per_session"
11✔
51

52

53
@dataclass(frozen=True)
11✔
54
class ProcessConcurrency:
11✔
55
    kind: Literal["exactly", "range", "exclusive"]
11✔
56
    min: int | None = None
11✔
57
    max: int | None = None
11✔
58

59
    def __post_init__(self):
11✔
UNCOV
60
        if self.min is not None and self.min < 1:
×
61
            raise ValueError(f"min concurrency must be >= 1, got {self.min}")
×
UNCOV
62
        if self.max is not None and self.max < 1:
×
63
            raise ValueError(f"max concurrency must be >= 1, got {self.max}")
×
UNCOV
64
        if self.min is not None and self.max is not None and self.min > self.max:
×
65
            raise ValueError(
×
66
                f"min concurrency must be <= max concurrency, got {self.min} and {self.max}"
67
            )
UNCOV
68
        if self.kind == "exactly" and self.min != self.max:
×
69
            raise ValueError(
×
70
                f"exactly concurrency must have min and max equal, got {self.min} and {self.max}"
71
            )
72

73
    @staticmethod
11✔
74
    def range(max: int, min: int = 1):
11✔
75
        """The amount of parallelism that this process is capable of given its inputs. This value
76
        does not directly set the number of cores allocated to the process: that is computed based
77
        on availability, and provided as a template value in the arguments of the process.
78

79
        When set, a `{pants_concurrency}` variable will be templated into the `argv` of the process.
80

81
        Processes which set this value may be preempted (i.e. canceled and restarted) for a short
82
        period after starting if available resources have changed (because other processes have
83
        started or finished).
84
        """
UNCOV
85
        return ProcessConcurrency("range", min, max)
×
86

87
    @staticmethod
11✔
88
    def exactly(count: int):
11✔
89
        """A specific number of cores required to run the process.
90

91
        The process will wait until the specified number of cores are available.
92
        """
UNCOV
93
        return ProcessConcurrency("exactly", count, count)
×
94

95
    @staticmethod
11✔
96
    def exclusive():
11✔
97
        """Exclusive access to all cores.
98

99
        No other processes will be scheduled to run while this process is running.
100
        """
UNCOV
101
        return ProcessConcurrency("exclusive")
×
102

103

104
@dataclass(frozen=True)
11✔
105
class Process:
11✔
106
    argv: tuple[str, ...]
11✔
107
    description: str = dataclasses.field(compare=False)
11✔
108
    level: LogLevel
11✔
109
    input_digest: Digest
11✔
110
    immutable_input_digests: FrozenDict[str, Digest]
11✔
111
    use_nailgun: tuple[str, ...]
11✔
112
    working_directory: str | None
11✔
113
    env: FrozenDict[str, str]
11✔
114
    append_only_caches: FrozenDict[str, str]
11✔
115
    output_files: tuple[str, ...]
11✔
116
    output_directories: tuple[str, ...]
11✔
117
    timeout_seconds: int | float
11✔
118
    jdk_home: str | None
11✔
119
    execution_slot_variable: str | None
11✔
120
    concurrency_available: int
11✔
121
    concurrency: ProcessConcurrency | None
11✔
122
    cache_scope: ProcessCacheScope
11✔
123
    remote_cache_speculation_delay_millis: int
11✔
124
    attempt: int
11✔
125

126
    def __init__(
11✔
127
        self,
128
        argv: Iterable[str],
129
        *,
130
        description: str,
131
        level: LogLevel = LogLevel.INFO,
132
        input_digest: Digest = EMPTY_DIGEST,
133
        immutable_input_digests: Mapping[str, Digest] | None = None,
134
        use_nailgun: Iterable[str] = (),
135
        working_directory: str | None = None,
136
        env: Mapping[str, str] | None = None,
137
        append_only_caches: Mapping[str, str] | None = None,
138
        output_files: Iterable[str] | None = None,
139
        output_directories: Iterable[str] | None = None,
140
        timeout_seconds: int | float | None = None,
141
        jdk_home: str | None = None,
142
        execution_slot_variable: str | None = None,
143
        concurrency_available: int = 0,
144
        concurrency: ProcessConcurrency | None = None,
145
        cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
146
        remote_cache_speculation_delay_millis: int = 0,
147
        attempt: int = 0,
148
    ) -> None:
149
        """Request to run a subprocess, similar to subprocess.Popen.
150

151
        This process will be hermetic, meaning that it cannot access files and environment variables
152
        that are not explicitly populated. For example, $PATH will not be defined by default, unless
153
        populated through the `env` parameter.
154

155
        Usually, you will want to provide input files/directories via the parameter `input_digest`.
156
        The process will then be able to access these paths through relative paths. If you want to
157
        give multiple input digests, first merge them with `merge_digests()`. Files larger than
158
        512KB will be read-only unless they are globbed as part of either `output_files` or
159
        `output_directories`.
160

161
        Often, you will want to capture the files/directories created in the process. To do this,
162
        you can either set `output_files` or `output_directories`. The specified paths should be
163
        specified relative to the `working_directory`, if any, and will then be used to populate
164
        `output_digest` on the `ProcessResult`. If you want to split up this output digest into
165
        multiple digests, use `digest_subset_to_digest()` on the `output_digest`.
166

167
        To actually run the process, use or `await execute_process(Process(...), **implicitly())`
168
        or `await execute_process_or_raise(**implicitly(Process(...)))`.
169

170
        Example:
171

172
            result = await execute_process_or_raise(
173
                **implicitly(Process(["/bin/echo", "hello world"], description="demo")
174
            )
175
            assert result.stdout == b"hello world"
176
        """
177
        if isinstance(argv, str):
9✔
178
            raise ValueError("argv must be a sequence of strings, but was a single string.")
×
179

180
        object.__setattr__(self, "argv", tuple(argv))
9✔
181
        object.__setattr__(self, "description", description)
9✔
182
        object.__setattr__(self, "level", level)
9✔
183
        object.__setattr__(self, "input_digest", input_digest)
9✔
184
        object.__setattr__(
9✔
185
            self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
186
        )
187
        object.__setattr__(self, "use_nailgun", tuple(use_nailgun))
9✔
188
        object.__setattr__(self, "working_directory", working_directory)
9✔
189
        object.__setattr__(self, "env", FrozenDict(env or {}))
9✔
190
        object.__setattr__(self, "append_only_caches", FrozenDict(append_only_caches or {}))
9✔
191
        object.__setattr__(self, "output_files", tuple(output_files or ()))
9✔
192
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
9✔
193
        # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust.
194
        object.__setattr__(
9✔
195
            self,
196
            "timeout_seconds",
197
            timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1,
198
        )
199
        object.__setattr__(self, "jdk_home", jdk_home)
9✔
200
        object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
9✔
201
        object.__setattr__(self, "concurrency_available", concurrency_available)
9✔
202
        object.__setattr__(self, "concurrency", concurrency)
9✔
203
        object.__setattr__(self, "cache_scope", cache_scope)
9✔
204
        object.__setattr__(
9✔
205
            self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
206
        )
207
        object.__setattr__(self, "attempt", attempt)
9✔
208

209
    def __post_init__(self) -> None:
11✔
210
        if self.concurrency_available and self.concurrency:
×
211
            raise ValueError(
×
212
                "Cannot specify both concurrency_available and concurrency. "
213
                "Only one concurrency setting may be used at a time."
214
            )
215

216

217
@dataclass(frozen=True)
11✔
218
class ProcessWithRetries:
11✔
219
    proc: Process
11✔
220
    attempts: int
11✔
221

222

223
@dataclass(frozen=True)
11✔
224
class ProcessResult:
11✔
225
    """Result of executing a process which should not fail.
226

227
    If the process has a non-zero exit code, this will raise an exception, unlike
228
    FallibleProcessResult.
229
    """
230

231
    stdout: bytes
11✔
232
    stdout_digest: FileDigest
11✔
233
    stderr: bytes
11✔
234
    stderr_digest: FileDigest
11✔
235
    output_digest: Digest
11✔
236
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
11✔
237

238
    @property
11✔
239
    def platform(self) -> Platform:
11✔
240
        return self.metadata.platform
×
241

242

243
@dataclass(frozen=True)
11✔
244
class FallibleProcessResult:
11✔
245
    """Result of executing a process which might fail.
246

247
    If the process has a non-zero exit code, this will not raise an exception, unlike ProcessResult.
248
    """
249

250
    stdout: bytes
11✔
251
    stdout_digest: FileDigest
11✔
252
    stderr: bytes
11✔
253
    stderr_digest: FileDigest
11✔
254
    exit_code: int
11✔
255
    output_digest: Digest
11✔
256
    metadata: ProcessResultMetadata = field(compare=False, hash=False)
11✔
257

258
    @property
11✔
259
    def platform(self) -> Platform:
11✔
260
        return self.metadata.platform
1✔
261

262

263
@dataclass(frozen=True)
11✔
264
class ProcessResultWithRetries:
11✔
265
    results: tuple[FallibleProcessResult, ...]
11✔
266

267
    @property
11✔
268
    def last(self):
11✔
269
        return self.results[-1]
×
270

271

272
@dataclass(frozen=True)
11✔
273
class ProcessResultMetadata:
11✔
274
    """Metadata for a ProcessResult, which is not included in its definition of equality."""
275

276
    class Source(Enum):
11✔
277
        """This is public API as these values are part of the test result report file."""
278

279
        RAN = "ran"
11✔
280
        HIT_LOCALLY = "hit_locally"
11✔
281
        HIT_REMOTELY = "hit_remotely"
11✔
282
        MEMOIZED = "memoized"
11✔
283

284
    # The execution time of the process, in milliseconds, or None if it could not be captured
285
    # (since remote execution does not guarantee its availability).
286
    total_elapsed_ms: int | None
11✔
287
    # The environment that the process ran in (or would have run in, if it was not a cache hit).
288
    execution_environment: ProcessExecutionEnvironment
11✔
289
    # Whether the ProcessResult (when it was created in the attached run_id) came from the local
290
    # or remote cache, or ran locally or remotely. See the `self.source` method.
291
    _source: str
11✔
292
    # The run_id in which a ProcessResult was created. See the `self.source` method.
293
    source_run_id: int
11✔
294

295
    @property
11✔
296
    def platform(self) -> Platform:
11✔
297
        return Platform[self.execution_environment.platform]
1✔
298

299
    def source(self, current_run_id: RunId) -> Source:
11✔
300
        """Given the current run_id, return the calculated "source" of the ProcessResult.
301

302
        If a ProcessResult is consumed in any run_id other than the one it was created in, the
303
        source implicitly becomes memoization, since the result was re-used in a new run without
304
        being recreated.
305
        """
306
        return (
1✔
307
            self.Source(self._source)
308
            if self.source_run_id == current_run_id
309
            else self.Source.MEMOIZED
310
        )
311

312

313
class ProcessExecutionFailure(Exception):
11✔
314
    """Used to denote that a process exited, but was unsuccessful in some way.
315

316
    For example, exiting with a non-zero code.
317
    """
318

319
    def __init__(
11✔
320
        self,
321
        exit_code: int,
322
        stdout: bytes,
323
        stderr: bytes,
324
        process_description: str,
325
        *,
326
        keep_sandboxes: KeepSandboxes,
327
    ) -> None:
328
        # These are intentionally "public" members.
329
        self.exit_code = exit_code
1✔
330
        self.stdout = stdout
1✔
331
        self.stderr = stderr
1✔
332

333
        def try_decode(content: bytes) -> str:
1✔
334
            try:
1✔
335
                return content.decode()
1✔
336
            except ValueError:
×
337
                content_repr = repr(stdout)
×
338
                return f"{content_repr[:256]}..." if len(content_repr) > 256 else content_repr
×
339

340
        # NB: We don't use dedent on a single format string here because it would attempt to
341
        # interpret the stdio content.
342
        err_strings = [
1✔
343
            f"Process '{process_description}' failed with exit code {exit_code}.",
344
            "stdout:",
345
            try_decode(stdout),
346
            "stderr:",
347
            try_decode(stderr),
348
        ]
349
        if keep_sandboxes == KeepSandboxes.never:
1✔
350
            err_strings.append(
1✔
351
                "\n\nUse `--keep-sandboxes=on_failure` to preserve the process chroot for inspection."
352
            )
353
        super().__init__("\n".join(err_strings))
1✔
354

355
    @classmethod
11✔
356
    def from_result(
11✔
357
        cls, result: FallibleProcessResult, description: str, keep_sandboxes: KeepSandboxes
358
    ) -> ProcessExecutionFailure:
359
        return cls(
×
360
            result.exit_code,
361
            result.stdout,
362
            result.stderr,
363
            description,
364
            keep_sandboxes=keep_sandboxes,
365
        )
366

367

368
@rule
11✔
369
async def get_multi_platform_request_description(req: Process) -> ProductDescription:
11✔
370
    return ProductDescription(req.description)
×
371

372

373
@rule
11✔
374
async def fallible_to_exec_result_or_raise(
11✔
375
    fallible_result: FallibleProcessResult,
376
    description: ProductDescription,
377
    keep_sandboxes: KeepSandboxes,
378
) -> ProcessResult:
379
    """Converts a FallibleProcessResult to a ProcessResult or raises an error."""
380

381
    if fallible_result.exit_code == 0:
×
382
        return ProcessResult(
×
383
            stdout=fallible_result.stdout,
384
            stdout_digest=fallible_result.stdout_digest,
385
            stderr=fallible_result.stderr,
386
            stderr_digest=fallible_result.stderr_digest,
387
            output_digest=fallible_result.output_digest,
388
            metadata=fallible_result.metadata,
389
        )
390
    raise ProcessExecutionFailure(
×
391
        fallible_result.exit_code,
392
        fallible_result.stdout,
393
        fallible_result.stderr,
394
        description.value,
395
        keep_sandboxes=keep_sandboxes,
396
    )
397

398

399
# fallible_to_exec_result_or_raise directly converts a FallibleProcessResult
400
# to a ProcessResult, or raises an exception if the process failed.
401
# Its name makes sense when you already have a FallibleProcessResult in hand.
402
#
403
# But, it is common to want to execute a process and automatically raise an exception
404
# on process error. The execute_process_or_raise() alias below facilitates this idiom:
405
#
406
# result = await execute_process_or_raise(
407
#         **implicitly(
408
#             Process(...) # Or something that some other rule can convert to a Process.
409
#         )
410
#     )
411
# Where the execute_process() intrinsic is invoked implicitly to create a FallibleProcessResult.
412
# This is simply a better name for the same rule, when invoked in this use case.
413
execute_process_or_raise = fallible_to_exec_result_or_raise
11✔
414

415

416
@dataclass(frozen=True)
11✔
417
class InteractiveProcessResult:
11✔
418
    exit_code: int
11✔
419

420

421
@dataclass(frozen=True)
11✔
422
class InteractiveProcess(SideEffecting):
11✔
423
    # NB: Although InteractiveProcess supports only some of the features of Process, we construct an
424
    # underlying Process instance to improve code reuse.
425
    process: Process
11✔
426
    run_in_workspace: bool
11✔
427
    forward_signals_to_process: bool
11✔
428
    restartable: bool
11✔
429
    keep_sandboxes: KeepSandboxes
11✔
430

431
    def __init__(
11✔
432
        self,
433
        argv: Iterable[str],
434
        *,
435
        env: Mapping[str, str] | None = None,
436
        description: str = "Interactive process",
437
        input_digest: Digest = EMPTY_DIGEST,
438
        run_in_workspace: bool = False,
439
        forward_signals_to_process: bool = True,
440
        restartable: bool = False,
441
        append_only_caches: Mapping[str, str] | None = None,
442
        immutable_input_digests: Mapping[str, Digest] | None = None,
443
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
444
        working_directory: str | None = None,
445
    ) -> None:
446
        """Request to run a subprocess in the foreground, similar to subprocess.run().
447

448
        Unlike `Process`, the result will not be cached.
449

450
        To run the process, use `await run_interactive_process(InteractiveProcess(..))`
451
        in a `@goal_rule`.
452

453
        `forward_signals_to_process` controls whether pants will allow a SIGINT signal
454
        sent to a process by hitting Ctrl-C in the terminal to actually reach the process,
455
        or capture that signal itself, blocking it from the process.
456
        """
457
        object.__setattr__(
8✔
458
            self,
459
            "process",
460
            Process(
461
                argv,
462
                description=description,
463
                env=env,
464
                input_digest=input_digest,
465
                append_only_caches=append_only_caches,
466
                immutable_input_digests=immutable_input_digests,
467
                working_directory=working_directory,
468
            ),
469
        )
470
        object.__setattr__(self, "run_in_workspace", run_in_workspace)
8✔
471
        object.__setattr__(self, "forward_signals_to_process", forward_signals_to_process)
8✔
472
        object.__setattr__(self, "restartable", restartable)
8✔
473
        object.__setattr__(self, "keep_sandboxes", keep_sandboxes)
8✔
474

475
    @classmethod
11✔
476
    def from_process(
11✔
477
        cls,
478
        process: Process,
479
        *,
480
        forward_signals_to_process: bool = True,
481
        restartable: bool = False,
482
        keep_sandboxes: KeepSandboxes = KeepSandboxes.never,
483
    ) -> InteractiveProcess:
484
        return InteractiveProcess(
×
485
            argv=process.argv,
486
            env=process.env,
487
            description=process.description,
488
            input_digest=process.input_digest,
489
            forward_signals_to_process=forward_signals_to_process,
490
            restartable=restartable,
491
            append_only_caches=process.append_only_caches,
492
            immutable_input_digests=process.immutable_input_digests,
493
            keep_sandboxes=keep_sandboxes,
494
            working_directory=process.working_directory,
495
        )
496

497

498
def rules():
11✔
499
    return collect_rules()
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc