• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 27047003725

06 Jun 2026 12:19AM UTC coverage: 92.79% (+0.01%) from 92.78%
27047003725

Pull #23400

github

web-flow
Merge 360db9fbe into 95da181f5
Pull Request #23400: Compose a Process out of multiple sequential subprocesses

169 of 176 new or added lines in 3 files covered. (96.02%)

1 existing line in 1 file now uncovered.

93105 of 100339 relevant lines covered (92.79%)

4.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.27
/src/python/pants/engine/composite_process.py
1
# Copyright 2026 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import dataclasses
12✔
7
import logging
12✔
8
import shlex
12✔
9
from collections.abc import Iterable, Mapping
12✔
10
from dataclasses import dataclass
12✔
11

12
from pants.core.util_rules.system_binaries import BashBinary
12✔
13
from pants.engine.fs import EMPTY_DIGEST, Digest
12✔
14
from pants.engine.internals.native_engine import MergeDigests
12✔
15
from pants.engine.intrinsics import merge_digests
12✔
16
from pants.engine.process import Process, ProcessCacheScope, ProcessConcurrency
12✔
17
from pants.engine.rules import collect_rules, rule
12✔
18
from pants.util.frozendict import FrozenDict
12✔
19
from pants.util.logging import LogLevel
12✔
20

21
logger = logging.getLogger(__name__)
12✔
22

23

24
@dataclass(frozen=True)
12✔
25
class Subprocess:
12✔
26
    """One in a list of subprocesses to run sequentially in the same Process invocation."""
27

28
    # The subprocess can either provide argv or a pre-joined command string, but not both.
29
    command: str | None
12✔
30
    argv: Iterable[str]
12✔
31
    input_digest: Digest
12✔
32
    immutable_input_digests: FrozenDict[str, Digest]
12✔
33
    env: FrozenDict[str, str]
12✔
34
    append_only_caches: FrozenDict[str, str]
12✔
35
    output_files: Iterable[str]
12✔
36
    output_directories: Iterable[str]
12✔
37

38
    def __init__(
12✔
39
        self,
40
        *,
41
        command: str | None = None,
42
        argv: Iterable[str] | None = None,
43
        input_digest: Digest = EMPTY_DIGEST,
44
        immutable_input_digests: Mapping[str, Digest] | None = None,
45
        env: Mapping[str, str] | None = None,
46
        append_only_caches: Mapping[str, str] | None = None,
47
        output_files: Iterable[str] | None = None,
48
        output_directories: Iterable[str] | None = None,
49
    ) -> None:
50
        if (command is None and argv is None) or (command is not None and argv is not None):
1✔
51
            raise ValueError("Exactly one of command and argv must be specified.")
1✔
52
        if isinstance(argv, str):
1✔
NEW
53
            raise ValueError("argv must be a sequence of strings, but was a single string.")
×
54
        object.__setattr__(self, "command", command)
1✔
55
        object.__setattr__(self, "argv", tuple(argv or []))
1✔
56
        object.__setattr__(self, "input_digest", input_digest)
1✔
57
        object.__setattr__(
1✔
58
            self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
59
        )
60
        object.__setattr__(self, "env", FrozenDict(env or {}))
1✔
61
        object.__setattr__(self, "append_only_caches", FrozenDict(append_only_caches or {}))
1✔
62
        object.__setattr__(self, "output_files", tuple(output_files or ()))
1✔
63
        object.__setattr__(self, "output_directories", tuple(output_directories or ()))
1✔
64

65
    def get_command(self) -> str:
12✔
66
        if self.command is None:
1✔
67
            return shlex.join(self.argv)
1✔
68
        return self.command
1✔
69

70

71
@dataclass(frozen=True)
12✔
72
class CompositeProcess:
12✔
73
    description: str = dataclasses.field(compare=False)
12✔
74
    level: LogLevel
12✔
75
    subprocesses: tuple[Subprocess, ...]
12✔
76
    use_nailgun: tuple[str, ...]
12✔
77
    working_directory: str | None
12✔
78
    timeout_seconds: int | float
12✔
79
    jdk_home: str | None
12✔
80
    execution_slot_variable: str | None
12✔
81
    concurrency_available: int
12✔
82
    concurrency: ProcessConcurrency | None
12✔
83
    cache_scope: ProcessCacheScope
12✔
84
    remote_cache_speculation_delay_millis: int
12✔
85
    attempt: int
12✔
86

87
    @classmethod
12✔
88
    def from_process(cls, proc: Process) -> CompositeProcess:
12✔
89
        """Create a CompositeProcess from a Process.
90

91
        The returned CompositeProcess will act exactly as the Process would: the Process's
92
        field values will be set on the CompositeProcess's fields or on the fields of its single
93
        Subprocess, as appropriate.
94
        """
NEW
95
        return cls(
×
96
            subprocesses=[
97
                Subprocess(
98
                    argv=proc.argv,
99
                    input_digest=proc.input_digest,
100
                    immutable_input_digests=proc.immutable_input_digests,
101
                    env=proc.env,
102
                    append_only_caches=proc.append_only_caches,
103
                    output_files=proc.output_files,
104
                    output_directories=proc.output_directories,
105
                )
106
            ],
107
            description=proc.description,
108
            level=proc.level,
109
            use_nailgun=proc.use_nailgun,
110
            working_directory=proc.working_directory,
111
            timeout_seconds=proc.timeout_seconds,
112
            jdk_home=proc.jdk_home,
113
            execution_slot_variable=proc.execution_slot_variable,
114
            concurrency_available=proc.concurrency_available,
115
            concurrency=proc.concurrency,
116
            cache_scope=proc.cache_scope,
117
            remote_cache_speculation_delay_millis=proc.remote_cache_speculation_delay_millis,
118
            attempt=proc.attempt,
119
        )
120

121
    def __init__(
12✔
122
        self,
123
        subprocesses: Iterable[Subprocess],
124
        *,
125
        description: str,
126
        level: LogLevel = LogLevel.INFO,
127
        use_nailgun: Iterable[str] = (),
128
        working_directory: str | None = None,
129
        timeout_seconds: int | float | None = None,
130
        jdk_home: str | None = None,
131
        execution_slot_variable: str | None = None,
132
        concurrency_available: int = 0,
133
        concurrency: ProcessConcurrency | None = None,
134
        cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
135
        remote_cache_speculation_delay_millis: int = 0,
136
        attempt: int = 0,
137
    ) -> None:
138
        """A sequence of subprocesses to run serially under a single Process."""
139
        object.__setattr__(self, "subprocesses", tuple(subprocesses))
1✔
140
        object.__setattr__(self, "description", description)
1✔
141
        object.__setattr__(self, "level", level)
1✔
142

143
        object.__setattr__(self, "use_nailgun", tuple(use_nailgun))
1✔
144
        object.__setattr__(self, "working_directory", working_directory)
1✔
145
        # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust.
146
        object.__setattr__(
1✔
147
            self,
148
            "timeout_seconds",
149
            timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1,
150
        )
151
        object.__setattr__(self, "jdk_home", jdk_home)
1✔
152
        object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
1✔
153
        object.__setattr__(self, "concurrency_available", concurrency_available)
1✔
154
        object.__setattr__(self, "concurrency", concurrency)
1✔
155
        object.__setattr__(self, "cache_scope", cache_scope)
1✔
156
        object.__setattr__(
1✔
157
            self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
158
        )
159
        object.__setattr__(self, "attempt", attempt)
1✔
160

161
    def prepend_subprocesses(self, subprocesses: Iterable[Subprocess]) -> CompositeProcess:
12✔
NEW
162
        return dataclasses.replace(self, subprocesses=(*subprocesses, *self.subprocesses))
×
163

164
    def append_subprocesses(self, subprocesses: Iterable[Subprocess]) -> CompositeProcess:
12✔
NEW
165
        return dataclasses.replace(self, subprocesses=(*self.subprocesses, *subprocesses))
×
166

167

168
@rule
12✔
169
async def composite_process_to_process(
12✔
170
    composite_process: CompositeProcess, bash_binary: BashBinary
171
) -> Process:
172
    subprocs = composite_process.subprocesses
1✔
173
    command = "\n".join(subproc.get_command() for subproc in subprocs)
1✔
174
    input_digest = await merge_digests(MergeDigests([subproc.input_digest for subproc in subprocs]))
1✔
175

176
    immutable_input_digests: dict[str, Digest] = {}
1✔
177
    for subproc in subprocs:
1✔
178
        for path, digest in subproc.immutable_input_digests.items():
1✔
NEW
179
            if path in immutable_input_digests and immutable_input_digests[path] != digest:
×
NEW
180
                raise ValueError(
×
181
                    "Multiple Subprocess in the same CompositeProcess had "
182
                    f"immutable_input_digests with the path {path} and different digests"
183
                )
NEW
184
            immutable_input_digests[path] = digest
×
185

186
    env: dict[str, str] = {}
1✔
187
    for subproc in subprocs:
1✔
188
        for name, val in subproc.env.items():
1✔
189
            if name in env and env[name] != val:
1✔
190
                raise ValueError(
1✔
191
                    "Multiple Subprocess in the same CompositeProcess set the env var "
192
                    f"{name}, to different values"
193
                )
194
            env[name] = val
1✔
195

196
    append_only_caches: dict[str, str] = {}
1✔
197
    for subproc in subprocs:
1✔
198
        for cache_name, cache_dir in subproc.append_only_caches.items():
1✔
199
            if cache_name in append_only_caches and append_only_caches[cache_name] != cache_dir:
1✔
200
                raise ValueError(
1✔
201
                    "Multiple Subprocess in the same CompositeProcess had  "
202
                    f"append_only_caches with the name {cache_name} and different values"
203
                )
204
            append_only_caches[cache_name] = cache_dir
1✔
205

206
    return Process(
1✔
207
        argv=(bash_binary.path, "-c", command),
208
        description=composite_process.description,
209
        level=composite_process.level,
210
        input_digest=input_digest,
211
        immutable_input_digests=immutable_input_digests,
212
        use_nailgun=composite_process.use_nailgun,
213
        working_directory=composite_process.working_directory,
214
        env=env,
215
        append_only_caches=append_only_caches,
216
        output_files=[of for subproc in subprocs for of in subproc.output_files],
217
        output_directories=[od for subproc in subprocs for od in subproc.output_directories],
218
        timeout_seconds=composite_process.timeout_seconds,
219
        jdk_home=composite_process.jdk_home,
220
        execution_slot_variable=composite_process.execution_slot_variable,
221
        concurrency_available=composite_process.concurrency_available,
222
        concurrency=composite_process.concurrency,
223
        cache_scope=composite_process.cache_scope,
224
        remote_cache_speculation_delay_millis=composite_process.remote_cache_speculation_delay_millis,
225
        attempt=composite_process.attempt,
226
    )
227

228

229
def rules():
12✔
230
    return collect_rules()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc