• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 19015773527

02 Nov 2025 05:33PM UTC coverage: 17.872% (-62.4%) from 80.3%
19015773527

Pull #22816

github

web-flow
Merge a12d75757 into 6c024e162
Pull Request #22816: Update Pants internal Python to 3.14

4 of 5 new or added lines in 3 files covered. (80.0%)

28452 existing lines in 683 files now uncovered.

9831 of 55007 relevant lines covered (17.87%)

0.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.41
/src/python/pants/engine/internals/build_files.py
1
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
1✔
5

6
import ast
1✔
7
import builtins
1✔
8
import itertools
1✔
9
import logging
1✔
10
import os.path
1✔
11
import sys
1✔
12
import typing
1✔
13
from collections import defaultdict
1✔
14
from collections.abc import Coroutine, Sequence
1✔
15
from dataclasses import dataclass
1✔
16
from pathlib import PurePath
1✔
17
from typing import Any, cast
1✔
18

19
import typing_extensions
1✔
20

21
from pants.build_graph.address import (
1✔
22
    Address,
23
    AddressInput,
24
    BuildFileAddress,
25
    BuildFileAddressRequest,
26
    MaybeAddress,
27
    ResolveError,
28
)
29
from pants.core.util_rules.env_vars import environment_vars_subset
1✔
30
from pants.engine.engine_aware import EngineAwareParameter
1✔
31
from pants.engine.env_vars import CompleteEnvironmentVars, EnvironmentVars, EnvironmentVarsRequest
1✔
32
from pants.engine.fs import FileContent, GlobMatchErrorBehavior, PathGlobs
1✔
33
from pants.engine.internals.defaults import BuildFileDefaults, BuildFileDefaultsParserState
1✔
34
from pants.engine.internals.dep_rules import (
1✔
35
    BuildFileDependencyRules,
36
    DependencyRuleApplication,
37
    MaybeBuildFileDependencyRulesImplementation,
38
)
39
from pants.engine.internals.mapper import AddressFamily, AddressMap, DuplicateNameError
1✔
40
from pants.engine.internals.parser import (
1✔
41
    BuildFilePreludeSymbols,
42
    BuildFileSymbolsInfo,
43
    Parser,
44
    error_on_imports,
45
)
46
from pants.engine.internals.selectors import concurrently
1✔
47
from pants.engine.internals.session import SessionValues
1✔
48
from pants.engine.internals.synthetic_targets import (
1✔
49
    SyntheticAddressMapsRequest,
50
    get_synthetic_address_maps,
51
)
52
from pants.engine.internals.target_adaptor import TargetAdaptor, TargetAdaptorRequest
1✔
53
from pants.engine.intrinsics import get_digest_contents, path_globs_to_paths
1✔
54
from pants.engine.rules import QueryRule, collect_rules, implicitly, rule
1✔
55
from pants.engine.target import (
1✔
56
    DependenciesRuleApplication,
57
    DependenciesRuleApplicationRequest,
58
    InvalidTargetException,
59
    RegisteredTargetTypes,
60
)
61
from pants.engine.unions import UnionMembership
1✔
62
from pants.init.bootstrap_scheduler import BootstrapStatus
1✔
63
from pants.option.global_options import GlobalOptions
1✔
64
from pants.util.frozendict import FrozenDict
1✔
65
from pants.util.strutil import softwrap
1✔
66

67
logger = logging.getLogger(__name__)
1✔
68

69

70
class BuildFileSyntaxError(SyntaxError):
1✔
71
    """An error parsing a BUILD file."""
72

73
    def from_syntax_error(error: SyntaxError) -> BuildFileSyntaxError:
1✔
UNCOV
74
        return BuildFileSyntaxError(
×
75
            error.msg,
76
            (
77
                error.filename,
78
                error.lineno,
79
                error.offset,
80
                error.text,
81
            ),
82
        )
83

84
    def __str__(self) -> str:
1✔
UNCOV
85
        first_line = f"Error parsing BUILD file {self.filename}:{self.lineno}: {self.msg}"
×
86
        # These two fields are optional per the spec, so we can't rely on them being set.
UNCOV
87
        if self.text is not None and self.offset is not None:
×
UNCOV
88
            second_line = f"  {self.text.rstrip()}"
×
UNCOV
89
            third_line = f"  {' ' * (self.offset - 1)}^"
×
UNCOV
90
            return f"{first_line}\n{second_line}\n{third_line}"
×
91

92
        return first_line
×
93

94

95
@dataclass(frozen=True)
1✔
96
class BuildFileOptions:
1✔
97
    patterns: tuple[str, ...]
1✔
98
    ignores: tuple[str, ...] = ()
1✔
99
    prelude_globs: tuple[str, ...] = ()
1✔
100

101

102
@rule
1✔
103
async def extract_build_file_options(
1✔
104
    global_options: GlobalOptions,
105
    bootstrap_status: BootstrapStatus,
106
) -> BuildFileOptions:
107
    return BuildFileOptions(
×
108
        patterns=global_options.build_patterns,
109
        ignores=global_options.build_ignore,
110
        prelude_globs=(
111
            () if bootstrap_status.in_progress else global_options.build_file_prelude_globs
112
        ),
113
    )
114

115

116
@rule(desc="Expand macros")
1✔
117
async def evaluate_preludes(
1✔
118
    build_file_options: BuildFileOptions,
119
    parser: Parser,
120
) -> BuildFilePreludeSymbols:
UNCOV
121
    prelude_digest_contents = await get_digest_contents(
×
122
        **implicitly(
123
            PathGlobs(
124
                build_file_options.prelude_globs,
125
                glob_match_error_behavior=GlobMatchErrorBehavior.ignore,
126
            )
127
        )
128
    )
UNCOV
129
    globals: dict[str, Any] = {
×
130
        # Later entries have precendence replacing conflicting keys from previous entries, so we
131
        # start with typing_extensions as the lowest prio source for global values.
132
        **{name: getattr(typing_extensions, name) for name in typing_extensions.__all__},
133
        **{name: getattr(typing, name) for name in typing.__all__},
134
        **{name: getattr(builtins, name) for name in dir(builtins) if name.endswith("Error")},
135
        # Ensure the globals for each prelude includes the builtin symbols (E.g. `python_sources`)
136
        # and any build file aliases (e.g. from plugins)
137
        **parser.symbols,
138
    }
UNCOV
139
    locals: dict[str, Any] = {}
×
UNCOV
140
    env_vars: set[str] = set()
×
UNCOV
141
    for file_content in prelude_digest_contents:
×
UNCOV
142
        try:
×
UNCOV
143
            file_content_str = file_content.content.decode()
×
UNCOV
144
            content = compile(file_content_str, file_content.path, "exec", dont_inherit=True)
×
UNCOV
145
            exec(content, globals, locals)
×
UNCOV
146
        except Exception as e:
×
UNCOV
147
            raise Exception(f"Error parsing prelude file {file_content.path}: {e}")
×
UNCOV
148
        error_on_imports(file_content_str, file_content.path)
×
UNCOV
149
        env_vars.update(BUILDFileEnvVarExtractor.get_env_vars(file_content))
×
150
    # __builtins__ is a dict, so isn't hashable, and can't be put in a FrozenDict.
151
    # Fortunately, we don't care about it - preludes should not be able to override builtins, so we just pop it out.
152
    # TODO: Give a nice error message if a prelude tries to set and expose a non-hashable value.
UNCOV
153
    locals.pop("__builtins__", None)
×
154
    # Ensure preludes can reference each other by populating the shared globals object with references
155
    # to the other symbols
UNCOV
156
    globals.update(locals)
×
UNCOV
157
    return BuildFilePreludeSymbols.create(locals, env_vars)
×
158

159

160
@rule
1✔
161
async def get_all_build_file_symbols_info(
1✔
162
    parser: Parser, prelude_symbols: BuildFilePreludeSymbols
163
) -> BuildFileSymbolsInfo:
164
    return BuildFileSymbolsInfo.from_info(
×
165
        parser.symbols_info.info.values(), prelude_symbols.info.values()
166
    )
167

168

169
@rule
1✔
170
async def maybe_resolve_address(address_input: AddressInput) -> MaybeAddress:
1✔
171
    # Determine the type of the path_component of the input.
172
    if address_input.path_component:
×
173
        paths = await path_globs_to_paths(PathGlobs(globs=(address_input.path_component,)))
×
174
        is_file, is_dir = bool(paths.files), bool(paths.dirs)
×
175
    else:
176
        # It is an address in the root directory.
177
        is_file, is_dir = False, True
×
178

179
    if is_file:
×
180
        return MaybeAddress(address_input.file_to_address())
×
181
    if is_dir:
×
182
        return MaybeAddress(address_input.dir_to_address())
×
183
    spec = address_input.path_component
×
184
    if address_input.target_component:
×
185
        spec += f":{address_input.target_component}"
×
186
    return MaybeAddress(
×
187
        ResolveError(
188
            softwrap(
189
                f"""
190
                The file or directory '{address_input.path_component}' does not exist on disk in
191
                the workspace, so the address '{spec}' from {address_input.description_of_origin}
192
                cannot be resolved.
193
                """
194
            )
195
        )
196
    )
197

198

199
@rule
1✔
200
async def resolve_address(maybe_address: MaybeAddress) -> Address:
1✔
201
    if isinstance(maybe_address.val, ResolveError):
×
202
        raise maybe_address.val
×
203
    return maybe_address.val
×
204

205

206
@dataclass(frozen=True)
1✔
207
class AddressFamilyDir(EngineAwareParameter):
1✔
208
    """The directory to find addresses for.
209

210
    This does _not_ recurse into subdirectories.
211
    """
212

213
    path: str
1✔
214

215
    def debug_hint(self) -> str:
1✔
216
        return self.path
×
217

218

219
@dataclass(frozen=True)
1✔
220
class OptionalAddressFamily:
1✔
221
    path: str
1✔
222
    address_family: AddressFamily | None = None
1✔
223

224
    def ensure(self) -> AddressFamily:
1✔
225
        if self.address_family is not None:
×
226
            return self.address_family
×
227
        raise ResolveError(f"Directory '{self.path}' does not contain any BUILD files.")
×
228

229

230
@rule
1✔
231
async def ensure_address_family(request: OptionalAddressFamily) -> AddressFamily:
1✔
232
    return request.ensure()
×
233

234

235
class BUILDFileEnvVarExtractor(ast.NodeVisitor):
1✔
236
    def __init__(self, filename: str):
1✔
UNCOV
237
        super().__init__()
×
UNCOV
238
        self.env_vars: set[str] = set()
×
UNCOV
239
        self.filename = filename
×
240

241
    @classmethod
1✔
242
    def get_env_vars(cls, file_content: FileContent) -> Sequence[str]:
1✔
UNCOV
243
        obj = cls(file_content.path)
×
UNCOV
244
        try:
×
UNCOV
245
            obj.visit(ast.parse(file_content.content, file_content.path))
×
UNCOV
246
        except SyntaxError as e:
×
UNCOV
247
            raise BuildFileSyntaxError.from_syntax_error(e).with_traceback(e.__traceback__)
×
248

UNCOV
249
        return tuple(obj.env_vars)
×
250

251
    def visit_Call(self, node: ast.Call):
1✔
UNCOV
252
        is_env = isinstance(node.func, ast.Name) and node.func.id == "env"
×
UNCOV
253
        for arg in node.args:
×
UNCOV
254
            if not is_env:
×
UNCOV
255
                self.visit(arg)
×
UNCOV
256
                continue
×
257

258
            # Only first arg may be checked as env name
UNCOV
259
            is_env = False
×
260

UNCOV
261
            if sys.version_info[0:2] < (3, 8):
×
262
                value = arg.s if isinstance(arg, ast.Str) else None
×
263
            else:
UNCOV
264
                value = arg.value if isinstance(arg, ast.Constant) else None
×
UNCOV
265
            if value:
×
266
                # Found env name in this call, we're done here.
UNCOV
267
                self.env_vars.add(value)  # type: ignore[arg-type]
×
UNCOV
268
                return
×
269
            else:
270
                logger.warning(
×
271
                    f"{self.filename}:{arg.lineno}: Only constant string values as variable name to "
272
                    f"`env()` is currently supported. This `env()` call will always result in "
273
                    "the default value only."
274
                )
275

UNCOV
276
        for kwarg in node.keywords:
×
UNCOV
277
            self.visit(kwarg)
×
278

279

280
@rule(desc="Search for addresses in BUILD files")
1✔
281
async def parse_address_family(
1✔
282
    directory: AddressFamilyDir,
283
    parser: Parser,
284
    bootstrap_status: BootstrapStatus,
285
    build_file_options: BuildFileOptions,
286
    prelude_symbols: BuildFilePreludeSymbols,
287
    registered_target_types: RegisteredTargetTypes,
288
    union_membership: UnionMembership,
289
    maybe_build_file_dependency_rules_implementation: MaybeBuildFileDependencyRulesImplementation,
290
    session_values: SessionValues,
291
) -> OptionalAddressFamily:
292
    """Given an AddressMapper and a directory, return an AddressFamily.
293

294
    The AddressFamily may be empty, but it will not be None.
295
    """
UNCOV
296
    digest_contents, all_synthetic_address_maps = await concurrently(
×
297
        get_digest_contents(
298
            **implicitly(
299
                PathGlobs(
300
                    globs=(
301
                        *(os.path.join(directory.path, p) for p in build_file_options.patterns),
302
                        *(f"!{p}" for p in build_file_options.ignores),
303
                    )
304
                )
305
            ),
306
        ),
307
        get_synthetic_address_maps(SyntheticAddressMapsRequest(directory.path), **implicitly()),
308
    )
UNCOV
309
    synthetic_address_maps = tuple(itertools.chain(all_synthetic_address_maps))
×
UNCOV
310
    if not digest_contents and not synthetic_address_maps:
×
311
        return OptionalAddressFamily(directory.path)
×
312

UNCOV
313
    defaults = BuildFileDefaults({})
×
UNCOV
314
    dependents_rules: BuildFileDependencyRules | None = None
×
UNCOV
315
    dependencies_rules: BuildFileDependencyRules | None = None
×
UNCOV
316
    parent_dirs = tuple(PurePath(directory.path).parents)
×
UNCOV
317
    if parent_dirs:
×
UNCOV
318
        maybe_parents = await concurrently(
×
319
            parse_address_family(AddressFamilyDir(str(parent_dir)), **implicitly())
320
            for parent_dir in parent_dirs
321
        )
UNCOV
322
        for maybe_parent in maybe_parents:
×
UNCOV
323
            if maybe_parent.address_family is not None:
×
UNCOV
324
                family = maybe_parent.address_family
×
UNCOV
325
                defaults = family.defaults
×
UNCOV
326
                dependents_rules = family.dependents_rules
×
UNCOV
327
                dependencies_rules = family.dependencies_rules
×
UNCOV
328
                break
×
329

UNCOV
330
    defaults_parser_state = BuildFileDefaultsParserState.create(
×
331
        directory.path, defaults, registered_target_types, union_membership
332
    )
UNCOV
333
    build_file_dependency_rules_class = (
×
334
        maybe_build_file_dependency_rules_implementation.build_file_dependency_rules_class
335
    )
UNCOV
336
    if build_file_dependency_rules_class is not None:
×
337
        dependents_rules_parser_state = build_file_dependency_rules_class.create_parser_state(
×
338
            directory.path,
339
            dependents_rules,
340
        )
341
        dependencies_rules_parser_state = build_file_dependency_rules_class.create_parser_state(
×
342
            directory.path,
343
            dependencies_rules,
344
        )
345
    else:
UNCOV
346
        dependents_rules_parser_state = None
×
UNCOV
347
        dependencies_rules_parser_state = None
×
348

UNCOV
349
    def _extract_env_vars(
×
350
        file_content: FileContent, extra_env: Sequence[str], env: CompleteEnvironmentVars
351
    ) -> Coroutine[Any, Any, EnvironmentVars]:
352
        """For BUILD file env vars, we only ever consult the local systems env."""
UNCOV
353
        env_vars = (*BUILDFileEnvVarExtractor.get_env_vars(file_content), *extra_env)
×
UNCOV
354
        return environment_vars_subset(EnvironmentVarsRequest(env_vars), env)
×
355

UNCOV
356
    all_env_vars = await concurrently(
×
357
        _extract_env_vars(
358
            fc, prelude_symbols.referenced_env_vars, session_values[CompleteEnvironmentVars]
359
        )
360
        for fc in digest_contents
361
    )
362

UNCOV
363
    declared_address_maps = [
×
364
        AddressMap.parse(
365
            fc.path,
366
            fc.content.decode(),
367
            parser,
368
            prelude_symbols,
369
            env_vars,
370
            bootstrap_status.in_progress,
371
            defaults_parser_state,
372
            dependents_rules_parser_state,
373
            dependencies_rules_parser_state,
374
        )
375
        for fc, env_vars in zip(digest_contents, all_env_vars)
376
    ]
UNCOV
377
    declared_address_maps.sort(key=lambda x: x.path)
×
378

379
    # Freeze defaults and dependency rules
UNCOV
380
    frozen_defaults = defaults_parser_state.get_frozen_defaults()
×
UNCOV
381
    frozen_dependents_rules = cast(
×
382
        "BuildFileDependencyRules | None",
383
        dependents_rules_parser_state
384
        and dependents_rules_parser_state.get_frozen_dependency_rules(),
385
    )
UNCOV
386
    frozen_dependencies_rules = cast(
×
387
        "BuildFileDependencyRules | None",
388
        dependencies_rules_parser_state
389
        and dependencies_rules_parser_state.get_frozen_dependency_rules(),
390
    )
391

392
    # Process synthetic targets.
393

UNCOV
394
    def apply_defaults(tgt: TargetAdaptor) -> TargetAdaptor:
×
UNCOV
395
        default_values = frozen_defaults.get(tgt.type_alias)
×
UNCOV
396
        if default_values is None:
×
397
            return tgt
×
UNCOV
398
        return tgt.with_new_kwargs(**{**default_values, **tgt.kwargs})
×
399

UNCOV
400
    name_to_path_and_synthetic_target: dict[str, tuple[str, TargetAdaptor]] = {}
×
UNCOV
401
    for synthetic_address_map in synthetic_address_maps:
×
UNCOV
402
        for name, target in synthetic_address_map.name_to_target_adaptor.items():
×
UNCOV
403
            name_to_path_and_synthetic_target[name] = (
×
404
                synthetic_address_map.path,
405
                apply_defaults(target),
406
            )
407

UNCOV
408
    name_to_path_and_declared_target: dict[str, tuple[str, TargetAdaptor]] = {}
×
UNCOV
409
    for declared_address_map in declared_address_maps:
×
UNCOV
410
        for name, target in declared_address_map.name_to_target_adaptor.items():
×
UNCOV
411
            if name in name_to_path_and_declared_target:
×
412
                # This is a duplicate declared name, raise an exception.
413
                duplicate_path = name_to_path_and_declared_target[name][0]
×
414
                raise DuplicateNameError(
×
415
                    f"A target already exists at `{duplicate_path}` with name `{name}` and target type "
416
                    f"`{target.type_alias}`. The `{name}` target in `{declared_address_map.path}` "
417
                    "cannot use the same name."
418
                )
419

UNCOV
420
            name_to_path_and_declared_target[name] = (declared_address_map.path, target)
×
421

422
    # We copy the dict so we can modify the original in the loop.
UNCOV
423
    for name, (
×
424
        declared_target_path,
425
        declared_target,
426
    ) in name_to_path_and_declared_target.copy().items():
427
        # Pop the synthetic target to let the declared target take precedence.
UNCOV
428
        synthetic_target_path, synthetic_target = name_to_path_and_synthetic_target.pop(
×
429
            name, (None, None)
430
        )
UNCOV
431
        if "_extend_synthetic" not in declared_target.kwargs:
×
432
            # The explicitly declared target should replace the synthetic one.
UNCOV
433
            continue
×
434

435
        # The _extend_synthetic kwarg was explicitly provided, so we must strip it.
UNCOV
436
        declared_target_kwargs = dict(declared_target.kwargs)
×
UNCOV
437
        extend_synthetic = declared_target_kwargs.pop("_extend_synthetic")
×
UNCOV
438
        if extend_synthetic:
×
UNCOV
439
            if synthetic_target is None:
×
440
                raise InvalidTargetException(
×
441
                    softwrap(
442
                        f"""
443
                            The `{declared_target.type_alias}` target {name!r} in {declared_target_path} has
444
                            `_extend_synthetic=True` but there is no synthetic target to extend.
445
                            """
446
                    )
447
                )
448

UNCOV
449
            if synthetic_target.type_alias != declared_target.type_alias:
×
450
                raise InvalidTargetException(
×
451
                    softwrap(
452
                        f"""
453
                        The `{declared_target.type_alias}` target {name!r} in {declared_target_path} is
454
                        of a different type than the synthetic target
455
                        `{synthetic_target.type_alias}` from {synthetic_target_path}.
456

457
                        When `_extend_synthetic` is true the target types must match, set this to
458
                        false if you want to replace the synthetic target with the target from your
459
                        BUILD file.
460
                        """
461
                    )
462
                )
463

464
            # Preserve synthetic field values not overriden by the declared target from the BUILD.
UNCOV
465
            kwargs = {**synthetic_target.kwargs, **declared_target_kwargs}
×
466
        else:
467
            kwargs = declared_target_kwargs
×
UNCOV
468
        name_to_path_and_declared_target[name] = (
×
469
            declared_target_path,
470
            declared_target.with_new_kwargs(**kwargs),
471
        )
472

473
    # Now reconstitute into AddressMaps, to pass into AddressFamily.create().
474
    # We no longer need to distinguish between synthetic and declared AddressMaps.
475
    # TODO: We might want to move the validation done by AddressFamily.create() to here, since
476
    #  we're already iterating over the AddressMap data, and simplify AddressFamily.
UNCOV
477
    path_to_targets = defaultdict(list)
×
UNCOV
478
    for name_to_path_and_target in [
×
479
        name_to_path_and_declared_target,
480
        name_to_path_and_synthetic_target,
481
    ]:
UNCOV
482
        for path_and_target in name_to_path_and_target.values():
×
UNCOV
483
            path_to_targets[path_and_target[0]].append(path_and_target[1])
×
UNCOV
484
    address_maps = [AddressMap.create(path, targets) for path, targets in path_to_targets.items()]
×
485

UNCOV
486
    return OptionalAddressFamily(
×
487
        directory.path,
488
        AddressFamily.create(
489
            spec_path=directory.path,
490
            address_maps=address_maps,
491
            defaults=frozen_defaults,
492
            dependents_rules=frozen_dependents_rules,
493
            dependencies_rules=frozen_dependencies_rules,
494
        ),
495
    )
496

497

498
@rule
1✔
499
async def find_build_file(request: BuildFileAddressRequest) -> BuildFileAddress:
1✔
500
    address = request.address
×
501
    address_family = await ensure_address_family(**implicitly(AddressFamilyDir(address.spec_path)))
×
502
    owning_address = address.maybe_convert_to_target_generator()
×
503
    if address_family.get_target_adaptor(owning_address) is None:
×
504
        raise ResolveError.did_you_mean(
×
505
            owning_address,
506
            description_of_origin=request.description_of_origin,
507
            known_names=address_family.target_names,
508
            namespace=address_family.namespace,
509
        )
510
    bfa = next(
×
511
        build_file_address
512
        for build_file_address in address_family.build_file_addresses
513
        if build_file_address.address == owning_address
514
    )
515
    return BuildFileAddress(address, bfa.rel_path) if address.is_generated_target else bfa
×
516

517

518
def _get_target_adaptor(
1✔
519
    address: Address, address_family: AddressFamily, description_of_origin: str
520
) -> TargetAdaptor:
521
    target_adaptor = address_family.get_target_adaptor(address)
×
522
    if target_adaptor is None:
×
523
        raise ResolveError.did_you_mean(
×
524
            address,
525
            description_of_origin=description_of_origin,
526
            known_names=address_family.target_names,
527
            namespace=address_family.namespace,
528
        )
529
    return target_adaptor
×
530

531

532
@rule
1✔
533
async def find_target_adaptor(request: TargetAdaptorRequest) -> TargetAdaptor:
1✔
534
    """Hydrate a TargetAdaptor so that it may be converted into the Target API."""
535
    address = request.address
×
536
    if address.is_generated_target:
×
537
        raise AssertionError(
×
538
            "Generated targets are not defined in BUILD files, and so do not have "
539
            f"TargetAdaptors: {request}"
540
        )
541
    address_family = await ensure_address_family(**implicitly(AddressFamilyDir(address.spec_path)))
×
542
    target_adaptor = _get_target_adaptor(address, address_family, request.description_of_origin)
×
543
    return target_adaptor
×
544

545

546
def _rules_path(address: Address) -> str:
1✔
547
    if address.is_file_target and os.path.sep in address.relative_file_path:  # type: ignore[operator]
×
548
        # The file is in a subdirectory of spec_path
549
        return os.path.dirname(address.filename)
×
550
    else:
551
        return address.spec_path
×
552

553

554
async def _get_target_family_and_adaptor_for_dep_rules(
1✔
555
    *addresses: Address, description_of_origin: str
556
) -> tuple[tuple[AddressFamily, TargetAdaptor], ...]:
557
    # Fetch up to 2 sets of address families per address, as we want the rules from the directory
558
    # the file is in rather than the directory where the target generator was declared, if not the
559
    # same.
560
    rules_paths = set(
×
561
        itertools.chain.from_iterable(
562
            {address.spec_path, _rules_path(address)} for address in addresses
563
        )
564
    )
565
    maybe_address_families = await concurrently(
×
566
        parse_address_family(AddressFamilyDir(rules_path), **implicitly())
567
        for rules_path in rules_paths
568
    )
569
    maybe_families = {maybe.path: maybe for maybe in maybe_address_families}
×
570

571
    return tuple(
×
572
        (
573
            (
574
                maybe_families[_rules_path(address)].address_family
575
                or maybe_families[address.spec_path].ensure()
576
            ),
577
            _get_target_adaptor(
578
                address,
579
                maybe_families[address.spec_path].ensure(),
580
                description_of_origin,
581
            ),
582
        )
583
        for address in addresses
584
    )
585

586

587
@rule
1✔
588
async def get_dependencies_rule_application(
1✔
589
    request: DependenciesRuleApplicationRequest,
590
    maybe_build_file_rules_implementation: MaybeBuildFileDependencyRulesImplementation,
591
) -> DependenciesRuleApplication:
592
    build_file_dependency_rules_class = (
×
593
        maybe_build_file_rules_implementation.build_file_dependency_rules_class
594
    )
595
    if build_file_dependency_rules_class is None:
×
596
        return DependenciesRuleApplication.allow_all()
×
597

598
    (
×
599
        (
600
            origin_rules_family,
601
            origin_target,
602
        ),
603
        *dependencies_family_adaptor,
604
    ) = await _get_target_family_and_adaptor_for_dep_rules(
605
        request.address,
606
        *request.dependencies,
607
        description_of_origin=request.description_of_origin,
608
    )
609

610
    dependencies_rule: dict[Address, DependencyRuleApplication] = {}
×
611
    for dependency_address, (dependency_rules_family, dependency_target) in zip(
×
612
        request.dependencies, dependencies_family_adaptor
613
    ):
614
        dependencies_rule[dependency_address] = (
×
615
            build_file_dependency_rules_class.check_dependency_rules(
616
                origin_address=request.address,
617
                origin_adaptor=origin_target,
618
                dependencies_rules=origin_rules_family.dependencies_rules,
619
                dependency_address=dependency_address,
620
                dependency_adaptor=dependency_target,
621
                dependents_rules=dependency_rules_family.dependents_rules,
622
            )
623
        )
624
    return DependenciesRuleApplication(request.address, FrozenDict(dependencies_rule))
×
625

626

627
def rules():
1✔
UNCOV
628
    return (
×
629
        *collect_rules(),
630
        # The `BuildFileSymbolsInfo` is consumed by the `HelpInfoExtracter` and uses the scheduler
631
        # session `product_request()` directly so we need an explicit QueryRule to provide this type
632
        # as an valid entrypoint into the rule graph.
633
        QueryRule(BuildFileSymbolsInfo, ()),
634
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc