• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 26080722777

19 May 2026 06:37AM UTC coverage: 52.106% (-11.5%) from 63.597%
26080722777

Pull #23250

github

web-flow
Merge 63ec06323 into 2693df832
Pull Request #23250: Feature: Add generic option to docker image

12 of 50 new or added lines in 3 files covered. (24.0%)

5382 existing lines in 201 files now uncovered.

32053 of 61515 relevant lines covered (52.11%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.41
/src/python/pants/backend/python/dependency_inference/rules.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
2✔
5

6
import itertools
2✔
7
import logging
2✔
8
from collections.abc import Iterable
2✔
9
from dataclasses import dataclass
2✔
10
from enum import Enum
2✔
11
from pathlib import PurePath
2✔
12

13
from pants.backend.python.dependency_inference import module_mapper, parse_python_dependencies
2✔
14
from pants.backend.python.dependency_inference.default_unowned_dependencies import (
2✔
15
    DEFAULT_UNOWNED_DEPENDENCIES,
16
)
17
from pants.backend.python.dependency_inference.module_mapper import (
2✔
18
    PythonModuleOwners,
19
    PythonModuleOwnersRequest,
20
    ResolveName,
21
    map_module_to_address,
22
    module_from_stripped_path,
23
)
24
from pants.backend.python.dependency_inference.parse_python_dependencies import (
2✔
25
    ParsedPythonAssetPaths,
26
    ParsedPythonImports,
27
    ParsePythonDependenciesRequest,
28
    PythonFileDependencies,
29
)
30
from pants.backend.python.dependency_inference.parse_python_dependencies import (
2✔
31
    parse_python_dependencies as parse_python_dependencies_get,
32
)
33
from pants.backend.python.dependency_inference.subsystem import (
2✔
34
    AmbiguityResolution,
35
    InitFilesInference,
36
    PythonInferSubsystem,
37
)
38
from pants.backend.python.subsystems.setup import PythonSetup
2✔
39
from pants.backend.python.target_types import (
2✔
40
    InterpreterConstraintsField,
41
    PythonDependenciesField,
42
    PythonResolveField,
43
    PythonSourceField,
44
    PythonTestSourceField,
45
)
46
from pants.backend.python.util_rules import ancestor_files, pex
2✔
47
from pants.backend.python.util_rules.ancestor_files import AncestorFilesRequest, find_ancestor_files
2✔
48
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
2✔
49
from pants.core import target_types
2✔
50
from pants.core.target_types import AllAssetTargetsByPath, map_assets_by_path
2✔
51
from pants.core.util_rules import stripped_source_files
2✔
52
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
2✔
53
from pants.core.util_rules.unowned_dependency_behavior import (
2✔
54
    UnownedDependencyError,
55
    UnownedDependencyUsage,
56
)
57
from pants.engine.addresses import Address, Addresses
2✔
58
from pants.engine.internals.build_files import DELETED_ADDRESS
2✔
59
from pants.engine.internals.graph import (
2✔
60
    OwnersRequest,
61
    determine_explicitly_provided_dependencies,
62
    find_owners,
63
    resolve_targets,
64
)
65
from pants.engine.rules import concurrently, implicitly, rule
2✔
66
from pants.engine.target import (
2✔
67
    DependenciesRequest,
68
    ExplicitlyProvidedDependencies,
69
    FieldSet,
70
    InferDependenciesRequest,
71
    InferredDependencies,
72
)
73
from pants.engine.unions import UnionRule
2✔
74
from pants.source.source_root import (
2✔
75
    SourceRootRequest,
76
    SourceRootsRequest,
77
    get_optional_source_roots,
78
    get_source_root,
79
)
80
from pants.util.docutil import doc_url
2✔
81
from pants.util.strutil import bullet_list, softwrap
2✔
82
from pants.vcs.changed import DeletedFiles, get_deleted_files
2✔
83

84
logger = logging.getLogger(__name__)
2✔
85

86

87
@dataclass(frozen=True)
2✔
88
class PythonImportDependenciesInferenceFieldSet(FieldSet):
2✔
89
    required_fields = (
2✔
90
        PythonSourceField,
91
        PythonDependenciesField,
92
        PythonResolveField,
93
        InterpreterConstraintsField,
94
    )
95

96
    source: PythonSourceField
2✔
97
    dependencies: PythonDependenciesField
2✔
98
    resolve: PythonResolveField
2✔
99
    interpreter_constraints: InterpreterConstraintsField
2✔
100

101

102
class InferPythonImportDependencies(InferDependenciesRequest):
2✔
103
    infer_from = PythonImportDependenciesInferenceFieldSet
2✔
104

105

106
def _get_inferred_asset_deps(
2✔
107
    address: Address,
108
    request_file_path: str,
109
    assets_by_path: AllAssetTargetsByPath,
110
    assets: ParsedPythonAssetPaths,
111
    explicitly_provided_deps: ExplicitlyProvidedDependencies,
112
) -> dict[str, ImportResolveResult]:
113
    def _resolve_single_asset(filepath) -> ImportResolveResult:
×
114
        # NB: Resources in Python's ecosystem are loaded relative to a package, so we only try and
115
        # query for a resource relative to requesting module's path
116
        # (I.e. we assume the user is doing something like `pkgutil.get_data(__file__, "foo/bar")`)
117
        # See https://docs.python.org/3/library/pkgutil.html#pkgutil.get_data
118
        # and Pants' own docs on resources.
119
        #
120
        # Files in Pants are always loaded relative to the build root without any source root
121
        # stripping, so we use the full filepath to query for files.
122
        # (I.e. we assume the user is doing something like `open("src/python/configs/prod.json")`)
123
        #
124
        # In either case we could also try and query based on the others' key, however this will
125
        # almost always lead to a false positive.
126
        resource_path = PurePath(request_file_path).parent / filepath
×
127
        file_path = PurePath(filepath)
×
128

129
        inferred_resource_tgts = assets_by_path.resources.get(resource_path, frozenset())
×
130
        inferred_file_tgts = assets_by_path.files.get(file_path, frozenset())
×
131
        inferred_tgts = inferred_resource_tgts | inferred_file_tgts
×
132

133
        if inferred_tgts:
×
134
            possible_addresses = tuple(tgt.address for tgt in inferred_tgts)
×
135
            if len(possible_addresses) == 1:
×
136
                return ImportResolveResult(ImportOwnerStatus.unambiguous, possible_addresses)
×
137

138
            explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
×
139
                possible_addresses,
140
                address,
141
                import_reference="asset",
142
                context=f"The target {address} uses `{filepath}`",
143
            )
144
            maybe_disambiguated = explicitly_provided_deps.disambiguated(possible_addresses)
×
145
            if maybe_disambiguated:
×
146
                return ImportResolveResult(ImportOwnerStatus.disambiguated, (maybe_disambiguated,))
×
147
            else:
148
                return ImportResolveResult(ImportOwnerStatus.ambiguous)
×
149
        else:
150
            return ImportResolveResult(ImportOwnerStatus.unowned)
×
151

152
    return {filepath: _resolve_single_asset(filepath) for filepath in assets}
×
153

154

155
class ImportOwnerStatus(Enum):
2✔
156
    unambiguous = "unambiguous"
2✔
157
    disambiguated = "disambiguated"
2✔
158
    ambiguous = "ambiguous"
2✔
159
    unowned = "unowned"
2✔
160
    weak_ignore = "weak_ignore"
2✔
161
    unownable = "unownable"
2✔
162

163

164
@dataclass(frozen=True)
2✔
165
class ImportResolveResult:
2✔
166
    status: ImportOwnerStatus
2✔
167
    address: tuple[Address, ...] = ()
2✔
168

169

170
def _get_imports_info(
2✔
171
    address: Address,
172
    owners_per_import: Iterable[PythonModuleOwners],
173
    parsed_imports: ParsedPythonImports,
174
    explicitly_provided_deps: ExplicitlyProvidedDependencies,
175
) -> dict[str, ImportResolveResult]:
176
    def _resolve_single_import(owners, import_name) -> ImportResolveResult:
2✔
177
        if owners.unambiguous:
2✔
UNCOV
178
            return ImportResolveResult(ImportOwnerStatus.unambiguous, owners.unambiguous)
×
179

180
        explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
2✔
181
            owners.ambiguous,
182
            address,
183
            import_reference="module",
184
            context=f"The target {address} imports `{import_name}`",
185
        )
186
        maybe_disambiguated = explicitly_provided_deps.disambiguated(owners.ambiguous)
2✔
187
        if maybe_disambiguated:
2✔
188
            return ImportResolveResult(ImportOwnerStatus.disambiguated, (maybe_disambiguated,))
×
189
        elif import_name.split(".")[0] in DEFAULT_UNOWNED_DEPENDENCIES:
2✔
190
            return ImportResolveResult(ImportOwnerStatus.unownable)
2✔
191
        elif parsed_imports[import_name].weak:
2✔
192
            return ImportResolveResult(ImportOwnerStatus.weak_ignore)
×
193
        else:
194
            return ImportResolveResult(ImportOwnerStatus.unowned)
2✔
195

196
    return {
2✔
197
        imp: _resolve_single_import(owners, imp)
198
        for owners, (imp, inf) in zip(owners_per_import, parsed_imports.items())
199
    }
200

201

202
def _collect_imports_info(
2✔
203
    resolve_result: dict[str, ImportResolveResult],
204
) -> tuple[frozenset[Address], frozenset[str]]:
205
    """Collect import resolution results into:
206

207
    - imports (direct and disambiguated)
208
    - unowned
209
    """
210

211
    return frozenset(
2✔
212
        addr
213
        for dep in resolve_result.values()
214
        for addr in dep.address
215
        if (
216
            dep.status == ImportOwnerStatus.unambiguous
217
            or dep.status == ImportOwnerStatus.disambiguated
218
        )
219
    ), frozenset(
220
        imp for imp, dep in resolve_result.items() if dep.status == ImportOwnerStatus.unowned
221
    )
222

223

224
def _remove_ignored_imports(
2✔
225
    unowned_imports: frozenset[str], ignored_paths: tuple[str, ...]
226
) -> frozenset[str]:
227
    """Remove unowned imports given a list of paths to ignore.
228

229
    E.g. having
230
    ```
231
    import foo.bar
232
    from foo.bar import baz
233
    import foo.barley
234
    ```
235

236
    and passing `ignored-paths=["foo.bar"]`, only `foo.bar` and `foo.bar.baz` will be ignored.
237
    """
238
    if not ignored_paths:
2✔
239
        return unowned_imports
2✔
240

241
    unowned_imports_filtered = set()
×
242
    for unowned_import in unowned_imports:
×
243
        if not any(
×
244
            unowned_import == ignored_path or unowned_import.startswith(f"{ignored_path}.")
245
            for ignored_path in ignored_paths
246
        ):
247
            unowned_imports_filtered.add(unowned_import)
×
248
    return frozenset(unowned_imports_filtered)
×
249

250

251
@dataclass(frozen=True)
2✔
252
class UnownedImportsPossibleOwnersRequest:
2✔
253
    """A request to find possible owners for several imports originating in a resolve."""
254

255
    unowned_imports: frozenset[str]
2✔
256
    original_resolve: str
2✔
257

258

259
@dataclass(frozen=True)
2✔
260
class UnownedImportPossibleOwnerRequest:
2✔
261
    unowned_import: str
2✔
262
    original_resolve: str
2✔
263

264

265
@dataclass(frozen=True)
2✔
266
class UnownedImportsPossibleOwners:
2✔
267
    value: dict[str, list[tuple[Address, ResolveName]]]
2✔
268

269

270
@dataclass(frozen=True)
2✔
271
class UnownedImportPossibleOwners:
2✔
272
    value: list[tuple[Address, ResolveName]]
2✔
273

274

275
async def _find_other_owners_for_unowned_imports(
2✔
276
    req: UnownedImportsPossibleOwnersRequest,
277
) -> UnownedImportsPossibleOwners:
278
    individual_possible_owners = await concurrently(
×
279
        find_other_owners_for_unowned_import(
280
            UnownedImportPossibleOwnerRequest(r, req.original_resolve), **implicitly()
281
        )
282
        for r in req.unowned_imports
283
    )
284

285
    return UnownedImportsPossibleOwners(
×
286
        {
287
            imported_module: possible_owners.value
288
            for imported_module, possible_owners in zip(
289
                req.unowned_imports, individual_possible_owners
290
            )
291
            if possible_owners.value
292
        }
293
    )
294

295

296
@rule
2✔
297
async def find_other_owners_for_unowned_import(
2✔
298
    req: UnownedImportPossibleOwnerRequest,
299
    python_setup: PythonSetup,
300
) -> UnownedImportPossibleOwners:
301
    other_owner_from_other_resolves = await map_module_to_address(
×
302
        PythonModuleOwnersRequest(req.unowned_import, resolve=None, locality=None), **implicitly()
303
    )
304

305
    owners = other_owner_from_other_resolves
×
306
    other_owners_as_targets = await resolve_targets(
×
307
        **implicitly(Addresses(owners.unambiguous + owners.ambiguous))
308
    )
309

310
    other_owners = []
×
311

312
    for t in other_owners_as_targets:
×
313
        other_owner_resolve = t[PythonResolveField].normalized_value(python_setup)
×
314
        if other_owner_resolve != req.original_resolve:
×
315
            other_owners.append((t.address, other_owner_resolve))
×
316
    return UnownedImportPossibleOwners(other_owners)
×
317

318

319
async def _handle_unowned_imports(
2✔
320
    address: Address,
321
    unowned_dependency_behavior: UnownedDependencyUsage,
322
    python_setup: PythonSetup,
323
    unowned_imports: frozenset[str],
324
    parsed_imports: ParsedPythonImports,
325
    resolve: str,
326
) -> None:
327
    if not unowned_imports or unowned_dependency_behavior is UnownedDependencyUsage.DoNothing:
2✔
328
        return
2✔
329

330
    other_resolves_snippet = ""
2✔
331
    if len(python_setup.resolves) > 1:
2✔
332
        imports_to_other_owners = (
×
333
            await _find_other_owners_for_unowned_imports(
334
                UnownedImportsPossibleOwnersRequest(unowned_imports, resolve),
335
            )
336
        ).value
337

338
        if imports_to_other_owners:
×
339
            other_resolves_lines = []
×
340
            for import_module, other_owners in sorted(imports_to_other_owners.items()):
×
341
                owners_txt = ", ".join(
×
342
                    f"'{other_resolve}' from {addr}" for addr, other_resolve in sorted(other_owners)
343
                )
344
                other_resolves_lines.append(f"{import_module}: {owners_txt}")
×
345
            other_resolves_snippet = "\n\n" + softwrap(
×
346
                f"""
347
                These imports are not in the resolve used by the target (`{resolve}`), but they
348
                were present in other resolves:
349

350
                {bullet_list(other_resolves_lines)}\n\n
351
                """
352
            )
353

354
    unowned_imports_with_lines = [
2✔
355
        f"{module_name} (line: {parsed_imports[module_name].lineno})"
356
        for module_name in sorted(unowned_imports)
357
    ]
358

359
    msg = softwrap(
2✔
360
        f"""
361
        Pants cannot infer owners for the following imports in the target {address}:
362

363
        {bullet_list(unowned_imports_with_lines)}{other_resolves_snippet}
364

365
        If you do not expect an import to be inferable, add `# pants: no-infer-dep` to the
366
        import line. Otherwise, see
367
        {doc_url("docs/using-pants/troubleshooting-common-issues#import-errors-and-missing-dependencies")} for common problems.
368
        """
369
    )
370
    if unowned_dependency_behavior is UnownedDependencyUsage.LogWarning:
2✔
371
        logger.warning(msg)
2✔
372
    else:
373
        raise UnownedDependencyError(msg)
×
374

375

376
async def _exec_parse_deps(
2✔
377
    field_set: PythonImportDependenciesInferenceFieldSet,
378
    python_setup: PythonSetup,
379
) -> PythonFileDependencies:
380
    source = await determine_source_files(SourceFilesRequest([field_set.source]))
2✔
381
    resp = await parse_python_dependencies_get(
2✔
382
        ParsePythonDependenciesRequest(
383
            source,
384
        ),
385
        **implicitly(),
386
    )
387
    assert len(resp.path_to_deps) == 1
2✔
388
    return next(iter(resp.path_to_deps.values()))
2✔
389

390

391
@dataclass(frozen=True)
2✔
392
class ResolvedParsedPythonDependenciesRequest:
2✔
393
    field_set: PythonImportDependenciesInferenceFieldSet
2✔
394
    parsed_dependencies: PythonFileDependencies
2✔
395
    resolve: str | None
2✔
396

397

398
@dataclass(frozen=True)
2✔
399
class ResolvedParsedPythonDependencies:
2✔
400
    resolve_results: dict[str, ImportResolveResult]
2✔
401
    assets: dict[str, ImportResolveResult]
2✔
402
    explicit: ExplicitlyProvidedDependencies
2✔
403

404

405
@rule
2✔
406
async def resolve_parsed_dependencies(
2✔
407
    request: ResolvedParsedPythonDependenciesRequest,
408
    python_infer_subsystem: PythonInferSubsystem,
409
) -> ResolvedParsedPythonDependencies:
410
    """Find the owning targets for the parsed dependencies."""
411

412
    parsed_imports = request.parsed_dependencies.imports
2✔
413
    parsed_assets = request.parsed_dependencies.assets
2✔
414
    if not python_infer_subsystem.imports:
2✔
415
        parsed_imports = ParsedPythonImports([])
×
416

417
    explicitly_provided_deps = await determine_explicitly_provided_dependencies(
2✔
418
        **implicitly(DependenciesRequest(request.field_set.dependencies))
419
    )
420

421
    # Only set locality if needed, to avoid unnecessary rule graph memoization misses.
422
    # When set, use the source root, which is useful in practice, but incurs fewer memoization
423
    # misses than using the full spec_path.
424
    locality = None
2✔
425
    if python_infer_subsystem.ambiguity_resolution == AmbiguityResolution.by_source_root:
2✔
426
        source_root = await get_source_root(
×
427
            SourceRootRequest.for_address(request.field_set.address)
428
        )
429
        locality = source_root.path
×
430

431
    if parsed_imports:
2✔
432
        owners_per_import = await concurrently(
2✔
433
            map_module_to_address(
434
                PythonModuleOwnersRequest(imported_module, request.resolve, locality),
435
                **implicitly(),
436
            )
437
            for imported_module in parsed_imports
438
        )
439
        resolve_results = _get_imports_info(
2✔
440
            address=request.field_set.address,
441
            owners_per_import=owners_per_import,
442
            parsed_imports=parsed_imports,
443
            explicitly_provided_deps=explicitly_provided_deps,
444
        )
445
    else:
446
        resolve_results = {}
2✔
447

448
    if parsed_assets:
2✔
449
        assets_by_path = await map_assets_by_path(**implicitly())
×
450
        asset_deps = _get_inferred_asset_deps(
×
451
            request.field_set.address,
452
            request.field_set.source.file_path,
453
            assets_by_path,
454
            parsed_assets,
455
            explicitly_provided_deps,
456
        )
457
    else:
458
        asset_deps = {}
2✔
459

460
    return ResolvedParsedPythonDependencies(
2✔
461
        resolve_results=resolve_results,
462
        assets=asset_deps,
463
        explicit=explicitly_provided_deps,
464
    )
465

466

467
@rule(desc="Inferring Python dependencies by analyzing source")
2✔
468
async def infer_python_dependencies_via_source(
2✔
469
    request: InferPythonImportDependencies,
470
    python_infer_subsystem: PythonInferSubsystem,
471
    python_setup: PythonSetup,
472
) -> InferredDependencies:
473
    if not python_infer_subsystem.imports and not python_infer_subsystem.assets:
2✔
474
        return InferredDependencies([])
×
475

476
    parsed_dependencies = await _exec_parse_deps(request.field_set, python_setup)
2✔
477

478
    resolve = request.field_set.resolve.normalized_value(python_setup)
2✔
479

480
    resolved_dependencies = await resolve_parsed_dependencies(
2✔
481
        ResolvedParsedPythonDependenciesRequest(request.field_set, parsed_dependencies, resolve),
482
        **implicitly(),
483
    )
484
    import_deps, unowned_imports = _collect_imports_info(resolved_dependencies.resolve_results)
2✔
485
    unowned_imports = _remove_ignored_imports(
2✔
486
        unowned_imports, python_infer_subsystem.ignored_unowned_imports
487
    )
488

489
    asset_deps, unowned_assets = _collect_imports_info(resolved_dependencies.assets)
2✔
490

491
    inferred_deps = import_deps | asset_deps
2✔
492

493
    await _handle_unowned_imports(
2✔
494
        request.field_set.address,
495
        python_infer_subsystem.unowned_dependency_behavior,
496
        python_setup,
497
        unowned_imports,
498
        parsed_dependencies.imports,
499
        resolve=resolve,
500
    )
501

502
    if unowned_imports:
2✔
503
        # Let's see if any of the unowned imports were provided by a deleted file.
504
        deleted_files: DeletedFiles = await get_deleted_files(**implicitly())
2✔
505
        deleted_python_files = tuple(f for f in deleted_files.paths if f.endswith((".py", ".pyi")))
2✔
506
        if deleted_python_files:
2✔
507
            source_roots_result = await get_optional_source_roots(
×
508
                SourceRootsRequest.for_files(deleted_python_files)
509
            )
510
            # We drop files not under a source root, since they can't be used for import anyway.
511
            stripped_deleted_python_files = tuple(
×
512
                f.relative_to(opt_root.source_root.path)
513
                for f, opt_root in source_roots_result.path_to_optional_root.items()
514
                if opt_root.source_root
515
            )
516
            deleted_modules = tuple(
×
517
                module_from_stripped_path(f) for f in stripped_deleted_python_files
518
            )
519
            for impt in unowned_imports:
×
520
                for mod in deleted_modules:
×
521
                    if impt == mod or (impt.startswith(mod) and impt[len(mod)] == "."):
×
522
                        # At least one unowned import was provided by a deleted file, so we
523
                        # inject a dep on the DeletedTarget pseudo-target.
524
                        inferred_deps = frozenset(inferred_deps | {DELETED_ADDRESS})
×
525
                        break
×
526

527
    return InferredDependencies(sorted(inferred_deps))
2✔
528

529

530
@dataclass(frozen=True)
2✔
531
class InitDependenciesInferenceFieldSet(FieldSet):
2✔
532
    required_fields = (PythonSourceField, PythonResolveField)
2✔
533

534
    source: PythonSourceField
2✔
535
    resolve: PythonResolveField
2✔
536

537

538
class InferInitDependencies(InferDependenciesRequest):
2✔
539
    infer_from = InitDependenciesInferenceFieldSet
2✔
540

541

542
@rule(desc="Inferring dependencies on `__init__.py` files")
2✔
543
async def infer_python_init_dependencies(
2✔
544
    request: InferInitDependencies,
545
    python_infer_subsystem: PythonInferSubsystem,
546
    python_setup: PythonSetup,
547
) -> InferredDependencies:
548
    if python_infer_subsystem.init_files is InitFilesInference.never:
2✔
549
        return InferredDependencies([])
×
550

551
    ignore_empty_files = python_infer_subsystem.init_files is InitFilesInference.content_only
2✔
552
    fp = request.field_set.source.file_path
2✔
553
    assert fp is not None
2✔
554
    init_files = await find_ancestor_files(
2✔
555
        AncestorFilesRequest(
556
            input_files=(fp,),
557
            requested=("__init__.py", "__init__.pyi"),
558
            ignore_empty_files=ignore_empty_files,
559
        )
560
    )
561
    owners = await concurrently(
2✔
562
        find_owners(OwnersRequest((f,)), **implicitly()) for f in init_files.snapshot.files
563
    )
564

565
    owner_tgts = await resolve_targets(
2✔
566
        **implicitly(Addresses(itertools.chain.from_iterable(owners)))
567
    )
568
    resolve = request.field_set.resolve.normalized_value(python_setup)
2✔
569
    python_owners = [
2✔
570
        tgt.address
571
        for tgt in owner_tgts
572
        if (
573
            tgt.has_field(PythonSourceField)
574
            and tgt[PythonResolveField].normalized_value(python_setup) == resolve
575
        )
576
    ]
577
    return InferredDependencies(python_owners)
2✔
578

579

580
@dataclass(frozen=True)
2✔
581
class ConftestDependenciesInferenceFieldSet(FieldSet):
2✔
582
    required_fields = (PythonTestSourceField, PythonResolveField)
2✔
583

584
    source: PythonTestSourceField
2✔
585
    resolve: PythonResolveField
2✔
586

587

588
class InferConftestDependencies(InferDependenciesRequest):
2✔
589
    infer_from = ConftestDependenciesInferenceFieldSet
2✔
590

591

592
@rule(desc="Inferring dependencies on `conftest.py` files")
2✔
593
async def infer_python_conftest_dependencies(
2✔
594
    request: InferConftestDependencies,
595
    python_infer_subsystem: PythonInferSubsystem,
596
    python_setup: PythonSetup,
597
) -> InferredDependencies:
598
    if not python_infer_subsystem.conftests:
2✔
599
        return InferredDependencies([])
×
600

601
    fp = request.field_set.source.file_path
2✔
602
    assert fp is not None
2✔
603
    conftest_files = await find_ancestor_files(
2✔
604
        AncestorFilesRequest(input_files=(fp,), requested=("conftest.py",))
605
    )
606
    owners = await concurrently(
2✔
607
        # NB: Because conftest.py files effectively always have content, we require an
608
        # owning target.
609
        find_owners(
610
            OwnersRequest((f,), owners_not_found_behavior=GlobMatchErrorBehavior.error),
611
            **implicitly(),
612
        )
613
        for f in conftest_files.snapshot.files
614
    )
615

616
    owner_tgts = await resolve_targets(
2✔
617
        **implicitly(Addresses(itertools.chain.from_iterable(owners)))
618
    )
619
    resolve = request.field_set.resolve.normalized_value(python_setup)
2✔
620
    python_owners = [
2✔
621
        tgt.address
622
        for tgt in owner_tgts
623
        if (
624
            tgt.has_field(PythonSourceField)
625
            and tgt[PythonResolveField].normalized_value(python_setup) == resolve
626
        )
627
    ]
628
    return InferredDependencies(python_owners)
2✔
629

630

631
# This is a separate function to facilitate tests registering import inference.
632
def import_rules():
2✔
633
    return [
2✔
634
        resolve_parsed_dependencies,
635
        find_other_owners_for_unowned_import,
636
        infer_python_dependencies_via_source,
637
        *pex.rules(),
638
        *parse_python_dependencies.rules(),
639
        *module_mapper.rules(),
640
        *stripped_source_files.rules(),
641
        *target_types.rules(),
642
        *PythonInferSubsystem.rules(),
643
        *PythonSetup.rules(),
644
        UnionRule(InferDependenciesRequest, InferPythonImportDependencies),
645
    ]
646

647

648
def rules():
2✔
649
    return [
2✔
650
        *import_rules(),
651
        infer_python_init_dependencies,
652
        infer_python_conftest_dependencies,
653
        *ancestor_files.rules(),
654
        UnionRule(InferDependenciesRequest, InferInitDependencies),
655
        UnionRule(InferDependenciesRequest, InferConftestDependencies),
656
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc