• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 23177125175

17 Mar 2026 03:32AM UTC coverage: 52.677% (-40.3%) from 92.932%
23177125175

Pull #23177

github

web-flow
Merge 1824dfbf4 into 0b9fdfb0e
Pull Request #23177: Bump the gha-deps group across 1 directory with 4 updates

31687 of 60153 relevant lines covered (52.68%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.27
/src/python/pants/core/goals/check.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import logging
2✔
7
from collections import defaultdict
2✔
8
from collections.abc import Iterable
2✔
9
from dataclasses import dataclass
2✔
10
from typing import Any, ClassVar, Generic, TypeVar, cast
2✔
11

12
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
2✔
13
from pants.core.goals.lint import REPORT_DIR as REPORT_DIR  # noqa: F401
2✔
14
from pants.core.goals.multi_tool_goal_helper import (
2✔
15
    OnlyOption,
16
    determine_specified_tool_ids,
17
    write_reports,
18
)
19
from pants.core.util_rules.distdir import DistDir
2✔
20
from pants.engine.collection import Collection
2✔
21
from pants.engine.console import Console
2✔
22
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
2✔
23
from pants.engine.environment import EnvironmentName
2✔
24
from pants.engine.fs import EMPTY_DIGEST, Digest, Workspace
2✔
25
from pants.engine.goal import Goal, GoalSubsystem
2✔
26
from pants.engine.internals.selectors import concurrently
2✔
27
from pants.engine.internals.session import RunId
2✔
28
from pants.engine.process import FallibleProcessResult, ProcessCacheScope, ProcessResultMetadata
2✔
29
from pants.engine.rules import QueryRule, collect_rules, goal_rule, implicitly, rule
2✔
30
from pants.engine.target import FieldSet, FilteredTargets
2✔
31
from pants.engine.unions import UnionMembership, union
2✔
32
from pants.option.option_types import BoolOption
2✔
33
from pants.util.logging import LogLevel
2✔
34
from pants.util.memo import memoized_property
2✔
35
from pants.util.meta import classproperty
2✔
36
from pants.util.strutil import Simplifier
2✔
37

38
logger = logging.getLogger(__name__)
2✔
39

40
_FS = TypeVar("_FS", bound=FieldSet)
2✔
41

42

43
@dataclass(frozen=True)
2✔
44
class CheckResult:
2✔
45
    exit_code: int
2✔
46
    stdout: str
2✔
47
    stderr: str
2✔
48
    partition_description: str | None = None
2✔
49
    report: Digest = EMPTY_DIGEST
2✔
50
    result_metadata: ProcessResultMetadata | None = None
2✔
51

52
    @staticmethod
2✔
53
    def from_fallible_process_result(
2✔
54
        process_result: FallibleProcessResult,
55
        *,
56
        partition_description: str | None = None,
57
        output_simplifier: Simplifier = Simplifier(),
58
        report: Digest = EMPTY_DIGEST,
59
    ) -> CheckResult:
60
        return CheckResult(
2✔
61
            exit_code=process_result.exit_code,
62
            stdout=output_simplifier.simplify(process_result.stdout),
63
            stderr=output_simplifier.simplify(process_result.stderr),
64
            partition_description=partition_description,
65
            report=report,
66
            result_metadata=process_result.metadata,
67
        )
68

69
    def metadata(self) -> dict[str, Any]:
2✔
70
        return {"partition": self.partition_description}
×
71

72

73
@dataclass(frozen=True)
2✔
74
class CheckResults(EngineAwareReturnType):
2✔
75
    """Zero or more CheckResult objects for a single type checker.
76

77
    Typically, type checkers will return one result. If they no-oped, they will return zero results.
78
    However, some type checkers may need to partition their input and thus may need to return
79
    multiple results.
80
    """
81

82
    results: tuple[CheckResult, ...]
2✔
83
    checker_name: str
2✔
84
    output_per_partition: bool
2✔
85

86
    def __init__(
2✔
87
        self,
88
        results: Iterable[CheckResult],
89
        *,
90
        checker_name: str,
91
        output_per_partition: bool = True,
92
    ) -> None:
93
        object.__setattr__(self, "results", tuple(results))
2✔
94
        object.__setattr__(self, "checker_name", checker_name)
2✔
95
        object.__setattr__(self, "output_per_partition", output_per_partition)
2✔
96

97
    @property
2✔
98
    def skipped(self) -> bool:
2✔
99
        return bool(self.results) is False
2✔
100

101
    @memoized_property
2✔
102
    def exit_code(self) -> int:
2✔
103
        return next((result.exit_code for result in self.results if result.exit_code != 0), 0)
2✔
104

105
    def level(self) -> LogLevel | None:
2✔
106
        if self.skipped:
2✔
107
            return LogLevel.DEBUG
×
108
        return LogLevel.ERROR if self.exit_code != 0 else LogLevel.INFO
2✔
109

110
    def message(self) -> str | None:
2✔
111
        if self.skipped:
2✔
112
            return f"{self.checker_name} skipped."
×
113
        message = self.checker_name
2✔
114
        message += (
2✔
115
            " succeeded." if self.exit_code == 0 else f" failed (exit code {self.exit_code})."
116
        )
117

118
        def msg_for_result(result: CheckResult) -> str:
2✔
119
            msg = ""
2✔
120
            if result.stdout:
2✔
121
                msg += f"\n{result.stdout}"
2✔
122
            if result.stderr:
2✔
123
                msg += f"\n{result.stderr}"
×
124
            if msg:
2✔
125
                msg = f"{msg.rstrip()}\n\n"
2✔
126
            return msg
2✔
127

128
        if len(self.results) == 1:
2✔
129
            results_msg = msg_for_result(self.results[0])
2✔
130
        else:
131
            results_msg = "\n"
×
132
            for i, result in enumerate(self.results):
×
133
                msg = f"Partition #{i + 1}"
×
134
                msg += (
×
135
                    f" - {result.partition_description}:" if result.partition_description else ":"
136
                )
137
                msg += msg_for_result(result) or "\n\n"
×
138
                results_msg += msg
×
139
        message += results_msg
2✔
140
        return message
2✔
141

142
    def cacheable(self) -> bool:
2✔
143
        """Is marked uncacheable to ensure that it always renders."""
144
        return False
2✔
145

146

147
@dataclass(frozen=True)
2✔
148
@union(in_scope_types=[EnvironmentName])
2✔
149
class CheckRequest(Generic[_FS], EngineAwareParameter):
2✔
150
    """A union for targets that should be checked.
151

152
    Subclass and install a member of this type to provide a checker.
153
    """
154

155
    field_set_type: ClassVar[type[_FS]]
2✔
156
    tool_name: ClassVar[str]
2✔
157

158
    @classproperty
2✔
159
    def tool_id(cls) -> str:
2✔
160
        """The "id" of the tool, used in tool selection (Eg --only=<id>)."""
161
        return cls.tool_name
×
162

163
    field_sets: Collection[_FS]
2✔
164

165
    def __init__(self, field_sets: Iterable[_FS]) -> None:
2✔
166
        object.__setattr__(self, "field_sets", Collection[_FS](field_sets))
2✔
167

168
    def debug_hint(self) -> str:
2✔
169
        return self.tool_name
2✔
170

171
    def metadata(self) -> dict[str, Any]:
2✔
172
        return {"addresses": [fs.address.spec for fs in self.field_sets]}
2✔
173

174

175
@rule(polymorphic=True)
2✔
176
async def check(
2✔
177
    req: CheckRequest,
178
    environment_name: EnvironmentName,
179
) -> CheckResults:
180
    raise NotImplementedError()
×
181

182

183
class CheckSubsystem(GoalSubsystem):
2✔
184
    name = "check"
2✔
185
    help = "Run type checking or the lightest variant of compilation available for a language."
2✔
186

187
    @classmethod
2✔
188
    def activated(cls, union_membership: UnionMembership) -> bool:
2✔
189
        return CheckRequest in union_membership
×
190

191
    only = OnlyOption("checker", "mypy", "javac")
2✔
192
    force = BoolOption(
2✔
193
        default=False,
194
        help="Force checks to run, even if they could be satisfied from cache.",
195
    )
196

197
    @property
2✔
198
    def default_process_cache_scope(self) -> ProcessCacheScope:
2✔
199
        return ProcessCacheScope.PER_SESSION if self.force else ProcessCacheScope.SUCCESSFUL
2✔
200

201

202
class Check(Goal):
2✔
203
    subsystem_cls = CheckSubsystem
2✔
204
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
2✔
205

206

207
_SOURCE_MAP = {
2✔
208
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
209
    ProcessResultMetadata.Source.RAN: "ran",
210
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
211
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
212
}
213

214

215
def _format_check_result(
2✔
216
    checker_name: str,
217
    result: CheckResult,
218
    run_id: RunId,
219
    console: Console,
220
) -> str:
221
    """Format a single check result for console output."""
222
    sigil = console.sigil_succeeded() if result.exit_code == 0 else console.sigil_failed()
×
223
    status = "succeeded" if result.exit_code == 0 else "failed"
×
224

225
    desc = checker_name
×
226
    if result.partition_description:
×
227
        desc = f"{checker_name} ({result.partition_description})"
×
228

229
    elapsed = ""
×
230
    if result.result_metadata and result.result_metadata.total_elapsed_ms:
×
231
        elapsed = f" in {result.result_metadata.total_elapsed_ms / 1000:.2f}s"
×
232

233
    # Cache source (only show if not RAN)
234
    source_desc = ""
×
235
    if result.result_metadata:
×
236
        source = result.result_metadata.source(run_id)
×
237
        if source != ProcessResultMetadata.Source.RAN:
×
238
            source_desc = f" ({_SOURCE_MAP.get(source, source.value)})"
×
239

240
    return f"{sigil} {desc} {status}{elapsed}{source_desc}."
×
241

242

243
@goal_rule
2✔
244
async def check_goal(
2✔
245
    console: Console,
246
    workspace: Workspace,
247
    targets: FilteredTargets,
248
    dist_dir: DistDir,
249
    union_membership: UnionMembership,
250
    check_subsystem: CheckSubsystem,
251
    run_id: RunId,
252
) -> Check:
253
    request_types = cast("Iterable[type[CheckRequest]]", union_membership[CheckRequest])
×
254
    specified_ids = determine_specified_tool_ids("check", check_subsystem.only, request_types)
×
255

256
    requests = tuple(
×
257
        request_type(
258
            request_type.field_set_type.create(target)
259
            for target in targets
260
            if (
261
                request_type.tool_id in specified_ids
262
                and request_type.field_set_type.is_applicable(target)
263
            )
264
        )
265
        for request_type in request_types
266
    )
267

268
    request_to_field_set = [
×
269
        (request, field_set) for request in requests for field_set in request.field_sets
270
    ]
271

272
    environment_names = await concurrently(
×
273
        resolve_environment_name(EnvironmentNameRequest.from_field_set(field_set), **implicitly())
274
        for (_, field_set) in request_to_field_set
275
    )
276

277
    request_to_env_name = {
×
278
        (request, env_name)
279
        for (request, _), env_name in zip(request_to_field_set, environment_names)
280
    }
281

282
    # Run each check request in each valid environment (potentially multiple runs per tool)
283
    all_results = await concurrently(
×
284
        check(**implicitly({request: CheckRequest, env_name: EnvironmentName}))
285
        for (request, env_name) in request_to_env_name
286
    )
287

288
    results_by_tool: dict[str, list[CheckResult]] = defaultdict(list)
×
289
    for results in all_results:
×
290
        results_by_tool[results.checker_name].extend(results.results)
×
291

292
    write_reports(
×
293
        results_by_tool,
294
        workspace,
295
        dist_dir,
296
        goal_name=CheckSubsystem.name,
297
    )
298

299
    exit_code = 0
×
300
    if all_results:
×
301
        console.print_stderr("")
×
302
    for results in sorted(all_results, key=lambda results: results.checker_name):
×
303
        if results.skipped:
×
304
            continue
×
305
        if results.exit_code != 0:
×
306
            exit_code = results.exit_code
×
307
        if results.output_per_partition:
×
308
            for result in results.results:
×
309
                console.print_stderr(
×
310
                    _format_check_result(results.checker_name, result, run_id, console)
311
                )
312
        else:
313
            sigil = console.sigil_succeeded() if results.exit_code == 0 else console.sigil_failed()
×
314
            status = "succeeded" if results.exit_code == 0 else "failed"
×
315
            console.print_stderr(f"{sigil} {results.checker_name} {status}.")
×
316

317
    return Check(exit_code)
×
318

319

320
def rules():
2✔
321
    return [
×
322
        *collect_rules(),
323
        # NB: Would be unused otherwise.
324
        QueryRule(CheckSubsystem, []),
325
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc