• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.74
/src/python/pants/build_graph/build_configuration.py
1
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import logging
1✔
7
from collections import defaultdict
1✔
8
from collections.abc import Callable, Iterable
1✔
9
from dataclasses import dataclass, field
1✔
10
from enum import Enum
1✔
11
from typing import Any, DefaultDict
1✔
12

13
from pants.backend.project_info.filter_targets import FilterSubsystem
1✔
14
from pants.build_graph.build_file_aliases import BuildFileAliases
1✔
15
from pants.core.environments.subsystems import EnvironmentsSubsystem
1✔
16
from pants.engine.goal import GoalSubsystem
1✔
17
from pants.engine.rules import Rule, RuleIndex, collect_rules, rule
1✔
18
from pants.engine.target import Target
1✔
19
from pants.engine.unions import UnionRule
1✔
20
from pants.option.alias import CliOptions
1✔
21
from pants.option.global_options import GlobalOptions
1✔
22
from pants.option.scope import OptionsParsingSettings, ScopeInfo, normalize_scope
1✔
23
from pants.option.subsystem import Subsystem
1✔
24
from pants.util.frozendict import FrozenDict
1✔
25
from pants.util.ordered_set import FrozenOrderedSet
1✔
26
from pants.vcs.changed import Changed
1✔
27

28
logger = logging.getLogger(__name__)
1✔
29

30

31
# No goal or target_type can have a name from this set, so that `./pants help <name>`
32
# is unambiguous.
33
_RESERVED_NAMES = {
1✔
34
    "api-types",
35
    "backends",
36
    "global",
37
    "goals",
38
    "subsystems",
39
    "symbols",
40
    "targets",
41
    "tools",
42
}
43

44

45
# Subsystems used outside of any rule.
46
_GLOBAL_SUBSYSTEMS: set[type[Subsystem]] = {
1✔
47
    GlobalOptions,
48
    Changed,
49
    CliOptions,
50
    FilterSubsystem,
51
    EnvironmentsSubsystem,
52
}
53

54

55
@dataclass(frozen=True)
1✔
56
class BuildConfiguration:
1✔
57
    """Stores the types and helper functions exposed to BUILD files."""
58

59
    registered_aliases: BuildFileAliases
1✔
60
    subsystem_to_providers: FrozenDict[type[Subsystem], tuple[str, ...]]
1✔
61
    target_type_to_providers: FrozenDict[type[Target], tuple[str, ...]]
1✔
62
    rule_to_providers: FrozenDict[Rule, tuple[str, ...]]
1✔
63
    union_rule_to_providers: FrozenDict[UnionRule, tuple[str, ...]]
1✔
64
    allow_unknown_options: bool
1✔
65
    remote_auth_plugin_func: Callable | None
1✔
66

67
    @property
1✔
68
    def all_subsystems(self) -> tuple[type[Subsystem], ...]:
1✔
69
        """Return all subsystems in the system: global and those registered via rule usage."""
70
        # Sort by options_scope, for consistency.
UNCOV
71
        return tuple(
×
72
            sorted(
73
                _GLOBAL_SUBSYSTEMS | self.subsystem_to_providers.keys(),
74
                key=lambda x: x.options_scope,
75
            )
76
        )
77

78
    @property
1✔
79
    def known_scope_infos(self) -> tuple[ScopeInfo, ...]:
1✔
UNCOV
80
        return tuple(subsystem.get_scope_info() for subsystem in self.all_subsystems)
×
81

82
    @property
1✔
83
    def target_types(self) -> tuple[type[Target], ...]:
1✔
UNCOV
84
        return tuple(sorted(self.target_type_to_providers.keys(), key=lambda x: x.alias))
×
85

86
    @property
1✔
87
    def rules(self) -> FrozenOrderedSet[Rule]:
1✔
UNCOV
88
        return FrozenOrderedSet(self.rule_to_providers.keys())
×
89

90
    @property
1✔
91
    def union_rules(self) -> FrozenOrderedSet[UnionRule]:
1✔
UNCOV
92
        return FrozenOrderedSet(self.union_rule_to_providers.keys())
×
93

94
    def __post_init__(self) -> None:
1✔
UNCOV
95
        class Category(Enum):
×
UNCOV
96
            goal = "goal"
×
UNCOV
97
            reserved_name = "reserved name"
×
UNCOV
98
            subsystem = "subsystem"
×
UNCOV
99
            target_type = "target type"
×
100

UNCOV
101
        name_to_categories: DefaultDict[str, set[Category]] = defaultdict(set)
×
UNCOV
102
        normalized_to_orig_name: dict[str, str] = {}
×
103

UNCOV
104
        for opt in self.all_subsystems:
×
UNCOV
105
            scope = opt.options_scope
×
UNCOV
106
            normalized_scope = normalize_scope(scope)
×
UNCOV
107
            name_to_categories[normalized_scope].add(
×
108
                Category.goal if issubclass(opt, GoalSubsystem) else Category.subsystem
109
            )
UNCOV
110
            normalized_to_orig_name[normalized_scope] = scope
×
UNCOV
111
        for tgt_type in self.target_types:
×
UNCOV
112
            name_to_categories[normalize_scope(tgt_type.alias)].add(Category.target_type)
×
UNCOV
113
        for reserved_name in _RESERVED_NAMES:
×
UNCOV
114
            name_to_categories[normalize_scope(reserved_name)].add(Category.reserved_name)
×
115

UNCOV
116
        found_collision = False
×
UNCOV
117
        for name, cats in name_to_categories.items():
×
UNCOV
118
            if len(cats) > 1:
×
UNCOV
119
                scats = sorted(cat.value for cat in cats)
×
UNCOV
120
                cats_str = ", ".join(f"a {cat}" for cat in scats[:-1]) + f" and a {scats[-1]}."
×
UNCOV
121
                colliding_names = "`/`".join(
×
122
                    sorted({name, normalized_to_orig_name.get(name, name)})
123
                )
UNCOV
124
                logger.error(f"Naming collision: `{colliding_names}` is registered as {cats_str}")
×
UNCOV
125
                found_collision = True
×
126

UNCOV
127
        if found_collision:
×
UNCOV
128
            raise TypeError("Found naming collisions. See log for details.")
×
129

130
    @dataclass
1✔
131
    class Builder:
1✔
132
        _exposed_object_by_alias: dict[Any, Any] = field(default_factory=dict)
1✔
133
        _exposed_context_aware_object_factory_by_alias: dict[Any, Any] = field(default_factory=dict)
1✔
134
        _subsystem_to_providers: dict[type[Subsystem], list[str]] = field(
1✔
135
            default_factory=lambda: defaultdict(list)
136
        )
137
        _target_type_to_providers: dict[type[Target], list[str]] = field(
1✔
138
            default_factory=lambda: defaultdict(list)
139
        )
140
        _rule_to_providers: dict[Rule, list[str]] = field(default_factory=lambda: defaultdict(list))
1✔
141
        _union_rule_to_providers: dict[UnionRule, list[str]] = field(
1✔
142
            default_factory=lambda: defaultdict(list)
143
        )
144
        _allow_unknown_options: bool = False
1✔
145
        _remote_auth_plugin: Callable | None = None
1✔
146

147
        def registered_aliases(self) -> BuildFileAliases:
1✔
148
            """Return the registered aliases exposed in BUILD files.
149

150
            These returned aliases aren't so useful for actually parsing BUILD files.
151
            They are useful for generating online documentation.
152

153
            :returns: A new BuildFileAliases instance containing this BuildConfiguration's
154
                      registered alias mappings.
155
            """
156
            return BuildFileAliases(
×
157
                objects=self._exposed_object_by_alias.copy(),
158
                context_aware_object_factories=self._exposed_context_aware_object_factory_by_alias.copy(),
159
            )
160

161
        def register_aliases(self, aliases):
1✔
162
            """Registers the given aliases to be exposed in parsed BUILD files.
163

164
            :param aliases: The BuildFileAliases to register.
165
            :type aliases: :class:`pants.build_graph.build_file_aliases.BuildFileAliases`
166
            """
UNCOV
167
            if not isinstance(aliases, BuildFileAliases):
×
UNCOV
168
                raise TypeError(f"The aliases must be a BuildFileAliases, given {aliases}")
×
169

UNCOV
170
            for alias, obj in aliases.objects.items():
×
UNCOV
171
                self._register_exposed_object(alias, obj)
×
172

UNCOV
173
            for (
×
174
                alias,
175
                context_aware_object_factory,
176
            ) in aliases.context_aware_object_factories.items():
UNCOV
177
                self._register_exposed_context_aware_object_factory(
×
178
                    alias, context_aware_object_factory
179
                )
180

181
        def _register_exposed_object(self, alias, obj):
1✔
UNCOV
182
            if alias in self._exposed_object_by_alias:
×
UNCOV
183
                logger.debug(f"Object alias {alias} has already been registered. Overwriting!")
×
184

UNCOV
185
            self._exposed_object_by_alias[alias] = obj
×
186

187
        def _register_exposed_context_aware_object_factory(
1✔
188
            self, alias, context_aware_object_factory
189
        ):
UNCOV
190
            if alias in self._exposed_context_aware_object_factory_by_alias:
×
191
                logger.debug(
×
192
                    "This context aware object factory alias {} has already been registered. "
193
                    "Overwriting!".format(alias)
194
                )
195

UNCOV
196
            self._exposed_context_aware_object_factory_by_alias[alias] = (
×
197
                context_aware_object_factory
198
            )
199

200
        def register_subsystems(
1✔
201
            self, plugin_or_backend: str, subsystems: Iterable[type[Subsystem]]
202
        ):
203
            """Registers the given subsystem types."""
UNCOV
204
            if not isinstance(subsystems, Iterable):
×
205
                raise TypeError(f"The subsystems must be an iterable, given {subsystems}")
×
UNCOV
206
            subsystems = tuple(subsystems)
×
UNCOV
207
            if not subsystems:
×
UNCOV
208
                return
×
209

UNCOV
210
            invalid_subsystems = [
×
211
                s for s in subsystems if not isinstance(s, type) or not issubclass(s, Subsystem)
212
            ]
UNCOV
213
            if invalid_subsystems:
×
214
                raise TypeError(
×
215
                    "The following items from the given subsystems are not Subsystems "
216
                    "subclasses:\n\t{}".format("\n\t".join(str(i) for i in invalid_subsystems))
217
                )
218

UNCOV
219
            for subsystem in subsystems:
×
UNCOV
220
                self._subsystem_to_providers[subsystem].append(plugin_or_backend)
×
221

222
        def register_rules(self, plugin_or_backend: str, rules: Iterable[Rule | UnionRule]):
1✔
223
            """Registers the given rules."""
UNCOV
224
            if not isinstance(rules, Iterable):
×
225
                raise TypeError(f"The rules must be an iterable, given {rules!r}")
×
226

227
            # "Index" the rules to normalize them and expand their dependencies.
UNCOV
228
            rule_index = RuleIndex.create(rules)
×
UNCOV
229
            rules_and_queries: tuple[Rule, ...] = (*rule_index.rules, *rule_index.queries)
×
UNCOV
230
            for _rule in rules_and_queries:
×
UNCOV
231
                self._rule_to_providers[_rule].append(plugin_or_backend)
×
UNCOV
232
            for union_rule in rule_index.union_rules:
×
UNCOV
233
                self._union_rule_to_providers[union_rule].append(plugin_or_backend)
×
UNCOV
234
            self.register_subsystems(
×
235
                plugin_or_backend,
236
                (
237
                    _rule.output_type
238
                    for _rule in rules_and_queries
239
                    if issubclass(_rule.output_type, Subsystem)
240
                ),
241
            )
242

243
        # NB: We expect the parameter to be Iterable[Type[Target]], but we can't be confident in
244
        # this because we pass whatever people put in their `register.py`s to this function;
245
        # I.e., this is an impure function that reads from the outside world. So, we use the type
246
        # hint `Any` and perform runtime type checking.
247
        def register_target_types(
1✔
248
            self, plugin_or_backend: str, target_types: Iterable[type[Target]] | Any
249
        ) -> None:
250
            """Registers the given target types."""
UNCOV
251
            if not isinstance(target_types, Iterable):
×
252
                raise TypeError(
×
253
                    f"The entrypoint `target_types` must return an iterable. "
254
                    f"Given {repr(target_types)}"
255
                )
UNCOV
256
            bad_elements = [
×
257
                tgt_type
258
                for tgt_type in target_types
259
                if not isinstance(tgt_type, type) or not issubclass(tgt_type, Target)
260
            ]
UNCOV
261
            if bad_elements:
×
262
                raise TypeError(
×
263
                    "Every element of the entrypoint `target_types` must be a subclass of "
264
                    f"{Target.__name__}. Bad elements: {bad_elements}."
265
                )
UNCOV
266
            for target_type in target_types:
×
UNCOV
267
                self._target_type_to_providers[target_type].append(plugin_or_backend)
×
268
                # Access the Target.PluginField here to ensure the PluginField class is
269
                # created before the UnionMembership is instantiated, as the class hierarchy is
270
                # walked during union membership setup.
UNCOV
271
                _ = target_type.PluginField
×
272

273
        def register_remote_auth_plugin(self, remote_auth_plugin: Callable) -> None:
1✔
UNCOV
274
            self._remote_auth_plugin = remote_auth_plugin
×
275

276
        def register_auxiliary_goals(self, plugin_or_backend: str, auxiliary_goals: Iterable[type]):
1✔
277
            """Registers the given auxiliary goals."""
UNCOV
278
            if not isinstance(auxiliary_goals, Iterable):
×
279
                raise TypeError(
×
280
                    f"The entrypoint `auxiliary_goals` must return an iterable. "
281
                    f"Given {repr(auxiliary_goals)}"
282
                )
283
            # Import `AuxiliaryGoal` here to avoid import cycle.
UNCOV
284
            from pants.goal.auxiliary_goal import AuxiliaryGoal
×
285

UNCOV
286
            bad_elements = [goal for goal in auxiliary_goals if not issubclass(goal, AuxiliaryGoal)]
×
UNCOV
287
            if bad_elements:
×
288
                raise TypeError(
×
289
                    "Every element of the entrypoint `auxiliary_goals` must be a subclass of "
290
                    f"{AuxiliaryGoal.__name__}. Bad elements: {bad_elements}."
291
                )
UNCOV
292
            self.register_subsystems(plugin_or_backend, auxiliary_goals)
×
293

294
        def allow_unknown_options(self, allow: bool = True) -> None:
1✔
295
            """Allows overriding whether Options parsing will fail for unrecognized Options.
296

297
            Used to defer options failures while bootstrapping BuildConfiguration until after the
298
            complete set of plugins is known.
299
            """
UNCOV
300
            self._allow_unknown_options = True
×
301

302
        def create(self) -> BuildConfiguration:
1✔
UNCOV
303
            registered_aliases = BuildFileAliases(
×
304
                objects=self._exposed_object_by_alias.copy(),
305
                context_aware_object_factories=self._exposed_context_aware_object_factory_by_alias.copy(),
306
            )
UNCOV
307
            return BuildConfiguration(
×
308
                registered_aliases=registered_aliases,
309
                subsystem_to_providers=FrozenDict(
310
                    (k, tuple(v)) for k, v in self._subsystem_to_providers.items()
311
                ),
312
                target_type_to_providers=FrozenDict(
313
                    (k, tuple(v)) for k, v in self._target_type_to_providers.items()
314
                ),
315
                rule_to_providers=FrozenDict(
316
                    (k, tuple(v)) for k, v in self._rule_to_providers.items()
317
                ),
318
                union_rule_to_providers=FrozenDict(
319
                    (k, tuple(v)) for k, v in self._union_rule_to_providers.items()
320
                ),
321
                allow_unknown_options=self._allow_unknown_options,
322
                remote_auth_plugin_func=self._remote_auth_plugin,
323
            )
324

325

326
@rule
1✔
327
async def get_full_options_parsing_settings(
1✔
328
    build_config: BuildConfiguration,
329
) -> OptionsParsingSettings:
330
    return OptionsParsingSettings(
×
331
        build_config.known_scope_infos,
332
        build_config.allow_unknown_options,
333
    )
334

335

336
def rules():
1✔
UNCOV
337
    return collect_rules()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc