• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 22981081902

12 Mar 2026 12:31AM UTC coverage: 92.932% (-0.006%) from 92.938%
22981081902

Pull #23165

github

web-flow
Merge a62cf2b59 into 819035d5d
Pull Request #23165: [pants_ng] An NG subsystem implementation

269 of 295 new or added lines in 2 files covered. (91.19%)

1 existing line in 1 file now uncovered.

91437 of 98391 relevant lines covered (92.93%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.63
/src/python/pants/ng/subsystem.py
1
# Copyright 2025 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
import functools
1✔
5
import inspect
1✔
6
import typing
1✔
7
from collections.abc import Callable
1✔
8
from dataclasses import dataclass
1✔
9
from functools import partial, wraps
1✔
10
from types import GenericAlias
1✔
11
from typing import Any, Iterable, Self, Tuple, TypeVar, cast
1✔
12

13
from pants.engine.internals.native_engine import PyNgOptions, PyNgOptionsReader, PyOptionId
1✔
14
from pants.engine.rules import Rule, TaskRule, collect_rules, rule
1✔
15
from pants.ng.source_partition import SourcePartition
1✔
16
from pants.option.ranked_value import Rank
1✔
17
from pants.util.frozendict import FrozenDict
1✔
18
from pants.util.memo import memoized_classmethod
1✔
19

20
# Supported option value types.
21
#
22
# Note that this is a subset of the value types supported by og Pants, which allows
23
# nested and heterogenously-valued dicts. However that doesn't seem to be worth the effort
24
# so let's see how far we can get without supporting those. Note also that these types are all
25
# immutable, whereas the og types did not have to be. Again, not a complication we should
26
# be enthusiastic to support.
27
ScalarValue = bool | str | int | float
1✔
28
OptionValue = ScalarValue | tuple[ScalarValue, ...] | FrozenDict[str, str]
1✔
29

30

31
_SCALAR_TYPES = typing.get_args(ScalarValue)
1✔
32

33

34
def _is_valid_type(t: type | GenericAlias) -> bool:
1✔
35
    if isinstance(t, type):
1✔
36
        # ScalarType
37
        return t in _SCALAR_TYPES
1✔
38

39
    torig = typing.get_origin(t)
1✔
40
    targs = typing.get_args(t)
1✔
41
    return (
1✔
42
        # tuple[ScalarType, ...]
43
        torig == tuple and len(targs) == 2 and targs[0] in _SCALAR_TYPES and targs[1] == Ellipsis
44
    ) or (
45
        # FrozenDict[str, str]
46
        torig == FrozenDict and targs == (str, str)
47
    )
48

49

50
def _type_to_readable_str(t: type | GenericAlias) -> str:
1✔
51
    if isinstance(t, type):
1✔
52
        # E.g., `str` instead of `<class 'str'>`
53
        return t.__name__
1✔
54
    # A GenericAlias renders as you'd expect (e.g., `tuple[str, ...]`).
55
    return str(t)
1✔
56

57

58
def _is_of_type(val: OptionValue, expected_type: type | GenericAlias) -> bool:
1✔
59
    if isinstance(expected_type, type) and expected_type in _SCALAR_TYPES:
1✔
60
        return isinstance(val, expected_type)
1✔
61

62
    torig = typing.get_origin(expected_type)
1✔
63
    targs = typing.get_args(expected_type)
1✔
64
    if torig == tuple:
1✔
65
        member_type = targs[0]
1✔
66
        return isinstance(val, tuple) and all(isinstance(member, member_type) for member in val)
1✔
67
    if torig == FrozenDict:
1✔
68
        return isinstance(val, FrozenDict) and all(
1✔
69
            (type(k), type(v)) == (str, str) for k, v in val.items()
70
        )
71
    # Should never happen, assuming we've already passed _is_valid_type().
NEW
72
    raise ValueError(f"Unexpected expected_type {expected_type}")
×
73

74

75
def _getter(getter: Callable, option_parser, option_id, default) -> tuple[Any, bool]:
1✔
76
    # The legacy getter returns a tuple of (value, rank, optional derivation),
77
    # we only care about value and rank.
78
    value, rank, _ = getter(option_parser, option_id, default)
1✔
79
    return value, rank > Rank.HARDCODED._rank
1✔
80

81

82
def _tuple_getter(
1✔
83
    getter: Callable, option_parser, option_id, default
84
) -> tuple[tuple[ScalarValue, ...], bool]:
85
    # The legacy getter returns a tuple of (list value, rank, optional derivation),
86
    # we only care about value and rank, and we convert the value to tuple. The rust code can
87
    # consume a tuple where a list is expected, so no need to convert the default.
88
    value, rank, _ = getter(option_parser, option_id, default or ())
1✔
89
    return tuple(value), rank > Rank.HARDCODED._rank
1✔
90

91

92
def _dict_getter(option_parser, option_id, default) -> tuple[FrozenDict, bool]:
1✔
93
    # The legacy getter returns a tuple of (value, rank, optional derivation),
94
    # we only care about value and rank, and we convert the value to FrozenDict.
95
    # We also convert the default to regular dict, so the Rust code can consume it.
96
    # The type checker currently doesn't grok the overloaded FrozenDict.__init__, so
97
    # we ignore the arg-type error.
98
    value, rank, _ = PyNgOptionsReader.get_dict(option_parser, option_id, dict(default or {}))
1✔
99
    return FrozenDict(value), rank > Rank.HARDCODED._rank  # type: ignore[arg-type]
1✔
100

101

102
# A map from type to getter for that type.
103
# Each getter is called with the arguments (options_reader, option_id, default) and
104
# returns a pair (value, is_explicit). is_explicit is True if the value was specified
105
# explicitly (by config, env var or flag), and False if the default was used.
106
# Note that this differentiates between the case of "no explicit value" vs "explicit value
107
# was provided, but it happened to be the same as the default value".
108
_getters: dict[type | GenericAlias, Callable] = {
1✔
109
    bool: partial(_getter, PyNgOptionsReader.get_bool),
110
    str: partial(_getter, PyNgOptionsReader.get_string),
111
    int: partial(_getter, PyNgOptionsReader.get_int),
112
    float: partial(_getter, PyNgOptionsReader.get_float),
113
    tuple[bool, ...]: partial(_tuple_getter, PyNgOptionsReader.get_bool_list),
114
    tuple[str, ...]: partial(_tuple_getter, PyNgOptionsReader.get_string_list),
115
    tuple[int, ...]: partial(_tuple_getter, PyNgOptionsReader.get_int_list),
116
    tuple[float, ...]: partial(_tuple_getter, PyNgOptionsReader.get_float_list),
117
    FrozenDict[str, str]: _dict_getter,
118
}
119

120

121
@dataclass(frozen=True)
1✔
122
class OptionDescriptor:
1✔
123
    """Information we capture at @option evaluation time."""
124

125
    name: str
126
    type: type | GenericAlias
127
    default: OptionValue | None
128
    help: str
129
    required: bool = False
1✔
130

131

132
R = TypeVar("R")
1✔
133
S = TypeVar("S")
1✔
134
OptionFunc = Callable[[S], R]
1✔
135
Default = OptionValue | Callable[[Any], OptionValue]
1✔
136
Help = str | Callable[[Any], str]
1✔
137

138

139
REQUIRED = object()
1✔
140

141

142
# A @decorator for options.
143
def option(*, help: Help, required: bool = False, default: Default | None = None) -> OptionFunc:
1✔
144
    def decorator(func: OptionFunc):
1✔
145
        # Mark the func as an option.
146
        func._option_ = True  # type: ignore[attr-defined]
1✔
147
        func._option_required_ = required  # type: ignore[attr-defined]
1✔
148
        func._option_default_ = default  # type: ignore[attr-defined]
1✔
149
        func._option_help_ = help  # type: ignore[attr-defined]
1✔
150

151
        @wraps(func)
1✔
152
        def wrapper(instance):
1✔
153
            return instance._option_values_[func.__name__]
1✔
154

155
        return property(wrapper)
1✔
156

157
    return decorator
1✔
158

159

160
class SubsystemNg:
1✔
161
    """A holder of options in a scope.
162

163
    Much lighter weight than its og counterpart.
164

165
    Use like this:
166

167
    class Foo(SubsystemNg):
168
        options_scope = "foo"
169

170
        @option(help="bar help")
171
        def bar(self) -> str: ...
172

173
        @option(default=42, help="baz help")
174
        def baz(self) -> int: ...
175

176
        ...
177

178
    The supported option types are: bool, str, int, float, tuple[bool, ...], tuple[str, ...],
179
    tuple[int, ...], tuple[float, ...] and FrozenDict[str, str].
180

181
    The `...` in the method bodies above are literal: the subsystem mechanism will provide the body.
182
    You may need `# mypy: disable-error-code=empty-body` for your code to pass mypy type checks.
183

184
    `default` and `help` can be values or callables that take the subsystem type as an arg, and
185
    return the relevant values.
186

187
    Subsystem instances are instantiated with a PyNgOptionsReader, and calling .bar or .baz (with
188
    no parentheses) on an instance of Foo will return the value of that option from the underlying
189
    parser.
190

191
    Option names must not _begin_and_end_ with an underscore. Such names are reserved for the
192
    underlying initialization mechanics.
193
    """
194

195
    _subsystem_ng_ = True
1✔
196

197
    # Subclasses must set.
198
    options_scope: str | None = None
1✔
199
    help: str | None = None
1✔
200

201
    # The ctor will set.
202
    _option_values_: FrozenDict[str, OptionValue] | None = None
1✔
203

204
    @classmethod
1✔
205
    def create(cls, options_reader: PyNgOptionsReader) -> Self:
1✔
206
        """Indirect constructor method.
207

208
        Useful for avoiding spurious mypy errors when instantiating instances in tests.
209
        """
210
        return cls(options_reader)
1✔
211

212
    def __init__(self, options_reader: PyNgOptionsReader):
1✔
213
        option_values = {}
1✔
214
        for descriptor in self._get_option_descriptors_():
1✔
215
            option_id = PyOptionId(descriptor.name, scope=self.options_scope)
1✔
216
            getter = _getters[descriptor.type]
1✔
217
            val, is_explicit = getter(
1✔
218
                options_reader, option_id=option_id, default=descriptor.default
219
            )
220
            if descriptor.required and not is_explicit:
1✔
221
                raise ValueError(
1✔
222
                    "No value provided for required option "
223
                    f"[{self.options_scope}].{descriptor.name}."
224
                )
225
            option_values[descriptor.name] = val
1✔
226
        self._option_values_ = FrozenDict(option_values)
1✔
227

228
    def _get_option_values_(self) -> FrozenDict[str, OptionValue]:
1✔
NEW
229
        return cast(FrozenDict[str, OptionValue], self._option_values_)
×
230

231
    def __str__(self) -> str:
1✔
NEW
232
        fields = [f"{k}={v}" for k, v in self._get_option_values_().items()]
×
NEW
233
        return f"{self.__class__.__name__}({', '.join(fields)})"
×
234

235
    @classmethod
1✔
236
    def _get_option_descriptors_(cls) -> tuple[OptionDescriptor, ...]:
1✔
237
        """Return the option descriptors for this subsystem.
238

239
        Must only be called after initialize() has been called.
240
        """
241
        return cast(tuple[OptionDescriptor], getattr(cls, "_option_descriptors_"))
1✔
242

243
    @classmethod
1✔
244
    def _initialize_(cls):
1✔
245
        if getattr(cls, "_option_descriptors_", None) is not None:
1✔
NEW
246
            return
×
247

248
        if cls.options_scope is None:
1✔
249
            raise ValueError(f"Subsystem class {cls.__name__} must set the options_scope classvar.")
1✔
250
        if cls.help is None:
1✔
251
            raise ValueError(f"Subsystem class {cls.__name__} must set the help classvar.")
1✔
252
        option_descriptors = []
1✔
253
        # We don't use dir() or inspect.getmembers() because those sort by name,
254
        # and we want to preserve declaration order.
255
        for basecls in inspect.getmro(cls):
1✔
256
            for name, member in basecls.__dict__.items():
1✔
257
                if not isinstance(member, property):
1✔
258
                    continue
1✔
259
                wrapped = member.fget
1✔
260
                if not getattr(wrapped, "_option_", False):
1✔
NEW
261
                    continue
×
262
                sig = inspect.signature(wrapped)
1✔
263
                if len(sig.parameters) != 1 or next(iter(sig.parameters)) != "self":
1✔
264
                    raise ValueError(
1✔
265
                        f"The @option decorator expects to be placed on a no-arg instance method, but {basecls.__name__}.{name} does not have this signature."
266
                    )
267
                option_type = sig.return_annotation
1✔
268
                if not _is_valid_type(option_type):
1✔
NEW
269
                    raise ValueError(
×
270
                        f"Invalid type `{option_type}` for option {basecls.__name__}.{name}"
271
                    )
272

273
                help = getattr(wrapped, "_option_help_")
1✔
274
                if callable(help):
1✔
275
                    help = help(cls)
1✔
276
                required = getattr(wrapped, "_option_required_")
1✔
277
                default = getattr(wrapped, "_option_default_")
1✔
278
                if required:
1✔
279
                    if default is not None:
1✔
NEW
280
                        raise ValueError(
×
281
                            "The option {basecls.__name__}.{name} is required, so it must not provide a default value."
282
                        )
283
                else:
284
                    if callable(default):
1✔
285
                        default = default(cls)
1✔
286
                    if default is not None and not _is_of_type(default, option_type):
1✔
287
                        raise ValueError(
1✔
288
                            f"The default for option {basecls.__name__}.{name} must be of type {_type_to_readable_str(option_type)} (or None)."
289
                        )
290
                option_descriptors.append(
1✔
291
                    OptionDescriptor(
292
                        name,
293
                        type=option_type,
294
                        required=required,
295
                        default=default,
296
                        help=help,
297
                    )
298
                )
299
        cls._option_descriptors_ = tuple(option_descriptors)
1✔
300

301
    @memoized_classmethod
1✔
302
    def _get_rules_(cls: Any) -> Iterable[Rule]:
1✔
NEW
303
        return cast(Iterable[Rule], (cls._construct_subsystem_rule_(),))
×
304

305
    @classmethod
1✔
306
    def _get_construct_func_(cls) -> Tuple[Callable, dict]:
1✔
307
        """Returns information that allows the engine to construct an instance of the subsystem.
308

309
        The return value is a pair (function, params) where the function takes the
310
        type of the subsystem as its first argument (`cls`) and the given params
311
        as its subsequent argument(s). The params dict is a map of param name -> param type.
312

313
        See below for examples of use (currently the only uses, in fact).
314
        """
NEW
315
        raise ValueError(f"_get_construct_func_() not implemented for {cls}")
×
316

317
    @classmethod
1✔
318
    def _construct_subsystem_rule_(cls) -> Rule:
1✔
319
        """Returns a `TaskRule` that will construct the target Subsystem."""
NEW
320
        construct_func, construct_func_params = cls._get_construct_func_()
×
321

NEW
322
        partial_construct_subsystem: Any = functools.partial(construct_func, cls)
×
323

324
        # NB: We populate several dunder methods on the partial function because partial
325
        # functions do not have these defined by default, and the engine uses these values to
326
        # visualize functions in error messages and the rule graph.
NEW
327
        name = f"construct_{cls.__name__}"
×
NEW
328
        partial_construct_subsystem.__name__ = name
×
NEW
329
        partial_construct_subsystem.__module__ = cls.__module__
×
NEW
330
        partial_construct_subsystem.__doc__ = cls.help() if callable(cls.help) else cls.help
×
NEW
331
        partial_construct_subsystem.__line_number__ = inspect.getsourcelines(cls)[1]
×
332

NEW
333
        return TaskRule(
×
334
            output_type=cls,
335
            parameters=FrozenDict(construct_func_params),
336
            awaitables=(),
337
            masked_types=(),
338
            func=partial_construct_subsystem,
339
            canonical_name=name,
340
        )
341

342

343
@dataclass(frozen=True)
1✔
344
class UniversalOptionsReader:
1✔
345
    options_reader: PyNgOptionsReader
346

347

348
@rule
1✔
349
async def get_universal_options_reader(options: PyNgOptions) -> UniversalOptionsReader:
1✔
350
    # The universal options are the options for the repo root dir.
NEW
351
    return UniversalOptionsReader(options.get_options_reader_for_dir(""))
×
352

353

354
class UniversalSubsystem(SubsystemNg):
1✔
355
    """A subsystem whose values are universal for all uses.
356

357
    Uses include:
358
    - Global options, as these affect the running of Pants itself.
359
    - Command/subcommand options, as these are typically provided on the command line and
360
      are intended by the user to apply to all sources.
361
    """
362

363
    @classmethod
1✔
364
    def _get_construct_func_(cls):
1✔
NEW
365
        return _construct_universal_subsystem, {"universal_options_reader": UniversalOptionsReader}
×
366

367

368
_UniversalSubsystemT = TypeVar("_UniversalSubsystemT", bound="UniversalSubsystem")
1✔
369

370

371
async def _construct_universal_subsystem(
1✔
372
    subsystem_typ: type[_UniversalSubsystemT],
373
    universal_options_reader: UniversalOptionsReader,
374
) -> _UniversalSubsystemT:
NEW
375
    return subsystem_typ(universal_options_reader.options_reader)
×
376

377

378
class ContextualSubsystem(SubsystemNg):
1✔
379
    """A subsystem whose values may vary depending on the sources it applies to.
380

381
    The main use is to allow config files in subdirectories to override option values
382
    set in parent directories (including the root directory), and for those overrides
383
    to apply just to the sources in or below those subdirectories.
384

385
    The input sources for a command will be partitioned into subsets, each attached to
386
    its own subsystem instance.
387
    """
388

389
    @classmethod
1✔
390
    def _get_construct_func_(cls):
1✔
NEW
391
        return _construct_contextual_subsystem, {"source_partition": SourcePartition}
×
392

393

394
_ContextualSubsystemT = TypeVar("_ContextualSubsystemT", bound="ContextualSubsystem")
1✔
395

396

397
async def _construct_contextual_subsystem(
1✔
398
    subsystem_typ: type[_ContextualSubsystemT],
399
    source_partition: SourcePartition,
400
) -> _ContextualSubsystemT:
NEW
401
    return subsystem_typ(source_partition.options_reader)
×
402

403

404
def rules() -> tuple[Rule, ...]:
1✔
NEW
405
    return tuple(collect_rules())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc