• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 22507851448

27 Feb 2026 11:28PM UTC coverage: 92.928% (-0.007%) from 92.935%
22507851448

push

github

web-flow
silence new HdrHistogram induced deprecation warning on Python 3.14 (#23144)

No more:
```
/usr/lib/python3.14/ctypes/_endian.py:33: DeprecationWarning: Due to '_pack_', the
'ExternalHeader' Structure will use memory layout compatible with MSVC (Windows). If this is intended, set _layout_ to 'ms'. The
 implicit default is deprecated and slated to become an error in Python 3.19.
  super().__setattr__(attrname, value)
/usr/lib/python3.14/ctypes/_endian.py:33: DeprecationWarning: Due to '_pack_', the 'PayloadHeader' Structure will use memory
layout compatible with MSVC (Windows). If this is intended, set _layout_ to 'ms'. The implicit default is deprecated and slated
to become an error in Python 3.19.
  super().__setattr__(attrname, value)
```

0 of 1 new or added line in 1 file covered. (0.0%)

71 existing lines in 12 files now uncovered.

90912 of 97831 relevant lines covered (92.93%)

4.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.73
/src/python/pants/core/goals/check.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import logging
12✔
7
from collections import defaultdict
12✔
8
from collections.abc import Iterable
12✔
9
from dataclasses import dataclass
12✔
10
from typing import Any, ClassVar, Generic, TypeVar, cast
12✔
11

12
from pants.core.environments.rules import EnvironmentNameRequest, resolve_environment_name
12✔
13
from pants.core.goals.lint import REPORT_DIR as REPORT_DIR  # noqa: F401
12✔
14
from pants.core.goals.multi_tool_goal_helper import (
12✔
15
    OnlyOption,
16
    determine_specified_tool_ids,
17
    write_reports,
18
)
19
from pants.core.util_rules.distdir import DistDir
12✔
20
from pants.engine.collection import Collection
12✔
21
from pants.engine.console import Console
12✔
22
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
12✔
23
from pants.engine.environment import EnvironmentName
12✔
24
from pants.engine.fs import EMPTY_DIGEST, Digest, Workspace
12✔
25
from pants.engine.goal import Goal, GoalSubsystem
12✔
26
from pants.engine.internals.selectors import concurrently
12✔
27
from pants.engine.internals.session import RunId
12✔
28
from pants.engine.process import FallibleProcessResult, ProcessCacheScope, ProcessResultMetadata
12✔
29
from pants.engine.rules import QueryRule, collect_rules, goal_rule, implicitly, rule
12✔
30
from pants.engine.target import FieldSet, FilteredTargets
12✔
31
from pants.engine.unions import UnionMembership, union
12✔
32
from pants.option.option_types import BoolOption
12✔
33
from pants.util.logging import LogLevel
12✔
34
from pants.util.memo import memoized_property
12✔
35
from pants.util.meta import classproperty
12✔
36
from pants.util.strutil import Simplifier
12✔
37

38
logger = logging.getLogger(__name__)
12✔
39

40
_FS = TypeVar("_FS", bound=FieldSet)
12✔
41

42

43
@dataclass(frozen=True)
12✔
44
class CheckResult:
12✔
45
    exit_code: int
12✔
46
    stdout: str
12✔
47
    stderr: str
12✔
48
    partition_description: str | None = None
12✔
49
    report: Digest = EMPTY_DIGEST
12✔
50
    result_metadata: ProcessResultMetadata | None = None
12✔
51

52
    @staticmethod
12✔
53
    def from_fallible_process_result(
12✔
54
        process_result: FallibleProcessResult,
55
        *,
56
        partition_description: str | None = None,
57
        output_simplifier: Simplifier = Simplifier(),
58
        report: Digest = EMPTY_DIGEST,
59
    ) -> CheckResult:
60
        return CheckResult(
9✔
61
            exit_code=process_result.exit_code,
62
            stdout=output_simplifier.simplify(process_result.stdout),
63
            stderr=output_simplifier.simplify(process_result.stderr),
64
            partition_description=partition_description,
65
            report=report,
66
            result_metadata=process_result.metadata,
67
        )
68

69
    def metadata(self) -> dict[str, Any]:
12✔
UNCOV
70
        return {"partition": self.partition_description}
×
71

72

73
@dataclass(frozen=True)
12✔
74
class CheckResults(EngineAwareReturnType):
12✔
75
    """Zero or more CheckResult objects for a single type checker.
76

77
    Typically, type checkers will return one result. If they no-oped, they will return zero results.
78
    However, some type checkers may need to partition their input and thus may need to return
79
    multiple results.
80
    """
81

82
    results: tuple[CheckResult, ...]
12✔
83
    checker_name: str
12✔
84
    output_per_partition: bool
12✔
85

86
    def __init__(
12✔
87
        self,
88
        results: Iterable[CheckResult],
89
        *,
90
        checker_name: str,
91
        output_per_partition: bool = True,
92
    ) -> None:
93
        object.__setattr__(self, "results", tuple(results))
11✔
94
        object.__setattr__(self, "checker_name", checker_name)
11✔
95
        object.__setattr__(self, "output_per_partition", output_per_partition)
11✔
96

97
    @property
12✔
98
    def skipped(self) -> bool:
12✔
99
        return bool(self.results) is False
11✔
100

101
    @memoized_property
12✔
102
    def exit_code(self) -> int:
12✔
103
        return next((result.exit_code for result in self.results if result.exit_code != 0), 0)
11✔
104

105
    def level(self) -> LogLevel | None:
12✔
106
        if self.skipped:
11✔
107
            return LogLevel.DEBUG
4✔
108
        return LogLevel.ERROR if self.exit_code != 0 else LogLevel.INFO
11✔
109

110
    def message(self) -> str | None:
12✔
111
        if self.skipped:
11✔
112
            return f"{self.checker_name} skipped."
4✔
113
        message = self.checker_name
11✔
114
        message += (
11✔
115
            " succeeded." if self.exit_code == 0 else f" failed (exit code {self.exit_code})."
116
        )
117

118
        def msg_for_result(result: CheckResult) -> str:
11✔
119
            msg = ""
11✔
120
            if result.stdout:
11✔
121
                msg += f"\n{result.stdout}"
9✔
122
            if result.stderr:
11✔
123
                msg += f"\n{result.stderr}"
3✔
124
            if msg:
11✔
125
                msg = f"{msg.rstrip()}\n\n"
9✔
126
            return msg
11✔
127

128
        if len(self.results) == 1:
11✔
129
            results_msg = msg_for_result(self.results[0])
11✔
130
        else:
131
            results_msg = "\n"
4✔
132
            for i, result in enumerate(self.results):
4✔
133
                msg = f"Partition #{i + 1}"
4✔
134
                msg += (
4✔
135
                    f" - {result.partition_description}:" if result.partition_description else ":"
136
                )
137
                msg += msg_for_result(result) or "\n\n"
4✔
138
                results_msg += msg
4✔
139
        message += results_msg
11✔
140
        return message
11✔
141

142
    def cacheable(self) -> bool:
12✔
143
        """Is marked uncacheable to ensure that it always renders."""
144
        return False
11✔
145

146

147
@dataclass(frozen=True)
12✔
148
@union(in_scope_types=[EnvironmentName])
12✔
149
class CheckRequest(Generic[_FS], EngineAwareParameter):
12✔
150
    """A union for targets that should be checked.
151

152
    Subclass and install a member of this type to provide a checker.
153
    """
154

155
    field_set_type: ClassVar[type[_FS]]
12✔
156
    tool_name: ClassVar[str]
12✔
157

158
    @classproperty
12✔
159
    def tool_id(cls) -> str:
12✔
160
        """The "id" of the tool, used in tool selection (Eg --only=<id>)."""
UNCOV
161
        return cls.tool_name
×
162

163
    field_sets: Collection[_FS]
12✔
164

165
    def __init__(self, field_sets: Iterable[_FS]) -> None:
12✔
166
        object.__setattr__(self, "field_sets", Collection[_FS](field_sets))
11✔
167

168
    def debug_hint(self) -> str:
12✔
169
        return self.tool_name
9✔
170

171
    def metadata(self) -> dict[str, Any]:
12✔
172
        return {"addresses": [fs.address.spec for fs in self.field_sets]}
8✔
173

174

175
@rule(polymorphic=True)
12✔
176
async def check(
12✔
177
    req: CheckRequest,
178
    environment_name: EnvironmentName,
179
) -> CheckResults:
UNCOV
180
    raise NotImplementedError()
×
181

182

183
class CheckSubsystem(GoalSubsystem):
12✔
184
    name = "check"
12✔
185
    help = "Run type checking or the lightest variant of compilation available for a language."
12✔
186

187
    @classmethod
12✔
188
    def activated(cls, union_membership: UnionMembership) -> bool:
12✔
UNCOV
189
        return CheckRequest in union_membership
×
190

191
    only = OnlyOption("checker", "mypy", "javac")
12✔
192
    force = BoolOption(
12✔
193
        default=False,
194
        help="Force checks to run, even if they could be satisfied from cache.",
195
    )
196

197
    @property
12✔
198
    def default_process_cache_scope(self) -> ProcessCacheScope:
12✔
199
        return ProcessCacheScope.PER_SESSION if self.force else ProcessCacheScope.SUCCESSFUL
8✔
200

201

202
class Check(Goal):
12✔
203
    subsystem_cls = CheckSubsystem
12✔
204
    environment_behavior = Goal.EnvironmentBehavior.USES_ENVIRONMENTS
12✔
205

206

207
_SOURCE_MAP = {
12✔
208
    ProcessResultMetadata.Source.MEMOIZED: "memoized",
209
    ProcessResultMetadata.Source.RAN: "ran",
210
    ProcessResultMetadata.Source.HIT_LOCALLY: "cached locally",
211
    ProcessResultMetadata.Source.HIT_REMOTELY: "cached remotely",
212
}
213

214

215
def _format_check_result(
12✔
216
    checker_name: str,
217
    result: CheckResult,
218
    run_id: RunId,
219
    console: Console,
220
) -> str:
221
    """Format a single check result for console output."""
222
    sigil = console.sigil_succeeded() if result.exit_code == 0 else console.sigil_failed()
1✔
223
    status = "succeeded" if result.exit_code == 0 else "failed"
1✔
224

225
    desc = checker_name
1✔
226
    if result.partition_description:
1✔
UNCOV
227
        desc = f"{checker_name} ({result.partition_description})"
×
228

229
    elapsed = ""
1✔
230
    if result.result_metadata and result.result_metadata.total_elapsed_ms:
1✔
UNCOV
231
        elapsed = f" in {result.result_metadata.total_elapsed_ms / 1000:.2f}s"
×
232

233
    # Cache source (only show if not RAN)
234
    source_desc = ""
1✔
235
    if result.result_metadata:
1✔
UNCOV
236
        source = result.result_metadata.source(run_id)
×
UNCOV
237
        if source != ProcessResultMetadata.Source.RAN:
×
UNCOV
238
            source_desc = f" ({_SOURCE_MAP.get(source, source.value)})"
×
239

240
    return f"{sigil} {desc} {status}{elapsed}{source_desc}."
1✔
241

242

243
@goal_rule
12✔
244
async def check_goal(
12✔
245
    console: Console,
246
    workspace: Workspace,
247
    targets: FilteredTargets,
248
    dist_dir: DistDir,
249
    union_membership: UnionMembership,
250
    check_subsystem: CheckSubsystem,
251
    run_id: RunId,
252
) -> Check:
253
    request_types = cast("Iterable[type[CheckRequest]]", union_membership[CheckRequest])
1✔
254
    specified_ids = determine_specified_tool_ids("check", check_subsystem.only, request_types)
1✔
255

256
    requests = tuple(
1✔
257
        request_type(
258
            request_type.field_set_type.create(target)
259
            for target in targets
260
            if (
261
                request_type.tool_id in specified_ids
262
                and request_type.field_set_type.is_applicable(target)
263
            )
264
        )
265
        for request_type in request_types
266
    )
267

268
    request_to_field_set = [
1✔
269
        (request, field_set) for request in requests for field_set in request.field_sets
270
    ]
271

272
    environment_names = await concurrently(
1✔
273
        resolve_environment_name(EnvironmentNameRequest.from_field_set(field_set), **implicitly())
274
        for (_, field_set) in request_to_field_set
275
    )
276

277
    request_to_env_name = {
1✔
278
        (request, env_name)
279
        for (request, _), env_name in zip(request_to_field_set, environment_names)
280
    }
281

282
    # Run each check request in each valid environment (potentially multiple runs per tool)
283
    all_results = await concurrently(
1✔
284
        check(**implicitly({request: CheckRequest, env_name: EnvironmentName}))
285
        for (request, env_name) in request_to_env_name
286
    )
287

288
    results_by_tool: dict[str, list[CheckResult]] = defaultdict(list)
1✔
289
    for results in all_results:
1✔
290
        results_by_tool[results.checker_name].extend(results.results)
1✔
291

292
    write_reports(
1✔
293
        results_by_tool,
294
        workspace,
295
        dist_dir,
296
        goal_name=CheckSubsystem.name,
297
    )
298

299
    exit_code = 0
1✔
300
    if all_results:
1✔
301
        console.print_stderr("")
1✔
302
    for results in sorted(all_results, key=lambda results: results.checker_name):
1✔
303
        if results.skipped:
1✔
304
            continue
1✔
305
        if results.exit_code != 0:
1✔
306
            exit_code = results.exit_code
1✔
307
        if results.output_per_partition:
1✔
308
            for result in results.results:
1✔
309
                console.print_stderr(
1✔
310
                    _format_check_result(results.checker_name, result, run_id, console)
311
                )
312
        else:
UNCOV
313
            sigil = console.sigil_succeeded() if results.exit_code == 0 else console.sigil_failed()
×
UNCOV
314
            status = "succeeded" if results.exit_code == 0 else "failed"
×
UNCOV
315
            console.print_stderr(f"{sigil} {results.checker_name} {status}.")
×
316

317
    return Check(exit_code)
1✔
318

319

320
def rules():
12✔
321
    return [
7✔
322
        *collect_rules(),
323
        # NB: Would be unused otherwise.
324
        QueryRule(CheckSubsystem, []),
325
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc