• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20974506033

13 Jan 2026 10:14PM UTC coverage: 43.251% (-37.0%) from 80.269%
20974506033

Pull #22976

github

web-flow
Merge a16a40040 into c12556724
Pull Request #22976: WIP: Add the ability to set stdin for a Process

2 of 4 new or added lines in 2 files covered. (50.0%)

17213 existing lines in 540 files now uncovered.

26146 of 60452 relevant lines covered (43.25%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.25
/src/python/pants/engine/intrinsics.py
1
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import dataclasses
2✔
7
import logging
2✔
8

9
from pants.engine.environment import EnvironmentName
2✔
10
from pants.engine.fs import (
2✔
11
    AddPrefix,
12
    CreateDigest,
13
    Digest,
14
    DigestContents,
15
    DigestEntries,
16
    DigestSubset,
17
    MergeDigests,
18
    NativeDownloadFile,
19
    PathGlobs,
20
    PathMetadataRequest,
21
    PathMetadataResult,
22
    Paths,
23
    RemovePrefix,
24
    Snapshot,
25
)
26
from pants.engine.internals import native_engine
2✔
27
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
2✔
28
from pants.engine.internals.native_dep_inference import (
2✔
29
    NativeParsedDockerfileInfo,
30
    NativeParsedJavascriptDependencies,
31
    NativeParsedPythonDependencies,
32
)
33
from pants.engine.internals.native_engine import NativeDependenciesRequest, task_side_effected
2✔
34
from pants.engine.internals.session import RunId, SessionValues
2✔
35
from pants.engine.process import (
2✔
36
    FallibleProcessResult,
37
    InteractiveProcess,
38
    InteractiveProcessResult,
39
    Process,
40
    ProcessExecutionEnvironment,
41
    ProcessResultWithRetries,
42
    ProcessWithRetries,
43
)
44
from pants.engine.rules import _uncacheable_rule, collect_rules, implicitly, rule
2✔
45
from pants.util.docutil import git_url
2✔
46

47

48
@rule
2✔
49
async def create_digest(
2✔
50
    create_digest: CreateDigest,
51
) -> Digest:
52
    return await native_engine.create_digest(create_digest)
×
53

54

55
@rule
2✔
56
async def path_globs_to_digest(
2✔
57
    path_globs: PathGlobs,
58
) -> Digest:
59
    return await native_engine.path_globs_to_digest(path_globs)
×
60

61

62
@rule
2✔
63
async def path_globs_to_paths(
2✔
64
    path_globs: PathGlobs,
65
) -> Paths:
66
    return await native_engine.path_globs_to_paths(path_globs)
×
67

68

69
@rule
2✔
70
async def download_file(
2✔
71
    native_download_file: NativeDownloadFile,
72
) -> Digest:
73
    return await native_engine.download_file(native_download_file)
×
74

75

76
@rule
2✔
77
async def digest_to_snapshot(digest: Digest) -> Snapshot:
2✔
78
    return await native_engine.digest_to_snapshot(digest)
×
79

80

81
@rule
2✔
82
async def get_digest_contents(digest: Digest) -> DigestContents:
2✔
83
    return await native_engine.get_digest_contents(digest)
×
84

85

86
@rule
2✔
87
async def get_digest_entries(digest: Digest) -> DigestEntries:
2✔
88
    return await native_engine.get_digest_entries(digest)
×
89

90

91
@rule
2✔
92
async def merge_digests(merge_digests: MergeDigests) -> Digest:
2✔
93
    return await native_engine.merge_digests(merge_digests)
×
94

95

96
@rule
2✔
97
async def remove_prefix(remove_prefix: RemovePrefix) -> Digest:
2✔
98
    return await native_engine.remove_prefix(remove_prefix)
×
99

100

101
@rule
2✔
102
async def add_prefix(add_prefix: AddPrefix) -> Digest:
2✔
103
    return await native_engine.add_prefix(add_prefix)
×
104

105

106
@rule
2✔
107
async def execute_process(
2✔
108
    process: Process, process_execution_environment: ProcessExecutionEnvironment
109
) -> FallibleProcessResult:
110
    # Validate that stdin is not used with remote execution
111
    # because the RBE API does not support stdin.
NEW
112
    if process.stdin is not None and process_execution_environment.remote_execution:
×
NEW
113
        raise ValueError(
×
114
            f"Process '{process.description}' cannot use stdin with remote execution. "
115
            "Configure the process to run locally or remove stdin."
116
        )
UNCOV
117
    return await native_engine.execute_process(process, process_execution_environment)
×
118

119

120
@rule
2✔
121
async def execute_process_with_retry(req: ProcessWithRetries) -> ProcessResultWithRetries:
2✔
122
    results: list[FallibleProcessResult] = []
×
123
    for attempt in range(0, req.attempts):
×
124
        proc = dataclasses.replace(req.proc, attempt=attempt)
×
125
        result = await execute_process(  # noqa: PNT30: We only know that we need to rerun the test after we run it
×
126
            proc, **implicitly()
127
        )
128
        results.append(result)
×
129
        if result.exit_code == 0:
×
130
            break
×
131
    return ProcessResultWithRetries(tuple(results))
×
132

133

134
@rule
2✔
135
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest:
2✔
136
    return await native_engine.digest_subset_to_digest(digest_subset)
×
137

138

139
@rule
2✔
140
async def session_values() -> SessionValues:
2✔
141
    return await native_engine.session_values()
×
142

143

144
@rule
2✔
145
async def run_id() -> RunId:
2✔
146
    return await native_engine.run_id()
×
147

148

149
__SQUELCH_WARNING = "__squelch_warning"
2✔
150

151

152
# NB: Call one of the helpers below, instead of calling this rule directly,
153
#  to ensure correct application of restartable logic.
154
@_uncacheable_rule
2✔
155
async def _interactive_process(
2✔
156
    process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment
157
) -> InteractiveProcessResult:
158
    # This is a crafty way for a caller to signal into this function without a dedicated arg
159
    # (which would confound the solver).  Note that we go via __dict__ instead of using
160
    # setattr/delattr, because those error for frozen dataclasses.
161
    if __SQUELCH_WARNING in process.__dict__:
×
162
        del process.__dict__[__SQUELCH_WARNING]
×
163
    else:
164
        logging.warning(
×
165
            "A plugin is calling `await Effect(InteractiveProcessResult, InteractiveProcess, "
166
            "process)` directly. This will cause restarting logic not to be applied. "
167
            "Use `await run_interactive_process(process)` or `await "
168
            "run_interactive_process_in_environment(process, environment_name)` instead. "
169
            f"See {git_url('src/python/pants/engine/intrinsics.py')} for more details."
170
        )
171
    return await native_engine.interactive_process(process, process_execution_environment)
×
172

173

174
async def run_interactive_process(process: InteractiveProcess) -> InteractiveProcessResult:
2✔
175
    # NB: We must call task_side_effected() in this helper, rather than in a nested @rule call,
176
    #  so that the Task for the @rule that calls this helper is the one marked as non-restartable.
UNCOV
177
    if not process.restartable:
×
UNCOV
178
        task_side_effected()
×
179

UNCOV
180
    process.__dict__[__SQUELCH_WARNING] = True
×
UNCOV
181
    ret: InteractiveProcessResult = await _interactive_process(process, **implicitly())
×
UNCOV
182
    return ret
×
183

184

185
async def run_interactive_process_in_environment(
2✔
186
    process: InteractiveProcess, environment_name: EnvironmentName
187
) -> InteractiveProcessResult:
188
    # NB: We must call task_side_effected() in this helper, rather than in a nested @rule call,
189
    #  so that the Task for the @rule that calls this helper is the one marked as non-restartable.
UNCOV
190
    if not process.restartable:
×
UNCOV
191
        task_side_effected()
×
192

UNCOV
193
    process.__dict__[__SQUELCH_WARNING] = True
×
UNCOV
194
    ret: InteractiveProcessResult = await _interactive_process(
×
195
        process, **implicitly({environment_name: EnvironmentName})
196
    )
UNCOV
197
    return ret
×
198

199

200
@rule
2✔
201
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult:
2✔
202
    return await native_engine.docker_resolve_image(request)
×
203

204

205
@rule
2✔
206
async def parse_dockerfile_info(
2✔
207
    deps_request: NativeDependenciesRequest,
208
) -> NativeParsedDockerfileInfo:
209
    return await native_engine.parse_dockerfile_info(deps_request)
×
210

211

212
@rule
2✔
213
async def parse_python_deps(
2✔
214
    deps_request: NativeDependenciesRequest,
215
) -> NativeParsedPythonDependencies:
216
    return await native_engine.parse_python_deps(deps_request)
×
217

218

219
@rule
2✔
220
async def parse_javascript_deps(
2✔
221
    deps_request: NativeDependenciesRequest,
222
) -> NativeParsedJavascriptDependencies:
223
    return await native_engine.parse_javascript_deps(deps_request)
×
224

225

226
@rule
2✔
227
async def path_metadata_request(request: PathMetadataRequest) -> PathMetadataResult:
2✔
228
    return await native_engine.path_metadata_request(request)
×
229

230

231
def rules():
2✔
232
    return [
2✔
233
        *collect_rules(),
234
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc