• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18812500213

26 Oct 2025 03:42AM UTC coverage: 80.284% (+0.005%) from 80.279%
18812500213

Pull #22804

github

web-flow
Merge 2a56fdb46 into 4834308dc
Pull Request #22804: test_shell_command: use correct default cache scope for a test's environment

29 of 31 new or added lines in 2 files covered. (93.55%)

1314 existing lines in 64 files now uncovered.

77900 of 97030 relevant lines covered (80.28%)

3.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.96
/src/python/pants/engine/target.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import collections.abc
12✔
7
import dataclasses
12✔
8
import enum
12✔
9
import glob as glob_stdlib
12✔
10
import itertools
12✔
11
import logging
12✔
12
import os.path
12✔
13
import textwrap
12✔
14
import zlib
12✔
15
from abc import ABC, ABCMeta, abstractmethod
12✔
16
from collections import deque
12✔
17
from collections.abc import Callable, Iterable, Iterator, KeysView, Mapping, Sequence
12✔
18
from dataclasses import dataclass
12✔
19
from enum import Enum
12✔
20
from operator import attrgetter
12✔
21
from pathlib import PurePath
12✔
22
from typing import (
12✔
23
    AbstractSet,
24
    Any,
25
    ClassVar,
26
    Generic,
27
    Protocol,
28
    Self,
29
    TypeVar,
30
    cast,
31
    final,
32
    get_type_hints,
33
)
34

35
from pants.base.deprecated import warn_or_error
12✔
36
from pants.engine.addresses import Address, Addresses, UnparsedAddressInputs, assert_single_address
12✔
37
from pants.engine.collection import Collection
12✔
38
from pants.engine.engine_aware import EngineAwareParameter
12✔
39
from pants.engine.environment import EnvironmentName
12✔
40
from pants.engine.fs import (
12✔
41
    GlobExpansionConjunction,
42
    GlobMatchErrorBehavior,
43
    PathGlobs,
44
    Paths,
45
    Snapshot,
46
)
47
from pants.engine.internals.dep_rules import (
12✔
48
    DependencyRuleActionDeniedError,
49
    DependencyRuleApplication,
50
)
51
from pants.engine.internals.native_engine import NO_VALUE as NO_VALUE  # noqa: F401
12✔
52
from pants.engine.internals.native_engine import Field as Field
12✔
53
from pants.engine.internals.target_adaptor import SourceBlock, SourceBlocks  # noqa: F401
12✔
54
from pants.engine.rules import rule
12✔
55
from pants.engine.unions import UnionMembership, UnionRule, distinct_union_type_per_subclass, union
12✔
56
from pants.option.bootstrap_options import UnmatchedBuildFileGlobs
12✔
57
from pants.source.filespec import Filespec, FilespecMatcher
12✔
58
from pants.util.collections import ensure_list, ensure_str_list
12✔
59
from pants.util.dirutil import fast_relpath
12✔
60
from pants.util.docutil import bin_name, doc_url
12✔
61
from pants.util.frozendict import FrozenDict
12✔
62
from pants.util.memo import memoized_classproperty, memoized_method, memoized_property
12✔
63
from pants.util.ordered_set import FrozenOrderedSet
12✔
64
from pants.util.strutil import bullet_list, help_text, pluralize, softwrap
12✔
65

66
logger = logging.getLogger(__name__)
12✔
67

68
# -----------------------------------------------------------------------------------------------
69
# Core Field abstractions
70
# -----------------------------------------------------------------------------------------------
71

72
# Type alias to express the intent that the type should be immutable and hashable. There's nothing
73
# to actually enforce this, outside of convention. Maybe we could develop a MyPy plugin?
74
ImmutableValue = Any
12✔
75

76

77
# NB: By subclassing `Field`, MyPy understands our type hints, and it means it doesn't matter which
78
# order you use for inheriting the field template vs. the mixin.
79
class AsyncFieldMixin(Field):
12✔
80
    """A mixin to store the field's original `Address` for use during hydration by the engine.
81

82
    Typically, you should also create a dataclass representing the hydrated value and another for
83
    the request, then a rule to go from the request to the hydrated value. The request class should
84
    store the async field as a property.
85

86
    (Why use the request class as the rule input, rather than the field itself? It's a wrapper so
87
    that subclasses of the async field work properly, given that the engine uses exact type IDs.
88
    This is like WrappedTarget.)
89

90
    For example:
91

92
        class Sources(StringSequenceField, AsyncFieldMixin):
93
            alias = "sources"
94

95
            # Often, async fields will want to define entry points like this to allow subclasses to
96
            # change behavior.
97
            def validate_resolved_files(self, files: Sequence[str]) -> None:
98
                pass
99

100

101
        @dataclass(frozen=True)
102
        class HydrateSourcesRequest:
103
            field: Sources
104

105

106
        @dataclass(frozen=True)
107
        class HydratedSources:
108
            snapshot: Snapshot
109

110

111
        @rule
112
        async def hydrate_sources(request: HydrateSourcesRequest) -> HydratedSources:
113
            digest = await path_globs_to_digest(PathGlobs(request.field.value))
114
            result = await digest_to_snapshot(digest)
115
            request.field.validate_resolved_files(result.files)
116
            ...
117
            return HydratedSources(result)
118

119
    Then, call sites can `await` if they need to hydrate the field, even if they subclassed
120
    the original async field to have custom behavior:
121

122
        sources1 = hydrate_sources(HydrateSourcesRequest(my_tgt.get(Sources)))
123
        sources2 = hydrate_sources(HydrateSourcesRequest(custom_tgt.get(CustomSources)))
124
    """
125

126
    address: Address
12✔
127

128
    @final
12✔
129
    def __new__(cls, raw_value: Any | None, address: Address) -> Self:
12✔
130
        obj = super().__new__(cls, raw_value, address)  # type: ignore[call-arg]
10✔
131
        # N.B.: We store the address here and not in the Field base class, because the memory usage
132
        # of storing this value in every field was shown to be excessive / lead to performance
133
        # issues.
134
        object.__setattr__(obj, "address", address)
10✔
135
        return obj
10✔
136

137
    def __repr__(self) -> str:
12✔
138
        params = [
×
139
            f"alias={self.alias!r}",
140
            f"address={self.address}",
141
            f"value={self.value!r}",
142
        ]
143
        if hasattr(self, "default"):
×
144
            params.append(f"default={self.default!r}")
×
145
        return f"{self.__class__}({', '.join(params)})"
×
146

147
    def __hash__(self) -> int:
12✔
148
        return hash((self.__class__, self.value, self.address))
12✔
149

150
    def __eq__(self, other: Any) -> bool:
12✔
151
        if not isinstance(other, AsyncFieldMixin):
12✔
152
            return False
×
153
        return (
12✔
154
            self.__class__ == other.__class__
155
            and self.value == other.value
156
            and self.address == other.address
157
        )
158

159
    def __ne__(self, other: Any) -> bool:
12✔
160
        return not (self == other)
1✔
161

162

163
@union
12✔
164
@dataclass(frozen=True)
12✔
165
class FieldDefaultFactoryRequest:
12✔
166
    """Registers a dynamic default for a Field.
167

168
    See `FieldDefaults`.
169
    """
170

171
    field_type: ClassVar[type[Field]]
12✔
172

173

174
# TODO: Workaround for https://github.com/python/mypy/issues/5485, because we cannot directly use
175
# a Callable.
176
class FieldDefaultFactory(Protocol):
12✔
177
    def __call__(self, field: Field) -> Any:
12✔
178
        pass
×
179

180

181
@dataclass(frozen=True)
12✔
182
class FieldDefaultFactoryResult:
12✔
183
    """A wrapper for a function which computes the default value of a Field."""
184

185
    default_factory: FieldDefaultFactory
12✔
186

187

188
@dataclass(frozen=True)
12✔
189
class FieldDefaults:
12✔
190
    """Generic Field default values. To install a default, see `FieldDefaultFactoryRequest`.
191

192
    TODO: This is to work around the fact that Field value defaulting cannot have arbitrary
193
    subsystem requirements, and so e.g. `JvmResolveField` and `PythonResolveField` have methods
194
    which compute the true value of the field given a subsystem argument. Consumers need to
195
    be type aware, and `@rules` cannot have dynamic requirements.
196

197
    Additionally, `__defaults__` should mean that computed default Field values should become
198
    more rare: i.e. `JvmResolveField` and `PythonResolveField` could potentially move to
199
    hardcoded default values which users override with `__defaults__` if they'd like to change
200
    the default resolve names.
201

202
    See https://github.com/pantsbuild/pants/issues/12934 about potentially allowing unions
203
    (including Field registrations) to have `@rule_helper` methods, which would allow the
204
    computation of an AsyncField to directly require a subsystem. Since
205
    https://github.com/pantsbuild/pants/pull/17947 rules may use any methods as rule helpers without
206
    special decoration so this should now be possible to implement.
207
    """
208

209
    _factories: FrozenDict[type[Field], FieldDefaultFactory]
12✔
210

211
    @memoized_method
12✔
212
    def factory(self, field_type: type[Field]) -> FieldDefaultFactory:
12✔
213
        """Looks up a Field default factory in a subclass-aware way."""
214
        factory = self._factories.get(field_type, None)
1✔
215
        if factory is not None:
1✔
216
            return factory
1✔
217

218
        for ft, factory in self._factories.items():
1✔
219
            if issubclass(field_type, ft):
1✔
220
                return factory
1✔
221

222
        return lambda f: f.value
1✔
223

224
    def value_or_default(self, field: Field) -> Any:
12✔
225
        return (self.factory(type(field)))(field)
1✔
226

227

228
# -----------------------------------------------------------------------------------------------
229
# Core Target abstractions
230
# -----------------------------------------------------------------------------------------------
231

232

233
# NB: This TypeVar is what allows `Target.get()` to properly work with MyPy so that MyPy knows
234
# the precise Field returned.
235
_F = TypeVar("_F", bound=Field)
12✔
236

237

238
@dataclass(frozen=True)
12✔
239
class Target:
12✔
240
    """A Target represents an addressable set of metadata.
241

242
    Set the `help` class property with a description, which will be used in `./pants help`. For the
243
    best rendering, use soft wrapping (e.g. implicit string concatenation) within paragraphs, but
244
    hard wrapping (`\n`) to separate distinct paragraphs and/or lists.
245
    """
246

247
    # Subclasses must define these
248
    alias: ClassVar[str]
12✔
249
    core_fields: ClassVar[tuple[type[Field], ...]]
12✔
250
    help: ClassVar[str | Callable[[], str]]
12✔
251

252
    removal_version: ClassVar[str | None] = None
12✔
253
    removal_hint: ClassVar[str | None] = None
12✔
254

255
    deprecated_alias: ClassVar[str | None] = None
12✔
256
    deprecated_alias_removal_version: ClassVar[str | None] = None
12✔
257

258
    # These get calculated in the constructor
259
    address: Address
12✔
260
    field_values: FrozenDict[type[Field], Field]
12✔
261
    residence_dir: str
12✔
262
    name_explicitly_set: bool
12✔
263
    description_of_origin: str
12✔
264
    origin_sources_blocks: FrozenDict[str, SourceBlocks]
12✔
265

266
    @final
12✔
267
    def __init__(
12✔
268
        self,
269
        unhydrated_values: Mapping[str, Any],
270
        address: Address,
271
        # NB: `union_membership` is only optional to facilitate tests. In production, we should
272
        # always provide this parameter. This should be safe to do because production code should
273
        # rarely directly instantiate Targets and should instead use the engine to request them.
274
        union_membership: UnionMembership | None = None,
275
        *,
276
        name_explicitly_set: bool = True,
277
        residence_dir: str | None = None,
278
        ignore_unrecognized_fields: bool = False,
279
        description_of_origin: str | None = None,
280
        origin_sources_blocks: FrozenDict[str, SourceBlocks] = FrozenDict(),
281
    ) -> None:
282
        """Create a target.
283

284
        :param unhydrated_values: A mapping of field aliases to their raw values. Any left off
285
            fields will either use their default or error if required=True.
286
        :param address: How to uniquely identify this target.
287
        :param union_membership: Used to determine plugin fields. This must be set in production!
288
        :param residence_dir: Where this target "lives". If unspecified, will be the `spec_path`
289
            of the `address`, i.e. where the target was either explicitly defined or where its
290
            target generator was explicitly defined. Target generators can, however, set this to
291
            the directory where the generated target provides metadata for. For example, a
292
            file-based target like `python_source` should set this to the parent directory of
293
            its file. A file-less target like `go_third_party_package` should keep the default of
294
            `address.spec_path`. This field impacts how command line specs work, so that globs
295
            like `dir:` know whether to match the target or not.
296
        :param ignore_unrecognized_fields: Don't error if fields are not recognized. This is only
297
            intended for when Pants is bootstrapping itself.
298
        :param description_of_origin: Where this target was declared, such as a path to BUILD file
299
            and line number.
300
        """
301
        if self.removal_version and not address.is_generated_target:
12✔
302
            if not self.removal_hint:
×
303
                raise ValueError(
×
304
                    f"You specified `removal_version` for {self.__class__}, but not "
305
                    "the class property `removal_hint`."
306
                )
307
            warn_or_error(
×
308
                self.removal_version,
309
                entity=f"the {repr(self.alias)} target type",
310
                hint=f"Using the `{self.alias}` target type for {address}. {self.removal_hint}",
311
            )
312

313
        if origin_sources_blocks:
12✔
314
            _validate_origin_sources_blocks(origin_sources_blocks)
×
315

316
        object.__setattr__(
12✔
317
            self, "residence_dir", residence_dir if residence_dir is not None else address.spec_path
318
        )
319
        object.__setattr__(self, "address", address)
12✔
320
        object.__setattr__(
12✔
321
            self, "description_of_origin", description_of_origin or self.residence_dir
322
        )
323
        object.__setattr__(self, "origin_sources_blocks", origin_sources_blocks)
12✔
324
        object.__setattr__(self, "name_explicitly_set", name_explicitly_set)
12✔
325
        try:
12✔
326
            object.__setattr__(
12✔
327
                self,
328
                "field_values",
329
                self._calculate_field_values(
330
                    unhydrated_values,
331
                    address,
332
                    union_membership,
333
                    ignore_unrecognized_fields=ignore_unrecognized_fields,
334
                ),
335
            )
336

337
            self.validate()
12✔
338
        except Exception as e:
2✔
339
            raise InvalidTargetException(
2✔
340
                str(e), description_of_origin=self.description_of_origin
341
            ) from e
342

343
    @final
12✔
344
    def _calculate_field_values(
12✔
345
        self,
346
        unhydrated_values: Mapping[str, Any],
347
        address: Address,
348
        # See `__init__`.
349
        union_membership: UnionMembership | None,
350
        *,
351
        ignore_unrecognized_fields: bool,
352
    ) -> FrozenDict[type[Field], Field]:
353
        all_field_types = self.class_field_types(union_membership)
12✔
354
        field_values = {}
12✔
355
        aliases_to_field_types = self._get_field_aliases_to_field_types(all_field_types)
12✔
356

357
        for alias, value in unhydrated_values.items():
12✔
358
            if alias not in aliases_to_field_types:
11✔
359
                if ignore_unrecognized_fields:
1✔
360
                    continue
1✔
361
                valid_aliases = set(aliases_to_field_types.keys())
1✔
362
                if isinstance(self, TargetGenerator):
1✔
363
                    # Even though moved_fields don't live on the target generator, they are valid
364
                    # for users to specify. It's intentional that these are only used for
365
                    # `InvalidFieldException` and are not stored as normal fields with
366
                    # `aliases_to_field_types`.
367
                    for field_type in self.moved_fields:
×
368
                        valid_aliases.add(field_type.alias)
×
369
                        if field_type.deprecated_alias is not None:
×
370
                            valid_aliases.add(field_type.deprecated_alias)
×
371
                raise InvalidFieldException(
1✔
372
                    f"Unrecognized field `{alias}={value}` in target {address}. Valid fields for "
373
                    f"the target type `{self.alias}`: {sorted(valid_aliases)}.",
374
                )
375
            field_type = aliases_to_field_types[alias]
11✔
376
            field_values[field_type] = field_type(value, address)
11✔
377

378
        # For undefined fields, mark the raw value as missing.
379
        for field_type in all_field_types:
12✔
380
            if field_type in field_values:
11✔
381
                continue
11✔
382
            field_values[field_type] = field_type(NO_VALUE, address)
11✔
383
        return FrozenDict(
12✔
384
            sorted(
385
                field_values.items(),
386
                key=lambda field_type_to_val_pair: field_type_to_val_pair[0].alias,
387
            )
388
        )
389

390
    @final
12✔
391
    @classmethod
12✔
392
    def _get_field_aliases_to_field_types(
12✔
393
        cls, field_types: Iterable[type[Field]]
394
    ) -> dict[str, type[Field]]:
395
        aliases_to_field_types = {}
12✔
396
        for field_type in field_types:
12✔
397
            aliases_to_field_types[field_type.alias] = field_type
11✔
398
            if field_type.deprecated_alias is not None:
11✔
399
                aliases_to_field_types[field_type.deprecated_alias] = field_type
1✔
400
        return aliases_to_field_types
12✔
401

402
    @final
12✔
403
    @property
12✔
404
    def field_types(self) -> KeysView[type[Field]]:
12✔
405
        return self.field_values.keys()
2✔
406

407
    @distinct_union_type_per_subclass
12✔
408
    class PluginField:
12✔
409
        pass
12✔
410

411
    def __repr__(self) -> str:
12✔
412
        fields = ", ".join(str(field) for field in self.field_values.values())
×
413
        return (
×
414
            f"{self.__class__}("
415
            f"address={self.address}, "
416
            f"alias={self.alias!r}, "
417
            f"residence_dir={self.residence_dir!r}, "
418
            f"origin={self.description_of_origin}, "
419
            f"{fields})"
420
        )
421

422
    def __str__(self) -> str:
12✔
423
        fields = ", ".join(str(field) for field in self.field_values.values())
2✔
424
        address = f'address="{self.address}"{", " if fields else ""}'
2✔
425
        return f"{self.alias}({address}{fields})"
2✔
426

427
    def __hash__(self) -> int:
12✔
428
        return hash((self.__class__, self.address, self.residence_dir, self.field_values))
12✔
429

430
    def __eq__(self, other: Target | Any) -> bool:
12✔
431
        if not isinstance(other, Target):
12✔
432
            return NotImplemented
×
433
        return (self.__class__, self.address, self.residence_dir, self.field_values) == (
12✔
434
            other.__class__,
435
            other.address,
436
            other.residence_dir,
437
            other.field_values,
438
        )
439

440
    def __lt__(self, other: Any) -> bool:
12✔
441
        if not isinstance(other, Target):
×
442
            return NotImplemented
×
443
        return self.address < other.address
×
444

445
    def __gt__(self, other: Any) -> bool:
12✔
446
        if not isinstance(other, Target):
×
447
            return NotImplemented
×
448
        return self.address > other.address
×
449

450
    @classmethod
12✔
451
    @memoized_method
12✔
452
    def _find_plugin_fields(cls, union_membership: UnionMembership) -> tuple[type[Field], ...]:
12✔
453
        result: set[type[Field]] = set()
5✔
454
        classes = [cls]
5✔
455
        while classes:
5✔
456
            cls = classes.pop()
5✔
457
            classes.extend(cls.__bases__)
5✔
458
            if issubclass(cls, Target):
5✔
459
                result.update(cast("set[type[Field]]", union_membership.get(cls.PluginField)))
5✔
460

461
        return tuple(sorted(result, key=attrgetter("alias")))
5✔
462

463
    @final
12✔
464
    @classmethod
12✔
465
    def _find_registered_field_subclass(
12✔
466
        cls, requested_field: type[_F], *, registered_fields: Iterable[type[Field]]
467
    ) -> type[_F] | None:
468
        """Check if the Target has registered a subclass of the requested Field.
469

470
        This is necessary to allow targets to override the functionality of common fields. For
471
        example, you could subclass `Tags` to define `CustomTags` with a different default. At the
472
        same time, we still want to be able to call `tgt.get(Tags)`, in addition to
473
        `tgt.get(CustomTags)`.
474
        """
475
        subclass = next(
12✔
476
            (
477
                registered_field
478
                for registered_field in registered_fields
479
                if issubclass(registered_field, requested_field)
480
            ),
481
            None,
482
        )
483
        return subclass
12✔
484

485
    @final
12✔
486
    def _maybe_get(self, field: type[_F]) -> _F | None:
12✔
487
        result = self.field_values.get(field, None)
12✔
488
        if result is not None:
12✔
489
            return cast(_F, result)
12✔
490
        field_subclass = self._find_registered_field_subclass(
12✔
491
            field, registered_fields=self.field_values.keys()
492
        )
493
        if field_subclass is not None:
12✔
494
            return cast(_F, self.field_values[field_subclass])
12✔
495
        return None
6✔
496

497
    @final
12✔
498
    def __getitem__(self, field: type[_F]) -> _F:
12✔
499
        """Get the requested `Field` instance belonging to this target.
500

501
        If the `Field` is not registered on this `Target` type, this method will raise a
502
        `KeyError`. To avoid this, you should first call `tgt.has_field()` or `tgt.has_fields()`
503
        to ensure that the field is registered, or, alternatively, use `Target.get()`.
504

505
        See the docstring for `Target.get()` for how this method handles subclasses of the
506
        requested Field and for tips on how to use the returned value.
507
        """
508
        result = self._maybe_get(field)
12✔
509
        if result is not None:
12✔
510
            return result
12✔
511
        raise KeyError(
1✔
512
            f"The target `{self}` does not have a field `{field.__name__}`. Before calling "
513
            f"`my_tgt[{field.__name__}]`, call `my_tgt.has_field({field.__name__})` to "
514
            f"filter out any irrelevant Targets or call `my_tgt.get({field.__name__})` to use the "
515
            f"default Field value."
516
        )
517

518
    @final
12✔
519
    def get(self, field: type[_F], *, default_raw_value: Any | None = None) -> _F:
12✔
520
        """Get the requested `Field` instance belonging to this target.
521

522
        This will return an instance of the requested field type, e.g. an instance of
523
        `InterpreterConstraints`, `SourcesField`, `EntryPoint`, etc. Usually, you will want to
524
        grab the `Field`'s inner value, e.g. `tgt.get(Compatibility).value`. (For async fields like
525
        `SourcesField`, you may need to hydrate the value.).
526

527
        This works with subclasses of `Field`. For example, if you subclass `Tags`
528
        to define a custom subclass `CustomTags`, both `tgt.get(Tags)` and
529
        `tgt.get(CustomTags)` will return the same `CustomTags` instance.
530

531
        If the `Field` is not registered on this `Target` type, this will return an instance of
532
        the requested Field by using `default_raw_value` to create the instance. Alternatively,
533
        first call `tgt.has_field()` or `tgt.has_fields()` to ensure that the field is registered,
534
        or, alternatively, use indexing (e.g. `tgt[Compatibility]`) to raise a KeyError when the
535
        field is not registered.
536
        """
537
        result = self._maybe_get(field)
12✔
538
        if result is not None:
12✔
539
            return result
12✔
540
        return field(default_raw_value, self.address)
6✔
541

542
    @final
12✔
543
    @classmethod
12✔
544
    def _has_fields(
12✔
545
        cls, fields: Iterable[type[Field]], *, registered_fields: AbstractSet[type[Field]]
546
    ) -> bool:
547
        unrecognized_fields = [field for field in fields if field not in registered_fields]
11✔
548
        if not unrecognized_fields:
11✔
549
            return True
9✔
550
        for unrecognized_field in unrecognized_fields:
11✔
551
            maybe_subclass = cls._find_registered_field_subclass(
11✔
552
                unrecognized_field, registered_fields=registered_fields
553
            )
554
            if maybe_subclass is None:
11✔
555
                return False
9✔
556
        return True
6✔
557

558
    @final
12✔
559
    def has_field(self, field: type[Field]) -> bool:
12✔
560
        """Check that this target has registered the requested field.
561

562
        This works with subclasses of `Field`. For example, if you subclass `Tags` to define a
563
        custom subclass `CustomTags`, both `tgt.has_field(Tags)` and
564
        `python_tgt.has_field(CustomTags)` will return True.
565
        """
566
        return self.has_fields([field])
10✔
567

568
    @final
12✔
569
    def has_fields(self, fields: Iterable[type[Field]]) -> bool:
12✔
570
        """Check that this target has registered all of the requested fields.
571

572
        This works with subclasses of `Field`. For example, if you subclass `Tags` to define a
573
        custom subclass `CustomTags`, both `tgt.has_fields([Tags])` and
574
        `python_tgt.has_fields([CustomTags])` will return True.
575
        """
576
        return self._has_fields(fields, registered_fields=self.field_values.keys())
11✔
577

578
    @final
12✔
579
    @classmethod
12✔
580
    @memoized_method
12✔
581
    def class_field_types(
12✔
582
        cls, union_membership: UnionMembership | None
583
    ) -> FrozenOrderedSet[type[Field]]:
584
        """Return all registered Fields belonging to this target type.
585

586
        You can also use the instance property `tgt.field_types` to avoid having to pass the
587
        parameter UnionMembership.
588
        """
589
        if union_membership is None:
12✔
590
            return FrozenOrderedSet(cls.core_fields)
12✔
591
        else:
592
            return FrozenOrderedSet((*cls.core_fields, *cls._find_plugin_fields(union_membership)))
6✔
593

594
    @final
12✔
595
    @classmethod
12✔
596
    def class_has_field(cls, field: type[Field], union_membership: UnionMembership) -> bool:
12✔
597
        """Behaves like `Target.has_field()`, but works as a classmethod rather than an instance
598
        method."""
599
        return cls.class_has_fields([field], union_membership)
2✔
600

601
    @final
12✔
602
    @classmethod
12✔
603
    def class_has_fields(
12✔
604
        cls, fields: Iterable[type[Field]], union_membership: UnionMembership
605
    ) -> bool:
606
        """Behaves like `Target.has_fields()`, but works as a classmethod rather than an instance
607
        method."""
608
        return cls._has_fields(fields, registered_fields=cls.class_field_types(union_membership))
2✔
609

610
    @final
12✔
611
    @classmethod
12✔
612
    def class_get_field(cls, field: type[_F], union_membership: UnionMembership) -> type[_F]:
12✔
613
        """Get the requested Field type registered with this target type.
614

615
        This will error if the field is not registered, so you should call Target.class_has_field()
616
        first.
617
        """
618
        class_fields = cls.class_field_types(union_membership)
2✔
619
        result = next(
2✔
620
            (
621
                registered_field
622
                for registered_field in class_fields
623
                if issubclass(registered_field, field)
624
            ),
625
            None,
626
        )
627
        if result is None:
2✔
628
            raise KeyError(
1✔
629
                f"The target type `{cls.alias}` does not have a field `{field.__name__}`. Before "
630
                f"calling `TargetType.class_get_field({field.__name__})`, call "
631
                f"`TargetType.class_has_field({field.__name__})`."
632
            )
633
        return result
2✔
634

635
    @classmethod
12✔
636
    def register_plugin_field(cls, field: type[Field]) -> UnionRule:
12✔
637
        """Register a new field on the target type.
638

639
        In the `rules()` register.py entry-point, include
640
        `MyTarget.register_plugin_field(NewField)`. This will register `NewField` as a first-class
641
        citizen. Plugins can use this new field like any other.
642
        """
643
        return UnionRule(cls.PluginField, field)
12✔
644

645
    def validate(self) -> None:
12✔
646
        """Validate the target, such as checking for mutually exclusive fields.
647

648
        N.B.: The validation should only be of properties intrinsic to the associated files in any
649
        context. If the validation only makes sense for certain goals acting on targets; those
650
        validations should be done in the associated rules.
651
        """
652

653

654
def _validate_origin_sources_blocks(origin_sources_blocks: FrozenDict[str, SourceBlocks]) -> None:
12✔
655
    if not isinstance(origin_sources_blocks, FrozenDict):
1✔
656
        raise ValueError(
1✔
657
            f"Expected `origin_sources_blocks` to be of type `FrozenDict`, got {type(origin_sources_blocks)=} {origin_sources_blocks=}"
658
        )
659
    for blocks in origin_sources_blocks.values():
1✔
660
        if not isinstance(blocks, SourceBlocks):
1✔
661
            raise ValueError(
1✔
662
                f"Expected `origin_sources_blocks` to be a `FrozenDict` with values of type `SourceBlocks`, got values of {type(blocks)=} {blocks=}"
663
            )
664
        for block in blocks:
1✔
665
            if not isinstance(block, SourceBlock):
1✔
666
                raise ValueError(
×
667
                    f"Expected `origin_sources_blocks` to be a `FrozenDict` with values of type `SourceBlocks`, got values of {type(blocks)=} {blocks=}"
668
                )
669

670

671
@dataclass(frozen=True)
12✔
672
class WrappedTargetRequest:
12✔
673
    """Used with `WrappedTarget` to get the Target corresponding to an address.
674

675
    `description_of_origin` is used for error messages when the address does not actually exist. If
676
    you are confident this cannot happen, set the string to something like `<infallible>`.
677
    """
678

679
    address: Address
12✔
680
    description_of_origin: str = dataclasses.field(hash=False, compare=False)
12✔
681

682

683
@dataclass(frozen=True)
12✔
684
class WrappedTarget:
12✔
685
    """A light wrapper to encapsulate all the distinct `Target` subclasses into a single type.
686

687
    This is necessary when using a single target in a rule because the engine expects exact types
688
    and does not work with subtypes.
689
    """
690

691
    target: Target
12✔
692

693

694
class Targets(Collection[Target]):
12✔
695
    """A heterogeneous collection of instances of Target subclasses.
696

697
    While every element will be a subclass of `Target`, there may be many different `Target` types
698
    in this collection, e.g. some `FileTarget` and some `PythonTestTarget`.
699

700
    Often, you will want to filter out the relevant targets by looking at what fields they have
701
    registered, e.g.:
702

703
        valid_tgts = [tgt for tgt in tgts if tgt.has_fields([Compatibility, PythonSources])]
704

705
    You should not check the Target's actual type because this breaks custom target types;
706
    for example, prefer `tgt.has_field(PythonTestsSourcesField)` to
707
    `isinstance(tgt, PythonTestsTarget)`.
708
    """
709

710
    def expect_single(self) -> Target:
12✔
711
        assert_single_address([tgt.address for tgt in self])
8✔
712
        return self[0]
8✔
713

714

715
# This distinct type is necessary because of https://github.com/pantsbuild/pants/issues/14977.
716
#
717
# NB: We still proactively apply filtering inside `AddressSpecs` and `FilesystemSpecs`, which is
718
# earlier in the rule pipeline of `RawSpecs -> Addresses -> UnexpandedTargets -> Targets ->
719
# FilteredTargets`. That is necessary so that project-introspection goals like `list` which don't
720
# use `FilteredTargets` still have filtering applied.
721
class FilteredTargets(Collection[Target]):
12✔
722
    """A heterogeneous collection of Target instances that have been filtered with the global
723
    options `--tag` and `--exclude-target-regexp`.
724

725
    Outside of the extra filtering, this type is identical to `Targets`, including its handling of
726
    target generators.
727
    """
728

729
    def expect_single(self) -> Target:
12✔
730
        assert_single_address([tgt.address for tgt in self])
×
731
        return self[0]
×
732

733

734
class UnexpandedTargets(Collection[Target]):
12✔
735
    """Like `Targets`, but will not replace target generators with their generated targets (e.g.
736
    replace `python_sources` "BUILD targets" with generated `python_source` "file targets")."""
737

738
    def expect_single(self) -> Target:
12✔
739
        assert_single_address([tgt.address for tgt in self])
×
740
        return self[0]
×
741

742

743
class DepsTraversalBehavior(Enum):
12✔
744
    """The return value for ShouldTraverseDepsPredicate.
745

746
    NB: This only indicates whether to traverse the deps of a target;
747
    It does not control the inclusion of the target itself (though that
748
    might be added in the future). By the time the predicate is called,
749
    the target itself was already included.
750
    """
751

752
    INCLUDE = "include"
12✔
753
    EXCLUDE = "exclude"
12✔
754

755

756
@dataclass(frozen=True)
12✔
757
class ShouldTraverseDepsPredicate(metaclass=ABCMeta):
12✔
758
    """This callable determines whether to traverse through deps of a given Target + Field.
759

760
    This is a callable dataclass instead of a function to avoid issues with hashing closures.
761
    Only the id is used when hashing a function; any closure vars are NOT accounted for.
762

763
    NB: When subclassing a dataclass (like this one), you do not need to add the @dataclass
764
    decorator unless the subclass has additional fields. The @dataclass decorator only inspects
765
    the current class (not any parents from __mro__) to determine if any methods were explicitly
766
    defined. So, any typically-generated methods explicitly added on this parent class would NOT
767
    be inherited by @dataclass decorated subclasses. To avoid these issues, this parent class
768
    uses __post_init__ and relies on the generated __init__ and __hash__ methods.
769
    """
770

771
    # NB: This _callable field ensures that __call__ is included in the __hash__ method generated by @dataclass.
772
    # That is extremely important because two predicates with different implementations but the same data
773
    # (or no data) need to have different hashes and compare unequal.
774
    _callable: Callable[
12✔
775
        [Any, Target, Dependencies | SpecialCasedDependencies], DepsTraversalBehavior
776
    ] = dataclasses.field(init=False, repr=False)
777

778
    def __post_init__(self):
12✔
779
        object.__setattr__(self, "_callable", type(self).__call__)
12✔
780

781
    @abstractmethod
12✔
782
    def __call__(
12✔
783
        self, target: Target, field: Dependencies | SpecialCasedDependencies
784
    ) -> DepsTraversalBehavior:
785
        """This predicate decides when to INCLUDE or EXCLUDE the target's field's deps."""
786

787

788
class TraverseIfDependenciesField(ShouldTraverseDepsPredicate):
12✔
789
    """This is the default ShouldTraverseDepsPredicate implementation.
790

791
    This skips resolving dependencies for fields (like SpecialCasedDependencies) that are not
792
    subclasses of Dependencies.
793
    """
794

795
    def __call__(
12✔
796
        self, target: Target, field: Dependencies | SpecialCasedDependencies
797
    ) -> DepsTraversalBehavior:
798
        if isinstance(field, Dependencies):
×
799
            return DepsTraversalBehavior.INCLUDE
×
800
        return DepsTraversalBehavior.EXCLUDE
×
801

802

803
class AlwaysTraverseDeps(ShouldTraverseDepsPredicate):
12✔
804
    """A predicate to use when a request needs all deps.
805

806
    This includes deps from fields like SpecialCasedDependencies which are ignored in most cases.
807
    """
808

809
    def __call__(
12✔
810
        self, target: Target, field: Dependencies | SpecialCasedDependencies
811
    ) -> DepsTraversalBehavior:
812
        return DepsTraversalBehavior.INCLUDE
×
813

814

815
class CoarsenedTarget(EngineAwareParameter):
12✔
816
    def __init__(self, members: Iterable[Target], dependencies: Iterable[CoarsenedTarget]) -> None:
12✔
817
        """A set of Targets which cyclically reach one another, and are thus indivisible.
818

819
        Instances of this class form a structure-shared DAG, and so a hashcode is pre-computed for the
820
        recursive portion.
821

822
        :param members: The members of the cycle.
823
        :param dependencies: The deduped direct (not transitive) dependencies of all Targets in
824
            the cycle. Dependencies between members of the cycle are excluded.
825
        """
826
        self.members = FrozenOrderedSet(members)
2✔
827
        self.dependencies = FrozenOrderedSet(dependencies)
2✔
828
        self._hashcode = hash((self.members, self.dependencies))
2✔
829

830
    def debug_hint(self) -> str:
12✔
831
        return str(self)
×
832

833
    def metadata(self) -> dict[str, Any]:
12✔
834
        return {"addresses": [t.address.spec for t in self.members]}
×
835

836
    @property
12✔
837
    def representative(self) -> Target:
12✔
838
        """A stable "representative" target in the cycle."""
839
        return next(iter(self.members))
4✔
840

841
    def bullet_list(self) -> str:
12✔
842
        """The addresses and type aliases of all members of the cycle."""
843
        return bullet_list(sorted(f"{t.address.spec}\t({type(t).alias})" for t in self.members))
1✔
844

845
    def closure(self, visited: set[CoarsenedTarget] | None = None) -> Iterator[Target]:
12✔
846
        """All Targets reachable from this root."""
847
        return (t for ct in self.coarsened_closure(visited) for t in ct.members)
5✔
848

849
    def coarsened_closure(
12✔
850
        self, visited: set[CoarsenedTarget] | None = None
851
    ) -> Iterator[CoarsenedTarget]:
852
        """All CoarsenedTargets reachable from this root."""
853

854
        visited = set() if visited is None else visited
5✔
855
        queue = deque([self])
5✔
856
        while queue:
5✔
857
            ct = queue.popleft()
5✔
858
            if ct in visited:
5✔
859
                continue
1✔
860
            visited.add(ct)
5✔
861
            yield ct
5✔
862
            queue.extend(ct.dependencies)
5✔
863

864
    def __hash__(self) -> int:
12✔
865
        return self._hashcode
11✔
866

867
    def _eq_helper(self, other: CoarsenedTarget, equal_items: set[tuple[int, int]]) -> bool:
12✔
868
        key = (id(self), id(other))
4✔
869
        if key[0] == key[1] or key in equal_items:
4✔
870
            return True
1✔
871

872
        is_eq = (
4✔
873
            self._hashcode == other._hashcode
874
            and self.members == other.members
875
            and len(self.dependencies) == len(other.dependencies)
876
            and all(
877
                l._eq_helper(r, equal_items) for l, r in zip(self.dependencies, other.dependencies)
878
            )
879
        )
880

881
        # NB: We only track equal items because any non-equal item will cause the entire
882
        # operation to shortcircuit.
883
        if is_eq:
4✔
884
            equal_items.add(key)
4✔
885
        return is_eq
4✔
886

887
    def __eq__(self, other: Any) -> bool:
12✔
888
        if not isinstance(other, CoarsenedTarget):
4✔
889
            return NotImplemented
×
890
        return self._eq_helper(other, set())
4✔
891

892
    def __str__(self) -> str:
12✔
893
        if len(self.members) > 1:
×
894
            others = len(self.members) - 1
×
895
            return f"{self.representative.address.spec} (and {others} more)"
×
896
        return self.representative.address.spec
×
897

898
    def __repr__(self) -> str:
12✔
899
        return f"{self.__class__.__name__}({str(self)})"
×
900

901

902
class CoarsenedTargets(Collection[CoarsenedTarget]):
12✔
903
    """The CoarsenedTarget roots of a transitive graph walk for some addresses.
904

905
    To collect all reachable CoarsenedTarget members, use `def closure`.
906
    """
907

908
    def by_address(self) -> dict[Address, CoarsenedTarget]:
12✔
909
        """Compute a mapping from Address to containing CoarsenedTarget."""
910
        return {t.address: ct for ct in self for t in ct.members}
×
911

912
    def closure(self) -> Iterator[Target]:
12✔
913
        """All Targets reachable from these CoarsenedTarget roots."""
914
        visited: set[CoarsenedTarget] = set()
5✔
915
        return (t for root in self for t in root.closure(visited))
5✔
916

917
    def coarsened_closure(self) -> Iterator[CoarsenedTarget]:
12✔
918
        """All CoarsenedTargets reachable from these CoarsenedTarget roots."""
919
        visited: set[CoarsenedTarget] = set()
×
920
        return (ct for root in self for ct in root.coarsened_closure(visited))
×
921

922
    def __eq__(self, other: Any) -> bool:
12✔
923
        if not isinstance(other, CoarsenedTargets):
2✔
924
            return NotImplemented
×
925
        equal_items: set[tuple[int, int]] = set()
2✔
926
        return len(self) == len(other) and all(
2✔
927
            l._eq_helper(r, equal_items) for l, r in zip(self, other)
928
        )
929

930
    def __hash__(self):
12✔
931
        return super().__hash__()
4✔
932

933

934
@dataclass(frozen=True)
12✔
935
class CoarsenedTargetsRequest:
12✔
936
    """A request to get CoarsenedTargets for input roots."""
937

938
    roots: tuple[Address, ...]
12✔
939
    expanded_targets: bool
12✔
940
    should_traverse_deps_predicate: ShouldTraverseDepsPredicate
12✔
941

942
    def __init__(
12✔
943
        self,
944
        roots: Iterable[Address],
945
        *,
946
        expanded_targets: bool = False,
947
        should_traverse_deps_predicate: ShouldTraverseDepsPredicate = TraverseIfDependenciesField(),
948
    ) -> None:
949
        object.__setattr__(self, "roots", tuple(roots))
1✔
950
        object.__setattr__(self, "expanded_targets", expanded_targets)
1✔
951
        object.__setattr__(self, "should_traverse_deps_predicate", should_traverse_deps_predicate)
1✔
952

953

954
@dataclass(frozen=True)
12✔
955
class TransitiveTargets:
12✔
956
    """A set of Target roots, and their transitive, flattened, de-duped dependencies.
957

958
    If a target root is a dependency of another target root, then it will show up both in `roots`
959
    and in `dependencies`.
960
    """
961

962
    roots: tuple[Target, ...]
12✔
963
    dependencies: FrozenOrderedSet[Target]
12✔
964

965
    @memoized_property
12✔
966
    def closure(self) -> FrozenOrderedSet[Target]:
12✔
967
        """The roots and the dependencies combined."""
968
        return FrozenOrderedSet([*self.roots, *self.dependencies])
3✔
969

970

971
@dataclass(frozen=True)
12✔
972
class TransitiveTargetsRequest:
12✔
973
    """A request to get the transitive dependencies of the input roots."""
974

975
    roots: tuple[Address, ...]
12✔
976
    should_traverse_deps_predicate: ShouldTraverseDepsPredicate
12✔
977

978
    def __init__(
12✔
979
        self,
980
        roots: Iterable[Address],
981
        *,
982
        should_traverse_deps_predicate: ShouldTraverseDepsPredicate = TraverseIfDependenciesField(),
983
    ) -> None:
984
        object.__setattr__(self, "roots", tuple(roots))
3✔
985
        object.__setattr__(self, "should_traverse_deps_predicate", should_traverse_deps_predicate)
3✔
986

987

988
@dataclass(frozen=True)
12✔
989
class RegisteredTargetTypes:
12✔
990
    aliases_to_types: FrozenDict[str, type[Target]]
12✔
991

992
    def __init__(self, aliases_to_types: Mapping[str, type[Target]]) -> None:
12✔
993
        object.__setattr__(self, "aliases_to_types", FrozenDict(aliases_to_types))
12✔
994

995
    @classmethod
12✔
996
    def create(cls, target_types: Iterable[type[Target]]) -> RegisteredTargetTypes:
12✔
997
        result = {}
12✔
998
        for target_type in sorted(target_types, key=lambda tt: tt.alias):
12✔
999
            result[target_type.alias] = target_type
12✔
1000
            if target_type.deprecated_alias is not None:
12✔
1001
                result[target_type.deprecated_alias] = target_type
2✔
1002
        return cls(result)
12✔
1003

1004
    @property
12✔
1005
    def aliases(self) -> FrozenOrderedSet[str]:
12✔
1006
        return FrozenOrderedSet(self.aliases_to_types.keys())
3✔
1007

1008
    @property
12✔
1009
    def types(self) -> FrozenOrderedSet[type[Target]]:
12✔
1010
        return FrozenOrderedSet(self.aliases_to_types.values())
1✔
1011

1012

1013
class AllTargets(Collection[Target]):
12✔
1014
    """All targets in the project, but with target generators replaced by their generated targets,
1015
    unlike `AllUnexpandedTargets`."""
1016

1017

1018
class AllUnexpandedTargets(Collection[Target]):
12✔
1019
    """All targets in the project, including generated targets.
1020

1021
    This should generally be avoided because it is relatively expensive to compute and is frequently
1022
    invalidated, but it can be necessary for things like dependency inference to build a global
1023
    mapping of imports to targets.
1024
    """
1025

1026

1027
# -----------------------------------------------------------------------------------------------
1028
# Target generation
1029
# -----------------------------------------------------------------------------------------------
1030

1031

1032
class TargetGenerator(Target):
12✔
1033
    """A Target type which generates other Targets via installed `@rule` logic.
1034

1035
    To act as a generator, a Target type should subclass this base class and install generation
1036
    `@rule`s which consume a corresponding GenerateTargetsRequest subclass to produce
1037
    GeneratedTargets.
1038
    """
1039

1040
    # The generated Target class.
1041
    #
1042
    # If this is not provided, consider checking for the default values that applies to the target
1043
    # types being generated manually. The applicable defaults are available on the `AddressFamily`
1044
    # which you can get using:
1045
    #
1046
    #    family = await ensure_address_family(**implicitly(AddressFamilyDir(address.spec_path)))
1047
    #    target_defaults = family.defaults.get(MyTarget.alias, {})
1048
    generated_target_cls: ClassVar[type[Target]]
12✔
1049

1050
    # Fields which have their values copied from the generator Target to the generated Target.
1051
    #
1052
    # Must be a subset of `core_fields`.
1053
    #
1054
    # Fields should be copied from the generator to the generated when their semantic meaning is
1055
    # the same for both Target types, and when it is valuable for them to be introspected on
1056
    # either the generator or generated target (such as by `peek`, or in `filter`).
1057
    copied_fields: ClassVar[tuple[type[Field], ...]]
12✔
1058

1059
    # Fields which are specified to instances of the generator Target, but which are propagated
1060
    # to generated Targets rather than being stored on the generator Target.
1061
    #
1062
    # Must be disjoint from `core_fields`.
1063
    #
1064
    # Only Fields which are moved to the generated Target are allowed to be `parametrize`d. But
1065
    # it can also be the case that a Field only makes sense semantically when it is applied to
1066
    # the generated Target (for example, for an individual file), and the generator Target is just
1067
    # acting as a convenient place for them to be specified.
1068
    moved_fields: ClassVar[tuple[type[Field], ...]]
12✔
1069

1070
    @distinct_union_type_per_subclass
12✔
1071
    class MovedPluginField:
12✔
1072
        """A plugin field that should be moved into the generated targets."""
1073

1074
    def validate(self) -> None:
12✔
1075
        super().validate()
1✔
1076

1077
        copied_dependencies_field_types = [
1✔
1078
            field_type.__name__
1079
            for field_type in type(self).copied_fields
1080
            if issubclass(field_type, Dependencies)
1081
        ]
1082
        if copied_dependencies_field_types:
1✔
1083
            raise InvalidTargetException(
×
1084
                f"Using a `Dependencies` field subclass ({copied_dependencies_field_types}) as a "
1085
                "`TargetGenerator.copied_field`. `Dependencies` fields should be "
1086
                "`TargetGenerator.moved_field`s, to avoid redundant graph edges."
1087
            )
1088

1089
    @classmethod
12✔
1090
    def register_plugin_field(cls, field: type[Field], *, as_moved_field=False) -> UnionRule:
12✔
1091
        if as_moved_field:
12✔
1092
            return UnionRule(cls.MovedPluginField, field)
8✔
1093
        else:
1094
            return super().register_plugin_field(field)
12✔
1095

1096
    @classmethod
12✔
1097
    @memoized_method
12✔
1098
    def _find_plugin_fields(cls, union_membership: UnionMembership) -> tuple[type[Field], ...]:
12✔
1099
        return (
2✔
1100
            *cls._find_copied_plugin_fields(union_membership),
1101
            *cls._find_moved_plugin_fields(union_membership),
1102
        )
1103

1104
    @final
12✔
1105
    @classmethod
12✔
1106
    @memoized_method
12✔
1107
    def _find_moved_plugin_fields(
12✔
1108
        cls, union_membership: UnionMembership
1109
    ) -> tuple[type[Field], ...]:
1110
        result: set[type[Field]] = set()
2✔
1111
        classes = [cls]
2✔
1112
        while classes:
2✔
1113
            cls = classes.pop()
2✔
1114
            classes.extend(cls.__bases__)
2✔
1115
            if issubclass(cls, TargetGenerator):
2✔
1116
                result.update(cast("set[type[Field]]", union_membership.get(cls.MovedPluginField)))
2✔
1117

1118
        return tuple(result)
2✔
1119

1120
    @final
12✔
1121
    @classmethod
12✔
1122
    @memoized_method
12✔
1123
    def _find_copied_plugin_fields(
12✔
1124
        cls, union_membership: UnionMembership
1125
    ) -> tuple[type[Field], ...]:
1126
        return super()._find_plugin_fields(union_membership)
2✔
1127

1128

1129
class TargetFilesGenerator(TargetGenerator):
12✔
1130
    """A TargetGenerator which generates a Target per file matched by the generator.
1131

1132
    Unlike TargetGenerator, no additional `@rules` are required to be installed, because generation
1133
    is implemented declaratively. But an optional `settings_request_cls` can be declared to
1134
    dynamically control some settings of generation.
1135
    """
1136

1137
    settings_request_cls: ClassVar[type[TargetFilesGeneratorSettingsRequest] | None] = None
12✔
1138

1139
    def validate(self) -> None:
12✔
1140
        super().validate()
1✔
1141

1142
        if self.has_field(MultipleSourcesField) and not self[MultipleSourcesField].value:
1✔
1143
            raise InvalidTargetException(
×
1144
                f"The `{self.alias}` target generator at {self.address} has an empty "
1145
                f"`{self[MultipleSourcesField].alias}` field; so it will not generate any targets. "
1146
                "If its purpose is to act as an alias for its dependencies, then it should be "
1147
                "declared as a `target(..)` generic target instead. If it is unused, then it "
1148
                "should be removed."
1149
            )
1150

1151

1152
@union(in_scope_types=[EnvironmentName])
12✔
1153
class TargetFilesGeneratorSettingsRequest:
12✔
1154
    """An optional union to provide dynamic settings for a `TargetFilesGenerator`.
1155

1156
    See `TargetFilesGenerator`.
1157
    """
1158

1159

1160
@dataclass
12✔
1161
class TargetFilesGeneratorSettings:
12✔
1162
    # Set `add_dependencies_on_all_siblings` to True so that each file-level target depends on all
1163
    # other generated targets from the target generator. This is useful if both are true:
1164
    #
1165
    # a) file-level targets usually need their siblings to be present to work. Most target types
1166
    #   (Python, Java, Shell, etc) meet this, except for `files` and `resources` which have no
1167
    #   concept of "imports"
1168
    # b) dependency inference cannot infer dependencies on sibling files.
1169
    #
1170
    # Otherwise, set `add_dependencies_on_all_siblings` to `False` so that dependencies are
1171
    # finer-grained.
1172
    add_dependencies_on_all_siblings: bool = False
12✔
1173

1174

1175
_TargetGenerator = TypeVar("_TargetGenerator", bound=TargetGenerator)
12✔
1176

1177

1178
@union(in_scope_types=[EnvironmentName])
12✔
1179
@dataclass(frozen=True)
12✔
1180
class GenerateTargetsRequest(Generic[_TargetGenerator]):
12✔
1181
    generate_from: ClassVar[type[_TargetGenerator]]  # type: ignore[misc]
12✔
1182

1183
    # The TargetGenerator instance to generate targets for.
1184
    generator: _TargetGenerator
12✔
1185
    # The base Address to generate for. Note that due to parametrization, this may not
1186
    # always be the Address of the underlying target.
1187
    template_address: Address
12✔
1188
    # The `TargetGenerator.moved_field/copied_field` Field values that the generator
1189
    # should generate targets with.
1190
    template: Mapping[str, Any] = dataclasses.field(hash=False)
12✔
1191
    # Per-generated-Target overrides, with an additional `template_address` to be applied. The
1192
    # per-instance Address might not match the base `template_address` if parametrization was
1193
    # applied within overrides.
1194
    overrides: Mapping[str, Mapping[Address, Mapping[str, Any]]] = dataclasses.field(hash=False)
12✔
1195

1196
    def require_unparametrized_overrides(self) -> dict[str, Mapping[str, Any]]:
12✔
1197
        """Flattens overrides for `GenerateTargetsRequest` impls which don't support `parametrize`.
1198

1199
        If `parametrize` has been used in overrides, this will raise an error indicating that that is
1200
        not yet supported for the generator target type.
1201

1202
        TODO: https://github.com/pantsbuild/pants/issues/14430 covers porting implementations and
1203
        removing this method.
1204
        """
1205
        if any(len(templates) != 1 for templates in self.overrides.values()):
×
1206
            raise ValueError(
×
1207
                f"Target generators of type `{self.generate_from.alias}` (defined at "
1208
                f"`{self.generator.address}`) do not (yet) support use of the `parametrize(..)` "
1209
                f"builtin in their `{OverridesField.alias}=` field."
1210
            )
1211
        return {name: next(iter(templates.values())) for name, templates in self.overrides.items()}
×
1212

1213

1214
class GeneratedTargets(FrozenDict[Address, Target]):
12✔
1215
    """A mapping of the address of generated targets to the targets themselves."""
1216

1217
    def __init__(self, generator: Target, generated_targets: Iterable[Target]) -> None:
12✔
1218
        expected_spec_path = generator.address.spec_path
1✔
1219
        expected_tgt_name = generator.address.target_name
1✔
1220
        mapping = {}
1✔
1221
        for tgt in sorted(generated_targets, key=lambda t: t.address):
1✔
1222
            if tgt.address.spec_path != expected_spec_path:
1✔
1223
                raise InvalidGeneratedTargetException(
1✔
1224
                    "All generated targets must have the same `Address.spec_path` as their "
1225
                    f"target generator. Expected {generator.address.spec_path}, but got "
1226
                    f"{tgt.address.spec_path} for target generated from {generator.address}: {tgt}"
1227
                    "\n\nConsider using `request.generator.address.create_generated()`."
1228
                )
1229
            if tgt.address.target_name != expected_tgt_name:
1✔
1230
                raise InvalidGeneratedTargetException(
1✔
1231
                    "All generated targets must have the same `Address.target_name` as their "
1232
                    f"target generator. Expected {generator.address.target_name}, but got "
1233
                    f"{tgt.address.target_name} for target generated from {generator.address}: "
1234
                    f"{tgt}\n\n"
1235
                    "Consider using `request.generator.address.create_generated()`."
1236
                )
1237
            if not tgt.address.is_generated_target:
1✔
1238
                raise InvalidGeneratedTargetException(
×
1239
                    "All generated targets must set `Address.generator_name` or "
1240
                    "`Address.relative_file_path`. Invalid for target generated from "
1241
                    f"{generator.address}: {tgt}\n\n"
1242
                    "Consider using `request.generator.address.create_generated()`."
1243
                )
1244
            mapping[tgt.address] = tgt
1✔
1245
        super().__init__(mapping)
1✔
1246

1247

1248
@rule(polymorphic=True)
12✔
1249
async def generate_targets(req: GenerateTargetsRequest) -> GeneratedTargets:
12✔
1250
    raise NotImplementedError()
×
1251

1252

1253
class TargetTypesToGenerateTargetsRequests(
12✔
1254
    FrozenDict[type[TargetGenerator], type[GenerateTargetsRequest]]
1255
):
1256
    def is_generator(self, tgt: Target) -> bool:
12✔
1257
        """Does this target type generate other targets?"""
1258
        return isinstance(tgt, TargetGenerator) and bool(self.request_for(type(tgt)))
×
1259

1260
    def request_for(self, tgt_cls: type[TargetGenerator]) -> type[GenerateTargetsRequest] | None:
12✔
1261
        """Return the request type for the given Target, or None."""
1262
        if issubclass(tgt_cls, TargetFilesGenerator):
×
1263
            return self.get(TargetFilesGenerator)
×
1264
        return self.get(tgt_cls)
×
1265

1266

1267
def _generate_file_level_targets(
12✔
1268
    generated_target_cls: type[Target],
1269
    generator: Target,
1270
    paths: Sequence[str],
1271
    template_address: Address,
1272
    template: Mapping[str, Any],
1273
    overrides: Mapping[str, Mapping[Address, Mapping[str, Any]]],
1274
    # NB: Should only ever be set to `None` in tests.
1275
    union_membership: UnionMembership | None,
1276
    *,
1277
    add_dependencies_on_all_siblings: bool,
1278
) -> GeneratedTargets:
1279
    """Generate one new target for each path, using the same fields as the generator target except
1280
    for the `sources` field only referring to the path and using a new address.
1281

1282
    Set `add_dependencies_on_all_siblings` to True so that each file-level target depends on all
1283
    other generated targets from the target generator. This is useful if both are true:
1284

1285
        a) file-level targets usually need their siblings to be present to work. Most target types
1286
          (Python, Java, Shell, etc) meet this, except for `files` and `resources` which have no
1287
          concept of "imports"
1288
        b) dependency inference cannot infer dependencies on sibling files.
1289

1290
    Otherwise, set `add_dependencies_on_all_siblings` to `False` so that dependencies are
1291
    finer-grained.
1292

1293
    `overrides` allows changing the fields for particular targets. It expects the full file path
1294
     as the key.
1295
    """
1296

1297
    # Paths will have already been globbed, so they should be escaped. See
1298
    # https://github.com/pantsbuild/pants/issues/15381.
1299
    paths = [glob_stdlib.escape(path) for path in paths]
×
1300

1301
    normalized_overrides = dict(overrides or {})
×
1302

1303
    all_generated_items: list[tuple[Address, str, dict[str, Any]]] = []
×
1304
    for fp in paths:
×
1305
        relativized_fp = fast_relpath(fp, template_address.spec_path)
×
1306

1307
        generated_overrides = normalized_overrides.pop(fp, None)
×
1308
        if generated_overrides is None:
×
1309
            # No overrides apply.
1310
            all_generated_items.append(
×
1311
                (template_address.create_file(relativized_fp), fp, dict(template))
1312
            )
1313
        else:
1314
            # At least one override applies. Generate a target per set of fields.
1315
            all_generated_items.extend(
×
1316
                (
1317
                    overridden_address.create_file(relativized_fp),
1318
                    fp,
1319
                    {**template, **override_fields},
1320
                )
1321
                for overridden_address, override_fields in generated_overrides.items()
1322
            )
1323

1324
    # TODO: Parametrization in overrides will result in some unusual internal dependencies when
1325
    # `add_dependencies_on_all_siblings`. Similar to inference, `add_dependencies_on_all_siblings`
1326
    # should probably be field value aware.
1327
    all_generated_address_specs = (
×
1328
        FrozenOrderedSet(addr.spec for addr, _, _ in all_generated_items)
1329
        if add_dependencies_on_all_siblings
1330
        else FrozenOrderedSet()
1331
    )
1332

1333
    def gen_tgt(address: Address, full_fp: str, generated_target_fields: dict[str, Any]) -> Target:
×
1334
        if add_dependencies_on_all_siblings:
×
1335
            if union_membership and not generated_target_cls.class_has_field(
×
1336
                Dependencies, union_membership
1337
            ):
1338
                raise AssertionError(
×
1339
                    f"The {type(generator).__name__} target class generates "
1340
                    f"{generated_target_cls.__name__} targets, which do not "
1341
                    f"have a `{Dependencies.alias}` field, and thus cannot "
1342
                    "`add_dependencies_on_all_siblings`."
1343
                )
1344
            original_deps = generated_target_fields.get(Dependencies.alias, ())
×
1345
            generated_target_fields[Dependencies.alias] = tuple(original_deps) + tuple(
×
1346
                all_generated_address_specs - {address.spec}
1347
            )
1348

1349
        generated_target_fields[SingleSourceField.alias] = fast_relpath(full_fp, address.spec_path)
×
1350
        return generated_target_cls(
×
1351
            generated_target_fields,
1352
            address,
1353
            union_membership=union_membership,
1354
            residence_dir=os.path.dirname(full_fp),
1355
        )
1356

1357
    result = tuple(
×
1358
        gen_tgt(address, full_fp, fields) for address, full_fp, fields in all_generated_items
1359
    )
1360

1361
    if normalized_overrides:
×
1362
        unused_relative_paths = sorted(
×
1363
            fast_relpath(fp, template_address.spec_path) for fp in normalized_overrides
1364
        )
1365
        all_valid_relative_paths = sorted(
×
1366
            cast(str, tgt.address.relative_file_path or tgt.address.generated_name)
1367
            for tgt in result
1368
        )
1369
        raise InvalidFieldException(
×
1370
            f"Unused file paths in the `overrides` field for {template_address}: "
1371
            f"{sorted(unused_relative_paths)}"
1372
            f"\n\nDid you mean one of these valid paths?\n\n"
1373
            f"{all_valid_relative_paths}"
1374
        )
1375

1376
    return GeneratedTargets(generator, result)
×
1377

1378

1379
# -----------------------------------------------------------------------------------------------
1380
# FieldSet
1381
# -----------------------------------------------------------------------------------------------
1382
def _get_field_set_fields_from_target(
12✔
1383
    field_set: type[FieldSet], target: Target
1384
) -> dict[str, Field]:
1385
    return {
12✔
1386
        dataclass_field_name: (
1387
            target[field_cls] if field_cls in field_set.required_fields else target.get(field_cls)
1388
        )
1389
        for dataclass_field_name, field_cls in field_set.fields.items()
1390
    }
1391

1392

1393
_FS = TypeVar("_FS", bound="FieldSet")
12✔
1394

1395

1396
@dataclass(frozen=True)
12✔
1397
class FieldSet(EngineAwareParameter, metaclass=ABCMeta):
12✔
1398
    """An ad hoc set of fields from a target which are used by rules.
1399

1400
    Subclasses should declare all the fields they consume as dataclass attributes. They should also
1401
    indicate which of these are required, rather than optional, through the class property
1402
    `required_fields`. When a field is optional, the default constructor for the field will be used
1403
    for any targets that do not have that field registered.
1404

1405
    Subclasses must set `@dataclass(frozen=True)` for their declared fields to be recognized.
1406

1407
    You can optionally implement the classmethod `opt_out` so that targets have a
1408
    mechanism to not match with the FieldSet even if they have the `required_fields` registered.
1409

1410
    For example:
1411

1412
        @dataclass(frozen=True)
1413
        class FortranTestFieldSet(FieldSet):
1414
            required_fields = (FortranSources,)
1415

1416
            sources: FortranSources
1417
            fortran_version: FortranVersion
1418

1419
            @classmethod
1420
            def opt_out(cls, tgt: Target) -> bool:
1421
                return tgt.get(MaybeSkipFortranTestsField).value
1422

1423
    This field set may then be created from a `Target` through the `is_applicable()` and `create()`
1424
    class methods:
1425

1426
        field_sets = [
1427
            FortranTestFieldSet.create(tgt) for tgt in targets
1428
            if FortranTestFieldSet.is_applicable(tgt)
1429
        ]
1430

1431
    FieldSets are consumed like any normal dataclass:
1432

1433
        print(field_set.address)
1434
        print(field_set.sources)
1435
    """
1436

1437
    required_fields: ClassVar[tuple[type[Field], ...]]
12✔
1438

1439
    address: Address
12✔
1440

1441
    @classmethod
12✔
1442
    def opt_out(cls, tgt: Target) -> bool:
12✔
1443
        """If `True`, the target will not match with the field set, even if it has the FieldSet's
1444
        `required_fields`.
1445

1446
        Note: this method is not intended to categorically opt out a target type from a
1447
        FieldSet, i.e. to always opt out based solely on the target type. While it is possible to
1448
        do, some error messages will incorrectly suggest that that target is compatible with the
1449
        FieldSet. Instead, if you need this feature, please ask us to implement it. See
1450
        https://github.com/pantsbuild/pants/pull/12002 for discussion.
1451
        """
1452
        return False
4✔
1453

1454
    @final
12✔
1455
    @classmethod
12✔
1456
    def is_applicable(cls, tgt: Target) -> bool:
12✔
1457
        return tgt.has_fields(cls.required_fields) and not cls.opt_out(tgt)
10✔
1458

1459
    @final
12✔
1460
    @classmethod
12✔
1461
    def applicable_target_types(
12✔
1462
        cls, target_types: Iterable[type[Target]], union_membership: UnionMembership
1463
    ) -> tuple[type[Target], ...]:
1464
        return tuple(
×
1465
            tgt_type
1466
            for tgt_type in target_types
1467
            if tgt_type.class_has_fields(cls.required_fields, union_membership)
1468
        )
1469

1470
    @final
12✔
1471
    @classmethod
12✔
1472
    def create(cls: type[_FS], tgt: Target) -> _FS:
12✔
1473
        return cls(address=tgt.address, **_get_field_set_fields_from_target(cls, tgt))
12✔
1474

1475
    @final
12✔
1476
    @memoized_classproperty
12✔
1477
    def fields(cls) -> FrozenDict[str, type[Field]]:
12✔
1478
        return FrozenDict(
12✔
1479
            (
1480
                (name, field_type)
1481
                for name, field_type in get_type_hints(cls).items()
1482
                if isinstance(field_type, type) and issubclass(field_type, Field)
1483
            )
1484
        )
1485

1486
    def debug_hint(self) -> str:
12✔
1487
        return self.address.spec
×
1488

1489
    def metadata(self) -> dict[str, Any]:
12✔
1490
        return {"address": self.address.spec}
×
1491

1492
    def __repr__(self) -> str:
12✔
1493
        # We use a short repr() because this often shows up in stack traces. We don't need any of
1494
        # the field information because we can ask a user to send us their BUILD file.
1495
        return f"{self.__class__.__name__}(address={self.address})"
×
1496

1497

1498
@dataclass(frozen=True)
12✔
1499
class TargetRootsToFieldSets(Generic[_FS]):
12✔
1500
    mapping: FrozenDict[Target, tuple[_FS, ...]]
12✔
1501

1502
    def __init__(self, mapping: Mapping[Target, Iterable[_FS]]) -> None:
12✔
1503
        object.__setattr__(
2✔
1504
            self,
1505
            "mapping",
1506
            FrozenDict({tgt: tuple(field_sets) for tgt, field_sets in mapping.items()}),
1507
        )
1508

1509
    @memoized_property
12✔
1510
    def field_sets(self) -> tuple[_FS, ...]:
12✔
1511
        return tuple(
1✔
1512
            itertools.chain.from_iterable(
1513
                field_sets_per_target for field_sets_per_target in self.mapping.values()
1514
            )
1515
        )
1516

1517
    @memoized_property
12✔
1518
    def targets(self) -> tuple[Target, ...]:
12✔
1519
        return tuple(self.mapping.keys())
1✔
1520

1521

1522
class NoApplicableTargetsBehavior(Enum):
12✔
1523
    ignore = "ignore"
12✔
1524
    warn = "warn"
12✔
1525
    error = "error"
12✔
1526

1527

1528
def parse_shard_spec(shard_spec: str, origin: str = "") -> tuple[int, int]:
12✔
1529
    def invalid():
2✔
1530
        origin_str = f" from {origin}" if origin else ""
1✔
1531
        return ValueError(
1✔
1532
            f"Invalid shard specification {shard_spec}{origin_str}. Use a string of the form "
1533
            '"k/N" where k and N are integers, and 0 <= k < N .'
1534
        )
1535

1536
    if not shard_spec:
2✔
UNCOV
1537
        return 0, -1
1✔
1538
    shard_str, _, num_shards_str = shard_spec.partition("/")
1✔
1539
    try:
1✔
1540
        shard, num_shards = int(shard_str), int(num_shards_str)
1✔
1541
    except ValueError:
1✔
1542
        raise invalid()
1✔
1543
    if shard < 0 or shard >= num_shards:
1✔
1544
        raise invalid()
1✔
1545
    return shard, num_shards
1✔
1546

1547

1548
def get_shard(key: str, num_shards: int) -> int:
12✔
1549
    # Note: hash() is not guaranteed to be stable across processes, and adler32 is not
1550
    # well-distributed for small strings, so we use crc32. It's faster to compute than
1551
    # a cryptographic hash, which would be overkill.
1552
    return zlib.crc32(key.encode()) % num_shards
1✔
1553

1554

1555
@dataclass(frozen=True)
12✔
1556
class TargetRootsToFieldSetsRequest(Generic[_FS]):
12✔
1557
    field_set_superclass: type[_FS]
12✔
1558
    goal_description: str
12✔
1559
    no_applicable_targets_behavior: NoApplicableTargetsBehavior
12✔
1560
    shard: int
12✔
1561
    num_shards: int
12✔
1562

1563
    def __init__(
12✔
1564
        self,
1565
        field_set_superclass: type[_FS],
1566
        *,
1567
        goal_description: str,
1568
        no_applicable_targets_behavior: NoApplicableTargetsBehavior,
1569
        shard: int = 0,
1570
        num_shards: int = -1,
1571
    ) -> None:
1572
        object.__setattr__(self, "field_set_superclass", field_set_superclass)
3✔
1573
        object.__setattr__(self, "goal_description", goal_description)
3✔
1574
        object.__setattr__(self, "no_applicable_targets_behavior", no_applicable_targets_behavior)
3✔
1575
        object.__setattr__(self, "shard", shard)
3✔
1576
        object.__setattr__(self, "num_shards", num_shards)
3✔
1577

1578
    def is_in_shard(self, key: str) -> bool:
12✔
1579
        return get_shard(key, self.num_shards) == self.shard
×
1580

1581

1582
@dataclass(frozen=True)
12✔
1583
class FieldSetsPerTarget(Generic[_FS]):
12✔
1584
    # One tuple of FieldSet instances per input target.
1585
    collection: tuple[tuple[_FS, ...], ...]
12✔
1586

1587
    def __init__(self, collection: Iterable[Iterable[_FS]]):
12✔
1588
        object.__setattr__(self, "collection", tuple(tuple(iterable) for iterable in collection))
×
1589

1590
    @memoized_property
12✔
1591
    def field_sets(self) -> tuple[_FS, ...]:
12✔
1592
        return tuple(itertools.chain.from_iterable(self.collection))
×
1593

1594

1595
@dataclass(frozen=True)
12✔
1596
class FieldSetsPerTargetRequest(Generic[_FS]):
12✔
1597
    field_set_superclass: type[_FS]
12✔
1598
    targets: tuple[Target, ...]
12✔
1599

1600
    def __init__(self, field_set_superclass: type[_FS], targets: Iterable[Target]):
12✔
1601
        object.__setattr__(self, "field_set_superclass", field_set_superclass)
×
1602
        object.__setattr__(self, "targets", tuple(targets))
×
1603

1604

1605
# -----------------------------------------------------------------------------------------------
1606
# Exception messages
1607
# -----------------------------------------------------------------------------------------------
1608

1609

1610
class InvalidTargetException(Exception):
12✔
1611
    """Use when there's an issue with the target, e.g. mutually exclusive fields set.
1612

1613
    Suggested template:
1614

1615
         f"The `{alias!r}` target {address} ..."
1616
    """
1617

1618
    def __init__(self, message: Any, *, description_of_origin: str | None = None) -> None:
12✔
1619
        self.description_of_origin = description_of_origin
2✔
1620
        super().__init__(message)
2✔
1621

1622
    def __str__(self) -> str:
12✔
1623
        if not self.description_of_origin:
4✔
1624
            return super().__str__()
3✔
1625
        return f"{self.description_of_origin}: {super().__str__()}"
1✔
1626

1627
    def __repr__(self) -> str:
12✔
1628
        if not self.description_of_origin:
1✔
1629
            return super().__repr__()
1✔
1630
        return f"{self.description_of_origin}: {super().__repr__()}"
×
1631

1632

1633
class InvalidGeneratedTargetException(InvalidTargetException):
12✔
1634
    pass
12✔
1635

1636

1637
class InvalidFieldException(Exception):
12✔
1638
    """Use when there's an issue with a particular field.
1639

1640
    Suggested template:
1641

1642
         f"The {alias!r} field in target {address} must ..., but ..."
1643
    """
1644

1645
    def __init__(self, message: Any, *, description_of_origin: str | None = None) -> None:
12✔
1646
        self.description_of_origin = description_of_origin
8✔
1647
        super().__init__(message)
8✔
1648

1649
    def __str__(self) -> str:
12✔
1650
        if not self.description_of_origin:
4✔
1651
            return super().__str__()
4✔
1652
        return f"{self.description_of_origin}: {super().__str__()}"
×
1653

1654
    def __repr__(self) -> str:
12✔
1655
        if not self.description_of_origin:
×
1656
            return super().__repr__()
×
1657
        return f"{self.description_of_origin}: {super().__repr__()}"
×
1658

1659

1660
class InvalidFieldTypeException(InvalidFieldException):
12✔
1661
    """This is used to ensure that the field's value conforms with the expected type for the field,
1662
    e.g. `a boolean` or `a string` or `an iterable of strings and integers`."""
1663

1664
    def __init__(
12✔
1665
        self,
1666
        address: Address,
1667
        field_alias: str,
1668
        raw_value: Any | None,
1669
        *,
1670
        expected_type: str,
1671
        description_of_origin: str | None = None,
1672
    ) -> None:
1673
        raw_type = f"with type `{type(raw_value).__name__}`"
7✔
1674
        super().__init__(
7✔
1675
            f"The {repr(field_alias)} field in target {address} must be {expected_type}, but was "
1676
            f"`{repr(raw_value)}` {raw_type}.",
1677
            description_of_origin=description_of_origin,
1678
        )
1679

1680

1681
class InvalidFieldMemberTypeException(InvalidFieldException):
12✔
1682
    # based on InvalidFieldTypeException
1683
    def __init__(
12✔
1684
        self,
1685
        address: Address,
1686
        field_alias: str,
1687
        raw_value: Any | None,
1688
        *,
1689
        expected_type: str,
1690
        at_index: int,
1691
        wrong_element: Any,
1692
        description_of_origin: str | None = None,
1693
    ) -> None:
1694
        super().__init__(
1✔
1695
            softwrap(
1696
                f"""
1697
                The {repr(field_alias)} field in target {address} must be an iterable with
1698
                elements that have type {expected_type}. Encountered the element `{wrong_element}`
1699
                of type {type(wrong_element)} instead of {expected_type} at index {at_index}:
1700
                `{repr(raw_value)}`
1701
                """
1702
            ),
1703
            description_of_origin=description_of_origin,
1704
        )
1705

1706

1707
class RequiredFieldMissingException(InvalidFieldException):
12✔
1708
    def __init__(
12✔
1709
        self, address: Address, field_alias: str, *, description_of_origin: str | None = None
1710
    ) -> None:
1711
        super().__init__(
×
1712
            f"The {repr(field_alias)} field in target {address} must be defined.",
1713
            description_of_origin=description_of_origin,
1714
        )
1715

1716

1717
class InvalidFieldChoiceException(InvalidFieldException):
12✔
1718
    def __init__(
12✔
1719
        self,
1720
        address: Address,
1721
        field_alias: str,
1722
        raw_value: Any | None,
1723
        *,
1724
        valid_choices: Iterable[Any],
1725
        description_of_origin: str | None = None,
1726
    ) -> None:
1727
        super().__init__(
2✔
1728
            f"Values for the {repr(field_alias)} field in target {address} must be one of "
1729
            f"{sorted(valid_choices)}, but {repr(raw_value)} was provided.",
1730
            description_of_origin=description_of_origin,
1731
        )
1732

1733

1734
class UnrecognizedTargetTypeException(InvalidTargetException):
12✔
1735
    def __init__(
12✔
1736
        self,
1737
        target_type: str,
1738
        registered_target_types: RegisteredTargetTypes,
1739
        address: Address | None = None,
1740
        description_of_origin: str | None = None,
1741
    ) -> None:
1742
        for_address = f" for address {address}" if address else ""
×
1743
        super().__init__(
×
1744
            softwrap(
1745
                f"""
1746
                Target type {target_type!r} is not registered{for_address}.
1747

1748
                All valid target types: {sorted(registered_target_types.aliases)}
1749

1750
                (If {target_type!r} is a custom target type, refer to
1751
                {doc_url("docs/writing-plugins/the-target-api/concepts")} for getting it registered with Pants.)
1752

1753
                """
1754
            ),
1755
            description_of_origin=description_of_origin,
1756
        )
1757

1758

1759
# -----------------------------------------------------------------------------------------------
1760
# Field templates
1761
# -----------------------------------------------------------------------------------------------
1762

1763
T = TypeVar("T")
12✔
1764

1765

1766
class ScalarField(Generic[T], Field):
12✔
1767
    """A field with a scalar value (vs. a compound value like a sequence or dict).
1768

1769
    Subclasses must define the class properties `expected_type` and `expected_type_description`.
1770
    They should also override the type hints for the classmethod `compute_value` so that we use the
1771
    correct type annotation in generated documentation.
1772

1773
        class Example(ScalarField):
1774
            alias = "example"
1775
            expected_type = MyPluginObject
1776
            expected_type_description = "a `my_plugin` object"
1777

1778
            @classmethod
1779
            def compute_value(
1780
                cls, raw_value: Optional[MyPluginObject], address: Address
1781
            ) -> Optional[MyPluginObject]:
1782
                return super().compute_value(raw_value, address=address)
1783
    """
1784

1785
    expected_type: ClassVar[type[T]]  # type: ignore[misc]
12✔
1786
    expected_type_description: ClassVar[str]
12✔
1787
    value: T | None
12✔
1788
    default: ClassVar[T | None] = None  # type: ignore[misc]
12✔
1789

1790
    @classmethod
12✔
1791
    def compute_value(cls, raw_value: Any | None, address: Address) -> T | None:
12✔
1792
        value_or_default = super().compute_value(raw_value, address)
12✔
1793
        if value_or_default is not None and not isinstance(value_or_default, cls.expected_type):
12✔
1794
            raise InvalidFieldTypeException(
2✔
1795
                address,
1796
                cls.alias,
1797
                raw_value,
1798
                expected_type=cls.expected_type_description,
1799
            )
1800
        return value_or_default
12✔
1801

1802

1803
class BoolField(Field):
12✔
1804
    """A field whose value is a boolean.
1805

1806
    Subclasses must either set `default: bool` or `required = True` so that the value is always
1807
    defined.
1808
    """
1809

1810
    value: bool
12✔
1811
    default: ClassVar[bool]
12✔
1812

1813
    @classmethod
12✔
1814
    def compute_value(cls, raw_value: bool, address: Address) -> bool:  # type: ignore[override]
12✔
1815
        value_or_default = super().compute_value(raw_value, address)
9✔
1816
        if not isinstance(value_or_default, bool):
9✔
1817
            raise InvalidFieldTypeException(
×
1818
                address, cls.alias, raw_value, expected_type="a boolean"
1819
            )
1820
        return value_or_default
9✔
1821

1822

1823
class TriBoolField(ScalarField[bool]):
12✔
1824
    """A field whose value is a boolean or None, which is meant to represent a tri-state."""
1825

1826
    expected_type = bool
12✔
1827
    expected_type_description = "a boolean or None"
12✔
1828

1829
    @classmethod
12✔
1830
    def compute_value(cls, raw_value: bool | None, address: Address) -> bool | None:
12✔
1831
        return super().compute_value(raw_value, address)
3✔
1832

1833

1834
class ValidNumbers(Enum):
12✔
1835
    """What range of numbers are allowed for IntField and FloatField."""
1836

1837
    positive_only = enum.auto()
12✔
1838
    positive_and_zero = enum.auto()
12✔
1839
    all = enum.auto()
12✔
1840

1841
    def validate(self, num: float | int | None, alias: str, address: Address) -> None:
12✔
1842
        if num is None or self == self.all:  # type: ignore[comparison-overlap]
5✔
1843
            return
5✔
1844
        if self == self.positive_and_zero:  # type: ignore[comparison-overlap]
3✔
1845
            if num < 0:
2✔
1846
                raise InvalidFieldException(
1✔
1847
                    f"The {repr(alias)} field in target {address} must be greater than or equal to "
1848
                    f"zero, but was set to `{num}`."
1849
                )
1850
            return
2✔
1851
        if num <= 0:
2✔
1852
            raise InvalidFieldException(
1✔
1853
                f"The {repr(alias)} field in target {address} must be greater than zero, but was "
1854
                f"set to `{num}`."
1855
            )
1856

1857

1858
class IntField(ScalarField[int]):
12✔
1859
    expected_type = int
12✔
1860
    expected_type_description = "an integer"
12✔
1861
    valid_numbers: ClassVar[ValidNumbers] = ValidNumbers.all
12✔
1862

1863
    @classmethod
12✔
1864
    def compute_value(cls, raw_value: int | None, address: Address) -> int | None:
12✔
1865
        value_or_default = super().compute_value(raw_value, address)
5✔
1866
        cls.valid_numbers.validate(value_or_default, cls.alias, address)
5✔
1867
        return value_or_default
5✔
1868

1869

1870
class FloatField(ScalarField[float]):
12✔
1871
    expected_type = float
12✔
1872
    expected_type_description = "a float"
12✔
1873
    valid_numbers: ClassVar[ValidNumbers] = ValidNumbers.all
12✔
1874

1875
    @classmethod
12✔
1876
    def compute_value(cls, raw_value: float | None, address: Address) -> float | None:
12✔
1877
        value_or_default = super().compute_value(raw_value, address)
1✔
1878
        cls.valid_numbers.validate(value_or_default, cls.alias, address)
1✔
1879
        return value_or_default
1✔
1880

1881

1882
class StringField(ScalarField[str]):
12✔
1883
    """A field whose value is a string.
1884

1885
    If you expect the string to only be one of several values, set the class property
1886
    `valid_choices`.
1887
    """
1888

1889
    expected_type = str
12✔
1890
    expected_type_description = "a string"
12✔
1891
    valid_choices: ClassVar[type[Enum] | tuple[str, ...] | None] = None
12✔
1892

1893
    @classmethod
12✔
1894
    def compute_value(cls, raw_value: str | None, address: Address) -> str | None:
12✔
1895
        value_or_default = super().compute_value(raw_value, address)
12✔
1896
        if value_or_default is not None and cls.valid_choices is not None:
12✔
1897
            _validate_choices(
9✔
1898
                address, cls.alias, [value_or_default], valid_choices=cls.valid_choices
1899
            )
1900
        return value_or_default
12✔
1901

1902

1903
class SequenceField(Generic[T], Field):
12✔
1904
    """A field whose value is a homogeneous sequence.
1905

1906
    Subclasses must define the class properties `expected_element_type` and
1907
    `expected_type_description`. They should also override the type hints for the classmethod
1908
    `compute_value` so that we use the correct type annotation in generated documentation.
1909

1910
        class Example(SequenceField):
1911
            alias = "example"
1912
            expected_element_type = MyPluginObject
1913
            expected_type_description = "an iterable of `my_plugin` objects"
1914

1915
            @classmethod
1916
            def compute_value(
1917
                cls, raw_value: Optional[Iterable[MyPluginObject]], address: Address
1918
            ) -> Optional[Tuple[MyPluginObject, ...]]:
1919
                return super().compute_value(raw_value, address=address)
1920
    """
1921

1922
    expected_element_type: ClassVar[type]
12✔
1923
    expected_type_description: ClassVar[str]
12✔
1924
    value: tuple[T, ...] | None
12✔
1925
    default: ClassVar[tuple[T, ...] | None] = None  # type: ignore[misc]
12✔
1926

1927
    @classmethod
12✔
1928
    def compute_value(
12✔
1929
        cls, raw_value: Iterable[Any] | None, address: Address
1930
    ) -> tuple[T, ...] | None:
1931
        value_or_default = super().compute_value(raw_value, address)
12✔
1932
        if value_or_default is None:
12✔
1933
            return None
11✔
1934
        try:
12✔
1935
            ensure_list(value_or_default, expected_type=cls.expected_element_type)
12✔
1936
        except ValueError:
1✔
1937
            raise InvalidFieldTypeException(
1✔
1938
                address,
1939
                cls.alias,
1940
                raw_value,
1941
                expected_type=cls.expected_type_description,
1942
            )
1943
        return tuple(value_or_default)
12✔
1944

1945

1946
class TupleSequenceField(Generic[T], Field):
12✔
1947
    # this cannot be a SequenceField as compute_value's use of ensure_list
1948
    # does not work with expected_element_type=tuple when the value itself
1949
    # is already a tuple.
1950
    expected_element_type: ClassVar[type]
12✔
1951
    expected_element_count: ClassVar[int]  # -1 for unlimited
12✔
1952
    expected_type_description: ClassVar[str]
12✔
1953
    expected_element_type_description: ClassVar[str]
12✔
1954

1955
    value: tuple[tuple[T, ...], ...] | None
12✔
1956
    default: ClassVar[tuple[tuple[T, ...], ...] | None] = None  # type: ignore[misc]
12✔
1957

1958
    @classmethod
12✔
1959
    def compute_value(
12✔
1960
        cls, raw_value: Iterable[Iterable[T]] | None, address: Address
1961
    ) -> tuple[tuple[T, ...], ...] | None:
1962
        value_or_default = super().compute_value(raw_value, address)
1✔
1963
        if value_or_default is None:
1✔
1964
            return value_or_default
1✔
1965
        if isinstance(value_or_default, str) or not isinstance(
1✔
1966
            value_or_default, collections.abc.Iterable
1967
        ):
1968
            raise InvalidFieldTypeException(
1✔
1969
                address,
1970
                cls.alias,
1971
                raw_value,
1972
                expected_type=cls.expected_type_description,
1973
            )
1974

1975
        def invalid_member_exception(
1✔
1976
            at_index: int, wrong_element: Any
1977
        ) -> InvalidFieldMemberTypeException:
1978
            return InvalidFieldMemberTypeException(
1✔
1979
                address,
1980
                cls.alias,
1981
                raw_value,
1982
                expected_type=cls.expected_element_type_description,
1983
                wrong_element=wrong_element,
1984
                at_index=at_index,
1985
            )
1986

1987
        validated: list[tuple[T, ...]] = []
1✔
1988
        for i, x in enumerate(value_or_default):
1✔
1989
            if isinstance(x, str) or not isinstance(x, collections.abc.Iterable):
1✔
1990
                raise invalid_member_exception(i, x)
1✔
1991
            element = tuple(x)
1✔
1992
            if cls.expected_element_count >= 0 and cls.expected_element_count != len(element):
1✔
1993
                raise invalid_member_exception(i, x)
×
1994
            for s in element:
1✔
1995
                if not isinstance(s, cls.expected_element_type):
1✔
1996
                    raise invalid_member_exception(i, x)
1✔
1997
            validated.append(cast(tuple[T, ...], element))
1✔
1998

1999
        return tuple(validated)
1✔
2000

2001

2002
class StringSequenceField(SequenceField[str]):
12✔
2003
    expected_element_type = str
12✔
2004
    expected_type_description = "an iterable of strings (e.g. a list of strings)"
12✔
2005
    valid_choices: ClassVar[type[Enum] | tuple[str, ...] | None] = None
12✔
2006

2007
    @classmethod
12✔
2008
    def compute_value(
12✔
2009
        cls, raw_value: Iterable[str] | None, address: Address
2010
    ) -> tuple[str, ...] | None:
2011
        value_or_default = super().compute_value(raw_value, address)
12✔
2012
        if value_or_default and cls.valid_choices is not None:
12✔
2013
            _validate_choices(address, cls.alias, value_or_default, valid_choices=cls.valid_choices)
7✔
2014
        return value_or_default
12✔
2015

2016

2017
class DictStringToStringField(Field):
12✔
2018
    value: FrozenDict[str, str] | None
12✔
2019
    default: ClassVar[FrozenDict[str, str] | None] = None
12✔
2020

2021
    @classmethod
12✔
2022
    def compute_value(
12✔
2023
        cls, raw_value: dict[str, str] | None, address: Address
2024
    ) -> FrozenDict[str, str] | None:
2025
        value_or_default = super().compute_value(raw_value, address)
5✔
2026
        if value_or_default is None:
5✔
2027
            return None
3✔
2028
        invalid_type_exception = InvalidFieldTypeException(
4✔
2029
            address, cls.alias, raw_value, expected_type="a dictionary of string -> string"
2030
        )
2031
        if not isinstance(value_or_default, collections.abc.Mapping):
4✔
2032
            raise invalid_type_exception
1✔
2033
        if not all(isinstance(k, str) and isinstance(v, str) for k, v in value_or_default.items()):
4✔
2034
            raise invalid_type_exception
1✔
2035
        return FrozenDict(value_or_default)
4✔
2036

2037

2038
class ListOfDictStringToStringField(Field):
12✔
2039
    value: tuple[FrozenDict[str, str]] | None
12✔
2040
    default: ClassVar[list[FrozenDict[str, str]] | None] = None
12✔
2041

2042
    @classmethod
12✔
2043
    def compute_value(
12✔
2044
        cls, raw_value: list[dict[str, str]] | None, address: Address
2045
    ) -> tuple[FrozenDict[str, str], ...] | None:
2046
        value_or_default = super().compute_value(raw_value, address)
2✔
2047
        if value_or_default is None:
2✔
2048
            return None
2✔
2049
        invalid_type_exception = InvalidFieldTypeException(
1✔
2050
            address,
2051
            cls.alias,
2052
            raw_value,
2053
            expected_type="a list of dictionaries (or a single dictionary) of string -> string",
2054
        )
2055

2056
        # Also support passing in a single dictionary by wrapping it
2057
        if not isinstance(value_or_default, (list, tuple)):
1✔
2058
            value_or_default = [value_or_default]
1✔
2059

2060
        result_lst: list[FrozenDict[str, str]] = []
1✔
2061
        for item in value_or_default:
1✔
2062
            if not isinstance(item, collections.abc.Mapping):
1✔
2063
                raise invalid_type_exception
1✔
2064
            if not all(isinstance(k, str) and isinstance(v, str) for k, v in item.items()):
1✔
2065
                raise invalid_type_exception
1✔
2066
            result_lst.append(FrozenDict(item))
1✔
2067

2068
        return tuple(result_lst)
1✔
2069

2070

2071
class NestedDictStringToStringField(Field):
12✔
2072
    value: FrozenDict[str, FrozenDict[str, str]] | None
12✔
2073
    default: ClassVar[FrozenDict[str, FrozenDict[str, str]] | None] = None
12✔
2074

2075
    @classmethod
12✔
2076
    def compute_value(
12✔
2077
        cls, raw_value: dict[str, dict[str, str]] | None, address: Address
2078
    ) -> FrozenDict[str, FrozenDict[str, str]] | None:
2079
        value_or_default = super().compute_value(raw_value, address)
1✔
2080
        if value_or_default is None:
1✔
2081
            return None
1✔
2082
        invalid_type_exception = InvalidFieldTypeException(
1✔
2083
            address,
2084
            cls.alias,
2085
            raw_value,
2086
            expected_type="dict[str, dict[str, str]]",
2087
        )
2088
        if not isinstance(value_or_default, collections.abc.Mapping):
1✔
2089
            raise invalid_type_exception
1✔
2090
        for key, nested_value in value_or_default.items():
1✔
2091
            if not isinstance(key, str) or not isinstance(nested_value, collections.abc.Mapping):
1✔
2092
                raise invalid_type_exception
1✔
2093
            if not all(isinstance(k, str) and isinstance(v, str) for k, v in nested_value.items()):
1✔
2094
                raise invalid_type_exception
×
2095
        return FrozenDict(
1✔
2096
            {key: FrozenDict(nested_value) for key, nested_value in value_or_default.items()}
2097
        )
2098

2099

2100
class DictStringToStringSequenceField(Field):
12✔
2101
    value: FrozenDict[str, tuple[str, ...]] | None
12✔
2102
    default: ClassVar[FrozenDict[str, tuple[str, ...]] | None] = None
12✔
2103

2104
    @classmethod
12✔
2105
    def compute_value(
12✔
2106
        cls, raw_value: dict[str, Iterable[str]] | None, address: Address
2107
    ) -> FrozenDict[str, tuple[str, ...]] | None:
2108
        value_or_default = super().compute_value(raw_value, address)
3✔
2109
        if value_or_default is None:
3✔
2110
            return None
1✔
2111
        invalid_type_exception = InvalidFieldTypeException(
2✔
2112
            address,
2113
            cls.alias,
2114
            raw_value,
2115
            expected_type="a dictionary of string -> an iterable of strings",
2116
        )
2117
        if not isinstance(value_or_default, collections.abc.Mapping):
2✔
2118
            raise invalid_type_exception
1✔
2119
        result = {}
2✔
2120
        for k, v in value_or_default.items():
2✔
2121
            if not isinstance(k, str):
2✔
2122
                raise invalid_type_exception
1✔
2123
            try:
2✔
2124
                result[k] = tuple(ensure_str_list(v))
2✔
2125
            except ValueError:
1✔
2126
                raise invalid_type_exception
1✔
2127
        return FrozenDict(result)
2✔
2128

2129

2130
def _validate_choices(
12✔
2131
    address: Address,
2132
    field_alias: str,
2133
    values: Iterable[Any],
2134
    *,
2135
    valid_choices: type[Enum] | tuple[Any, ...],
2136
) -> None:
2137
    _valid_choices = set(
9✔
2138
        valid_choices
2139
        if isinstance(valid_choices, tuple)
2140
        else (choice.value for choice in valid_choices)
2141
    )
2142
    for choice in values:
9✔
2143
        if choice not in _valid_choices:
9✔
2144
            raise InvalidFieldChoiceException(
2✔
2145
                address, field_alias, choice, valid_choices=_valid_choices
2146
            )
2147

2148

2149
# -----------------------------------------------------------------------------------------------
2150
# Sources and codegen
2151
# -----------------------------------------------------------------------------------------------
2152

2153

2154
class SourcesField(AsyncFieldMixin, Field):
12✔
2155
    """A field for the sources that a target owns.
2156

2157
    When defining a new sources field, you should subclass `MultipleSourcesField` or
2158
    `SingleSourceField`, which set up the field's `alias` and data type / parsing. However, you
2159
    should use `tgt.get(SourcesField)` when you need to operate on all sources types, such as
2160
    with `HydrateSourcesRequest`, so that both subclasses work.
2161

2162
    Subclasses may set the following class properties:
2163

2164
    - `expected_file_extensions` -- A tuple of strings containing the expected file extensions for
2165
        source files. The default is no expected file extensions.
2166
    - `expected_num_files` -- An integer or range stating the expected total number of source
2167
        files. The default is no limit on the number of source files.
2168
    - `uses_source_roots` -- Whether the concept of "source root" pertains to the source files
2169
        referenced by this field.
2170
    - `default` -- A default value for this field.
2171
    - `default_glob_match_error_behavior` -- Advanced option, should very rarely be used. Override
2172
        glob match error behavior when using the default value. If setting this to
2173
        `GlobMatchErrorBehavior.ignore`, make sure you have other validation in place in case the
2174
        default glob doesn't match any files, if required, to alert the user appropriately.
2175
    """
2176

2177
    expected_file_extensions: ClassVar[tuple[str, ...] | None] = None
12✔
2178
    expected_num_files: ClassVar[int | range | None] = None
12✔
2179
    uses_source_roots: ClassVar[bool] = True
12✔
2180

2181
    default: ClassVar[ImmutableValue] = None
12✔
2182
    default_glob_match_error_behavior: ClassVar[GlobMatchErrorBehavior | None] = None
12✔
2183

2184
    @property
12✔
2185
    def globs(self) -> tuple[str, ...]:
12✔
2186
        """The raw globs, relative to the BUILD file."""
2187

2188
        # NB: We give a default implementation because it's common to use
2189
        # `tgt.get(SourcesField)`, and that must not error. But, subclasses need to
2190
        # implement this for the field to be useful (they should subclass `MultipleSourcesField`
2191
        # and `SingleSourceField`).
2192
        return ()
×
2193

2194
    def validate_resolved_files(self, files: Sequence[str]) -> None:
12✔
2195
        """Perform any additional validation on the resulting source files, e.g. ensuring that
2196
        certain banned files are not used.
2197

2198
        To enforce that the resulting files end in certain extensions, such as `.py` or `.java`, set
2199
        the class property `expected_file_extensions`.
2200

2201
        To enforce that there are only a certain number of resulting files, such as binary targets
2202
        checking for only 0-1 sources, set the class property `expected_num_files`.
2203
        """
2204
        if self.expected_file_extensions is not None:
×
2205
            bad_files = [
×
2206
                fp for fp in files if PurePath(fp).suffix not in self.expected_file_extensions
2207
            ]
2208
            if bad_files:
×
2209
                expected = (
×
2210
                    f"one of {sorted(self.expected_file_extensions)}"
2211
                    if len(self.expected_file_extensions) > 1
2212
                    else repr(self.expected_file_extensions[0])
2213
                )
2214
                raise InvalidFieldException(
×
2215
                    f"The {repr(self.alias)} field in target {self.address} can only contain "
2216
                    f"files that end in {expected}, but it had these files: {sorted(bad_files)}."
2217
                    "\n\nMaybe create a `resource`/`resources` or `file`/`files` target and "
2218
                    "include it in the `dependencies` field?"
2219
                )
2220
        if self.expected_num_files is not None:
×
2221
            num_files = len(files)
×
2222
            is_bad_num_files = (
×
2223
                num_files not in self.expected_num_files
2224
                if isinstance(self.expected_num_files, range)
2225
                else num_files != self.expected_num_files
2226
            )
2227
            if is_bad_num_files:
×
2228
                if isinstance(self.expected_num_files, range):
×
2229
                    if len(self.expected_num_files) == 2:
×
2230
                        expected_str = (
×
2231
                            " or ".join(str(n) for n in self.expected_num_files) + " files"
2232
                        )
2233
                    else:
2234
                        expected_str = f"a number of files in the range `{self.expected_num_files}`"
×
2235
                else:
2236
                    expected_str = pluralize(self.expected_num_files, "file")
×
2237
                raise InvalidFieldException(
×
2238
                    f"The {repr(self.alias)} field in target {self.address} must have "
2239
                    f"{expected_str}, but it had {pluralize(num_files, 'file')}."
2240
                )
2241

2242
    @staticmethod
12✔
2243
    def prefix_glob_with_dirpath(dirpath: str, glob: str) -> str:
12✔
2244
        if glob.startswith("!"):
2✔
2245
            return f"!{os.path.join(dirpath, glob[1:])}"
×
2246
        return os.path.join(dirpath, glob)
2✔
2247

2248
    @final
12✔
2249
    def _prefix_glob_with_address(self, glob: str) -> str:
12✔
2250
        return self.prefix_glob_with_dirpath(self.address.spec_path, glob)
2✔
2251

2252
    @final
12✔
2253
    @classmethod
12✔
2254
    def can_generate(
12✔
2255
        cls, output_type: type[SourcesField], union_membership: UnionMembership
2256
    ) -> bool:
2257
        """Can this field be used to generate the output_type?
2258

2259
        Generally, this method does not need to be used. Most call sites can simply use the below,
2260
        and the engine will generate the sources if possible or will return an instance of
2261
        HydratedSources with an empty snapshot if not possible:
2262

2263
            await hydrate_sources(
2264
                HydrateSourcesRequest(
2265
                    sources_field,
2266
                    for_sources_types=[FortranSources],
2267
                    enable_codegen=True,
2268
                ),
2269
                **implicitly(),
2270
            )
2271

2272
        This method is useful when you need to filter targets before hydrating them, such as how
2273
        you may filter targets via `tgt.has_field(MyField)`.
2274
        """
2275
        generate_request_types = union_membership.get(GenerateSourcesRequest)
2✔
2276
        return any(
2✔
2277
            issubclass(cls, generate_request_type.input)
2278
            and issubclass(generate_request_type.output, output_type)
2279
            for generate_request_type in generate_request_types
2280
        )
2281

2282
    @final
12✔
2283
    def path_globs(self, unmatched_build_file_globs: UnmatchedBuildFileGlobs) -> PathGlobs:
12✔
2284
        if not self.globs:
2✔
2285
            return PathGlobs([])
1✔
2286

2287
        # SingleSourceField has str as default type.
2288
        default_globs = (
2✔
2289
            [self.default] if self.default and isinstance(self.default, str) else self.default
2290
        )
2291

2292
        using_default_globs = default_globs and (set(self.globs) == set(default_globs)) or False
2✔
2293

2294
        # Use fields default error behavior if defined, if we use default globs else the provided
2295
        # error behavior.
2296
        error_behavior = (
2✔
2297
            unmatched_build_file_globs.error_behavior
2298
            if not using_default_globs or self.default_glob_match_error_behavior is None
2299
            else self.default_glob_match_error_behavior
2300
        )
2301

2302
        return PathGlobs(
2✔
2303
            (self._prefix_glob_with_address(glob) for glob in self.globs),
2304
            conjunction=GlobExpansionConjunction.any_match,
2305
            glob_match_error_behavior=error_behavior,
2306
            description_of_origin=(
2307
                f"{self.address}'s `{self.alias}` field"
2308
                if error_behavior != GlobMatchErrorBehavior.ignore
2309
                else None
2310
            ),
2311
        )
2312

2313
    @memoized_property
12✔
2314
    def filespec(self) -> Filespec:
12✔
2315
        """The original globs, returned in the Filespec dict format.
2316

2317
        The globs will be relativized to the build root.
2318
        """
2319
        includes = []
×
2320
        excludes = []
×
2321
        for glob in self.globs:
×
2322
            if glob.startswith("!"):
×
2323
                excludes.append(os.path.join(self.address.spec_path, glob[1:]))
×
2324
            else:
2325
                includes.append(os.path.join(self.address.spec_path, glob))
×
2326
        result: Filespec = {"includes": includes}
×
2327
        if excludes:
×
2328
            result["excludes"] = excludes
×
2329
        return result
×
2330

2331
    @memoized_property
12✔
2332
    def filespec_matcher(self) -> FilespecMatcher:
12✔
2333
        # Note: memoized because parsing the globs is expensive:
2334
        # https://github.com/pantsbuild/pants/issues/16122
2335
        return FilespecMatcher(self.filespec["includes"], self.filespec.get("excludes", []))
×
2336

2337

2338
class MultipleSourcesField(SourcesField, StringSequenceField):
12✔
2339
    """The `sources: list[str]` field.
2340

2341
    See the docstring for `SourcesField` for some class properties you can set, such as
2342
    `expected_file_extensions`.
2343

2344
    When you need to get the sources for all targets, use `tgt.get(SourcesField)` rather than
2345
    `tgt.get(MultipleSourcesField)`.
2346
    """
2347

2348
    alias = "sources"
12✔
2349

2350
    ban_subdirectories: ClassVar[bool] = False
12✔
2351

2352
    @property
12✔
2353
    def globs(self) -> tuple[str, ...]:
12✔
2354
        return self.value or ()
3✔
2355

2356
    @classmethod
12✔
2357
    def compute_value(
12✔
2358
        cls, raw_value: Iterable[str] | None, address: Address
2359
    ) -> tuple[str, ...] | None:
2360
        value = super().compute_value(raw_value, address)
9✔
2361
        invalid_globs = [glob for glob in (value or ()) if glob.startswith("../") or "/../" in glob]
9✔
2362
        if invalid_globs:
9✔
2363
            raise InvalidFieldException(
1✔
2364
                softwrap(
2365
                    f"""
2366
                    The {repr(cls.alias)} field in target {address} must not have globs with the
2367
                    pattern `../` because targets can only have sources in the current directory
2368
                    or subdirectories. It was set to: {sorted(value or ())}
2369
                    """
2370
                )
2371
            )
2372
        if cls.ban_subdirectories:
9✔
2373
            invalid_globs = [glob for glob in (value or ()) if "**" in glob or os.path.sep in glob]
2✔
2374
            if invalid_globs:
2✔
2375
                raise InvalidFieldException(
2✔
2376
                    softwrap(
2377
                        f"""
2378
                        The {repr(cls.alias)} field in target {address} must only have globs for
2379
                        the target's directory, i.e. it cannot include values with `**` or
2380
                        `{os.path.sep}`. It was set to: {sorted(value or ())}
2381
                        """
2382
                    )
2383
                )
2384
        return value
9✔
2385

2386

2387
class OptionalSingleSourceField(SourcesField, StringField):
12✔
2388
    """The `source: str` field.
2389

2390
    See the docstring for `SourcesField` for some class properties you can set, such as
2391
    `expected_file_extensions`.
2392

2393
    When you need to get the sources for all targets, use `tgt.get(SourcesField)` rather than
2394
    `tgt.get(OptionalSingleSourceField)`.
2395

2396
    Use `SingleSourceField` if the source must exist.
2397
    """
2398

2399
    alias = "source"
12✔
2400
    help = help_text(
12✔
2401
        """
2402
        A single file that belongs to this target.
2403

2404
        Path is relative to the BUILD file's directory, e.g. `source='example.ext'`.
2405
        """
2406
    )
2407
    required = False
12✔
2408
    default: ClassVar[str | None] = None
12✔
2409
    expected_num_files: ClassVar[int | range] = range(0, 2)
12✔
2410

2411
    @classmethod
12✔
2412
    def compute_value(cls, raw_value: str | None, address: Address) -> str | None:
12✔
2413
        value_or_default = super().compute_value(raw_value, address)
10✔
2414
        if value_or_default is None:
10✔
2415
            return None
3✔
2416
        if value_or_default.startswith("../") or "/../" in value_or_default:
10✔
2417
            raise InvalidFieldException(
1✔
2418
                softwrap(
2419
                    f"""\
2420
                    The {repr(cls.alias)} field in target {address} should not include `../`
2421
                    patterns because targets can only have sources in the current directory or
2422
                    subdirectories. It was set to {value_or_default}. Instead, use a normalized
2423
                    literal file path (relative to the BUILD file).
2424
                    """
2425
                )
2426
            )
2427
        if "*" in value_or_default:
10✔
2428
            raise InvalidFieldException(
1✔
2429
                softwrap(
2430
                    f"""\
2431
                    The {repr(cls.alias)} field in target {address} should not include `*` globs,
2432
                    but was set to {value_or_default}. Instead, use a literal file path (relative
2433
                    to the BUILD file).
2434
                    """
2435
                )
2436
            )
2437
        if value_or_default.startswith("!"):
10✔
2438
            raise InvalidFieldException(
1✔
2439
                softwrap(
2440
                    f"""\
2441
                    The {repr(cls.alias)} field in target {address} should not start with `!`,
2442
                    which is usually used in the `sources` field to exclude certain files. Instead,
2443
                    use a literal file path (relative to the BUILD file).
2444
                    """
2445
                )
2446
            )
2447
        return value_or_default
10✔
2448

2449
    @property
12✔
2450
    def file_path(self) -> str | None:
12✔
2451
        """The path to the file, relative to the build root.
2452

2453
        This works without hydration because we validate that `*` globs and `!` ignores are not
2454
        used. However, consider still hydrating so that you verify the source file actually exists.
2455

2456
        The return type is optional because it's possible to have 0-1 files.
2457
        """
2458
        if self.value is None:
1✔
2459
            return None
1✔
2460
        return os.path.join(self.address.spec_path, self.value)
1✔
2461

2462
    @property
12✔
2463
    def globs(self) -> tuple[str, ...]:
12✔
2464
        if self.value is None:
2✔
2465
            return ()
1✔
2466
        return (self.value,)
2✔
2467

2468

2469
class SingleSourceField(OptionalSingleSourceField):
12✔
2470
    """The `source: str` field.
2471

2472
    Unlike `OptionalSingleSourceField`, the `.value` must be defined, whether by setting the
2473
    `default` or making the field `required`.
2474

2475
    See the docstring for `SourcesField` for some class properties you can set, such as
2476
    `expected_file_extensions`.
2477

2478
    When you need to get the sources for all targets, use `tgt.get(SourcesField)` rather than
2479
    `tgt.get(SingleSourceField)`.
2480
    """
2481

2482
    required = True
12✔
2483
    expected_num_files = 1
12✔
2484
    value: str
12✔
2485

2486
    @property
12✔
2487
    def file_path(self) -> str:
12✔
2488
        result = super().file_path
×
2489
        assert result is not None
×
2490
        return result
×
2491

2492

2493
@dataclass(frozen=True)
12✔
2494
class HydrateSourcesRequest(EngineAwareParameter):
12✔
2495
    field: SourcesField
12✔
2496
    for_sources_types: tuple[type[SourcesField], ...]
12✔
2497
    enable_codegen: bool
12✔
2498

2499
    def __init__(
12✔
2500
        self,
2501
        field: SourcesField,
2502
        *,
2503
        for_sources_types: Iterable[type[SourcesField]] = (SourcesField,),
2504
        enable_codegen: bool = False,
2505
    ) -> None:
2506
        """Convert raw sources globs into an instance of HydratedSources.
2507

2508
        If you only want to handle certain SourcesFields, such as only PythonSources, set
2509
        `for_sources_types`. Any invalid sources will return a `HydratedSources` instance with an
2510
        empty snapshot and `sources_type = None`.
2511

2512
        If `enable_codegen` is set to `True`, any codegen sources will try to be converted to one
2513
        of the `for_sources_types`.
2514
        """
2515
        object.__setattr__(self, "field", field)
11✔
2516
        object.__setattr__(self, "for_sources_types", tuple(for_sources_types))
11✔
2517
        object.__setattr__(self, "enable_codegen", enable_codegen)
11✔
2518

2519
        self.__post_init__()
11✔
2520

2521
    def __post_init__(self) -> None:
12✔
2522
        if self.enable_codegen and self.for_sources_types == (SourcesField,):
11✔
2523
            raise ValueError(
×
2524
                "When setting `enable_codegen=True` on `HydrateSourcesRequest`, you must also "
2525
                "explicitly set `for_source_types`. Why? `for_source_types` is used to "
2526
                "determine which language(s) to try to generate. For example, "
2527
                "`for_source_types=(PythonSources,)` will hydrate `PythonSources` like normal, "
2528
                "and, if it encounters codegen sources that can be converted into Python, it will "
2529
                "generate Python files."
2530
            )
2531

2532
    def debug_hint(self) -> str:
12✔
2533
        return self.field.address.spec
×
2534

2535

2536
@dataclass(frozen=True)
12✔
2537
class HydratedSources:
12✔
2538
    """The result of hydrating a SourcesField.
2539

2540
    The `sources_type` will indicate which of the `HydrateSourcesRequest.for_sources_type` the
2541
    result corresponds to, e.g. if the result comes from `FilesSources` vs. `PythonSources`. If this
2542
    value is None, then the input `SourcesField` was not one of the expected types; or, when codegen
2543
    was enabled in the request, there was no valid code generator to generate the requested language
2544
    from the original input. This property allows for switching on the result, e.g. handling
2545
    hydrated files() sources differently than hydrated Python sources.
2546
    """
2547

2548
    snapshot: Snapshot
12✔
2549
    filespec: Filespec
12✔
2550
    sources_type: type[SourcesField] | None
12✔
2551

2552

2553
@union(in_scope_types=[EnvironmentName])
12✔
2554
@dataclass(frozen=True)
12✔
2555
class GenerateSourcesRequest:
12✔
2556
    """A request to go from protocol sources -> a particular language.
2557

2558
    This should be subclassed for each distinct codegen implementation. The subclasses must define
2559
    the class properties `input` and `output`. The subclass must also be registered via
2560
    `UnionRule(GenerateSourcesRequest, GenerateFortranFromAvroRequest)`, for example.
2561

2562
    The rule to actually implement the codegen should take the subclass as input, and it must
2563
    return `GeneratedSources`.
2564

2565
    The `exportable` attribute disables the use of this codegen by the `export-codegen` goal when
2566
    set to False.
2567

2568
    For example:
2569

2570
        class GenerateFortranFromAvroRequest:
2571
            input = AvroSources
2572
            output = FortranSources
2573

2574
        @rule
2575
        async def generate_fortran_from_avro(request: GenerateFortranFromAvroRequest) -> GeneratedSources:
2576
            ...
2577

2578
        def rules():
2579
            return [
2580
                generate_fortran_from_avro,
2581
                UnionRule(GenerateSourcesRequest, GenerateFortranFromAvroRequest),
2582
            ]
2583
    """
2584

2585
    protocol_sources: Snapshot
12✔
2586
    protocol_target: Target
12✔
2587

2588
    input: ClassVar[type[SourcesField]]
12✔
2589
    output: ClassVar[type[SourcesField]]
12✔
2590

2591
    exportable: ClassVar[bool] = True
12✔
2592

2593

2594
@dataclass(frozen=True)
12✔
2595
class GeneratedSources:
12✔
2596
    snapshot: Snapshot
12✔
2597

2598

2599
@rule(polymorphic=True)
12✔
2600
async def generate_sources(
12✔
2601
    req: GenerateSourcesRequest, env_name: EnvironmentName
2602
) -> GeneratedSources:
2603
    raise NotImplementedError()
×
2604

2605

2606
class SourcesPaths(Paths):
12✔
2607
    """The resolved file names of the `source`/`sources` field.
2608

2609
    This does not consider codegen, and only captures the files from the field.
2610
    """
2611

2612

2613
@dataclass(frozen=True)
12✔
2614
class SourcesPathsRequest(EngineAwareParameter):
12✔
2615
    """A request to resolve the file names of the `source`/`sources` field.
2616

2617
    Use via `await resolve_source_paths(SourcesPathRequest(tgt.get(SourcesField))`.
2618

2619
    This is faster than `await hydrate_sources(HydrateSourcesRequest)` because it does not snapshot
2620
    the files and it only resolves the file names.
2621

2622
    This does not consider codegen, and only captures the files from the field. Use
2623
    `HydrateSourcesRequest` to use codegen.
2624
    """
2625

2626
    field: SourcesField
12✔
2627

2628
    def debug_hint(self) -> str:
12✔
2629
        return self.field.address.spec
×
2630

2631

2632
def targets_with_sources_types(
12✔
2633
    sources_types: Iterable[type[SourcesField]],
2634
    targets: Iterable[Target],
2635
    union_membership: UnionMembership,
2636
) -> tuple[Target, ...]:
2637
    """Return all targets either with the specified sources subclass(es) or which can generate those
2638
    sources."""
2639
    return tuple(
1✔
2640
        tgt
2641
        for tgt in targets
2642
        if any(
2643
            tgt.has_field(sources_type)
2644
            or tgt.get(SourcesField).can_generate(sources_type, union_membership)
2645
            for sources_type in sources_types
2646
        )
2647
    )
2648

2649

2650
# -----------------------------------------------------------------------------------------------
2651
# `Dependencies` field
2652
# -----------------------------------------------------------------------------------------------
2653

2654

2655
class Dependencies(StringSequenceField, AsyncFieldMixin):
12✔
2656
    """The dependencies field.
2657

2658
    To resolve all dependencies—including the results of dependency inference—use either
2659
    `await resolve_dependencies(DependenciesRequest(tgt[Dependencies])` or
2660
    `await resolve_targets(**implicitly(DependenciesRequest(tgt[Dependencies]))`.
2661
    """
2662

2663
    alias = "dependencies"
12✔
2664
    help = help_text(
12✔
2665
        f"""
2666
        Addresses to other targets that this target depends on, e.g.
2667
        `['helloworld/subdir:lib', 'helloworld/main.py:lib', '3rdparty:reqs#django']`.
2668

2669
        This augments any dependencies inferred by Pants, such as by analyzing your imports. Use
2670
        `{bin_name()} dependencies` or `{bin_name()} peek` on this target to get the final
2671
        result.
2672

2673
        See {doc_url("docs/using-pants/key-concepts/targets-and-build-files")} for more about how addresses are formed, including for generated
2674
        targets. You can also run `{bin_name()} list ::` to find all addresses in your project, or
2675
        `{bin_name()} list dir` to find all addresses defined in that directory.
2676

2677
        If the target is in the same BUILD file, you can leave off the BUILD file path, e.g.
2678
        `:tgt` instead of `helloworld/subdir:tgt`. For generated first-party addresses, use
2679
        `./` for the file path, e.g. `./main.py:tgt`; for all other generated targets,
2680
        use `:tgt#generated_name`.
2681

2682
        You may exclude dependencies by prefixing with `!`, e.g.
2683
        `['!helloworld/subdir:lib', '!./sibling.txt']`. Ignores are intended for false positives
2684
        with dependency inference; otherwise, simply leave off the dependency from the BUILD file.
2685
        """
2686
    )
2687
    supports_transitive_excludes = False
12✔
2688

2689
    @memoized_property
12✔
2690
    def unevaluated_transitive_excludes(self) -> UnparsedAddressInputs:
12✔
2691
        val = (
×
2692
            (v[2:] for v in self.value if v.startswith("!!"))
2693
            if self.supports_transitive_excludes and self.value
2694
            else ()
2695
        )
2696
        return UnparsedAddressInputs(
×
2697
            val,
2698
            owning_address=self.address,
2699
            description_of_origin=f"the `{self.alias}` field from the target {self.address}",
2700
        )
2701

2702

2703
@dataclass(frozen=True)
12✔
2704
class DependenciesRequest(EngineAwareParameter):
12✔
2705
    field: Dependencies
12✔
2706
    should_traverse_deps_predicate: ShouldTraverseDepsPredicate = TraverseIfDependenciesField()
12✔
2707

2708
    def debug_hint(self) -> str:
12✔
2709
        return self.field.address.spec
×
2710

2711

2712
# NB: ExplicitlyProvidedDependenciesRequest does not have a predicate unlike DependenciesRequest.
2713
@dataclass(frozen=True)
12✔
2714
class ExplicitlyProvidedDependenciesRequest(EngineAwareParameter):
12✔
2715
    field: Dependencies
12✔
2716

2717
    def debug_hint(self) -> str:
12✔
2718
        return self.field.address.spec
×
2719

2720

2721
@dataclass(frozen=True)
12✔
2722
class ExplicitlyProvidedDependencies:
12✔
2723
    """The literal addresses from a BUILD file `dependencies` field.
2724

2725
    Almost always, you should use `await resolve_dependencies(DependenciesRequest, **implicitly())`
2726
    instead, which will consider dependency inference and apply ignores. However, this type can be
2727
    useful particularly within inference rules to see if a user already explicitly
2728
    provided a dependency.
2729

2730
    Resolve using
2731
    `await determine_explicitly_provided_dependencies(**implicitly(DependenciesRequest))`.
2732

2733
    Note that the `includes` are not filtered based on the `ignores`: this type preserves exactly
2734
    what was in the BUILD file.
2735
    """
2736

2737
    address: Address
12✔
2738
    includes: FrozenOrderedSet[Address]
12✔
2739
    ignores: FrozenOrderedSet[Address]
12✔
2740

2741
    @memoized_method
12✔
2742
    def any_are_covered_by_includes(self, addresses: Iterable[Address]) -> bool:
12✔
2743
        """Return True if every address is in the explicitly provided includes.
2744

2745
        Note that if the input addresses are generated targets, they will still be marked as covered
2746
        if their original target generator is in the explicitly provided includes.
2747
        """
2748
        return any(
1✔
2749
            addr in self.includes or addr.maybe_convert_to_target_generator() in self.includes
2750
            for addr in addresses
2751
        )
2752

2753
    @memoized_method
12✔
2754
    def remaining_after_disambiguation(
12✔
2755
        self, addresses: Iterable[Address], owners_must_be_ancestors: bool
2756
    ) -> frozenset[Address]:
2757
        """All addresses that remain after ineligible candidates are discarded.
2758

2759
        Candidates are removed if they appear as ignores (`!` and `!!)` in the `dependencies`
2760
        field. Note that if the input addresses are generated targets, they will still be marked as
2761
        covered if their original target generator is in the explicitly provided ignores.
2762

2763
        Candidates are also removed if `owners_must_be_ancestors` is True and the targets are not
2764
        ancestors, e.g. `root2:tgt` is not a valid candidate for something defined in `root1`.
2765
        """
2766
        original_addr_path = PurePath(self.address.spec_path)
1✔
2767

2768
        def is_valid(addr: Address) -> bool:
1✔
2769
            is_ignored = (
1✔
2770
                addr in self.ignores or addr.maybe_convert_to_target_generator() in self.ignores
2771
            )
2772
            if owners_must_be_ancestors is False:
1✔
2773
                return not is_ignored
1✔
2774
            # NB: `PurePath.is_relative_to()` was not added until Python 3.9. This emulates it.
2775
            try:
1✔
2776
                original_addr_path.relative_to(addr.spec_path)
1✔
2777
                return not is_ignored
1✔
2778
            except ValueError:
1✔
2779
                return False
1✔
2780

2781
        return frozenset(filter(is_valid, addresses))
1✔
2782

2783
    def maybe_warn_of_ambiguous_dependency_inference(
12✔
2784
        self,
2785
        ambiguous_addresses: Iterable[Address],
2786
        original_address: Address,
2787
        *,
2788
        context: str,
2789
        import_reference: str,
2790
        owners_must_be_ancestors: bool = False,
2791
    ) -> None:
2792
        """If the module is ambiguous and the user did not disambiguate, warn that dependency
2793
        inference will not be used.
2794

2795
        Disambiguation usually happens by using ignores in the `dependencies` field with `!` and
2796
        `!!`. If `owners_must_be_ancestors` is True, any addresses which are not ancestors of the
2797
        target in question will also be ignored.
2798
        """
2799
        if not ambiguous_addresses or self.any_are_covered_by_includes(ambiguous_addresses):
1✔
2800
            return
1✔
2801
        remaining = self.remaining_after_disambiguation(
1✔
2802
            ambiguous_addresses, owners_must_be_ancestors=owners_must_be_ancestors
2803
        )
2804
        if len(remaining) <= 1:
1✔
2805
            return
1✔
2806
        logger.warning(
1✔
2807
            f"{context}, but Pants cannot safely infer a dependency because more than one target "
2808
            f"owns this {import_reference}, so it is ambiguous which to use: "
2809
            f"{sorted(addr.spec for addr in remaining)}."
2810
            f"\n\nPlease explicitly include the dependency you want in the `dependencies` "
2811
            f"field of {original_address}, or ignore the ones you do not want by prefixing "
2812
            f"with `!` or `!!` so that one or no targets are left."
2813
            f"\n\nAlternatively, you can remove the ambiguity by deleting/changing some of the "
2814
            f"targets so that only 1 target owns this {import_reference}. Refer to "
2815
            f"{doc_url('docs/using-pants/troubleshooting-common-issues#import-errors-and-missing-dependencies')}."
2816
        )
2817

2818
    def disambiguated(
12✔
2819
        self, ambiguous_addresses: Iterable[Address], owners_must_be_ancestors: bool = False
2820
    ) -> Address | None:
2821
        """If exactly one of the input addresses remains after disambiguation, return it.
2822

2823
        Disambiguation usually happens by using ignores in the `dependencies` field with `!` and
2824
        `!!`. If `owners_must_be_ancestors` is True, any addresses which are not ancestors of the
2825
        target in question will also be ignored.
2826
        """
2827
        if not ambiguous_addresses or self.any_are_covered_by_includes(ambiguous_addresses):
1✔
2828
            return None
1✔
2829
        remaining_after_ignores = self.remaining_after_disambiguation(
1✔
2830
            ambiguous_addresses, owners_must_be_ancestors=owners_must_be_ancestors
2831
        )
2832
        return list(remaining_after_ignores)[0] if len(remaining_after_ignores) == 1 else None
1✔
2833

2834

2835
FS = TypeVar("FS", bound="FieldSet")
12✔
2836

2837

2838
@union(in_scope_types=[EnvironmentName])
12✔
2839
@dataclass(frozen=True)
12✔
2840
class InferDependenciesRequest(Generic[FS], EngineAwareParameter):
12✔
2841
    """A request to infer dependencies by analyzing source files.
2842

2843
    To set up a new inference implementation, subclass this class. Set the class property
2844
    `infer_from` to the FieldSet subclass you are able to infer from. This will cause the FieldSet
2845
    class, and any subclass, to use your inference implementation.
2846

2847
    Note that there cannot be more than one implementation for a particular `FieldSet` class.
2848

2849
    Register this subclass with `UnionRule(InferDependenciesRequest, InferFortranDependencies)`, for example.
2850

2851
    Then, create a rule that takes the subclass as a parameter and returns `InferredDependencies`.
2852

2853
    For example:
2854

2855
        class InferFortranDependencies(InferDependenciesRequest):
2856
            infer_from = FortranDependenciesInferenceFieldSet
2857

2858
        @rule
2859
        async def infer_fortran_dependencies(request: InferFortranDependencies) -> InferredDependencies:
2860
            hydrated_sources = await hydrate_sources(HydrateSources(request.field_set.sources))
2861
            ...
2862
            return InferredDependencies(...)
2863

2864
        def rules():
2865
            return [
2866
                infer_fortran_dependencies,
2867
                UnionRule(InferDependenciesRequest, InferFortranDependencies),
2868
            ]
2869
    """
2870

2871
    infer_from: ClassVar[type[FS]]  # type: ignore[misc]
12✔
2872

2873
    field_set: FS
12✔
2874

2875

2876
@dataclass(frozen=True)
12✔
2877
class InferredDependencies:
12✔
2878
    include: FrozenOrderedSet[Address]
12✔
2879
    exclude: FrozenOrderedSet[Address]
12✔
2880

2881
    def __init__(
12✔
2882
        self,
2883
        include: Iterable[Address],
2884
        *,
2885
        exclude: Iterable[Address] = (),
2886
    ) -> None:
2887
        """The result of inferring dependencies."""
2888
        object.__setattr__(self, "include", FrozenOrderedSet(sorted(include)))
9✔
2889
        object.__setattr__(self, "exclude", FrozenOrderedSet(sorted(exclude)))
9✔
2890

2891

2892
@union(in_scope_types=[EnvironmentName])
12✔
2893
@dataclass(frozen=True)
12✔
2894
class TransitivelyExcludeDependenciesRequest(Generic[FS], EngineAwareParameter):
12✔
2895
    """A request to transitvely exclude dependencies of a "root" node.
2896

2897
    This is similar to `InferDependenciesRequest`, except the request is only made for "root" nodes
2898
    in the dependency graph.
2899

2900
    This mirrors the public facing "transitive exclude" dependency feature (i.e. `!!<address>`).
2901
    """
2902

2903
    infer_from: ClassVar[type[FS]]  # type: ignore[misc]
12✔
2904

2905
    field_set: FS
12✔
2906

2907

2908
class TransitivelyExcludeDependencies(FrozenOrderedSet[Address]):
12✔
2909
    pass
12✔
2910

2911

2912
@union(in_scope_types=[EnvironmentName])
12✔
2913
@dataclass(frozen=True)
12✔
2914
class ValidateDependenciesRequest(Generic[FS], ABC):
12✔
2915
    """A request to validate dependencies after they have been computed.
2916

2917
    An implementing rule should raise an exception if dependencies are invalid.
2918
    """
2919

2920
    field_set_type: ClassVar[type[FS]]  # type: ignore[misc]
12✔
2921

2922
    field_set: FS
12✔
2923
    dependencies: Addresses
12✔
2924

2925

2926
@dataclass(frozen=True)
12✔
2927
class ValidatedDependencies:
12✔
2928
    pass
12✔
2929

2930

2931
@dataclass(frozen=True)
12✔
2932
class DependenciesRuleApplicationRequest:
12✔
2933
    """A request to return the applicable dependency rule action for each dependency of a target."""
2934

2935
    address: Address
12✔
2936
    dependencies: Addresses
12✔
2937
    description_of_origin: str = dataclasses.field(hash=False, compare=False)
12✔
2938

2939

2940
@dataclass(frozen=True)
12✔
2941
class DependenciesRuleApplication:
12✔
2942
    """Maps all dependencies to their respective dependency rule application of an origin target
2943
    address.
2944

2945
    The `applications` will be empty and the `address` `None` if there is no dependency rule
2946
    implementation.
2947
    """
2948

2949
    address: Address | None = None
12✔
2950
    dependencies_rule: FrozenDict[Address, DependencyRuleApplication] = FrozenDict()
12✔
2951

2952
    def __post_init__(self):
12✔
2953
        if self.dependencies_rule and self.address is None:
×
2954
            raise ValueError(
×
2955
                "The `address` field must not be None when there are `dependencies_rule`s."
2956
            )
2957

2958
    @classmethod
12✔
2959
    @memoized_method
12✔
2960
    def allow_all(cls) -> DependenciesRuleApplication:
12✔
2961
        return cls()
×
2962

2963
    def execute_actions(self) -> None:
12✔
2964
        errors = [
1✔
2965
            action_error.replace("\n", "\n    ")
2966
            for action_error in (rule.execute() for rule in self.dependencies_rule.values())
2967
            if action_error is not None
2968
        ]
2969
        if errors:
1✔
2970
            err_count = len(errors)
1✔
2971
            raise DependencyRuleActionDeniedError(
1✔
2972
                softwrap(
2973
                    f"""
2974
                    {self.address} has {pluralize(err_count, "dependency violation")}:
2975

2976
                    {bullet_list(errors)}
2977
                    """
2978
                )
2979
            )
2980

2981

2982
class SpecialCasedDependencies(StringSequenceField, AsyncFieldMixin):
12✔
2983
    """Subclass this for fields that act similarly to the `dependencies` field, but are handled
2984
    differently than normal dependencies.
2985

2986
    For example, you might have a field for package/binary dependencies, which you will call
2987
    the equivalent of `./pants package` on. While you could put these in the normal
2988
    `dependencies` field, it is often clearer to the user to call out this magic through a
2989
    dedicated field.
2990

2991
    This type will ensure that the dependencies show up in project introspection,
2992
    like `dependencies` and `dependents`, but not show up when you
2993
    `await transitive_targets(TransitiveTargetsRequest(...), **implicitly())` and
2994
    `await resolve_dependencies(DependenciesRequest(...), **implicitly())`.
2995

2996
    To hydrate this field's dependencies, use
2997
    `await resolve_unparsed_address_inputs(tgt.get(MyField).to_unparsed_address_inputs(), **implicitly())`.
2998
    """
2999

3000
    def to_unparsed_address_inputs(self) -> UnparsedAddressInputs:
12✔
3001
        return UnparsedAddressInputs(
×
3002
            self.value or (),
3003
            owning_address=self.address,
3004
            description_of_origin=f"the `{self.alias}` from the target {self.address}",
3005
        )
3006

3007

3008
# -----------------------------------------------------------------------------------------------
3009
# Other common Fields used across most targets
3010
# -----------------------------------------------------------------------------------------------
3011

3012

3013
class Tags(StringSequenceField):
12✔
3014
    alias = "tags"
12✔
3015
    help = help_text(
12✔
3016
        f"""
3017
        Arbitrary strings to describe a target.
3018

3019
        For example, you may tag some test targets with 'integration_test' so that you could run
3020
        `{bin_name()} --tag='integration_test' test ::`  to only run on targets with that tag.
3021
        """
3022
    )
3023

3024

3025
class DescriptionField(StringField):
12✔
3026
    alias = "description"
12✔
3027
    help = help_text(
12✔
3028
        f"""
3029
        A human-readable description of the target.
3030

3031
        Use `{bin_name()} list --documented ::` to see all targets with descriptions.
3032
        """
3033
    )
3034

3035

3036
COMMON_TARGET_FIELDS = (Tags, DescriptionField)
12✔
3037

3038

3039
class OverridesField(AsyncFieldMixin, Field):
12✔
3040
    """A mapping of keys (e.g. target names, source globs) to field names with their overridden
3041
    values.
3042

3043
    This is meant for target generators to reduce boilerplate. It's up to the corresponding target
3044
    generator rule to determine how to implement the field, such as how users specify the key. For
3045
    example, `{"f.ext": {"tags": ['my_tag']}}`.
3046
    """
3047

3048
    alias = "overrides"
12✔
3049
    value: dict[tuple[str, ...], dict[str, Any]] | None
12✔
3050
    default: ClassVar[None] = None  # A default does not make sense for this field.
12✔
3051

3052
    @classmethod
12✔
3053
    def compute_value(
12✔
3054
        cls,
3055
        raw_value: dict[str | tuple[str, ...], dict[str, Any]] | None,
3056
        address: Address,
3057
    ) -> FrozenDict[tuple[str, ...], FrozenDict[str, ImmutableValue]] | None:
3058
        value_or_default = super().compute_value(raw_value, address)
2✔
3059
        if value_or_default is None:
2✔
3060
            return None
2✔
3061

3062
        def invalid_type_exception() -> InvalidFieldException:
2✔
3063
            return InvalidFieldTypeException(
1✔
3064
                address,
3065
                cls.alias,
3066
                raw_value,
3067
                expected_type="dict[str | tuple[str, ...], dict[str, Any]]",
3068
            )
3069

3070
        if not isinstance(value_or_default, collections.abc.Mapping):
2✔
3071
            raise invalid_type_exception()
1✔
3072

3073
        result: dict[tuple[str, ...], FrozenDict[str, ImmutableValue]] = {}
2✔
3074
        for outer_key, nested_value in value_or_default.items():
2✔
3075
            if isinstance(outer_key, str):
2✔
3076
                outer_key = (outer_key,)
1✔
3077
            if not isinstance(outer_key, collections.abc.Sequence) or not all(
2✔
3078
                isinstance(elem, str) for elem in outer_key
3079
            ):
3080
                raise invalid_type_exception()
1✔
3081
            if not isinstance(nested_value, collections.abc.Mapping):
2✔
3082
                raise invalid_type_exception()
1✔
3083
            if not all(isinstance(inner_key, str) for inner_key in nested_value):
2✔
3084
                raise invalid_type_exception()
1✔
3085
            result[tuple(outer_key)] = FrozenDict.deep_freeze(cast(Mapping[str, Any], nested_value))
2✔
3086

3087
        return FrozenDict(result)
2✔
3088

3089
    @classmethod
12✔
3090
    def to_path_globs(
12✔
3091
        cls,
3092
        address: Address,
3093
        overrides_keys: Iterable[str],
3094
        unmatched_build_file_globs: UnmatchedBuildFileGlobs,
3095
    ) -> tuple[PathGlobs, ...]:
3096
        """Create a `PathGlobs` for each key.
3097

3098
        This should only be used if the keys are file globs.
3099
        """
3100

3101
        def relativize_glob(glob: str) -> str:
1✔
3102
            return (
1✔
3103
                f"!{os.path.join(address.spec_path, glob[1:])}"
3104
                if glob.startswith("!")
3105
                else os.path.join(address.spec_path, glob)
3106
            )
3107

3108
        return tuple(
1✔
3109
            PathGlobs(
3110
                [relativize_glob(glob)],
3111
                glob_match_error_behavior=unmatched_build_file_globs.error_behavior,
3112
                description_of_origin=f"the `overrides` field for {address}",
3113
            )
3114
            for glob in overrides_keys
3115
        )
3116

3117
    def flatten(self) -> dict[str, dict[str, Any]]:
12✔
3118
        """Combine all overrides for every key into a single dictionary."""
3119
        result: dict[str, dict[str, Any]] = {}
1✔
3120
        for keys, override in (self.value or {}).items():
1✔
3121
            for key in keys:
1✔
3122
                for field, value in override.items():
1✔
3123
                    if key not in result:
1✔
3124
                        result[key] = {field: value}
1✔
3125
                        continue
1✔
3126
                    if field not in result[key]:
1✔
3127
                        result[key][field] = value
1✔
3128
                        continue
1✔
3129
                    raise InvalidFieldException(
×
3130
                        f"Conflicting overrides in the `{self.alias}` field of "
3131
                        f"`{self.address}` for the key `{key}` for "
3132
                        f"the field `{field}`. You cannot specify the same field name "
3133
                        "multiple times for the same key.\n\n"
3134
                        f"(One override sets the field to `{repr(result[key][field])}` "
3135
                        f"but another sets to `{repr(value)}`.)"
3136
                    )
3137
        return result
1✔
3138

3139
    @classmethod
12✔
3140
    def flatten_paths(
12✔
3141
        cls,
3142
        address: Address,
3143
        paths_and_overrides: Iterable[tuple[Paths, PathGlobs, dict[str, Any]]],
3144
    ) -> dict[str, dict[str, Any]]:
3145
        """Combine all overrides for each file into a single dictionary."""
3146
        result: dict[str, dict[str, Any]] = {}
1✔
3147
        for paths, globs, override in paths_and_overrides:
1✔
3148
            # NB: If some globs did not result in any Paths, we preserve them to ensure that
3149
            # unconsumed overrides trigger errors during generation.
3150
            for path in paths.files or globs.globs:
1✔
3151
                for field, value in override.items():
1✔
3152
                    if path not in result:
1✔
3153
                        result[path] = {field: value}
1✔
3154
                        continue
1✔
3155
                    if field not in result[path]:
1✔
3156
                        result[path][field] = value
1✔
3157
                        continue
1✔
3158
                    relpath = fast_relpath(path, address.spec_path)
1✔
3159
                    raise InvalidFieldException(
1✔
3160
                        f"Conflicting overrides for `{address}` for the relative path "
3161
                        f"`{relpath}` for the field `{field}`. You cannot specify the same field "
3162
                        f"name multiple times for the same path.\n\n"
3163
                        f"(One override sets the field to `{repr(result[path][field])}` "
3164
                        f"but another sets to `{repr(value)}`.)"
3165
                    )
3166
        return result
1✔
3167

3168

3169
def generate_multiple_sources_field_help_message(files_example: str) -> str:
12✔
3170
    return softwrap(
12✔
3171
        """
3172
        A list of files and globs that belong to this target.
3173

3174
        Paths are relative to the BUILD file's directory. You can ignore files/globs by
3175
        prefixing them with `!`.
3176

3177
        """
3178
        + files_example
3179
    )
3180

3181

3182
def generate_file_based_overrides_field_help_message(
12✔
3183
    generated_target_name: str, example: str
3184
) -> str:
3185
    example = textwrap.dedent(example.lstrip("\n"))  # noqa: PNT20
12✔
3186
    example = textwrap.indent(example, " " * 4)
12✔
3187
    return "\n".join(
12✔
3188
        [
3189
            softwrap(
3190
                f"""
3191
                Override the field values for generated `{generated_target_name}` targets.
3192

3193
                Expects a dictionary of relative file paths and globs to a dictionary for the
3194
                overrides. You may either use a string for a single path / glob,
3195
                or a string tuple for multiple paths / globs. Each override is a dictionary of
3196
                field names to the overridden value.
3197

3198
                For example:
3199

3200
                {example}
3201
                """
3202
            ),
3203
            "",
3204
            softwrap(
3205
                f"""
3206
                File paths and globs are relative to the BUILD file's directory. Every overridden file is
3207
                validated to belong to this target's `sources` field.
3208

3209
                If you'd like to override a field's value for every `{generated_target_name}` target
3210
                generated by this target, change the field directly on this target rather than using the
3211
                `overrides` field.
3212

3213
                You can specify the same file name in multiple keys, so long as you don't override the
3214
                same field more than one time for the file.
3215
                """
3216
            ),
3217
        ],
3218
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc