• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/python/pants/backend/python/target_types_rules.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
"""Rules for the core Python target types.
5

6
This is a separate module to avoid circular dependencies. Note that all types used by call sites are
7
defined in `target_types.py`.
8
"""
9

UNCOV
10
import dataclasses
×
UNCOV
11
import logging
×
UNCOV
12
import os.path
×
UNCOV
13
from collections import defaultdict
×
UNCOV
14
from collections.abc import Callable, Generator
×
UNCOV
15
from dataclasses import dataclass
×
UNCOV
16
from itertools import chain
×
UNCOV
17
from typing import DefaultDict, cast
×
18

UNCOV
19
from pants.backend.python.dependency_inference.module_mapper import (
×
20
    PythonModuleOwners,
21
    PythonModuleOwnersRequest,
22
    map_module_to_address,
23
)
UNCOV
24
from pants.backend.python.dependency_inference.rules import import_rules
×
UNCOV
25
from pants.backend.python.dependency_inference.subsystem import (
×
26
    AmbiguityResolution,
27
    PythonInferSubsystem,
28
)
UNCOV
29
from pants.backend.python.subsystems.setup import PythonSetup
×
UNCOV
30
from pants.backend.python.target_types import (
×
31
    EntryPoint,
32
    InterpreterConstraintsField,
33
    PexBinariesGeneratorTarget,
34
    PexBinary,
35
    PexBinaryDependenciesField,
36
    PexEntryPointField,
37
    PexEntryPointsField,
38
    PythonDistributionDependenciesField,
39
    PythonDistributionEntryPoint,
40
    PythonDistributionEntryPointsField,
41
    PythonFilesGeneratorSettingsRequest,
42
    PythonProvidesField,
43
    PythonResolveField,
44
    ResolvedPexEntryPoint,
45
    ResolvedPythonDistributionEntryPoints,
46
    ResolvePexEntryPointRequest,
47
    ResolvePythonDistributionEntryPointsRequest,
48
)
UNCOV
49
from pants.backend.python.util_rules.interpreter_constraints import interpreter_constraints_contains
×
UNCOV
50
from pants.core.util_rules.unowned_dependency_behavior import (
×
51
    UnownedDependencyError,
52
    UnownedDependencyUsage,
53
)
UNCOV
54
from pants.engine.addresses import Address, Addresses, UnparsedAddressInputs
×
UNCOV
55
from pants.engine.fs import GlobMatchErrorBehavior, PathGlobs
×
UNCOV
56
from pants.engine.internals.graph import (
×
57
    determine_explicitly_provided_dependencies,
58
    resolve_target,
59
    resolve_targets,
60
    resolve_unparsed_address_inputs,
61
)
UNCOV
62
from pants.engine.intrinsics import path_globs_to_paths
×
UNCOV
63
from pants.engine.rules import collect_rules, concurrently, implicitly, rule
×
UNCOV
64
from pants.engine.target import (
×
65
    DependenciesRequest,
66
    ExplicitlyProvidedDependencies,
67
    FieldDefaultFactoryRequest,
68
    FieldDefaultFactoryResult,
69
    FieldSet,
70
    GeneratedTargets,
71
    GenerateTargetsRequest,
72
    InferDependenciesRequest,
73
    InferredDependencies,
74
    InvalidFieldException,
75
    TargetFilesGeneratorSettings,
76
    TargetFilesGeneratorSettingsRequest,
77
    ValidatedDependencies,
78
    ValidateDependenciesRequest,
79
    WrappedTargetRequest,
80
)
UNCOV
81
from pants.engine.unions import UnionMembership, UnionRule
×
UNCOV
82
from pants.source.source_root import SourceRootRequest, get_source_root
×
UNCOV
83
from pants.util.docutil import doc_url
×
UNCOV
84
from pants.util.frozendict import FrozenDict
×
UNCOV
85
from pants.util.ordered_set import OrderedSet
×
UNCOV
86
from pants.util.strutil import bullet_list, softwrap
×
87

UNCOV
88
logger = logging.getLogger(__name__)
×
89

90

UNCOV
91
class SetupPyError(Exception):
×
UNCOV
92
    def __init__(self, msg: str):
×
UNCOV
93
        super().__init__(f"{msg} See {doc_url('docs/python/overview/building-distributions')}.")
×
94

95

UNCOV
96
class InvalidEntryPoint(SetupPyError, InvalidFieldException):
×
97
    """Indicates that a specified binary entry point was invalid."""
98

99

UNCOV
100
@rule
×
UNCOV
101
async def python_files_generator_settings(
×
102
    _: PythonFilesGeneratorSettingsRequest,
103
    python_infer: PythonInferSubsystem,
104
) -> TargetFilesGeneratorSettings:
105
    return TargetFilesGeneratorSettings(add_dependencies_on_all_siblings=not python_infer.imports)
×
106

107

108
# -----------------------------------------------------------------------------------------------
109
# `pex_binary` target generation rules
110
# -----------------------------------------------------------------------------------------------
111

112

UNCOV
113
class GenerateTargetsFromPexBinaries(GenerateTargetsRequest):
×
114
    # TODO: This can be deprecated in favor of `parametrize`.
UNCOV
115
    generate_from = PexBinariesGeneratorTarget
×
116

117

UNCOV
118
@rule
×
UNCOV
119
async def generate_targets_from_pex_binaries(
×
120
    request: GenerateTargetsFromPexBinaries,
121
    union_membership: UnionMembership,
122
) -> GeneratedTargets:
123
    generator_addr = request.template_address
×
124
    entry_points_field = request.generator[PexEntryPointsField].value or []
×
125
    overrides = request.require_unparametrized_overrides()
×
126

127
    # Note that we don't check for overlap because it seems unlikely to be a problem.
128
    # If it does, we should add this check. (E.g. `path.to.app` and `path/to/app.py`)
129

130
    def create_pex_binary(entry_point_spec: str) -> PexBinary:
×
131
        return PexBinary(
×
132
            {
133
                PexEntryPointField.alias: entry_point_spec,
134
                **request.template,
135
                # Note that overrides comes last to make sure that it indeed overrides.
136
                **overrides.pop(entry_point_spec, {}),
137
            },
138
            # ":" is a forbidden character in target names
139
            generator_addr.create_generated(entry_point_spec.replace(":", "-")),
140
            union_membership,
141
            residence_dir=generator_addr.spec_path,
142
        )
143

144
    pex_binaries = [create_pex_binary(entry_point) for entry_point in entry_points_field]
×
145

146
    if overrides:
×
147
        raise InvalidFieldException(
×
148
            softwrap(
149
                f"""
150
                Unused key in the `overrides` field for {generator_addr}:
151
                {sorted(overrides)}
152

153
                Tip: if you'd like to override a field's value for every `{PexBinary.alias}` target
154
                generated by this target, change the field directly on this target rather than using
155
                the `overrides` field.
156
                """
157
            )
158
        )
159

160
    return GeneratedTargets(request.generator, pex_binaries)
×
161

162

163
# -----------------------------------------------------------------------------------------------
164
# `pex_binary` rules
165
# -----------------------------------------------------------------------------------------------
166

167

UNCOV
168
@rule(desc="Determining the entry point for a `pex_binary` target")
×
UNCOV
169
async def resolve_pex_entry_point(request: ResolvePexEntryPointRequest) -> ResolvedPexEntryPoint:
×
170
    ep_val = request.entry_point_field.value
×
171
    if ep_val is None:
×
172
        return ResolvedPexEntryPoint(None, file_name_used=False)
×
173
    address = request.entry_point_field.address
×
174

175
    # We support several different schemes:
176
    #  1) `path.to.module` => preserve exactly.
177
    #  2) `path.to.module:func` => preserve exactly.
178
    #  3) `app.py` => convert into `path.to.app`.
179
    #  4) `app.py:func` => convert into `path.to.app:func`.
180

181
    # If it's already a module (cases #1 and #2), simply use that. Otherwise, convert the file name
182
    # into a module path (cases #3 and #4).
183
    if not ep_val.module.endswith(".py"):
×
184
        return ResolvedPexEntryPoint(ep_val, file_name_used=False)
×
185

186
    # Use the engine to validate that the file exists and that it resolves to only one file.
187
    full_glob = os.path.join(address.spec_path, ep_val.module)
×
188
    entry_point_paths = await path_globs_to_paths(
×
189
        PathGlobs(
190
            [full_glob],
191
            glob_match_error_behavior=GlobMatchErrorBehavior.error,
192
            description_of_origin=f"{address}'s `{request.entry_point_field.alias}` field",
193
        )
194
    )
195

196
    # We will have already raised if the glob did not match, i.e. if there were no files. But
197
    # we need to check if they used a file glob (`*` or `**`) that resolved to >1 file.
198
    if len(entry_point_paths.files) != 1:
×
199
        raise InvalidFieldException(
×
200
            softwrap(
201
                f"""
202
                Multiple files matched for the `{request.entry_point_field.alias}`
203
                {ep_val.spec!r} for the target {address}, but only one file expected. Are you using
204
                a glob, rather than a file name?
205

206
                All matching files: {list(entry_point_paths.files)}.
207
                """
208
            )
209
        )
210
    entry_point_path = entry_point_paths.files[0]
×
211
    source_root = await get_source_root(SourceRootRequest.for_file(entry_point_path))
×
212
    stripped_source_path = os.path.relpath(entry_point_path, source_root.path)
×
213
    module_base, _ = os.path.splitext(stripped_source_path)
×
214
    normalized_path = module_base.replace(os.path.sep, ".")
×
215
    return ResolvedPexEntryPoint(
×
216
        dataclasses.replace(ep_val, module=normalized_path), file_name_used=True
217
    )
218

219

UNCOV
220
@dataclass(frozen=True)
×
UNCOV
221
class PexBinaryEntryPointDependencyInferenceFieldSet(FieldSet):
×
UNCOV
222
    required_fields = (PexBinaryDependenciesField, PexEntryPointField, PythonResolveField)
×
223

224
    dependencies: PexBinaryDependenciesField
225
    entry_point: PexEntryPointField
226
    resolve: PythonResolveField
227

228

UNCOV
229
class InferPexBinaryEntryPointDependency(InferDependenciesRequest):
×
UNCOV
230
    infer_from = PexBinaryEntryPointDependencyInferenceFieldSet
×
231

232

UNCOV
233
@rule(desc="Inferring dependency from the pex_binary `entry_point` field")
×
UNCOV
234
async def infer_pex_binary_entry_point_dependency(
×
235
    request: InferPexBinaryEntryPointDependency,
236
    python_infer_subsystem: PythonInferSubsystem,
237
    python_setup: PythonSetup,
238
) -> InferredDependencies:
239
    if not python_infer_subsystem.entry_points:
×
240
        return InferredDependencies([])
×
241

242
    entry_point_field = request.field_set.entry_point
×
243
    if entry_point_field.value is None:
×
244
        return InferredDependencies([])
×
245

246
    explicitly_provided_deps, entry_point = await concurrently(
×
247
        determine_explicitly_provided_dependencies(
248
            **implicitly(DependenciesRequest(request.field_set.dependencies))
249
        ),
250
        resolve_pex_entry_point(ResolvePexEntryPointRequest(entry_point_field)),
251
    )
252
    if entry_point.val is None:
×
253
        return InferredDependencies([])
×
254

255
    # Only set locality if needed, to avoid unnecessary rule graph memoization misses.
256
    # When set, use the source root, which is useful in practice, but incurs fewer memoization
257
    # misses than using the full spec_path.
258
    locality = None
×
259
    if python_infer_subsystem.ambiguity_resolution == AmbiguityResolution.by_source_root:
×
260
        source_root = await get_source_root(
×
261
            SourceRootRequest.for_address(request.field_set.address)
262
        )
263
        locality = source_root.path
×
264

265
    owners = await map_module_to_address(
×
266
        PythonModuleOwnersRequest(
267
            entry_point.val.module,
268
            resolve=request.field_set.resolve.normalized_value(python_setup),
269
            locality=locality,
270
        ),
271
        **implicitly(),
272
    )
273
    address = request.field_set.address
×
274
    explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
×
275
        owners.ambiguous,
276
        address,
277
        # If the entry point was specified as a file, like `app.py`, we know the module must
278
        # live in the pex_binary's directory or subdirectory, so the owners must be ancestors.
279
        owners_must_be_ancestors=entry_point.file_name_used,
280
        import_reference="module",
281
        context=softwrap(
282
            f"""
283
            The pex_binary target {address} has the field
284
            `entry_point={repr(entry_point_field.value.spec)}`, which
285
            maps to the Python module `{entry_point.val.module}`
286
            """
287
        ),
288
    )
289
    maybe_disambiguated = explicitly_provided_deps.disambiguated(
×
290
        owners.ambiguous, owners_must_be_ancestors=entry_point.file_name_used
291
    )
292

293
    unambiguous_owners = _determine_entry_point_owner(
×
294
        maybe_disambiguated,
295
        owners,
296
        unresolved_ambiguity_handler=lambda: _handle_unresolved_pex_entrypoint(
297
            address,
298
            entry_point_field.value,
299
            owners.ambiguous,
300
            python_infer_subsystem.unowned_dependency_behavior,
301
        ),
302
    )
303
    return InferredDependencies(unambiguous_owners)
×
304

305

UNCOV
306
def _determine_entry_point_owner(
×
307
    maybe_disambiguated: Address | None,
308
    owners: PythonModuleOwners,
309
    unresolved_ambiguity_handler: Callable[[], None],
310
) -> tuple[Address, ...]:
311
    """Determine what should be the unambiguous owner for a PEX's entrypoint.
312

313
    This might be empty.
314
    """
315
    if owners.unambiguous:
×
316
        return owners.unambiguous
×
317
    elif maybe_disambiguated:
×
318
        return (maybe_disambiguated,)
×
319
    elif owners.ambiguous and not maybe_disambiguated:
×
320
        unresolved_ambiguity_handler()
×
321
        return ()
×
322
    else:
323
        return ()
×
324

325

UNCOV
326
def _handle_unresolved_pex_entrypoint(
×
327
    address: Address,
328
    entry_point: str,
329
    ambiguous_owners: tuple[Address, ...],
330
    unowned_dependency_behavior: UnownedDependencyUsage,
331
) -> None:
332
    """Raise an error if we could not disambiguate an entrypoint for the PEX."""
333
    msg = softwrap(
×
334
        f"""
335
        Pants cannot resolve the entrypoint for the target {address}.
336
        The entrypoint {entry_point} might refer to the following:
337

338
        {bullet_list(o.spec for o in ambiguous_owners)}
339
        """
340
    )
341
    if unowned_dependency_behavior is UnownedDependencyUsage.DoNothing:
×
342
        pass
×
343
    elif unowned_dependency_behavior is UnownedDependencyUsage.LogWarning:
×
344
        logger.warning(msg)
×
345
    else:
346
        raise UnownedDependencyError(msg)
×
347

348

349
# -----------------------------------------------------------------------------------------------
350
# `python_distribution` rules
351
# -----------------------------------------------------------------------------------------------
352

353

UNCOV
354
_EntryPointsDictType = dict[str, dict[str, str]]
×
355

356

UNCOV
357
def _classify_entry_points(
×
358
    all_entry_points: _EntryPointsDictType,
359
) -> Generator[tuple[bool, str, str, str], None, None]:
360
    """Looks at each entry point to see if it is a target address or not.
361

362
    Yields tuples: is_target, category, name, entry_point_str.
363
    """
364
    for category, entry_points in all_entry_points.items():
×
365
        for name, entry_point_str in entry_points.items():
×
366
            yield (
×
367
                entry_point_str.startswith(":") or "/" in entry_point_str,
368
                category,
369
                name,
370
                entry_point_str,
371
            )
372

373

UNCOV
374
@rule(desc="Determining the entry points for a `python_distribution` target")
×
UNCOV
375
async def resolve_python_distribution_entry_points(
×
376
    request: ResolvePythonDistributionEntryPointsRequest,
377
) -> ResolvedPythonDistributionEntryPoints:
378
    if request.entry_points_field:
×
379
        if request.entry_points_field.value is None:
×
380
            return ResolvedPythonDistributionEntryPoints()
×
381
        address = request.entry_points_field.address
×
382
        all_entry_points = cast(_EntryPointsDictType, request.entry_points_field.value)
×
383
        description_of_origin = (
×
384
            f"the `{request.entry_points_field.alias}` field from the target {address}"
385
        )
386

387
    elif request.provides_field:
×
388
        address = request.provides_field.address
×
389
        provides_field_value = cast(
×
390
            _EntryPointsDictType, request.provides_field.value.kwargs.get("entry_points") or {}
391
        )
392
        if not provides_field_value:
×
393
            return ResolvedPythonDistributionEntryPoints()
×
394

395
        all_entry_points = provides_field_value
×
396
        description_of_origin = softwrap(
×
397
            f"""
398
            the `entry_points` argument from the `{request.provides_field.alias}` field from
399
            the target {address}
400
            """
401
        )
402

403
    else:
404
        return ResolvedPythonDistributionEntryPoints()
×
405

406
    classified_entry_points = list(_classify_entry_points(all_entry_points))
×
407

408
    # Pick out all target addresses up front, so we can use concurrently later.
409
    #
410
    # This calls for a bit of trickery however (using the "y_by_x" mapping dicts), so we keep track
411
    # of which address belongs to which entry point. I.e. the `address_by_ref` and
412
    # `binary_entry_point_by_address` variables.
413

414
    target_refs = [
×
415
        entry_point_str for is_target, _, _, entry_point_str in classified_entry_points if is_target
416
    ]
417

418
    # Intermediate step, as resolve_targets() returns a deduplicated set which breaks in case of
419
    # multiple input refs that map to the same target.
420
    target_addresses = await resolve_unparsed_address_inputs(
×
421
        UnparsedAddressInputs(
422
            target_refs,
423
            owning_address=address,
424
            description_of_origin=description_of_origin,
425
        ),
426
        **implicitly(),
427
    )
428
    address_by_ref = dict(zip(target_refs, target_addresses))
×
429
    targets = await resolve_targets(**implicitly(target_addresses))
×
430

431
    # Check that we only have targets with a pex entry_point field.
432
    for target in targets:
×
433
        if not target.has_field(PexEntryPointField):
×
434
            raise InvalidEntryPoint(
×
435
                softwrap(
436
                    f"""
437
                    All target addresses in the entry_points field must be for pex_binary targets,
438
                    but the target {address} includes the value {target.address}, which has the
439
                    target type {target.alias}.
440

441
                    Alternatively, you can use a module like "project.app:main".
442
                    See {doc_url("docs/python/overview/building-distributions")}.
443
                    """
444
                )
445
            )
446

447
    binary_entry_points = await concurrently(
×
448
        resolve_pex_entry_point(ResolvePexEntryPointRequest(target[PexEntryPointField]))
449
        for target in targets
450
    )
451
    binary_entry_point_by_address = {
×
452
        target.address: entry_point for target, entry_point in zip(targets, binary_entry_points)
453
    }
454

455
    entry_points: DefaultDict[str, dict[str, PythonDistributionEntryPoint]] = defaultdict(dict)
×
456

457
    # Parse refs/replace with resolved pex entry point, and validate console entry points have function.
458
    for is_target, category, name, ref in classified_entry_points:
×
459
        owner: Address | None = None
×
460
        if is_target:
×
461
            owner = address_by_ref[ref]
×
462
            entry_point = binary_entry_point_by_address[owner].val
×
463
            if entry_point is None:
×
464
                logger.warning(
×
465
                    softwrap(
466
                        f"""
467
                        The entry point {name} in {category} references a pex_binary target {ref}
468
                        which does not set `entry_point`. Skipping.
469
                        """
470
                    )
471
                )
472
                continue
×
473
        else:
474
            entry_point = EntryPoint.parse(ref, f"{name} for {address} {category}")
×
475

476
        if category in ["console_scripts", "gui_scripts"] and not entry_point.function:
×
477
            url = "https://python-packaging.readthedocs.io/en/latest/command-line-scripts.html#the-console-scripts-entry-point"
×
478
            raise InvalidEntryPoint(
×
479
                softwrap(
480
                    f"""
481
                    Every entry point in `{category}` for {address} must end in the format
482
                    `:my_func`, but {name} set it to {entry_point.spec!r}. For example, set
483
                    `entry_points={{"{category}": {{"{name}": "{entry_point.module}:main}} }}`. See
484
                    {url}.
485
                    """
486
                )
487
            )
488

489
        entry_points[category][name] = PythonDistributionEntryPoint(entry_point, owner)
×
490

491
    return ResolvedPythonDistributionEntryPoints(
×
492
        FrozenDict(
493
            {category: FrozenDict(entry_points) for category, entry_points in entry_points.items()}
494
        )
495
    )
496

497

UNCOV
498
@dataclass(frozen=True)
×
UNCOV
499
class PythonDistributionDependenciesInferenceFieldSet(FieldSet):
×
UNCOV
500
    required_fields = (
×
501
        PythonDistributionDependenciesField,
502
        PythonDistributionEntryPointsField,
503
        PythonProvidesField,
504
    )
505

506
    dependencies: PythonDistributionDependenciesField
507
    entry_points: PythonDistributionEntryPointsField
508
    provides: PythonProvidesField
509

510

UNCOV
511
class InferPythonDistributionDependencies(InferDependenciesRequest):
×
UNCOV
512
    infer_from = PythonDistributionDependenciesInferenceFieldSet
×
513

514

UNCOV
515
def get_python_distribution_entry_point_unambiguous_module_owners(
×
516
    address: Address,
517
    entry_point_group: str,  # group is the pypa term; aka category or namespace
518
    entry_point_name: str,
519
    entry_point: EntryPoint,
520
    explicitly_provided_deps: ExplicitlyProvidedDependencies,
521
    owners: PythonModuleOwners,
522
) -> tuple[Address, ...]:
523
    field_str = repr({entry_point_group: {entry_point_name: entry_point.spec}})
×
524
    explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
×
525
        owners.ambiguous,
526
        address,
527
        import_reference="module",
528
        context=softwrap(
529
            f"""
530
            The python_distribution target {address} has the field
531
            `entry_points={field_str}`, which maps to the Python module
532
            `{entry_point.module}`
533
            """
534
        ),
535
    )
536
    maybe_disambiguated = explicitly_provided_deps.disambiguated(owners.ambiguous)
×
537
    unambiguous_owners = owners.unambiguous or (
×
538
        (maybe_disambiguated,) if maybe_disambiguated else ()
539
    )
540
    return unambiguous_owners
×
541

542

UNCOV
543
@rule
×
UNCOV
544
async def infer_python_distribution_dependencies(
×
545
    request: InferPythonDistributionDependencies, python_infer_subsystem: PythonInferSubsystem
546
) -> InferredDependencies:
547
    """Infer dependencies that we can infer from entry points in the distribution."""
548
    if not python_infer_subsystem.entry_points:
×
549
        return InferredDependencies([])
×
550

551
    explicitly_provided_deps, distribution_entry_points, provides_entry_points = await concurrently(
×
552
        determine_explicitly_provided_dependencies(
553
            **implicitly(DependenciesRequest(request.field_set.dependencies))
554
        ),
555
        resolve_python_distribution_entry_points(
556
            ResolvePythonDistributionEntryPointsRequest(
557
                entry_points_field=request.field_set.entry_points
558
            )
559
        ),
560
        resolve_python_distribution_entry_points(
561
            ResolvePythonDistributionEntryPointsRequest(provides_field=request.field_set.provides)
562
        ),
563
    )
564

565
    address = request.field_set.address
×
566
    all_module_entry_points = [
×
567
        (category, name, entry_point)
568
        for category, entry_points in chain(
569
            distribution_entry_points.explicit_modules.items(),
570
            provides_entry_points.explicit_modules.items(),
571
        )
572
        for name, entry_point in entry_points.items()
573
    ]
574
    all_module_owners = iter(
×
575
        await concurrently(
576
            map_module_to_address(
577
                PythonModuleOwnersRequest(entry_point.module, resolve=None), **implicitly()
578
            )
579
            for _, _, entry_point in all_module_entry_points
580
        )
581
    )
582
    module_owners: OrderedSet[Address] = OrderedSet()
×
583
    for (category, name, entry_point), owners in zip(all_module_entry_points, all_module_owners):
×
584
        module_owners.update(
×
585
            get_python_distribution_entry_point_unambiguous_module_owners(
586
                address, category, name, entry_point, explicitly_provided_deps, owners
587
            )
588
        )
589

590
    return InferredDependencies(
×
591
        Addresses(module_owners)
592
        + distribution_entry_points.pex_binary_addresses
593
        + provides_entry_points.pex_binary_addresses
594
    )
595

596

597
# -----------------------------------------------------------------------------------------------
598
# Dynamic Field defaults
599
# -----------------------------------------------------------------------------------------------
600

601

UNCOV
602
class PythonResolveFieldDefaultFactoryRequest(FieldDefaultFactoryRequest):
×
UNCOV
603
    field_type = PythonResolveField
×
604

605

UNCOV
606
@rule
×
UNCOV
607
async def python_resolve_field_default_factory(
×
608
    request: PythonResolveFieldDefaultFactoryRequest,
609
    python_setup: PythonSetup,
610
) -> FieldDefaultFactoryResult:
611
    return FieldDefaultFactoryResult(lambda f: f.normalized_value(python_setup))
×
612

613

614
# -----------------------------------------------------------------------------------------------
615
# Dependency validation
616
# -----------------------------------------------------------------------------------------------
617

618

UNCOV
619
@dataclass(frozen=True)
×
UNCOV
620
class DependencyValidationFieldSet(FieldSet):
×
UNCOV
621
    required_fields = (InterpreterConstraintsField,)
×
622

623
    interpreter_constraints: InterpreterConstraintsField
UNCOV
624
    resolve: PythonResolveField | None = None
×
625

626

UNCOV
627
class PythonValidateDependenciesRequest(ValidateDependenciesRequest):
×
UNCOV
628
    field_set_type = DependencyValidationFieldSet
×
629

630

UNCOV
631
@rule
×
UNCOV
632
async def validate_python_dependencies(
×
633
    request: PythonValidateDependenciesRequest,
634
    python_setup: PythonSetup,
635
) -> ValidatedDependencies:
636
    dependencies = await concurrently(
×
637
        resolve_target(
638
            WrappedTargetRequest(
639
                d, description_of_origin=f"the dependencies of {request.field_set.address}"
640
            ),
641
            **implicitly(),
642
        )
643
        for d in request.dependencies
644
    )
645

646
    # Validate that the ICs for dependencies are all compatible with our own.
647
    target_ics = request.field_set.interpreter_constraints.value_or_configured_default(
×
648
        python_setup, request.field_set.resolve
649
    )
650
    non_subset_items = []
×
651
    for dep in dependencies:
×
652
        if not dep.target.has_field(InterpreterConstraintsField):
×
653
            continue
×
654
        dep_ics = dep.target[InterpreterConstraintsField].value_or_configured_default(
×
655
            python_setup,
656
            dep.target[PythonResolveField] if dep.target.has_field(PythonResolveField) else None,
657
        )
658
        if not interpreter_constraints_contains(
×
659
            dep_ics, target_ics, python_setup.interpreter_versions_universe
660
        ):
661
            non_subset_items.append(f"{dep_ics}: {dep.target.address}")
×
662

663
    if non_subset_items:
×
664
        raise InvalidFieldException(
×
665
            softwrap(
666
                f"""
667
            The target {request.field_set.address} has the `interpreter_constraints` {target_ics},
668
            which are not a subset of the `interpreter_constraints` of some of its dependencies:
669

670
            {bullet_list(sorted(non_subset_items))}
671

672
            To fix this, you should likely adjust {request.field_set.address}'s
673
            `interpreter_constraints` to match the narrowest range in the above list.
674
            """
675
            )
676
        )
677

678
    return ValidatedDependencies()
×
679

680

UNCOV
681
def rules():
×
UNCOV
682
    return (
×
683
        *collect_rules(),
684
        *import_rules(),
685
        UnionRule(FieldDefaultFactoryRequest, PythonResolveFieldDefaultFactoryRequest),
686
        UnionRule(TargetFilesGeneratorSettingsRequest, PythonFilesGeneratorSettingsRequest),
687
        UnionRule(GenerateTargetsRequest, GenerateTargetsFromPexBinaries),
688
        UnionRule(InferDependenciesRequest, InferPexBinaryEntryPointDependency),
689
        UnionRule(InferDependenciesRequest, InferPythonDistributionDependencies),
690
        UnionRule(ValidateDependenciesRequest, PythonValidateDependenciesRequest),
691
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc