• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 25441711719

06 May 2026 02:31PM UTC coverage: 92.915%. Remained the same
25441711719

push

github

web-flow
use sha pin (with comment) format for generated actions (#23312)

Per the GitHub Action best practices we recently enabled at #23249, we
should pin each action to a SHA so that the reference is actually
immutable.

This will -- I hope -- knock out a large chunk of the 421 alerts we
currently get from zizmor. The next followup would then be upgrades and
harmonizing the generated and none-generated pins.

Notice: This idea was suggested by Claude while going over pinact output
and I was surprised to see that post processing the yaml wasn't too
gross.

92206 of 99237 relevant lines covered (92.91%)

4.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.67
/src/python/pants/backend/python/dependency_inference/rules.py
1
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import itertools
12✔
7
import logging
12✔
8
from collections.abc import Iterable
12✔
9
from dataclasses import dataclass
12✔
10
from enum import Enum
12✔
11
from pathlib import PurePath
12✔
12

13
from pants.backend.python.dependency_inference import module_mapper, parse_python_dependencies
12✔
14
from pants.backend.python.dependency_inference.default_unowned_dependencies import (
12✔
15
    DEFAULT_UNOWNED_DEPENDENCIES,
16
)
17
from pants.backend.python.dependency_inference.module_mapper import (
12✔
18
    PythonModuleOwners,
19
    PythonModuleOwnersRequest,
20
    ResolveName,
21
    map_module_to_address,
22
    module_from_stripped_path,
23
)
24
from pants.backend.python.dependency_inference.parse_python_dependencies import (
12✔
25
    ParsedPythonAssetPaths,
26
    ParsedPythonImports,
27
    ParsePythonDependenciesRequest,
28
    PythonFileDependencies,
29
)
30
from pants.backend.python.dependency_inference.parse_python_dependencies import (
12✔
31
    parse_python_dependencies as parse_python_dependencies_get,
32
)
33
from pants.backend.python.dependency_inference.subsystem import (
12✔
34
    AmbiguityResolution,
35
    InitFilesInference,
36
    PythonInferSubsystem,
37
)
38
from pants.backend.python.subsystems.setup import PythonSetup
12✔
39
from pants.backend.python.target_types import (
12✔
40
    InterpreterConstraintsField,
41
    PythonDependenciesField,
42
    PythonResolveField,
43
    PythonSourceField,
44
    PythonTestSourceField,
45
)
46
from pants.backend.python.util_rules import ancestor_files, pex
12✔
47
from pants.backend.python.util_rules.ancestor_files import AncestorFilesRequest, find_ancestor_files
12✔
48
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
12✔
49
from pants.core import target_types
12✔
50
from pants.core.target_types import AllAssetTargetsByPath, map_assets_by_path
12✔
51
from pants.core.util_rules import stripped_source_files
12✔
52
from pants.core.util_rules.source_files import SourceFilesRequest, determine_source_files
12✔
53
from pants.core.util_rules.unowned_dependency_behavior import (
12✔
54
    UnownedDependencyError,
55
    UnownedDependencyUsage,
56
)
57
from pants.engine.addresses import Address, Addresses
12✔
58
from pants.engine.internals.build_files import DELETED_ADDRESS
12✔
59
from pants.engine.internals.graph import (
12✔
60
    OwnersRequest,
61
    determine_explicitly_provided_dependencies,
62
    find_owners,
63
    resolve_targets,
64
)
65
from pants.engine.rules import concurrently, implicitly, rule
12✔
66
from pants.engine.target import (
12✔
67
    DependenciesRequest,
68
    ExplicitlyProvidedDependencies,
69
    FieldSet,
70
    InferDependenciesRequest,
71
    InferredDependencies,
72
)
73
from pants.engine.unions import UnionRule
12✔
74
from pants.source.source_root import (
12✔
75
    SourceRootRequest,
76
    SourceRootsRequest,
77
    get_optional_source_roots,
78
    get_source_root,
79
)
80
from pants.util.docutil import doc_url
12✔
81
from pants.util.strutil import bullet_list, softwrap
12✔
82
from pants.vcs.changed import DeletedFiles, get_deleted_files
12✔
83

84
logger = logging.getLogger(__name__)
12✔
85

86

87
@dataclass(frozen=True)
12✔
88
class PythonImportDependenciesInferenceFieldSet(FieldSet):
12✔
89
    required_fields = (
12✔
90
        PythonSourceField,
91
        PythonDependenciesField,
92
        PythonResolveField,
93
        InterpreterConstraintsField,
94
    )
95

96
    source: PythonSourceField
12✔
97
    dependencies: PythonDependenciesField
12✔
98
    resolve: PythonResolveField
12✔
99
    interpreter_constraints: InterpreterConstraintsField
12✔
100

101

102
class InferPythonImportDependencies(InferDependenciesRequest):
12✔
103
    infer_from = PythonImportDependenciesInferenceFieldSet
12✔
104

105

106
def _get_inferred_asset_deps(
12✔
107
    address: Address,
108
    request_file_path: str,
109
    assets_by_path: AllAssetTargetsByPath,
110
    assets: ParsedPythonAssetPaths,
111
    explicitly_provided_deps: ExplicitlyProvidedDependencies,
112
) -> dict[str, ImportResolveResult]:
113
    def _resolve_single_asset(filepath) -> ImportResolveResult:
2✔
114
        # NB: Resources in Python's ecosystem are loaded relative to a package, so we only try and
115
        # query for a resource relative to requesting module's path
116
        # (I.e. we assume the user is doing something like `pkgutil.get_data(__file__, "foo/bar")`)
117
        # See https://docs.python.org/3/library/pkgutil.html#pkgutil.get_data
118
        # and Pants' own docs on resources.
119
        #
120
        # Files in Pants are always loaded relative to the build root without any source root
121
        # stripping, so we use the full filepath to query for files.
122
        # (I.e. we assume the user is doing something like `open("src/python/configs/prod.json")`)
123
        #
124
        # In either case we could also try and query based on the others' key, however this will
125
        # almost always lead to a false positive.
126
        resource_path = PurePath(request_file_path).parent / filepath
2✔
127
        file_path = PurePath(filepath)
2✔
128

129
        inferred_resource_tgts = assets_by_path.resources.get(resource_path, frozenset())
2✔
130
        inferred_file_tgts = assets_by_path.files.get(file_path, frozenset())
2✔
131
        inferred_tgts = inferred_resource_tgts | inferred_file_tgts
2✔
132

133
        if inferred_tgts:
2✔
134
            possible_addresses = tuple(tgt.address for tgt in inferred_tgts)
2✔
135
            if len(possible_addresses) == 1:
2✔
136
                return ImportResolveResult(ImportOwnerStatus.unambiguous, possible_addresses)
2✔
137

138
            explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
1✔
139
                possible_addresses,
140
                address,
141
                import_reference="asset",
142
                context=f"The target {address} uses `{filepath}`",
143
            )
144
            maybe_disambiguated = explicitly_provided_deps.disambiguated(possible_addresses)
1✔
145
            if maybe_disambiguated:
1✔
146
                return ImportResolveResult(ImportOwnerStatus.disambiguated, (maybe_disambiguated,))
1✔
147
            else:
148
                return ImportResolveResult(ImportOwnerStatus.ambiguous)
1✔
149
        else:
150
            return ImportResolveResult(ImportOwnerStatus.unowned)
×
151

152
    return {filepath: _resolve_single_asset(filepath) for filepath in assets}
2✔
153

154

155
class ImportOwnerStatus(Enum):
12✔
156
    unambiguous = "unambiguous"
12✔
157
    disambiguated = "disambiguated"
12✔
158
    ambiguous = "ambiguous"
12✔
159
    unowned = "unowned"
12✔
160
    weak_ignore = "weak_ignore"
12✔
161
    unownable = "unownable"
12✔
162

163

164
@dataclass(frozen=True)
12✔
165
class ImportResolveResult:
12✔
166
    status: ImportOwnerStatus
12✔
167
    address: tuple[Address, ...] = ()
12✔
168

169

170
def _get_imports_info(
12✔
171
    address: Address,
172
    owners_per_import: Iterable[PythonModuleOwners],
173
    parsed_imports: ParsedPythonImports,
174
    explicitly_provided_deps: ExplicitlyProvidedDependencies,
175
) -> dict[str, ImportResolveResult]:
176
    def _resolve_single_import(owners, import_name) -> ImportResolveResult:
11✔
177
        if owners.unambiguous:
11✔
178
            return ImportResolveResult(ImportOwnerStatus.unambiguous, owners.unambiguous)
8✔
179

180
        explicitly_provided_deps.maybe_warn_of_ambiguous_dependency_inference(
8✔
181
            owners.ambiguous,
182
            address,
183
            import_reference="module",
184
            context=f"The target {address} imports `{import_name}`",
185
        )
186
        maybe_disambiguated = explicitly_provided_deps.disambiguated(owners.ambiguous)
8✔
187
        if maybe_disambiguated:
8✔
188
            return ImportResolveResult(ImportOwnerStatus.disambiguated, (maybe_disambiguated,))
1✔
189
        elif import_name.split(".")[0] in DEFAULT_UNOWNED_DEPENDENCIES:
8✔
190
            return ImportResolveResult(ImportOwnerStatus.unownable)
8✔
191
        elif parsed_imports[import_name].weak:
7✔
192
            return ImportResolveResult(ImportOwnerStatus.weak_ignore)
2✔
193
        else:
194
            return ImportResolveResult(ImportOwnerStatus.unowned)
7✔
195

196
    return {
11✔
197
        imp: _resolve_single_import(owners, imp)
198
        for owners, (imp, inf) in zip(owners_per_import, parsed_imports.items())
199
    }
200

201

202
def _collect_imports_info(
12✔
203
    resolve_result: dict[str, ImportResolveResult],
204
) -> tuple[frozenset[Address], frozenset[str]]:
205
    """Collect import resolution results into:
206

207
    - imports (direct and disambiguated)
208
    - unowned
209
    """
210

211
    return frozenset(
11✔
212
        addr
213
        for dep in resolve_result.values()
214
        for addr in dep.address
215
        if (
216
            dep.status == ImportOwnerStatus.unambiguous
217
            or dep.status == ImportOwnerStatus.disambiguated
218
        )
219
    ), frozenset(
220
        imp for imp, dep in resolve_result.items() if dep.status == ImportOwnerStatus.unowned
221
    )
222

223

224
def _remove_ignored_imports(
12✔
225
    unowned_imports: frozenset[str], ignored_paths: tuple[str, ...]
226
) -> frozenset[str]:
227
    """Remove unowned imports given a list of paths to ignore.
228

229
    E.g. having
230
    ```
231
    import foo.bar
232
    from foo.bar import baz
233
    import foo.barley
234
    ```
235

236
    and passing `ignored-paths=["foo.bar"]`, only `foo.bar` and `foo.bar.baz` will be ignored.
237
    """
238
    if not ignored_paths:
11✔
239
        return unowned_imports
11✔
240

241
    unowned_imports_filtered = set()
1✔
242
    for unowned_import in unowned_imports:
1✔
243
        if not any(
1✔
244
            unowned_import == ignored_path or unowned_import.startswith(f"{ignored_path}.")
245
            for ignored_path in ignored_paths
246
        ):
247
            unowned_imports_filtered.add(unowned_import)
1✔
248
    return frozenset(unowned_imports_filtered)
1✔
249

250

251
@dataclass(frozen=True)
12✔
252
class UnownedImportsPossibleOwnersRequest:
12✔
253
    """A request to find possible owners for several imports originating in a resolve."""
254

255
    unowned_imports: frozenset[str]
12✔
256
    original_resolve: str
12✔
257

258

259
@dataclass(frozen=True)
12✔
260
class UnownedImportPossibleOwnerRequest:
12✔
261
    unowned_import: str
12✔
262
    original_resolve: str
12✔
263

264

265
@dataclass(frozen=True)
12✔
266
class UnownedImportsPossibleOwners:
12✔
267
    value: dict[str, list[tuple[Address, ResolveName]]]
12✔
268

269

270
@dataclass(frozen=True)
12✔
271
class UnownedImportPossibleOwners:
12✔
272
    value: list[tuple[Address, ResolveName]]
12✔
273

274

275
async def _find_other_owners_for_unowned_imports(
12✔
276
    req: UnownedImportsPossibleOwnersRequest,
277
) -> UnownedImportsPossibleOwners:
278
    individual_possible_owners = await concurrently(
3✔
279
        find_other_owners_for_unowned_import(
280
            UnownedImportPossibleOwnerRequest(r, req.original_resolve), **implicitly()
281
        )
282
        for r in req.unowned_imports
283
    )
284

285
    return UnownedImportsPossibleOwners(
3✔
286
        {
287
            imported_module: possible_owners.value
288
            for imported_module, possible_owners in zip(
289
                req.unowned_imports, individual_possible_owners
290
            )
291
            if possible_owners.value
292
        }
293
    )
294

295

296
@rule
12✔
297
async def find_other_owners_for_unowned_import(
12✔
298
    req: UnownedImportPossibleOwnerRequest,
299
    python_setup: PythonSetup,
300
) -> UnownedImportPossibleOwners:
301
    other_owner_from_other_resolves = await map_module_to_address(
3✔
302
        PythonModuleOwnersRequest(req.unowned_import, resolve=None, locality=None), **implicitly()
303
    )
304

305
    owners = other_owner_from_other_resolves
3✔
306
    other_owners_as_targets = await resolve_targets(
3✔
307
        **implicitly(Addresses(owners.unambiguous + owners.ambiguous))
308
    )
309

310
    other_owners = []
3✔
311

312
    for t in other_owners_as_targets:
3✔
313
        other_owner_resolve = t[PythonResolveField].normalized_value(python_setup)
2✔
314
        if other_owner_resolve != req.original_resolve:
2✔
315
            other_owners.append((t.address, other_owner_resolve))
2✔
316
    return UnownedImportPossibleOwners(other_owners)
3✔
317

318

319
async def _handle_unowned_imports(
12✔
320
    address: Address,
321
    unowned_dependency_behavior: UnownedDependencyUsage,
322
    python_setup: PythonSetup,
323
    unowned_imports: frozenset[str],
324
    parsed_imports: ParsedPythonImports,
325
    resolve: str,
326
) -> None:
327
    if not unowned_imports or unowned_dependency_behavior is UnownedDependencyUsage.DoNothing:
11✔
328
        return
11✔
329

330
    other_resolves_snippet = ""
6✔
331
    if len(python_setup.resolves) > 1:
6✔
332
        imports_to_other_owners = (
2✔
333
            await _find_other_owners_for_unowned_imports(
334
                UnownedImportsPossibleOwnersRequest(unowned_imports, resolve),
335
            )
336
        ).value
337

338
        if imports_to_other_owners:
2✔
339
            other_resolves_lines = []
1✔
340
            for import_module, other_owners in sorted(imports_to_other_owners.items()):
1✔
341
                owners_txt = ", ".join(
1✔
342
                    f"'{other_resolve}' from {addr}" for addr, other_resolve in sorted(other_owners)
343
                )
344
                other_resolves_lines.append(f"{import_module}: {owners_txt}")
1✔
345
            other_resolves_snippet = "\n\n" + softwrap(
1✔
346
                f"""
347
                These imports are not in the resolve used by the target (`{resolve}`), but they
348
                were present in other resolves:
349

350
                {bullet_list(other_resolves_lines)}\n\n
351
                """
352
            )
353

354
    unowned_imports_with_lines = [
6✔
355
        f"{module_name} (line: {parsed_imports[module_name].lineno})"
356
        for module_name in sorted(unowned_imports)
357
    ]
358

359
    msg = softwrap(
6✔
360
        f"""
361
        Pants cannot infer owners for the following imports in the target {address}:
362

363
        {bullet_list(unowned_imports_with_lines)}{other_resolves_snippet}
364

365
        If you do not expect an import to be inferable, add `# pants: no-infer-dep` to the
366
        import line. Otherwise, see
367
        {doc_url("docs/using-pants/troubleshooting-common-issues#import-errors-and-missing-dependencies")} for common problems.
368
        """
369
    )
370
    if unowned_dependency_behavior is UnownedDependencyUsage.LogWarning:
6✔
371
        logger.warning(msg)
6✔
372
    else:
373
        raise UnownedDependencyError(msg)
1✔
374

375

376
async def _exec_parse_deps(
12✔
377
    field_set: PythonImportDependenciesInferenceFieldSet,
378
    python_setup: PythonSetup,
379
) -> PythonFileDependencies:
380
    source = await determine_source_files(SourceFilesRequest([field_set.source]))
11✔
381
    resp = await parse_python_dependencies_get(
11✔
382
        ParsePythonDependenciesRequest(
383
            source,
384
        ),
385
        **implicitly(),
386
    )
387
    assert len(resp.path_to_deps) == 1
11✔
388
    return next(iter(resp.path_to_deps.values()))
11✔
389

390

391
@dataclass(frozen=True)
12✔
392
class ResolvedParsedPythonDependenciesRequest:
12✔
393
    field_set: PythonImportDependenciesInferenceFieldSet
12✔
394
    parsed_dependencies: PythonFileDependencies
12✔
395
    resolve: str | None
12✔
396

397

398
@dataclass(frozen=True)
12✔
399
class ResolvedParsedPythonDependencies:
12✔
400
    resolve_results: dict[str, ImportResolveResult]
12✔
401
    assets: dict[str, ImportResolveResult]
12✔
402
    explicit: ExplicitlyProvidedDependencies
12✔
403

404

405
@rule
12✔
406
async def resolve_parsed_dependencies(
12✔
407
    request: ResolvedParsedPythonDependenciesRequest,
408
    python_infer_subsystem: PythonInferSubsystem,
409
) -> ResolvedParsedPythonDependencies:
410
    """Find the owning targets for the parsed dependencies."""
411

412
    parsed_imports = request.parsed_dependencies.imports
11✔
413
    parsed_assets = request.parsed_dependencies.assets
11✔
414
    if not python_infer_subsystem.imports:
11✔
415
        parsed_imports = ParsedPythonImports([])
×
416

417
    explicitly_provided_deps = await determine_explicitly_provided_dependencies(
11✔
418
        **implicitly(DependenciesRequest(request.field_set.dependencies))
419
    )
420

421
    # Only set locality if needed, to avoid unnecessary rule graph memoization misses.
422
    # When set, use the source root, which is useful in practice, but incurs fewer memoization
423
    # misses than using the full spec_path.
424
    locality = None
11✔
425
    if python_infer_subsystem.ambiguity_resolution == AmbiguityResolution.by_source_root:
11✔
426
        source_root = await get_source_root(
×
427
            SourceRootRequest.for_address(request.field_set.address)
428
        )
429
        locality = source_root.path
×
430

431
    if parsed_imports:
11✔
432
        owners_per_import = await concurrently(
11✔
433
            map_module_to_address(
434
                PythonModuleOwnersRequest(imported_module, request.resolve, locality),
435
                **implicitly(),
436
            )
437
            for imported_module in parsed_imports
438
        )
439
        resolve_results = _get_imports_info(
11✔
440
            address=request.field_set.address,
441
            owners_per_import=owners_per_import,
442
            parsed_imports=parsed_imports,
443
            explicitly_provided_deps=explicitly_provided_deps,
444
        )
445
    else:
446
        resolve_results = {}
11✔
447

448
    if parsed_assets:
11✔
449
        assets_by_path = await map_assets_by_path(**implicitly())
2✔
450
        asset_deps = _get_inferred_asset_deps(
2✔
451
            request.field_set.address,
452
            request.field_set.source.file_path,
453
            assets_by_path,
454
            parsed_assets,
455
            explicitly_provided_deps,
456
        )
457
    else:
458
        asset_deps = {}
11✔
459

460
    return ResolvedParsedPythonDependencies(
11✔
461
        resolve_results=resolve_results,
462
        assets=asset_deps,
463
        explicit=explicitly_provided_deps,
464
    )
465

466

467
@rule(desc="Inferring Python dependencies by analyzing source")
12✔
468
async def infer_python_dependencies_via_source(
12✔
469
    request: InferPythonImportDependencies,
470
    python_infer_subsystem: PythonInferSubsystem,
471
    python_setup: PythonSetup,
472
) -> InferredDependencies:
473
    if not python_infer_subsystem.imports and not python_infer_subsystem.assets:
11✔
474
        return InferredDependencies([])
1✔
475

476
    parsed_dependencies = await _exec_parse_deps(request.field_set, python_setup)
11✔
477

478
    resolve = request.field_set.resolve.normalized_value(python_setup)
11✔
479

480
    resolved_dependencies = await resolve_parsed_dependencies(
11✔
481
        ResolvedParsedPythonDependenciesRequest(request.field_set, parsed_dependencies, resolve),
482
        **implicitly(),
483
    )
484
    import_deps, unowned_imports = _collect_imports_info(resolved_dependencies.resolve_results)
11✔
485
    unowned_imports = _remove_ignored_imports(
11✔
486
        unowned_imports, python_infer_subsystem.ignored_unowned_imports
487
    )
488

489
    asset_deps, unowned_assets = _collect_imports_info(resolved_dependencies.assets)
11✔
490

491
    inferred_deps = import_deps | asset_deps
11✔
492

493
    await _handle_unowned_imports(
11✔
494
        request.field_set.address,
495
        python_infer_subsystem.unowned_dependency_behavior,
496
        python_setup,
497
        unowned_imports,
498
        parsed_dependencies.imports,
499
        resolve=resolve,
500
    )
501

502
    if unowned_imports:
11✔
503
        # Let's see if any of the unowned imports were provided by a deleted file.
504
        deleted_files: DeletedFiles = await get_deleted_files(**implicitly())
6✔
505
        deleted_python_files = tuple(f for f in deleted_files.paths if f.endswith((".py", ".pyi")))
6✔
506
        if deleted_python_files:
6✔
507
            source_roots_result = await get_optional_source_roots(
×
508
                SourceRootsRequest.for_files(deleted_python_files)
509
            )
510
            # We drop files not under a source root, since they can't be used for import anyway.
511
            stripped_deleted_python_files = tuple(
×
512
                f.relative_to(opt_root.source_root.path)
513
                for f, opt_root in source_roots_result.path_to_optional_root.items()
514
                if opt_root.source_root
515
            )
516
            deleted_modules = tuple(
×
517
                module_from_stripped_path(f) for f in stripped_deleted_python_files
518
            )
519
            for impt in unowned_imports:
×
520
                for mod in deleted_modules:
×
521
                    if impt == mod or (impt.startswith(mod) and impt[len(mod)] == "."):
×
522
                        # At least one unowned import was provided by a deleted file, so we
523
                        # inject a dep on the DeletedTarget pseudo-target.
524
                        inferred_deps = frozenset(inferred_deps | {DELETED_ADDRESS})
×
525
                        break
×
526

527
    return InferredDependencies(sorted(inferred_deps))
11✔
528

529

530
@dataclass(frozen=True)
12✔
531
class InitDependenciesInferenceFieldSet(FieldSet):
12✔
532
    required_fields = (PythonSourceField, PythonResolveField)
12✔
533

534
    source: PythonSourceField
12✔
535
    resolve: PythonResolveField
12✔
536

537

538
class InferInitDependencies(InferDependenciesRequest):
12✔
539
    infer_from = InitDependenciesInferenceFieldSet
12✔
540

541

542
@rule(desc="Inferring dependencies on `__init__.py` files")
12✔
543
async def infer_python_init_dependencies(
12✔
544
    request: InferInitDependencies,
545
    python_infer_subsystem: PythonInferSubsystem,
546
    python_setup: PythonSetup,
547
) -> InferredDependencies:
548
    if python_infer_subsystem.init_files is InitFilesInference.never:
7✔
549
        return InferredDependencies([])
1✔
550

551
    ignore_empty_files = python_infer_subsystem.init_files is InitFilesInference.content_only
7✔
552
    fp = request.field_set.source.file_path
7✔
553
    assert fp is not None
7✔
554
    init_files = await find_ancestor_files(
7✔
555
        AncestorFilesRequest(
556
            input_files=(fp,),
557
            requested=("__init__.py", "__init__.pyi"),
558
            ignore_empty_files=ignore_empty_files,
559
        )
560
    )
561
    owners = await concurrently(
7✔
562
        find_owners(OwnersRequest((f,)), **implicitly()) for f in init_files.snapshot.files
563
    )
564

565
    owner_tgts = await resolve_targets(
7✔
566
        **implicitly(Addresses(itertools.chain.from_iterable(owners)))
567
    )
568
    resolve = request.field_set.resolve.normalized_value(python_setup)
7✔
569
    python_owners = [
7✔
570
        tgt.address
571
        for tgt in owner_tgts
572
        if (
573
            tgt.has_field(PythonSourceField)
574
            and tgt[PythonResolveField].normalized_value(python_setup) == resolve
575
        )
576
    ]
577
    return InferredDependencies(python_owners)
7✔
578

579

580
@dataclass(frozen=True)
12✔
581
class ConftestDependenciesInferenceFieldSet(FieldSet):
12✔
582
    required_fields = (PythonTestSourceField, PythonResolveField)
12✔
583

584
    source: PythonTestSourceField
12✔
585
    resolve: PythonResolveField
12✔
586

587

588
class InferConftestDependencies(InferDependenciesRequest):
12✔
589
    infer_from = ConftestDependenciesInferenceFieldSet
12✔
590

591

592
@rule(desc="Inferring dependencies on `conftest.py` files")
12✔
593
async def infer_python_conftest_dependencies(
12✔
594
    request: InferConftestDependencies,
595
    python_infer_subsystem: PythonInferSubsystem,
596
    python_setup: PythonSetup,
597
) -> InferredDependencies:
598
    if not python_infer_subsystem.conftests:
4✔
599
        return InferredDependencies([])
×
600

601
    fp = request.field_set.source.file_path
4✔
602
    assert fp is not None
4✔
603
    conftest_files = await find_ancestor_files(
4✔
604
        AncestorFilesRequest(input_files=(fp,), requested=("conftest.py",))
605
    )
606
    owners = await concurrently(
4✔
607
        # NB: Because conftest.py files effectively always have content, we require an
608
        # owning target.
609
        find_owners(
610
            OwnersRequest((f,), owners_not_found_behavior=GlobMatchErrorBehavior.error),
611
            **implicitly(),
612
        )
613
        for f in conftest_files.snapshot.files
614
    )
615

616
    owner_tgts = await resolve_targets(
4✔
617
        **implicitly(Addresses(itertools.chain.from_iterable(owners)))
618
    )
619
    resolve = request.field_set.resolve.normalized_value(python_setup)
4✔
620
    python_owners = [
4✔
621
        tgt.address
622
        for tgt in owner_tgts
623
        if (
624
            tgt.has_field(PythonSourceField)
625
            and tgt[PythonResolveField].normalized_value(python_setup) == resolve
626
        )
627
    ]
628
    return InferredDependencies(python_owners)
4✔
629

630

631
# This is a separate function to facilitate tests registering import inference.
632
def import_rules():
12✔
633
    return [
12✔
634
        resolve_parsed_dependencies,
635
        find_other_owners_for_unowned_import,
636
        infer_python_dependencies_via_source,
637
        *pex.rules(),
638
        *parse_python_dependencies.rules(),
639
        *module_mapper.rules(),
640
        *stripped_source_files.rules(),
641
        *target_types.rules(),
642
        *PythonInferSubsystem.rules(),
643
        *PythonSetup.rules(),
644
        UnionRule(InferDependenciesRequest, InferPythonImportDependencies),
645
    ]
646

647

648
def rules():
12✔
649
    return [
9✔
650
        *import_rules(),
651
        infer_python_init_dependencies,
652
        infer_python_conftest_dependencies,
653
        *ancestor_files.rules(),
654
        UnionRule(InferDependenciesRequest, InferInitDependencies),
655
        UnionRule(InferDependenciesRequest, InferConftestDependencies),
656
    ]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc