• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20332790708

18 Dec 2025 09:48AM UTC coverage: 64.992% (-15.3%) from 80.295%
20332790708

Pull #22949

github

web-flow
Merge f730a56cd into 407284c67
Pull Request #22949: Add experimental uv resolver for Python lockfiles

54 of 97 new or added lines in 5 files covered. (55.67%)

8270 existing lines in 295 files now uncovered.

48990 of 75379 relevant lines covered (64.99%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.56
/src/python/pants/backend/project_info/peek.py
1
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
3✔
5

6
import collections
3✔
7
import collections.abc
3✔
8
import json
3✔
9
import logging
3✔
10
from abc import ABCMeta
3✔
11
from collections.abc import Iterable, Mapping
3✔
12
from dataclasses import dataclass, fields, is_dataclass, replace
3✔
13
from typing import Any, Protocol, runtime_checkable
3✔
14

15
from pants.core.goals.deploy import Deploy, DeployFieldSet
3✔
16
from pants.core.goals.package import Package, PackageFieldSet
3✔
17
from pants.core.goals.publish import Publish, PublishFieldSet
3✔
18
from pants.core.goals.run import Run, RunFieldSet
3✔
19
from pants.core.goals.test import Test, TestFieldSet
3✔
20
from pants.engine.addresses import Address, Addresses
3✔
21
from pants.engine.collection import Collection
3✔
22
from pants.engine.console import Console
3✔
23
from pants.engine.environment import EnvironmentName
3✔
24
from pants.engine.fs import Snapshot
3✔
25
from pants.engine.goal import Goal, GoalSubsystem, Outputting
3✔
26
from pants.engine.internals.build_files import (
3✔
27
    _get_target_family_and_adaptor_for_dep_rules,
28
    get_dependencies_rule_application,
29
)
30
from pants.engine.internals.dep_rules import DependencyRuleApplication, DependencyRuleSet
3✔
31
from pants.engine.internals.graph import hydrate_sources, resolve_targets
3✔
32
from pants.engine.internals.specs_rules import find_valid_field_sets_for_target_roots
3✔
33
from pants.engine.rules import Rule, collect_rules, concurrently, goal_rule, implicitly, rule
3✔
34
from pants.engine.target import (
3✔
35
    AlwaysTraverseDeps,
36
    Dependencies,
37
    DependenciesRequest,
38
    DependenciesRuleApplicationRequest,
39
    Field,
40
    FieldSet,
41
    HydrateSourcesRequest,
42
    ImmutableValue,
43
    NoApplicableTargetsBehavior,
44
    SourcesField,
45
    Target,
46
    TargetRootsToFieldSetsRequest,
47
    UnexpandedTargets,
48
)
49
from pants.engine.unions import UnionMembership, union
3✔
50
from pants.option.option_types import BoolOption
3✔
51
from pants.util.frozendict import FrozenDict
3✔
52
from pants.util.strutil import softwrap
3✔
53

54
logger = logging.getLogger(__name__)
3✔
55

56

57
@runtime_checkable
3✔
58
class Dictable(Protocol):
3✔
59
    """Make possible to avoid adding concrete types to serialize objects."""
60

61
    def asdict(self) -> Mapping[str, Any]: ...
62

63

64
class PeekSubsystem(Outputting, GoalSubsystem):
3✔
65
    """Display detailed target information in JSON form."""
66

67
    name = "peek"
3✔
68
    help = "Display BUILD target info"
3✔
69

70
    exclude_defaults = BoolOption(
3✔
71
        default=False,
72
        help="Whether to leave off values that match the target-defined default values.",
73
    )
74

75
    include_dep_rules = BoolOption(
3✔
76
        default=False,
77
        help=softwrap(
78
            """
79
            Whether to include `_dependencies_rules`, `_dependents_rules` and `_applicable_dep_rules`
80
            that apply to the target and its dependencies.
81
            """
82
        ),
83
    )
84

85
    include_additional_info = BoolOption(
3✔
86
        default=False, help="Whether to include additional information generated by plugins."
87
    )
88

89

90
class Peek(Goal):
3✔
91
    subsystem_cls = PeekSubsystem
3✔
92
    environment_behavior = Goal.EnvironmentBehavior.LOCAL_ONLY
3✔
93

94

95
def _normalize_value(val: Any) -> Any:
3✔
UNCOV
96
    if isinstance(val, collections.abc.Mapping):
×
UNCOV
97
        return {str(k): _normalize_value(v) for k, v in val.items()}
×
UNCOV
98
    return val
×
99

100

101
@union(in_scope_types=[EnvironmentName])
3✔
102
@dataclass(frozen=True)
3✔
103
class HasAdditionalTargetDataFieldSet(FieldSet, metaclass=ABCMeta):
3✔
104
    """Union type to attach data to a target that will appear under the "additional_info" field in
105
    the output of `pants peek` if the `--peek-include-additional-info` option is enabled."""
106

107

108
@dataclass(frozen=True)
3✔
109
class AdditionalTargetData:
3✔
110
    label: str
3✔
111
    data: ImmutableValue
3✔
112

113
    def __init__(self, label: str, data: Any) -> None:
3✔
UNCOV
114
        object.__setattr__(self, "label", label)
×
UNCOV
115
        if isinstance(data, collections.abc.Mapping):
×
UNCOV
116
            data = FrozenDict.deep_freeze(data)
×
UNCOV
117
        elif isinstance(data, (list, set)):
×
UNCOV
118
            data = tuple(data)
×
UNCOV
119
        object.__setattr__(self, "data", data)
×
120

121

122
@rule(polymorphic=True)
3✔
123
async def get_additional_target_data(
3✔
124
    field_set: HasAdditionalTargetDataFieldSet, env_name: EnvironmentName
125
) -> AdditionalTargetData:
126
    raise NotImplementedError()
×
127

128

129
@dataclass(frozen=True)
3✔
130
class TargetData:
3✔
131
    target: Target
3✔
132
    # Sources may not be registered on the target, so we'll have nothing to expand.
133
    expanded_sources: Snapshot | None
3✔
134
    expanded_dependencies: tuple[str, ...]
3✔
135

136
    dependencies_rules: tuple[str, ...] | None = None
3✔
137
    dependents_rules: tuple[str, ...] | None = None
3✔
138
    applicable_dep_rules: tuple[DependencyRuleApplication, ...] | None = None
3✔
139
    goals: tuple[str, ...] | None = None
3✔
140
    additional_info: tuple[AdditionalTargetData, ...] | None = None
3✔
141

142
    def to_dict(self, exclude_defaults: bool = False, include_dep_rules: bool = False) -> dict:
3✔
UNCOV
143
        nothing = object()
×
UNCOV
144
        fields = {
×
145
            (
146
                f"{k.alias}_raw" if issubclass(k, (SourcesField, Dependencies)) else k.alias
147
            ): _normalize_value(v.value)
148
            for k, v in self.target.field_values.items()
149
            if not (exclude_defaults and getattr(k, "default", nothing) == v.value)
150
        }
151

UNCOV
152
        fields["dependencies"] = self.expanded_dependencies
×
UNCOV
153
        if self.expanded_sources is not None:
×
UNCOV
154
            fields["sources"] = self.expanded_sources.files
×
UNCOV
155
            fields["sources_fingerprint"] = self.expanded_sources.digest.fingerprint
×
156

UNCOV
157
        if include_dep_rules:
×
UNCOV
158
            fields["_dependencies_rules"] = self.dependencies_rules
×
UNCOV
159
            fields["_dependents_rules"] = self.dependents_rules
×
UNCOV
160
            fields["_applicable_dep_rules"] = self.applicable_dep_rules
×
161

UNCOV
162
        if self.goals is not None:
×
UNCOV
163
            fields["goals"] = self.goals
×
164

UNCOV
165
        if self.additional_info is not None:
×
UNCOV
166
            fields["additional_info"] = {
×
167
                additional_target_data.label: additional_target_data.data
168
                for additional_target_data in sorted(
169
                    self.additional_info, key=lambda atd: atd.label
170
                )
171
            }
172

UNCOV
173
        return {
×
174
            "address": self.target.address.spec,
175
            "target_type": self.target.alias,
176
            **dict(sorted(fields.items())),
177
        }
178

179

180
class TargetDatas(Collection[TargetData]):
3✔
181
    pass
3✔
182

183

184
def render_json(
3✔
185
    tds: Iterable[TargetData], exclude_defaults: bool = False, include_dep_rules: bool = False
186
) -> str:
UNCOV
187
    return f"{json.dumps([td.to_dict(exclude_defaults, include_dep_rules) for td in tds], indent=2, cls=_PeekJsonEncoder)}\n"
×
188

189

190
class _PeekJsonEncoder(json.JSONEncoder):
3✔
191
    """Allow us to serialize some commonly found types in BUILD files."""
192

193
    def default(self, o):
3✔
194
        """Return a serializable object for o."""
UNCOV
195
        if isinstance(o, str):  # early exit prevents strings from being treated as sequences
×
196
            return o
×
UNCOV
197
        if o is None:
×
198
            return o
×
UNCOV
199
        if is_dataclass(o):
×
200
            # NB: `dataclasses.asdict` creates a deep copy by default, which is unnecessary for
201
            # this case.
UNCOV
202
            return {field.name: getattr(o, field.name) for field in fields(o)}
×
UNCOV
203
        if isinstance(o, collections.abc.Mapping):
×
UNCOV
204
            return dict(o)
×
UNCOV
205
        if (
×
206
            isinstance(o, collections.abc.Sequence)
207
            or isinstance(o, set)
208
            or isinstance(o, collections.abc.Set)
209
        ):
210
            return list(o)
×
UNCOV
211
        if isinstance(o, Field):
×
212
            return self.default(o.value)
×
UNCOV
213
        if isinstance(o, Dictable):
×
214
            return o.asdict()
×
UNCOV
215
        try:
×
UNCOV
216
            return super().default(o)
×
UNCOV
217
        except TypeError:
×
UNCOV
218
            return str(o)
×
219

220

221
def describe_ruleset(ruleset: DependencyRuleSet | None) -> tuple[str, ...] | None:
3✔
222
    if ruleset is None:
×
223
        return None
×
224
    return ruleset.peek()
×
225

226

227
async def _create_target_alias_to_goals_map() -> dict[str, tuple[str, ...]]:
3✔
228
    """Returns a mapping from a target alias to the goals that can operate on that target.
229

230
    For instance, `pex_binary` would map to `("run", "package")`.
231

232
    :return: A mapping from a target alias to the goals that can operate on that target.
233
    """
234
    # This is manually curated for now - we'll have to use it a bit to determine if we want to show all goals or not
235
    peekable_field_sets: list[type[FieldSet]] = [
×
236
        DeployFieldSet,
237
        PackageFieldSet,
238
        PublishFieldSet,
239
        RunFieldSet,
240
        TestFieldSet,
241
    ]
242

243
    # Goal holds a GoalSubsystem which has the name we care about, and it's exposed via a classmethod on Goal
244
    # There is no tightly coupled relationship between a Goal/GoalSubsystem and the associated FieldSet
245
    # This gets murkier with Fix/Fmt/Lint/etc... So, we'll just manually map them for now
246
    field_set_to_goal_map: dict[type[FieldSet], str] = {
×
247
        DeployFieldSet: Deploy.name,
248
        PackageFieldSet: Package.name,
249
        PublishFieldSet: Publish.name,
250
        RunFieldSet: Run.name,
251
        TestFieldSet: Test.name,
252
    }
253

254
    assert len(peekable_field_sets) == len(field_set_to_goal_map), (
×
255
        "Must have a goal string for each field set"
256
    )
257
    peekable_goals = [field_set_to_goal_map[fs] for fs in peekable_field_sets]
×
258

259
    target_roots_to_field_sets_get = [
×
260
        find_valid_field_sets_for_target_roots(
261
            TargetRootsToFieldSetsRequest(
262
                field_set_superclass=fs,
263
                goal_description="",
264
                no_applicable_targets_behavior=NoApplicableTargetsBehavior.ignore,
265
            ),
266
            **implicitly(),
267
        )
268
        for fs in peekable_field_sets
269
    ]
270

271
    target_roots_to_field_sets = await concurrently(target_roots_to_field_sets_get)
×
272

273
    # Create a collection of target aliases per target roots: e.g. [frozenset(), frozenset({'pyoxidizer_binary', 'pex_binary'}), ...]
274
    aliases_per_target_root: Iterable[frozenset[str]] = [
×
275
        frozenset(tgt.alias for tgt in tgt_root.targets) for tgt_root in target_roots_to_field_sets
276
    ]
277

278
    # Create a mapping from the goal name to a collection of target aliases: e.g. {'run': frozenset({'pyoxidizer_binary', 'pex_binary'}), 'test': frozenset(), ...}
279
    goal_to_aliases_map = dict(zip(peekable_goals, aliases_per_target_root))
×
280

281
    # Inverse the goal_to_aliases_map to create a mapping from a target alias to a collection of goal names: e.g. {'pyoxidizer_binary': frozenset({'package', 'run'}), 'pex_binary': frozenset({'package', 'run'}), ...}
282
    alias_to_goals_map: dict[str, set[str]] = {}
×
283
    for goal, aliases in goal_to_aliases_map.items():
×
284
        for alias in aliases:
×
285
            alias_to_goals_map.setdefault(alias, set()).add(goal)
×
286

287
    # Convert the goal sets to tuples for JSON serialization
288
    return {alias: tuple(sorted(goals)) for alias, goals in alias_to_goals_map.items()}
×
289

290

291
@rule
3✔
292
async def get_target_data(
3✔
293
    targets: UnexpandedTargets,
294
    subsys: PeekSubsystem,
295
    union_membership: UnionMembership,
296
) -> TargetDatas:
297
    """Given a set of unexpanded targets, return a mapping of target addresses to their data.
298

299
    The data includes the target's fields, expanded sources, expanded dependencies, and any
300
    dependency rules that apply to the target.
301

302
    NB: We must preserve target generators, not replace with their generated targets.
303

304
    :param targets: The unexpanded targets to get data for.
305
    :param subsys: The `PeekSubsystem` instance.
306
    :return: A mapping of target addresses to their data.
307
    """
308
    sorted_targets = sorted(targets, key=lambda tgt: tgt.address)
×
309

310
    # We "hydrate" sources fields with the engine, but not every target has them registered.
311
    targets_with_sources = [tgt for tgt in sorted_targets if tgt.has_field(SourcesField)]
×
312

313
    # When determining dependencies, we replace target generators with their generated targets.
314
    dependencies_per_target = await concurrently(
×
315
        resolve_targets(
316
            **implicitly(
317
                DependenciesRequest(
318
                    tgt.get(Dependencies), should_traverse_deps_predicate=AlwaysTraverseDeps()
319
                )
320
            )
321
        )
322
        for tgt in sorted_targets
323
    )
324
    hydrated_sources_per_target = await concurrently(
×
325
        hydrate_sources(HydrateSourcesRequest(tgt[SourcesField]), **implicitly())
326
        for tgt in targets_with_sources
327
    )
328
    if subsys.include_additional_info:
×
329
        additional_info_field_sets = [
×
330
            field_set_type.create(tgt)
331
            for tgt in sorted_targets
332
            for field_set_type in union_membership[HasAdditionalTargetDataFieldSet]
333
            if field_set_type.is_applicable(tgt)
334
        ]
335
        additional_infos = await concurrently(
×
336
            get_additional_target_data(**implicitly({field_set: HasAdditionalTargetDataFieldSet}))
337
            for field_set in additional_info_field_sets
338
        )
339
        group_additional_infos_by_address_builder = collections.defaultdict(list)
×
340
        for field_set, additional_info in zip(additional_info_field_sets, additional_infos):
×
341
            group_additional_infos_by_address_builder[field_set.address].append(additional_info)
×
342
        group_additional_infos_by_address = {
×
343
            address: tuple(additional_info)
344
            for address, additional_info in group_additional_infos_by_address_builder.items()
345
        }
346
    else:
347
        group_additional_infos_by_address = {}
×
348

349
    # TODO: This feels like something that could be merged with the above code somewhere
350
    expanded_sources_map = {
×
351
        tgt.address: hs.snapshot
352
        for tgt, hs in zip(targets_with_sources, hydrated_sources_per_target)
353
    }
354

355
    expanded_dependencies = [
×
356
        tuple(dep.address.spec for dep in deps)
357
        for _, deps in zip(sorted_targets, dependencies_per_target)
358
    ]
359

360
    dependencies_rules_map: dict[Address, tuple[str, ...] | None] = {}
×
361
    dependents_rules_map: dict[Address, tuple[str, ...] | None] = {}
×
362
    applicable_dep_rules_map = {}
×
363

364
    if subsys.include_dep_rules:
×
365
        # TODO: Why are we pulling a private definition from `build_files`?
366
        family_adaptors = await _get_target_family_and_adaptor_for_dep_rules(
×
367
            *(tgt.address for tgt in sorted_targets),
368
            description_of_origin="`peek` goal",
369
        )
370

371
        dependencies_rules_map = {
×
372
            tgt.address: describe_ruleset(
373
                family.dependencies_rules.get_ruleset(tgt.address, adaptor)
374
            )
375
            for tgt, (family, adaptor) in zip(sorted_targets, family_adaptors)
376
            if family.dependencies_rules is not None
377
        }
378

379
        dependents_rules_map = {
×
380
            tgt.address: describe_ruleset(family.dependents_rules.get_ruleset(tgt.address, adaptor))
381
            for tgt, (family, adaptor) in zip(sorted_targets, family_adaptors)
382
            if family.dependents_rules is not None
383
        }
384

385
        all_applicable_dep_rules = await concurrently(
×
386
            get_dependencies_rule_application(
387
                DependenciesRuleApplicationRequest(
388
                    tgt.address,
389
                    Addresses(dep.address for dep in deps),
390
                    description_of_origin="`peek` goal",
391
                ),
392
                **implicitly(),
393
            )
394
            for tgt, deps in zip(sorted_targets, dependencies_per_target)
395
        )
396
        applicable_dep_rules_map = {
×
397
            application.address: tuple(application.dependencies_rule.values())
398
            for application in all_applicable_dep_rules
399
        }
400

401
    # TODO: This currently exists in the `goal_rule` section of code below, but I'd prefer to have it here
402
    # target_alias_to_goals_map = await _create_target_alias_to_goals_map() if subsys.include_goals else {}
403

404
    return TargetDatas(
×
405
        TargetData(
406
            tgt,
407
            expanded_dependencies=expanded_deps,
408
            expanded_sources=expanded_sources_map.get(tgt.address),
409
            dependencies_rules=dependencies_rules_map.get(tgt.address),
410
            dependents_rules=dependents_rules_map.get(tgt.address),
411
            applicable_dep_rules=applicable_dep_rules_map.get(tgt.address),
412
            goals=None,
413
            additional_info=group_additional_infos_by_address.get(
414
                tgt.address, () if subsys.include_additional_info else None
415
            ),
416
        )
417
        for tgt, expanded_deps in zip(sorted_targets, expanded_dependencies)
418
    )
419

420

421
@goal_rule
3✔
422
async def peek(
3✔
423
    console: Console,
424
    subsys: PeekSubsystem,
425
    targets: UnexpandedTargets,
426
) -> Peek:
427
    """Display detailed target information in JSON form.
428

429
    :param console: The console to write the JSON to.
430
    :param subsys: The `PeekSubsystem` instance.
431
    :param targets: The targets to get data for.
432
    :return: The `Peek` goal.
433
    """
434

435
    tds = await get_target_data(targets, **implicitly())
×
436
    # This method needs to be called in a @goal_rule, otherwise it fails out with Rule errors (when called in an @rule)
437
    target_alias_to_goals_map = await _create_target_alias_to_goals_map()
×
438

439
    if target_alias_to_goals_map:
×
440
        # Attach the goals to the target data, in the hopes that we can pull `_create_target_alias_to_goals_map` back into `get_target_data`
441
        # TargetData is frozen so we need to create a new collection
442
        tds = TargetDatas(
×
443
            [
444
                replace(
445
                    td,
446
                    goals=target_alias_to_goals_map.get(td.target.alias),
447
                )
448
                for td in tds
449
            ]
450
        )
451
    output = render_json(tds, subsys.exclude_defaults, subsys.include_dep_rules)
×
452

453
    with subsys.output(console) as write_stdout:
×
454
        write_stdout(output)
×
455
    return Peek(exit_code=0)
×
456

457

458
def rules() -> Iterable[Rule]:
3✔
459
    return collect_rules()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc