• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/project_info/peek.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

UNCOV
4
from __future__ import annotations
×
5

UNCOV
6
import collections
×
UNCOV
7
import collections.abc
×
UNCOV
8
import json
×
UNCOV
9
import logging
×
UNCOV
10
from abc import ABCMeta
×
UNCOV
11
from collections.abc import Iterable, Mapping
×
UNCOV
12
from dataclasses import dataclass, fields, is_dataclass, replace
×
UNCOV
13
from typing import Any, Protocol, runtime_checkable
×
14

UNCOV
15
from pants.core.goals.deploy import Deploy, DeployFieldSet
×
UNCOV
16
from pants.core.goals.package import Package, PackageFieldSet
×
UNCOV
17
from pants.core.goals.publish import Publish, PublishFieldSet
×
UNCOV
18
from pants.core.goals.run import Run, RunFieldSet
×
UNCOV
19
from pants.core.goals.test import Test, TestFieldSet
×
UNCOV
20
from pants.engine.addresses import Address, Addresses
×
UNCOV
21
from pants.engine.collection import Collection
×
UNCOV
22
from pants.engine.console import Console
×
UNCOV
23
from pants.engine.environment import EnvironmentName
×
UNCOV
24
from pants.engine.fs import Snapshot
×
UNCOV
25
from pants.engine.goal import Goal, GoalSubsystem, Outputting
×
UNCOV
26
from pants.engine.internals.build_files import (
×
27
    _get_target_family_and_adaptor_for_dep_rules,
28
    get_dependencies_rule_application,
29
)
UNCOV
30
from pants.engine.internals.dep_rules import DependencyRuleApplication, DependencyRuleSet
×
UNCOV
31
from pants.engine.internals.graph import hydrate_sources, resolve_targets
×
UNCOV
32
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
×
UNCOV
33
from pants.engine.rules import Rule, collect_rules, concurrently, goal_rule, implicitly, rule
×
UNCOV
34
from pants.engine.target import (
×
35
    AlwaysTraverseDeps,
36
    Dependencies,
37
    DependenciesRequest,
38
    DependenciesRuleApplicationRequest,
39
    Field,
40
    FieldSet,
41
    HydrateSourcesRequest,
42
    ImmutableValue,
43
    NoApplicableTargetsBehavior,
44
    SourcesField,
45
    Target,
46
    TargetRootsToFieldSetsRequest,
47
    UnexpandedTargets,
48
)
UNCOV
49
from pants.engine.unions import UnionMembership, union
×
UNCOV
50
from pants.option.option_types import BoolOption
×
UNCOV
51
from pants.util.frozendict import FrozenDict
×
UNCOV
52
from pants.util.strutil import softwrap
×
53

UNCOV
54
logger = logging.getLogger(__name__)
×
55

56

UNCOV
57
@runtime_checkable
×
UNCOV
58
class Dictable(Protocol):
×
59
    """Make possible to avoid adding concrete types to serialize objects."""
60

61
    def asdict(self) -> Mapping[str, Any]: ...
62

63

UNCOV
64
class PeekSubsystem(Outputting, GoalSubsystem):
×
65
    """Display detailed target information in JSON form."""
66

UNCOV
67
    name = "peek"
×
UNCOV
68
    help = "Display BUILD target info"
×
69

UNCOV
70
    exclude_defaults = BoolOption(
×
71
        default=False,
72
        help="Whether to leave off values that match the target-defined default values.",
73
    )
74

UNCOV
75
    include_dep_rules = BoolOption(
×
76
        default=False,
77
        help=softwrap(
78
            """
79
            Whether to include `_dependencies_rules`, `_dependents_rules` and `_applicable_dep_rules`
80
            that apply to the target and its dependencies.
81
            """
82
        ),
83
    )
84

UNCOV
85
    include_additional_info = BoolOption(
×
86
        default=False, help="Whether to include additional information generated by plugins."
87
    )
88

89

UNCOV
90
class Peek(Goal):
×
UNCOV
91
    subsystem_cls = PeekSubsystem
×
UNCOV
92
    environment_behavior = Goal.EnvironmentBehavior.LOCAL_ONLY
×
93

94

UNCOV
95
def _normalize_value(val: Any) -> Any:
×
UNCOV
96
    if isinstance(val, collections.abc.Mapping):
×
UNCOV
97
        return {str(k): _normalize_value(v) for k, v in val.items()}
×
UNCOV
98
    return val
×
99

100

UNCOV
101
@union(in_scope_types=[EnvironmentName])
×
UNCOV
102
@dataclass(frozen=True)
×
UNCOV
103
class HasAdditionalTargetDataFieldSet(FieldSet, metaclass=ABCMeta):
×
104
    """Union type to attach data to a target that will appear under the "additional_info" field in
105
    the output of `pants peek` if the `--peek-include-additional-info` option is enabled."""
106

107

UNCOV
108
@dataclass(frozen=True)
×
UNCOV
109
class AdditionalTargetData:
×
UNCOV
110
    label: str
×
UNCOV
111
    data: ImmutableValue
×
112

UNCOV
113
    def __init__(self, label: str, data: Any) -> None:
×
UNCOV
114
        object.__setattr__(self, "label", label)
×
UNCOV
115
        if isinstance(data, collections.abc.Mapping):
×
UNCOV
116
            data = FrozenDict.deep_freeze(data)
×
UNCOV
117
        elif isinstance(data, (list, set)):
×
UNCOV
118
            data = tuple(data)
×
UNCOV
119
        object.__setattr__(self, "data", data)
×
120

121

UNCOV
122
@rule(polymorphic=True)
×
UNCOV
123
async def get_additional_target_data(
×
124
    field_set: HasAdditionalTargetDataFieldSet, env_name: EnvironmentName
125
) -> AdditionalTargetData:
126
    raise NotImplementedError()
×
127

128

UNCOV
129
@dataclass(frozen=True)
×
UNCOV
130
class TargetData:
×
UNCOV
131
    target: Target
×
132
    # Sources may not be registered on the target, so we'll have nothing to expand.
UNCOV
133
    expanded_sources: Snapshot | None
×
UNCOV
134
    expanded_dependencies: tuple[str, ...]
×
135

UNCOV
136
    dependencies_rules: tuple[str, ...] | None = None
×
UNCOV
137
    dependents_rules: tuple[str, ...] | None = None
×
UNCOV
138
    applicable_dep_rules: tuple[DependencyRuleApplication, ...] | None = None
×
UNCOV
139
    goals: tuple[str, ...] | None = None
×
UNCOV
140
    additional_info: tuple[AdditionalTargetData, ...] | None = None
×
141

UNCOV
142
    def to_dict(self, exclude_defaults: bool = False, include_dep_rules: bool = False) -> dict:
×
UNCOV
143
        nothing = object()
×
UNCOV
144
        fields = {
×
145
            (
146
                f"{k.alias}_raw" if issubclass(k, (SourcesField, Dependencies)) else k.alias
147
            ): _normalize_value(v.value)
148
            for k, v in self.target.field_values.items()
149
            if not (exclude_defaults and getattr(k, "default", nothing) == v.value)
150
        }
151

UNCOV
152
        fields["dependencies"] = self.expanded_dependencies
×
UNCOV
153
        if self.expanded_sources is not None:
×
UNCOV
154
            fields["sources"] = self.expanded_sources.files
×
UNCOV
155
            fields["sources_fingerprint"] = self.expanded_sources.digest.fingerprint
×
156

UNCOV
157
        if include_dep_rules:
×
UNCOV
158
            fields["_dependencies_rules"] = self.dependencies_rules
×
UNCOV
159
            fields["_dependents_rules"] = self.dependents_rules
×
UNCOV
160
            fields["_applicable_dep_rules"] = self.applicable_dep_rules
×
161

UNCOV
162
        if self.goals is not None:
×
UNCOV
163
            fields["goals"] = self.goals
×
164

UNCOV
165
        if self.additional_info is not None:
×
UNCOV
166
            fields["additional_info"] = {
×
167
                additional_target_data.label: additional_target_data.data
168
                for additional_target_data in sorted(
169
                    self.additional_info, key=lambda atd: atd.label
170
                )
171
            }
172

UNCOV
173
        return {
×
174
            "address": self.target.address.spec,
175
            "target_type": self.target.alias,
176
            **dict(sorted(fields.items())),
177
        }
178

179

UNCOV
180
class TargetDatas(Collection[TargetData]):
×
UNCOV
181
    pass
×
182

183

UNCOV
184
def render_json(
×
185
    tds: Iterable[TargetData], exclude_defaults: bool = False, include_dep_rules: bool = False
186
) -> str:
UNCOV
187
    return f"{json.dumps([td.to_dict(exclude_defaults, include_dep_rules) for td in tds], indent=2, cls=_PeekJsonEncoder)}\n"
×
188

189

UNCOV
190
class _PeekJsonEncoder(json.JSONEncoder):
×
191
    """Allow us to serialize some commonly found types in BUILD files."""
192

UNCOV
193
    def default(self, o):
×
194
        """Return a serializable object for o."""
UNCOV
195
        if isinstance(o, str):  # early exit prevents strings from being treated as sequences
×
196
            return o
×
UNCOV
197
        if o is None:
×
198
            return o
×
UNCOV
199
        if is_dataclass(o):
×
200
            # NB: `dataclasses.asdict` creates a deep copy by default, which is unnecessary for
201
            # this case.
UNCOV
202
            return {field.name: getattr(o, field.name) for field in fields(o)}
×
UNCOV
203
        if isinstance(o, collections.abc.Mapping):
×
UNCOV
204
            return dict(o)
×
UNCOV
205
        if (
×
206
            isinstance(o, collections.abc.Sequence)
207
            or isinstance(o, set)
208
            or isinstance(o, collections.abc.Set)
209
        ):
210
            return list(o)
×
UNCOV
211
        if isinstance(o, Field):
×
212
            return self.default(o.value)
×
UNCOV
213
        if isinstance(o, Dictable):
×
214
            return o.asdict()
×
UNCOV
215
        try:
×
UNCOV
216
            return super().default(o)
×
UNCOV
217
        except TypeError:
×
UNCOV
218
            return str(o)
×
219

220

UNCOV
221
def describe_ruleset(ruleset: DependencyRuleSet | None) -> tuple[str, ...] | None:
×
222
    if ruleset is None:
×
223
        return None
×
224
    return ruleset.peek()
×
225

226

UNCOV
227
async def _create_target_alias_to_goals_map() -> dict[str, tuple[str, ...]]:
×
228
    """Returns a mapping from a target alias to the goals that can operate on that target.
229

230
    For instance, `pex_binary` would map to `("run", "package")`.
231

232
    :return: A mapping from a target alias to the goals that can operate on that target.
233
    """
234
    # This is manually curated for now - we'll have to use it a bit to determine if we want to show all goals or not
235
    peekable_field_sets: list[type[FieldSet]] = [
×
236
        DeployFieldSet,
237
        PackageFieldSet,
238
        PublishFieldSet,
239
        RunFieldSet,
240
        TestFieldSet,
241
    ]
242

243
    # Goal holds a GoalSubsystem which has the name we care about, and it's exposed via a classmethod on Goal
244
    # There is no tightly coupled relationship between a Goal/GoalSubsystem and the associated FieldSet
245
    # This gets murkier with Fix/Fmt/Lint/etc... So, we'll just manually map them for now
246
    field_set_to_goal_map: dict[type[FieldSet], str] = {
×
247
        DeployFieldSet: Deploy.name,
248
        PackageFieldSet: Package.name,
249
        PublishFieldSet: Publish.name,
250
        RunFieldSet: Run.name,
251
        TestFieldSet: Test.name,
252
    }
253

254
    assert len(peekable_field_sets) == len(field_set_to_goal_map), (
×
255
        "Must have a goal string for each field set"
256
    )
257
    peekable_goals = [field_set_to_goal_map[fs] for fs in peekable_field_sets]
×
258

259
    target_roots_to_field_sets_get = [
×
260
        find_valid_field_sets_for_target_roots(
261
            TargetRootsToFieldSetsRequest(
262
                field_set_superclass=fs,
263
                goal_description="",
264
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
265
            ),
266
            **implicitly(),
267
        )
268
        for fs in peekable_field_sets
269
    ]
270

271
    target_roots_to_field_sets = await concurrently(target_roots_to_field_sets_get)
×
272

273
    # Create a collection of target aliases per target roots: e.g. [frozenset(), frozenset({'pyoxidizer_binary', 'pex_binary'}), ...]
274
    aliases_per_target_root: Iterable[frozenset[str]] = [
×
275
        frozenset(tgt.alias for tgt in tgt_root.targets) for tgt_root in target_roots_to_field_sets
276
    ]
277

278
    # Create a mapping from the goal name to a collection of target aliases: e.g. {'run': frozenset({'pyoxidizer_binary', 'pex_binary'}), 'test': frozenset(), ...}
279
    goal_to_aliases_map = dict(zip(peekable_goals, aliases_per_target_root))
×
280

281
    # Inverse the goal_to_aliases_map to create a mapping from a target alias to a collection of goal names: e.g. {'pyoxidizer_binary': frozenset({'package', 'run'}), 'pex_binary': frozenset({'package', 'run'}), ...}
282
    alias_to_goals_map: dict[str, set[str]] = {}
×
283
    for goal, aliases in goal_to_aliases_map.items():
×
284
        for alias in aliases:
×
285
            alias_to_goals_map.setdefault(alias, set()).add(goal)
×
286

287
    # Convert the goal sets to tuples for JSON serialization
288
    return {alias: tuple(sorted(goals)) for alias, goals in alias_to_goals_map.items()}
×
289

290

UNCOV
291
@rule
×
UNCOV
292
async def get_target_data(
×
293
    targets: UnexpandedTargets,
294
    subsys: PeekSubsystem,
295
    union_membership: UnionMembership,
296
) -> TargetDatas:
297
    """Given a set of unexpanded targets, return a mapping of target addresses to their data.
298

299
    The data includes the target's fields, expanded sources, expanded dependencies, and any
300
    dependency rules that apply to the target.
301

302
    NB: We must preserve target generators, not replace with their generated targets.
303

304
    :param targets: The unexpanded targets to get data for.
305
    :param subsys: The `PeekSubsystem` instance.
306
    :return: A mapping of target addresses to their data.
307
    """
308
    sorted_targets = sorted(targets, key=lambda tgt: tgt.address)
×
309

310
    # We "hydrate" sources fields with the engine, but not every target has them registered.
311
    targets_with_sources = [tgt for tgt in sorted_targets if tgt.has_field(SourcesField)]
×
312

313
    # When determining dependencies, we replace target generators with their generated targets.
314
    dependencies_per_target = await concurrently(
×
315
        resolve_targets(
316
            **implicitly(
317
                DependenciesRequest(
318
                    tgt.get(Dependencies), should_traverse_deps_predicate=AlwaysTraverseDeps()
319
                )
320
            )
321
        )
322
        for tgt in sorted_targets
323
    )
324
    hydrated_sources_per_target = await concurrently(
×
325
        hydrate_sources(HydrateSourcesRequest(tgt[SourcesField]), **implicitly())
326
        for tgt in targets_with_sources
327
    )
328
    if subsys.include_additional_info:
×
329
        additional_info_field_sets = [
×
330
            field_set_type.create(tgt)
331
            for tgt in sorted_targets
332
            for field_set_type in union_membership[HasAdditionalTargetDataFieldSet]
333
            if field_set_type.is_applicable(tgt)
334
        ]
335
        additional_infos = await concurrently(
×
336
            get_additional_target_data(**implicitly({field_set: HasAdditionalTargetDataFieldSet}))
337
            for field_set in additional_info_field_sets
338
        )
339
        group_additional_infos_by_address_builder = collections.defaultdict(list)
×
340
        for field_set, additional_info in zip(additional_info_field_sets, additional_infos):
×
341
            group_additional_infos_by_address_builder[field_set.address].append(additional_info)
×
342
        group_additional_infos_by_address = {
×
343
            address: tuple(additional_info)
344
            for address, additional_info in group_additional_infos_by_address_builder.items()
345
        }
346
    else:
347
        group_additional_infos_by_address = {}
×
348

349
    # TODO: This feels like something that could be merged with the above code somewhere
350
    expanded_sources_map = {
×
351
        tgt.address: hs.snapshot
352
        for tgt, hs in zip(targets_with_sources, hydrated_sources_per_target)
353
    }
354

355
    expanded_dependencies = [
×
356
        tuple(dep.address.spec for dep in deps)
357
        for _, deps in zip(sorted_targets, dependencies_per_target)
358
    ]
359

360
    dependencies_rules_map: dict[Address, tuple[str, ...] | None] = {}
×
361
    dependents_rules_map: dict[Address, tuple[str, ...] | None] = {}
×
362
    applicable_dep_rules_map = {}
×
363

364
    if subsys.include_dep_rules:
×
365
        # TODO: Why are we pulling a private definition from `build_files`?
366
        family_adaptors = await _get_target_family_and_adaptor_for_dep_rules(
×
367
            *(tgt.address for tgt in sorted_targets),
368
            description_of_origin="`peek` goal",
369
        )
370

371
        dependencies_rules_map = {
×
372
            tgt.address: describe_ruleset(
373
                family.dependencies_rules.get_ruleset(tgt.address, adaptor)
374
            )
375
            for tgt, (family, adaptor) in zip(sorted_targets, family_adaptors)
376
            if family.dependencies_rules is not None
377
        }
378

379
        dependents_rules_map = {
×
380
            tgt.address: describe_ruleset(family.dependents_rules.get_ruleset(tgt.address, adaptor))
381
            for tgt, (family, adaptor) in zip(sorted_targets, family_adaptors)
382
            if family.dependents_rules is not None
383
        }
384

385
        all_applicable_dep_rules = await concurrently(
×
386
            get_dependencies_rule_application(
387
                DependenciesRuleApplicationRequest(
388
                    tgt.address,
389
                    Addresses(dep.address for dep in deps),
390
                    description_of_origin="`peek` goal",
391
                ),
392
                **implicitly(),
393
            )
394
            for tgt, deps in zip(sorted_targets, dependencies_per_target)
395
        )
396
        applicable_dep_rules_map = {
×
397
            application.address: tuple(application.dependencies_rule.values())
398
            for application in all_applicable_dep_rules
399
        }
400

401
    # TODO: This currently exists in the `goal_rule` section of code below, but I'd prefer to have it here
402
    # target_alias_to_goals_map = await _create_target_alias_to_goals_map() if subsys.include_goals else {}
403

404
    return TargetDatas(
×
405
        TargetData(
406
            tgt,
407
            expanded_dependencies=expanded_deps,
408
            expanded_sources=expanded_sources_map.get(tgt.address),
409
            dependencies_rules=dependencies_rules_map.get(tgt.address),
410
            dependents_rules=dependents_rules_map.get(tgt.address),
411
            applicable_dep_rules=applicable_dep_rules_map.get(tgt.address),
412
            goals=None,
413
            additional_info=group_additional_infos_by_address.get(
414
                tgt.address, () if subsys.include_additional_info else None
415
            ),
416
        )
417
        for tgt, expanded_deps in zip(sorted_targets, expanded_dependencies)
418
    )
419

420

UNCOV
421
@goal_rule
×
UNCOV
422
async def peek(
×
423
    console: Console,
424
    subsys: PeekSubsystem,
425
    targets: UnexpandedTargets,
426
) -> Peek:
427
    """Display detailed target information in JSON form.
428

429
    :param console: The console to write the JSON to.
430
    :param subsys: The `PeekSubsystem` instance.
431
    :param targets: The targets to get data for.
432
    :return: The `Peek` goal.
433
    """
434

435
    tds = await get_target_data(targets, **implicitly())
×
436
    # This method needs to be called in a @goal_rule, otherwise it fails out with Rule errors (when called in an @rule)
437
    target_alias_to_goals_map = await _create_target_alias_to_goals_map()
×
438

439
    if target_alias_to_goals_map:
×
440
        # Attach the goals to the target data, in the hopes that we can pull `_create_target_alias_to_goals_map` back into `get_target_data`
441
        # TargetData is frozen so we need to create a new collection
442
        tds = TargetDatas(
×
443
            [
444
                replace(
445
                    td,
446
                    goals=target_alias_to_goals_map.get(td.target.alias),
447
                )
448
                for td in tds
449
            ]
450
        )
451
    output = render_json(tds, subsys.exclude_defaults, subsys.include_dep_rules)
×
452

453
    with subsys.output(console) as write_stdout:
×
454
        write_stdout(output)
×
455
    return Peek(exit_code=0)
×
456

457

UNCOV
458
def rules() -> Iterable[Rule]:
×
UNCOV
459
    return collect_rules()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc