• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 18252174847

05 Oct 2025 01:36AM UTC coverage: 43.382% (-36.9%) from 80.261%
18252174847

push

github

web-flow
run tests on mac arm (#22717)

Just doing the minimal to pull forward the x86_64 pattern.

ref #20993

25776 of 59416 relevant lines covered (43.38%)

1.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.7
/src/python/pants/engine/rules.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
3✔
5

6
import functools
3✔
7
import inspect
3✔
8
import sys
3✔
9
from collections.abc import Callable, Coroutine, Iterable, Mapping, Sequence
3✔
10
from dataclasses import dataclass
3✔
11
from enum import Enum
3✔
12
from types import FrameType, ModuleType
3✔
13
from typing import (
3✔
14
    Any,
15
    NotRequired,
16
    Protocol,
17
    TypedDict,
18
    TypeVar,
19
    Unpack,
20
    cast,
21
    get_type_hints,
22
    overload,
23
)
24

25
from typing_extensions import ParamSpec
3✔
26

27
from pants.engine.engine_aware import SideEffecting
3✔
28
from pants.engine.internals.rule_visitor import collect_awaitables
3✔
29
from pants.engine.internals.selectors import AwaitableConstraints, Call
3✔
30
from pants.engine.internals.selectors import Effect as Effect  # noqa: F401
3✔
31
from pants.engine.internals.selectors import Get as Get  # noqa: F401
3✔
32
from pants.engine.internals.selectors import MultiGet as MultiGet  # noqa: F401
3✔
33
from pants.engine.internals.selectors import concurrently as concurrently  # noqa: F401
3✔
34
from pants.engine.unions import UnionRule
3✔
35
from pants.util.frozendict import FrozenDict
3✔
36
from pants.util.logging import LogLevel
3✔
37
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet
3✔
38
from pants.util.strutil import softwrap
3✔
39

40
PANTS_RULES_MODULE_KEY = "__pants_rules__"
3✔
41

42

43
def implicitly(*args) -> dict[str, Any]:
3✔
44
    # NB: This function does not have a `TypedDict` return type, because the `@rule` decorator
45
    # cannot adjust the type of the `@rule` function to include a keyword argument (keyword
46
    # arguments are not supported by PEP-612).
47
    return {"__implicitly": args}
3✔
48

49

50
class RuleType(Enum):
3✔
51
    rule = "rule"
3✔
52
    goal_rule = "goal_rule"
3✔
53
    uncacheable_rule = "_uncacheable_rule"
3✔
54

55

56
P = ParamSpec("P")
3✔
57
R = TypeVar("R")
3✔
58
SyncRuleT = Callable[P, R]
3✔
59
AsyncRuleT = Callable[P, Coroutine[Any, Any, R]]
3✔
60
RuleDecorator = Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]
3✔
61

62

63
def _rule_call_trampoline(
3✔
64
    rule_id: str, output_type: type[Any], func: Callable[P, R]
65
) -> Callable[P, R]:
66
    @functools.wraps(func)  # type: ignore
3✔
67
    async def wrapper(*args, __implicitly: Sequence[Any] = (), **kwargs):
3✔
68
        call = Call(rule_id, output_type, args, *__implicitly)
3✔
69
        return await call
3✔
70

71
    return cast(Callable[P, R], wrapper)
3✔
72

73

74
def _make_rule(
3✔
75
    func_id: str,
76
    rule_type: RuleType,
77
    return_type: type[Any],
78
    parameter_types: dict[str, type[Any]],
79
    masked_types: Iterable[type[Any]],
80
    *,
81
    cacheable: bool,
82
    polymorphic: bool,
83
    canonical_name: str,
84
    desc: str | None,
85
    level: LogLevel,
86
) -> RuleDecorator:
87
    """A @decorator that declares that a particular static function may be used as a TaskRule.
88

89
    :param rule_type: The specific decorator used to declare the rule.
90
    :param return_type: The return/output type for the Rule. This must be a concrete Python type.
91
    :param parameter_types: A sequence of types that matches the number and order of arguments to
92
                            the decorated function.
93
    :param cacheable: Whether the results of executing the Rule should be cached as keyed by all of
94
                      its inputs.
95
    :param polymorphic: Whether the rule is an abstract base method for polymorphic dispatch via
96
                        a union type.
97
    """
98

99
    is_goal_cls = getattr(return_type, "__goal__", False)
3✔
100
    if rule_type == RuleType.rule and is_goal_cls:
3✔
101
        raise TypeError(
×
102
            "An `@rule` that returns a `Goal` must instead be declared with `@goal_rule`."
103
        )
104
    if rule_type == RuleType.goal_rule and not is_goal_cls:
3✔
105
        raise TypeError("An `@goal_rule` must return a subclass of `engine.goal.Goal`.")
×
106

107
    def wrapper(original_func):
3✔
108
        if not inspect.isfunction(original_func):
3✔
109
            raise ValueError("The @rule decorator must be applied innermost of all decorators.")
×
110

111
        # Set our own custom `__line_number__` dunder so that the engine may visualize the line number.
112
        original_func.__line_number__ = original_func.__code__.co_firstlineno
3✔
113
        original_func.rule_id = canonical_name
3✔
114

115
        awaitables = FrozenOrderedSet(collect_awaitables(original_func))
3✔
116

117
        validate_requirements(func_id, parameter_types, awaitables, cacheable)
3✔
118
        func = _rule_call_trampoline(canonical_name, return_type, original_func)
3✔
119

120
        # NB: The named definition of the rule ends up wrapped in a trampoline to handle memoization
121
        # and implicit arguments for direct by-name calls. But the `TaskRule` takes a reference to
122
        # the original unwrapped function, which avoids the need for a special protocol when the
123
        # engine invokes a @rule under memoization.
124
        func.rule = TaskRule(
3✔
125
            return_type,
126
            FrozenDict(parameter_types),
127
            awaitables,
128
            masked_types,
129
            original_func,
130
            canonical_name=canonical_name,
131
            desc=desc,
132
            level=level,
133
            cacheable=cacheable,
134
            polymorphic=polymorphic,
135
        )
136
        return func
3✔
137

138
    return wrapper
3✔
139

140

141
class InvalidTypeAnnotation(TypeError):
3✔
142
    """Indicates an incorrect type annotation for an `@rule`."""
143

144

145
class UnrecognizedRuleArgument(TypeError):
3✔
146
    """Indicates an unrecognized keyword argument to a `@rule`."""
147

148

149
class MissingTypeAnnotation(TypeError):
3✔
150
    """Indicates a missing type annotation for an `@rule`."""
151

152

153
class MissingReturnTypeAnnotation(InvalidTypeAnnotation):
3✔
154
    """Indicates a missing return type annotation for an `@rule`."""
155

156

157
class MissingParameterTypeAnnotation(InvalidTypeAnnotation):
3✔
158
    """Indicates a missing parameter type annotation for an `@rule`."""
159

160

161
class DuplicateRuleError(TypeError):
3✔
162
    """Invalid to overwrite `@rule`s using the same name in the same module."""
163

164

165
def _ensure_type_annotation(
3✔
166
    *,
167
    type_annotation: type[Any] | None,
168
    name: str,
169
    raise_type: type[InvalidTypeAnnotation],
170
) -> type[Any]:
171
    if type_annotation is None:
3✔
172
        raise raise_type(f"{name} is missing a type annotation.")
×
173
    if not isinstance(type_annotation, type):
3✔
174
        raise raise_type(
×
175
            f"The annotation for {name} must be a type, got {type_annotation} of type {type(type_annotation)}."
176
        )
177
    return type_annotation
3✔
178

179

180
PUBLIC_RULE_DECORATOR_ARGUMENTS = {
3✔
181
    "canonical_name",
182
    "canonical_name_suffix",
183
    "desc",
184
    "level",
185
    "polymorphic",
186
}
187
# We aren't sure if these'll stick around or be removed at some point, so they are "private"
188
# and should only be used in Pants' codebase.
189
PRIVATE_RULE_DECORATOR_ARGUMENTS = {
3✔
190
    # Allows callers to override the type Pants will use for the params listed.
191
    #
192
    # It is assumed (but not enforced) that the provided type is a subclass of the annotated type.
193
    # (We assume but not enforce since this is likely to be used with unions, which has the same
194
    # assumption between the union base and its members).
195
    "_param_type_overrides",
196
    # Allows callers to prevent the given list of types from being included in the identity of
197
    # a @rule. Although the type may be in scope for callers, it will not be consumable in the
198
    # `@rule` which declares the type masked.
199
    "_masked_types",
200
}
201
# We don't want @rule-writers to use 'rule_type' or 'cacheable' as kwargs directly,
202
# but rather set them implicitly based on the rule annotation.
203
# So we leave it out of PUBLIC_RULE_DECORATOR_ARGUMENTS.
204
IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS = {"rule_type", "cacheable"}
3✔
205

206

207
class RuleDecoratorKwargs(TypedDict):
3✔
208
    """Public-facing @rule kwargs used in the codebase."""
209

210
    canonical_name: NotRequired[str]
3✔
211

212
    canonical_name_suffix: NotRequired[str]
3✔
213

214
    desc: NotRequired[str]
3✔
215
    """The rule's description as it appears in stacktraces/debugging. For goal rules, defaults to the goal name."""
3✔
216

217
    level: NotRequired[LogLevel]
3✔
218
    """The logging level applied to this rule. Defaults to TRACE."""
3✔
219

220
    polymorphic: NotRequired[bool]
3✔
221
    """Whether this rule represents an abstract method for a union.
222

223
    A polymorphic rule can only be called by name, and must have a single input type that is a
224
    union base type (plus other non-union arguments as needed). Execution will be dispatched to the
225
    @rule with the same signature with the union base type replaced by one of its member types.
226

227
    E.g., given
228

229
    ```
230
    @rule(polymorphic=True)
231
    async def base_rule(arg: UnionBase, other_arg: OtherType) -> OutputType
232
        ...
233

234
    @rule(polymorphic=True)
235
    async def derived_rule(arg: UnionMember, other_arg: OtherType) -> OutputType
236
       ...
237

238
    ```
239

240
    And an arg of type UnionMember, then
241

242
    `await base_rule(arg, other_arg)`
243

244
    will invoke `derived_rule(arg, other_arg)`
245

246
    This is the call-by-name equivalent of Get(OutputType, UnionBase, union_member_instance).
247
    """
248

249
    _masked_types: NotRequired[Iterable[type[Any]]]
3✔
250
    """Unstable. Internal Pants usage only."""
3✔
251

252
    _param_type_overrides: NotRequired[dict[str, type[Any]]]
3✔
253
    """Unstable. Internal Pants usage only."""
3✔
254

255

256
class _RuleDecoratorKwargs(RuleDecoratorKwargs):
3✔
257
    """Internal/Implicit @rule kwargs (not for use outside rules.py)"""
258

259
    rule_type: RuleType
3✔
260
    """The decorator used to declare the rule (see rules.py:_make_rule(...))"""
3✔
261

262
    cacheable: bool
3✔
263
    """Whether the results of this rule should be cached.
3✔
264
    Typically true for rules, false for goal_rules (see rules.py:_make_rule(...))
265
    """
266

267

268
def rule_decorator(
3✔
269
    func: SyncRuleT | AsyncRuleT, **kwargs: Unpack[_RuleDecoratorKwargs]
270
) -> AsyncRuleT:
271
    if not inspect.isfunction(func):
3✔
272
        raise ValueError("The @rule decorator expects to be placed on a function.")
×
273

274
    if (
3✔
275
        len(
276
            set(kwargs)
277
            - PUBLIC_RULE_DECORATOR_ARGUMENTS
278
            - PRIVATE_RULE_DECORATOR_ARGUMENTS
279
            - IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS
280
        )
281
        != 0
282
    ):
283
        raise UnrecognizedRuleArgument(
×
284
            f"`@rule`s and `@goal_rule`s only accept the following keyword arguments: {PUBLIC_RULE_DECORATOR_ARGUMENTS}"
285
        )
286

287
    rule_type = kwargs["rule_type"]
3✔
288
    cacheable = kwargs["cacheable"]
3✔
289
    polymorphic = kwargs.get("polymorphic", False)
3✔
290
    masked_types: tuple[type, ...] = tuple(kwargs.get("_masked_types", ()))
3✔
291
    param_type_overrides: dict[str, type] = kwargs.get("_param_type_overrides", {})
3✔
292

293
    func_id = f"@rule {func.__module__}:{func.__name__}"
3✔
294
    type_hints = get_type_hints(func)
3✔
295
    return_type = _ensure_type_annotation(
3✔
296
        type_annotation=type_hints.get("return"),
297
        name=f"{func_id} return",
298
        raise_type=MissingReturnTypeAnnotation,
299
    )
300

301
    func_params = inspect.signature(func).parameters
3✔
302
    for parameter in param_type_overrides:
3✔
303
        if parameter not in func_params:
3✔
304
            raise ValueError(
×
305
                f"Unknown parameter name in `param_type_overrides`: {parameter}."
306
                + f" Parameter names: '{', '.join(func_params)}'"
307
            )
308

309
    parameter_types = {
3✔
310
        parameter: _ensure_type_annotation(
311
            type_annotation=param_type_overrides.get(parameter, type_hints.get(parameter)),
312
            name=f"{func_id} parameter {parameter}",
313
            raise_type=MissingParameterTypeAnnotation,
314
        )
315
        for parameter in func_params
316
    }
317
    is_goal_cls = getattr(return_type, "__goal__", False)
3✔
318

319
    # Set a default canonical name if one is not explicitly provided to the module and name of the
320
    # function that implements it, plus an optional suffix. This is used as the workunit name.
321
    # The suffix is a convenient way to disambiguate multiple rules registered dynamically from the
322
    # same static code (by overriding the inferred param types in the @rule decorator).
323
    # TODO: It is not yet clear how dynamically registered rules whose names are generated
324
    #  with a suffix will work in practice with the new call-by-name semantics.
325
    #  For now the suffix serves to ensure unique names. Whether they are useful is another matter.
326
    suffix = kwargs.get("canonical_name_suffix", "")
3✔
327
    effective_name = kwargs.get(
3✔
328
        "canonical_name",
329
        f"{func.__module__}.{func.__qualname__}{('_' + suffix) if suffix else ''}".replace(
330
            ".<locals>", ""
331
        ),
332
    )
333

334
    # Set a default description, which is used in the dynamic UI and stacktraces.
335
    effective_desc = kwargs.get("desc")
3✔
336
    if effective_desc is None and is_goal_cls:
3✔
337
        effective_desc = f"`{return_type.name}` goal"
3✔
338

339
    effective_level = kwargs.get("level", LogLevel.TRACE)
3✔
340
    if not isinstance(effective_level, LogLevel):  # type: ignore[unused-ignore]
3✔
341
        raise ValueError(
×
342
            "Expected to receive a value of type LogLevel for the level "
343
            f"argument, but got: {effective_level}"
344
        )
345

346
    module = sys.modules[func.__module__]
3✔
347
    pants_rules = getattr(module, PANTS_RULES_MODULE_KEY, None)
3✔
348
    if pants_rules is None:
3✔
349
        pants_rules = {}
3✔
350
        setattr(module, PANTS_RULES_MODULE_KEY, pants_rules)
3✔
351

352
    if effective_name not in pants_rules:
3✔
353
        pants_rules[effective_name] = func
3✔
354
    else:
355
        prev_func = pants_rules[effective_name]
3✔
356
        if prev_func.__code__ != func.__code__:
3✔
357
            raise DuplicateRuleError(
×
358
                softwrap(
359
                    f"""
360
                    Redeclaring rule {effective_name} with {func} at line
361
                    {func.__code__.co_firstlineno}, previously defined by {prev_func} at line
362
                    {prev_func.__code__.co_firstlineno}.
363
                    """
364
                )
365
            )
366

367
    return _make_rule(
3✔
368
        func_id,
369
        rule_type,
370
        return_type,
371
        parameter_types,
372
        masked_types,
373
        cacheable=cacheable,
374
        polymorphic=polymorphic,
375
        canonical_name=effective_name,
376
        desc=effective_desc,
377
        level=effective_level,
378
    )(func)
379

380

381
def validate_requirements(
3✔
382
    func_id: str,
383
    parameter_types: dict[str, type],
384
    awaitables: tuple[AwaitableConstraints, ...],
385
    cacheable: bool,
386
) -> None:
387
    # TODO: Technically this will also fire for an @_uncacheable_rule, but we don't expose those as
388
    # part of the API, so it's OK for these errors not to mention them.
389
    for ty in parameter_types.values():
3✔
390
        if cacheable and issubclass(ty, SideEffecting):
3✔
391
            raise ValueError(
×
392
                f"A `@rule` that is not a @goal_rule ({func_id}) may not have "
393
                f"a side-effecting parameter: {ty}."
394
            )
395
    for awaitable in awaitables:
3✔
396
        input_type_side_effecting = [
3✔
397
            it for it in awaitable.input_types if issubclass(it, SideEffecting)
398
        ]
399
        if input_type_side_effecting and not awaitable.is_effect:
3✔
400
            raise ValueError(
×
401
                f"A `Get` may not request side-effecting types ({input_type_side_effecting}). "
402
                f"Use `Effect` instead: `{awaitable}`."
403
            )
404
        if not input_type_side_effecting and awaitable.is_effect:
3✔
405
            raise ValueError(
×
406
                f"An `Effect` should not be used with pure types ({awaitable.input_types}). "
407
                f"Use `Get` instead: `{awaitable}`."
408
            )
409
        if cacheable and awaitable.is_effect:
3✔
410
            raise ValueError(
×
411
                f"A `@rule` that is not a @goal_rule ({func_id}) may not use an "
412
                f"Effect: `{awaitable}`."
413
            )
414

415

416
def inner_rule(*args, **kwargs) -> AsyncRuleT | RuleDecorator:
3✔
417
    if len(args) == 1 and inspect.isfunction(args[0]):
3✔
418
        return rule_decorator(*args, **kwargs)
3✔
419
    else:
420

421
        def wrapper(*args):
3✔
422
            return rule_decorator(*args, **kwargs)
3✔
423

424
        return wrapper
3✔
425

426

427
F = TypeVar("F", bound=Callable[..., Any | Coroutine[Any, Any, Any]])
3✔
428

429

430
@overload
3✔
431
def rule(**kwargs: Unpack[RuleDecoratorKwargs]) -> Callable[[F], F]:
3✔
432
    """Handles decorator factories of the form `@rule(foo=..., bar=...)`
433
    https://mypy.readthedocs.io/en/stable/generics.html#decorator-factories.
434

435
    Note: This needs to be the first rule, otherwise MyPy goes nuts
436
    """
437
    ...
438

439

440
@overload
3✔
441
def rule(_func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]:
3✔
442
    """Handles bare @rule decorators on async functions.
443

444
    Usage of Coroutine[...] (vs Awaitable[...]) is intentional, as `MultiGet`/`concurrently` use
445
    coroutines directly.
446
    """
447
    ...
448

449

450
@overload
3✔
451
def rule(_func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]:
3✔
452
    """Handles bare @rule decorators on non-async functions It's debatable whether we should even
453
    have non-async @rule functions, but keeping this to not break the world for plugin authors.
454

455
    Usage of Coroutine[...] (vs Awaitable[...]) is intentional, as `MultiGet`/`concurrently` use
456
    coroutines directly.
457
    """
458
    ...
459

460

461
def rule(*args, **kwargs):
3✔
462
    return inner_rule(*args, **kwargs, rule_type=RuleType.rule, cacheable=True)
3✔
463

464

465
@overload
466
def goal_rule(func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]: ...
467

468

469
@overload
470
def goal_rule(func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]: ...
471

472

473
@overload
474
def goal_rule(
475
    *args, func: None = None, **kwargs: Any
476
) -> Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]: ...
477

478

479
def goal_rule(*args, **kwargs):
3✔
480
    if "level" not in kwargs:
3✔
481
        kwargs["level"] = LogLevel.DEBUG
3✔
482
    return inner_rule(
3✔
483
        *args,
484
        **kwargs,
485
        rule_type=RuleType.goal_rule,
486
        cacheable=False,
487
    )
488

489

490
@overload
491
def _uncacheable_rule(
492
    func: Callable[P, Coroutine[Any, Any, R]],
493
) -> Callable[P, Coroutine[Any, Any, R]]: ...
494

495

496
@overload
497
def _uncacheable_rule(func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]: ...
498

499

500
@overload
501
def _uncacheable_rule(
502
    *args, func: None = None, **kwargs: Any
503
) -> Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]: ...
504

505

506
# This has a "private" name, as we don't (yet?) want it to be part of the rule API, at least
507
# until we figure out the implications, and have a handle on the semantics and use-cases.
508
def _uncacheable_rule(*args, **kwargs):
3✔
509
    return inner_rule(
3✔
510
        *args, **kwargs, rule_type=RuleType.uncacheable_rule, cacheable=False, polymorphic=False
511
    )
512

513

514
class Rule(Protocol):
3✔
515
    """Rules declare how to produce products for the product graph.
516

517
    A rule describes what dependencies must be provided to produce a particular product. They also
518
    act as factories for constructing the nodes within the graph.
519
    """
520

521
    @property
3✔
522
    def output_type(self):
3✔
523
        """An output `type` for the rule."""
524

525

526
def collect_rules(*namespaces: ModuleType | Mapping[str, Any]) -> Iterable[Rule]:
3✔
527
    """Collects all @rules in the given namespaces.
528

529
    If no namespaces are given, collects all the @rules in the caller's module namespace.
530
    """
531

532
    if not namespaces:
3✔
533
        currentframe = inspect.currentframe()
3✔
534
        assert isinstance(currentframe, FrameType)
3✔
535
        caller_frame = currentframe.f_back
3✔
536
        assert isinstance(caller_frame, FrameType)
3✔
537

538
        global_items = caller_frame.f_globals
3✔
539
        namespaces = (global_items,)
3✔
540

541
    def iter_rules():
3✔
542
        for namespace in namespaces:
3✔
543
            mapping = namespace.__dict__ if isinstance(namespace, ModuleType) else namespace
3✔
544
            for item in mapping.values():
3✔
545
                if not callable(item):
3✔
546
                    continue
3✔
547
                rule = getattr(item, "rule", None)
3✔
548
                if isinstance(rule, TaskRule):
3✔
549
                    for input in rule.parameters.values():
3✔
550
                        if getattr(input, "__subsystem__", False):
3✔
551
                            yield from input.rules()
3✔
552
                        if getattr(input, "__subsystem_environment_aware__", False):
3✔
553
                            yield from input.subsystem.rules()
3✔
554
                    if getattr(rule.output_type, "__goal__", False):
3✔
555
                        yield from rule.output_type.subsystem_cls.rules()
3✔
556
                    yield rule
3✔
557

558
    return list(iter_rules())
3✔
559

560

561
@dataclass(frozen=True)
3✔
562
class TaskRule:
3✔
563
    """A Rule that runs a task function when all of its input selectors are satisfied.
564

565
    NB: This API is not meant for direct consumption. To create a `TaskRule` you should always
566
    prefer the `@rule` constructor.
567
    """
568

569
    output_type: type[Any]
3✔
570
    parameters: FrozenDict[str, type[Any]]
3✔
571
    awaitables: tuple[AwaitableConstraints, ...]
3✔
572
    masked_types: tuple[type[Any], ...]
3✔
573
    func: Callable
3✔
574
    canonical_name: str
3✔
575
    desc: str | None = None
3✔
576
    level: LogLevel = LogLevel.TRACE
3✔
577
    cacheable: bool = True
3✔
578
    polymorphic: bool = False
3✔
579

580
    def __str__(self):
3✔
581
        return "(name={}, {}, {!r}, {}, gets={})".format(
×
582
            getattr(self, "name", "<not defined>"),
583
            self.output_type.__name__,
584
            self.parameters.values(),
585
            self.func.__name__,
586
            self.awaitables,
587
        )
588

589

590
@dataclass(frozen=True)
3✔
591
class QueryRule:
3✔
592
    """A QueryRule declares that a given set of Params will be used to request an output type.
593

594
    Every callsite to `Scheduler.product_request` should have a corresponding QueryRule to ensure
595
    that the relevant portions of the RuleGraph are generated.
596
    """
597

598
    output_type: type[Any]
3✔
599
    input_types: tuple[type[Any], ...]
3✔
600

601
    def __init__(self, output_type: type[Any], input_types: Iterable[type[Any]]) -> None:
3✔
602
        object.__setattr__(self, "output_type", output_type)
3✔
603
        object.__setattr__(self, "input_types", tuple(input_types))
3✔
604

605

606
@dataclass(frozen=True)
3✔
607
class RuleIndex:
3✔
608
    """Holds a normalized index of Rules used to instantiate Nodes."""
609

610
    rules: FrozenOrderedSet[TaskRule]
3✔
611
    queries: FrozenOrderedSet[QueryRule]
3✔
612
    union_rules: FrozenOrderedSet[UnionRule]
3✔
613

614
    @classmethod
3✔
615
    def create(cls, rule_entries: Iterable[Rule | UnionRule]) -> RuleIndex:
3✔
616
        """Creates a RuleIndex with tasks indexed by their output type."""
617
        rules: OrderedSet[TaskRule] = OrderedSet()
3✔
618
        queries: OrderedSet[QueryRule] = OrderedSet()
3✔
619
        union_rules: OrderedSet[UnionRule] = OrderedSet()
3✔
620

621
        for entry in rule_entries:
3✔
622
            if isinstance(entry, TaskRule):
3✔
623
                rules.add(entry)
3✔
624
            elif isinstance(entry, UnionRule):
3✔
625
                union_rules.add(entry)
3✔
626
            elif isinstance(entry, QueryRule):
3✔
627
                queries.add(entry)
3✔
628
            elif hasattr(entry, "__call__"):
3✔
629
                rule = getattr(entry, "rule", None)
3✔
630
                if rule is None:
3✔
631
                    raise TypeError(f"Expected function {entry} to be decorated with @rule.")
×
632
                rules.add(rule)
3✔
633
            else:
634
                raise TypeError(
×
635
                    f"Rule entry {entry} had an unexpected type: {type(entry)}. Rules either "
636
                    "extend Rule or UnionRule, or are static functions decorated with @rule."
637
                )
638

639
        return RuleIndex(
3✔
640
            rules=FrozenOrderedSet(rules),
641
            queries=FrozenOrderedSet(queries),
642
            union_rules=FrozenOrderedSet(union_rules),
643
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc