• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25259185675

02 May 2026 06:47PM UTC coverage: 92.141% (-0.8%) from 92.955%
25259185675

push

github

web-flow
Fix the dynamic UI. (#23306)

In #23114 we upgraded to indicatif 0.18.4,
which included a fix to respect TERM, and 
display nothing if it's unset.

Since we did not pass TERM through pantsd, the
dynamic ui is now not shown. 

This change fixes that, and also pass NO_COLOR
through, since indicatif inspects it too.

88773 of 96345 relevant lines covered (92.14%)

3.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.89
/src/python/pants/engine/internals/specs_rules.py
1
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
11✔
5

6
import dataclasses
11✔
7
import itertools
11✔
8
import logging
11✔
9
import os
11✔
10
from collections import defaultdict
11✔
11
from collections.abc import Iterable
11✔
12

13
# TODO: This is a very fishy import. `internals` should not be importing from a backend.
14
from pants.backend.project_info.filter_targets import FilterSubsystem
11✔
15
from pants.base.specs import (
11✔
16
    AddressLiteralSpec,
17
    AncestorGlobSpec,
18
    DirGlobSpec,
19
    DirLiteralSpec,
20
    RawSpecs,
21
    RawSpecsWithOnlyFileOwners,
22
    RawSpecsWithoutFileOwners,
23
    RecursiveGlobSpec,
24
    Specs,
25
)
26
from pants.engine.addresses import Address, Addresses, AddressInput
11✔
27
from pants.engine.environment import ChosenLocalEnvironmentName, EnvironmentName
11✔
28
from pants.engine.fs import SpecsPaths
11✔
29
from pants.engine.internals.build_files import resolve_address
11✔
30
from pants.engine.internals.graph import (
11✔
31
    Owners,
32
    OwnersRequest,
33
    address_families_from_raw_specs_without_file_owners,
34
    filter_targets,
35
    find_owners,
36
    find_valid_field_sets,
37
    resolve_source_paths,
38
    resolve_target,
39
    resolve_target_parametrizations,
40
    resolve_targets,
41
)
42
from pants.engine.internals.mapper import SpecsFilter
11✔
43
from pants.engine.internals.parametrize import _TargetParametrizationsRequest
11✔
44
from pants.engine.internals.selectors import concurrently
11✔
45
from pants.engine.intrinsics import path_globs_to_paths
11✔
46
from pants.engine.rules import collect_rules, implicitly, rule
11✔
47
from pants.engine.target import (
11✔
48
    FieldSet,
49
    FieldSetsPerTargetRequest,
50
    NoApplicableTargetsBehavior,
51
    RegisteredTargetTypes,
52
    SourcesField,
53
    SourcesPathsRequest,
54
    Target,
55
    TargetGenerator,
56
    TargetRootsToFieldSets,
57
    TargetRootsToFieldSetsRequest,
58
    WrappedTarget,
59
    WrappedTargetRequest,
60
)
61
from pants.engine.unions import UnionMembership
11✔
62
from pants.option.global_options import GlobalOptions
11✔
63
from pants.util.dirutil import recursive_dirname
11✔
64
from pants.util.docutil import bin_name
11✔
65
from pants.util.logging import LogLevel
11✔
66
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet
11✔
67
from pants.util.strutil import bullet_list
11✔
68

69
logger = logging.getLogger(__name__)
11✔
70

71

72
# -----------------------------------------------------------------------------------------------
73
# RawSpecsWithoutFileOwners -> Targets
74
# -----------------------------------------------------------------------------------------------
75

76

77
async def _determine_literal_addresses_from_raw_specs(
11✔
78
    literal_specs: tuple[AddressLiteralSpec, ...],
79
    local_environment_name: ChosenLocalEnvironmentName,
80
    *,
81
    description_of_origin: str,
82
) -> tuple[WrappedTarget, ...]:
83
    literal_addresses = await concurrently(
11✔
84
        resolve_address(
85
            **implicitly(
86
                AddressInput(
87
                    str(spec),
88
                    spec.path_component,
89
                    description_of_origin=description_of_origin,
90
                    target_component=spec.target_component,
91
                    generated_component=spec.generated_component,
92
                    parameters=dict(spec.parameters),
93
                )
94
            )
95
        )
96
        for spec in literal_specs
97
    )
98

99
    # We replace references to parametrized target templates with all their created targets. For
100
    # example:
101
    #  - dir:tgt -> (dir:tgt@k=v1, dir:tgt@k=v2)
102
    #  - dir:tgt@k=v -> (dir:tgt@k=v,another=a, dir:tgt@k=v,another=b), but not anything
103
    #       where @k=v is not true.
104
    literal_parametrizations = await concurrently(
11✔
105
        resolve_target_parametrizations(
106
            **implicitly(
107
                {
108
                    _TargetParametrizationsRequest(
109
                        address.maybe_convert_to_target_generator(),
110
                        description_of_origin=description_of_origin,
111
                    ): _TargetParametrizationsRequest,
112
                    local_environment_name.val: EnvironmentName,
113
                }
114
            )
115
        )
116
        for address in literal_addresses
117
    )
118

119
    # Note that if the address is not in the _TargetParametrizations, we must fall back to that
120
    # address's value. This will allow us to error that the address is invalid.
121
    all_candidate_addresses = itertools.chain.from_iterable(
11✔
122
        list(params.get_all_superset_targets(address)) or [address]
123
        for address, params in zip(literal_addresses, literal_parametrizations)
124
    )
125

126
    # We eagerly call the `WrappedTarget` rule because it will validate that every final address
127
    # actually exists, such as with generated target addresses.
128
    return await concurrently(
11✔
129
        resolve_target(
130
            WrappedTargetRequest(addr, description_of_origin=description_of_origin), **implicitly()
131
        )
132
        for addr in all_candidate_addresses
133
    )
134

135

136
@rule(_masked_types=[EnvironmentName])
11✔
137
async def addresses_from_raw_specs_without_file_owners(
11✔
138
    specs: RawSpecsWithoutFileOwners,
139
    specs_filter: SpecsFilter,
140
    local_environment_name: ChosenLocalEnvironmentName,
141
) -> Addresses:
142
    matched_addresses: OrderedSet[Address] = OrderedSet()
11✔
143
    filtering_disabled = specs.filter_by_global_options is False
11✔
144

145
    literal_wrapped_targets = await _determine_literal_addresses_from_raw_specs(
11✔
146
        specs.address_literals,
147
        local_environment_name,
148
        description_of_origin=specs.description_of_origin,
149
    )
150
    matched_addresses.update(
11✔
151
        wrapped_tgt.target.address
152
        for wrapped_tgt in literal_wrapped_targets
153
        if filtering_disabled or specs_filter.matches(wrapped_tgt.target)
154
    )
155

156
    address_families = await address_families_from_raw_specs_without_file_owners(
11✔
157
        specs, **implicitly()
158
    )
159
    if not address_families:
11✔
160
        return Addresses(matched_addresses)
11✔
161

162
    base_addresses = address_families.addresses()
11✔
163

164
    target_parametrizations_list = await concurrently(
11✔
165
        resolve_target_parametrizations(
166
            **implicitly(
167
                {
168
                    _TargetParametrizationsRequest(
169
                        base_address, description_of_origin=specs.description_of_origin
170
                    ): _TargetParametrizationsRequest,
171
                    local_environment_name.val: EnvironmentName,
172
                }
173
            )
174
        )
175
        for base_address in base_addresses
176
    )
177
    residence_dir_to_targets = defaultdict(list)
11✔
178
    for target_parametrizations in target_parametrizations_list:
11✔
179
        for tgt in target_parametrizations.all:
11✔
180
            residence_dir_to_targets[tgt.residence_dir].append(tgt)
11✔
181

182
    def valid_tgt(
11✔
183
        tgt: Target, spec: DirLiteralSpec | DirGlobSpec | RecursiveGlobSpec | AncestorGlobSpec
184
    ) -> bool:
185
        if not spec.matches_target_generators and isinstance(tgt, TargetGenerator):
11✔
186
            return False
1✔
187
        return filtering_disabled or specs_filter.matches(tgt)
11✔
188

189
    for glob_spec in specs.glob_specs():
11✔
190
        for residence_dir in residence_dir_to_targets:
11✔
191
            if not glob_spec.matches_target_residence_dir(residence_dir):
11✔
192
                continue
7✔
193
            matched_addresses.update(
11✔
194
                tgt.address
195
                for tgt in residence_dir_to_targets[residence_dir]
196
                if valid_tgt(tgt, glob_spec)
197
            )
198

199
    return Addresses(sorted(matched_addresses))
11✔
200

201

202
# -----------------------------------------------------------------------------------------------
203
# RawSpecsWithOnlyFileOwners -> Targets
204
# -----------------------------------------------------------------------------------------------
205

206

207
@rule(_masked_types=[EnvironmentName])
11✔
208
async def addresses_from_raw_specs_with_only_file_owners(
11✔
209
    specs: RawSpecsWithOnlyFileOwners,
210
) -> Owners:
211
    """Find the owner(s) for each spec."""
212
    paths_per_include = await concurrently(
11✔
213
        path_globs_to_paths(specs.path_globs_for_spec(spec)) for spec in specs.all_specs()
214
    )
215
    all_files = tuple(itertools.chain.from_iterable(paths.files for paths in paths_per_include))
11✔
216
    owners = await find_owners(
11✔
217
        OwnersRequest(
218
            all_files,
219
            filter_by_global_options=specs.filter_by_global_options,
220
            # Specifying a BUILD file should not expand to all the targets it defines.
221
            match_if_owning_build_file_included_in_sources=False,
222
        ),
223
        **implicitly(),
224
    )
225
    return owners
11✔
226

227

228
@rule(_masked_types=[EnvironmentName])
11✔
229
async def addresses_from_owners(owners: Owners) -> Addresses:
11✔
230
    return Addresses(sorted(owners))
11✔
231

232

233
# -----------------------------------------------------------------------------------------------
234
# RawSpecs & Specs -> Targets
235
# -----------------------------------------------------------------------------------------------
236

237

238
@rule(_masked_types=[EnvironmentName])
11✔
239
async def resolve_addresses_from_raw_specs(specs: RawSpecs) -> Addresses:
11✔
240
    without_file_owners, with_file_owners = await concurrently(
11✔
241
        addresses_from_raw_specs_without_file_owners(
242
            RawSpecsWithoutFileOwners.from_raw_specs(specs), **implicitly()
243
        ),
244
        addresses_from_owners(**implicitly(RawSpecsWithOnlyFileOwners.from_raw_specs(specs))),
245
    )
246
    # Use a set to dedupe.
247
    return Addresses(sorted({*without_file_owners, *with_file_owners}))
11✔
248

249

250
@rule(desc="Find targets from input specs", level=LogLevel.DEBUG, _masked_types=[EnvironmentName])
11✔
251
async def resolve_addresses_from_specs(specs: Specs) -> Addresses:
11✔
252
    includes, ignores = await concurrently(
10✔
253
        resolve_addresses_from_raw_specs(specs.includes),
254
        resolve_addresses_from_raw_specs(specs.ignores),
255
    )
256
    # No matter what, ignores win out over includes. This avoids "specificity wars" and keeps our
257
    # semantics simple/predictable.
258
    return Addresses(FrozenOrderedSet(includes) - FrozenOrderedSet(ignores))
10✔
259

260

261
@rule
11✔
262
async def setup_specs_filter(
11✔
263
    global_options: GlobalOptions,
264
    filter_subsystem: FilterSubsystem,
265
    registered_target_types: RegisteredTargetTypes,
266
) -> SpecsFilter:
267
    return SpecsFilter.create(filter_subsystem, registered_target_types, tags=global_options.tag)
11✔
268

269

270
# -----------------------------------------------------------------------------------------------
271
# SpecsPaths
272
# -----------------------------------------------------------------------------------------------
273

274

275
@rule(desc="Find all sources from input specs", level=LogLevel.DEBUG)
11✔
276
async def resolve_specs_paths(specs: Specs) -> SpecsPaths:
11✔
277
    """Resolve all files matching the given specs.
278

279
    All matched targets will use their `sources` field. Certain specs like FileLiteralSpec will
280
    also match against all their files, regardless of if a target owns them.
281

282
    Ignores win out over includes, with these edge cases:
283

284
    * Ignored paths: the resolved paths should be excluded.
285
    * Ignored targets: their `sources` should be excluded.
286
    * File owned by a target that gets filtered out, e.g. via `--tag`. See
287
      https://github.com/pantsbuild/pants/issues/15478.
288
    """
289

290
    unfiltered_include_targets, ignore_targets, include_paths, ignore_paths = await concurrently(
8✔
291
        resolve_targets(
292
            **implicitly(
293
                {dataclasses.replace(specs.includes, filter_by_global_options=False): RawSpecs}
294
            )
295
        ),
296
        resolve_targets(**implicitly(specs.ignores)),
297
        path_globs_to_paths(specs.includes.to_specs_paths_path_globs()),
298
        path_globs_to_paths(specs.ignores.to_specs_paths_path_globs()),
299
    )
300

301
    filtered_include_targets = await filter_targets(unfiltered_include_targets, **implicitly())
8✔
302

303
    include_targets_sources_paths = await concurrently(
8✔
304
        resolve_source_paths(SourcesPathsRequest(tgt[SourcesField]), **implicitly())
305
        for tgt in filtered_include_targets
306
        if tgt.has_field(SourcesField)
307
    )
308

309
    ignore_targets_sources_paths = await concurrently(
8✔
310
        resolve_source_paths(SourcesPathsRequest(tgt[SourcesField]), **implicitly())
311
        for tgt in ignore_targets
312
        if tgt.has_field(SourcesField)
313
    )
314

315
    result_paths = OrderedSet(
8✔
316
        itertools.chain.from_iterable(paths.files for paths in include_targets_sources_paths),
317
    )
318
    result_paths.update(include_paths.files)
8✔
319
    result_paths.difference_update(
8✔
320
        itertools.chain.from_iterable(paths.files for paths in ignore_targets_sources_paths)
321
    )
322
    result_paths.difference_update(ignore_paths.files)
8✔
323

324
    # If include paths were given, we need to also remove any paths from filtered out targets
325
    # (e.g. via `--tag`), per https://github.com/pantsbuild/pants/issues/15478.
326
    if include_paths.files:
8✔
327
        filtered_out_include_targets = FrozenOrderedSet(unfiltered_include_targets).difference(
6✔
328
            FrozenOrderedSet(filtered_include_targets)
329
        )
330
        filtered_include_targets_sources_paths = await concurrently(
6✔
331
            resolve_source_paths(SourcesPathsRequest(tgt[SourcesField]), **implicitly())
332
            for tgt in filtered_out_include_targets
333
            if tgt.has_field(SourcesField)
334
        )
335
        result_paths.difference_update(
6✔
336
            itertools.chain.from_iterable(
337
                paths.files for paths in filtered_include_targets_sources_paths
338
            )
339
        )
340

341
    dirs = OrderedSet(
8✔
342
        itertools.chain.from_iterable(recursive_dirname(os.path.dirname(f)) for f in result_paths)
343
    ) - {""}
344
    return SpecsPaths(tuple(sorted(result_paths)), tuple(sorted(dirs)))
8✔
345

346

347
# -----------------------------------------------------------------------------------------------
348
# RawSpecs -> FieldSets
349
# -----------------------------------------------------------------------------------------------
350

351

352
class NoApplicableTargetsException(Exception):
11✔
353
    def __init__(
11✔
354
        self,
355
        targets: Iterable[Target],
356
        specs: Specs,
357
        union_membership: UnionMembership,
358
        *,
359
        applicable_target_types: Iterable[type[Target]],
360
        goal_description: str,
361
    ) -> None:
362
        applicable_target_aliases = sorted(
3✔
363
            {target_type.alias for target_type in applicable_target_types}
364
        )
365
        inapplicable_target_aliases = sorted({tgt.alias for tgt in targets})
3✔
366
        msg = (
3✔
367
            "No applicable files or targets matched."
368
            if inapplicable_target_aliases
369
            else "No files or targets specified."
370
        )
371
        msg += (
3✔
372
            f" {goal_description.capitalize()} works "
373
            f"with these target types:\n\n"
374
            f"{bullet_list(applicable_target_aliases)}\n\n"
375
        )
376

377
        # Explain what was specified, if relevant.
378
        if inapplicable_target_aliases:
3✔
379
            specs_description = specs.arguments_provided_description() or ""
2✔
380
            if specs_description:
2✔
381
                specs_description = f" {specs_description} with"
2✔
382
            msg += (
2✔
383
                f"However, you only specified{specs_description} these target types:\n\n"
384
                f"{bullet_list(inapplicable_target_aliases)}\n\n"
385
            )
386

387
        # Add a remedy.
388
        #
389
        # We sometimes suggest using `./pants filedeps` to find applicable files. However, this
390
        # command only works if at least one of the targets has a SourcesField field.
391
        filedeps_goal_works = any(
3✔
392
            tgt.class_has_field(SourcesField, union_membership) for tgt in applicable_target_types
393
        )
394
        pants_filter_command = (
3✔
395
            f"{bin_name()} --filter-target-type={','.join(applicable_target_aliases)}"
396
        )
397
        remedy = (
3✔
398
            f"Please specify relevant file and/or target arguments. Run `{pants_filter_command} "
399
            f"list ::` to find all applicable targets in your project"
400
        )
401
        if filedeps_goal_works:
3✔
402
            remedy += f", or run `{pants_filter_command} filedeps ::` to find all applicable files."
1✔
403
        else:
404
            remedy += "."
3✔
405
        msg += remedy
3✔
406
        super().__init__(msg)
3✔
407

408
    @classmethod
11✔
409
    def create_from_field_sets(
11✔
410
        cls,
411
        targets: Iterable[Target],
412
        specs: Specs,
413
        union_membership: UnionMembership,
414
        registered_target_types: RegisteredTargetTypes,
415
        *,
416
        field_set_types: Iterable[type[FieldSet]],
417
        goal_description: str,
418
    ) -> NoApplicableTargetsException:
419
        applicable_target_types = {
3✔
420
            target_type
421
            for field_set_type in field_set_types
422
            for target_type in field_set_type.applicable_target_types(
423
                registered_target_types.types, union_membership
424
            )
425
        }
426
        return cls(
3✔
427
            targets,
428
            specs,
429
            union_membership,
430
            applicable_target_types=applicable_target_types,
431
            goal_description=goal_description,
432
        )
433

434

435
class TooManyTargetsException(Exception):
11✔
436
    def __init__(self, targets: Iterable[Target], *, goal_description: str) -> None:
11✔
437
        addresses = sorted(tgt.address.spec for tgt in targets)
×
438
        super().__init__(
×
439
            f"{goal_description.capitalize()} only works with one valid target, but was given "
440
            f"multiple valid targets:\n\n{bullet_list(addresses)}\n\n"
441
            "Please select one of these targets to run."
442
        )
443

444

445
class AmbiguousImplementationsException(Exception):
11✔
446
    """A target has multiple valid FieldSets, but a goal expects there to be one FieldSet."""
447

448
    def __init__(
11✔
449
        self,
450
        target: Target,
451
        field_sets: Iterable[FieldSet],
452
        *,
453
        goal_description: str,
454
    ) -> None:
455
        # TODO: improve this error message. A better error message would explain to users how they
456
        #  can resolve the issue.
457
        possible_field_sets_types = sorted(field_set.__class__.__name__ for field_set in field_sets)
×
458
        super().__init__(
×
459
            f"Multiple of the registered implementations for {goal_description} work for "
460
            f"{target.address} (target type {repr(target.alias)}). It is ambiguous which "
461
            "implementation to use.\n\nPossible implementations:\n\n"
462
            f"{bullet_list(possible_field_sets_types)}"
463
        )
464

465

466
@rule
11✔
467
async def find_valid_field_sets_for_target_roots(
11✔
468
    request: TargetRootsToFieldSetsRequest,
469
    specs: Specs,
470
    union_membership: UnionMembership,
471
    registered_target_types: RegisteredTargetTypes,
472
) -> TargetRootsToFieldSets:
473
    # NB: This must be in an `await Get`, rather than the rule signature, to avoid a rule graph
474
    # issue.
475
    targets = await filter_targets(**implicitly({specs: Specs}))
4✔
476
    field_sets_per_target = await find_valid_field_sets(
4✔
477
        FieldSetsPerTargetRequest(request.field_set_superclass, targets), **implicitly()
478
    )
479
    targets_to_applicable_field_sets = {}
4✔
480
    for tgt, field_sets in zip(targets, field_sets_per_target.collection):
4✔
481
        if field_sets:
4✔
482
            targets_to_applicable_field_sets[tgt] = field_sets
4✔
483

484
    # Possibly warn or error if no targets were applicable.
485
    if not targets_to_applicable_field_sets:
4✔
486
        no_applicable_exception = NoApplicableTargetsException.create_from_field_sets(
3✔
487
            targets,
488
            specs,
489
            union_membership,
490
            registered_target_types,
491
            field_set_types=union_membership.get(request.field_set_superclass),
492
            goal_description=request.goal_description,
493
        )
494
        if request.no_applicable_targets_behavior == NoApplicableTargetsBehavior.error:
3✔
495
            raise no_applicable_exception
2✔
496

497
        # We squelch the warning if the specs came from change detection or only from globs,
498
        # since in that case we interpret the user's intent as "if there are relevant matching
499
        # targets, act on them". But we still want to warn if the specs were literal, or empty.
500
        #
501
        # No need to check `specs.ignores` here, as change detection will not set that. Likewise,
502
        # we don't want an ignore spec to trigger this warning, even if it was a literal.
503
        empty_ok = specs.includes.from_change_detection or (
2✔
504
            specs.includes
505
            and not specs.includes.address_literals
506
            and not specs.includes.file_literals
507
        )
508
        if (
2✔
509
            request.no_applicable_targets_behavior == NoApplicableTargetsBehavior.warn
510
            and not empty_ok
511
        ):
512
            logger.warning(str(no_applicable_exception))
1✔
513

514
    if request.num_shards > 0:
4✔
515
        sharded_targets_to_applicable_field_sets = {
×
516
            tgt: value
517
            for tgt, value in targets_to_applicable_field_sets.items()
518
            if request.is_in_shard(tgt.address.spec)
519
        }
520
        return TargetRootsToFieldSets(sharded_targets_to_applicable_field_sets)
×
521
    return TargetRootsToFieldSets(targets_to_applicable_field_sets)
4✔
522

523

524
def rules():
11✔
525
    return collect_rules()
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc