• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 21552830208

31 Jan 2026 11:40PM UTC coverage: 80.277% (-0.05%) from 80.324%
21552830208

Pull #23062

github

web-flow
Merge 808a9786c into 2c4dcf9cf
Pull Request #23062: Remove support for Get

18 of 25 new or added lines in 4 files covered. (72.0%)

17119 existing lines in 541 files now uncovered.

78278 of 97510 relevant lines covered (80.28%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.52
/src/python/pants/engine/rules.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
from __future__ import annotations
12✔
5

6
import functools
12✔
7
import inspect
12✔
8
import sys
12✔
9
from collections.abc import Callable, Coroutine, Iterable, Mapping, Sequence
12✔
10
from dataclasses import dataclass
12✔
11
from enum import Enum
12✔
12
from types import FrameType, ModuleType
12✔
13
from typing import (
12✔
14
    Any,
15
    NotRequired,
16
    Protocol,
17
    TypedDict,
18
    TypeVar,
19
    Unpack,
20
    cast,
21
    get_type_hints,
22
    overload,
23
)
24

25
from typing_extensions import ParamSpec
12✔
26

27
from pants.engine.engine_aware import SideEffecting
12✔
28
from pants.engine.internals.rule_visitor import collect_awaitables
12✔
29
from pants.engine.internals.selectors import AwaitableConstraints, Call
12✔
30
from pants.engine.internals.selectors import concurrently as concurrently  # noqa: F401
12✔
31
from pants.engine.unions import UnionRule
12✔
32
from pants.util.frozendict import FrozenDict
12✔
33
from pants.util.logging import LogLevel
12✔
34
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet
12✔
35
from pants.util.strutil import softwrap
12✔
36

37
PANTS_RULES_MODULE_KEY = "__pants_rules__"
12✔
38

39

40
def implicitly(*args) -> dict[str, Any]:
12✔
41
    # NB: This function does not have a `TypedDict` return type, because the `@rule` decorator
42
    # cannot adjust the type of the `@rule` function to include a keyword argument (keyword
43
    # arguments are not supported by PEP-612).
44
    return {"__implicitly": args}
9✔
45

46

47
class RuleType(Enum):
12✔
48
    rule = "rule"
12✔
49
    goal_rule = "goal_rule"
12✔
50
    uncacheable_rule = "_uncacheable_rule"
12✔
51

52

53
P = ParamSpec("P")
12✔
54
R = TypeVar("R")
12✔
55
SyncRuleT = Callable[P, R]
12✔
56
AsyncRuleT = Callable[P, Coroutine[Any, Any, R]]
12✔
57
RuleDecorator = Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]
12✔
58

59

60
def _rule_call_trampoline(
12✔
61
    rule_id: str, output_type: type[Any], func: Callable[P, R]
62
) -> Callable[P, R]:
63
    @functools.wraps(func)  # type: ignore
12✔
64
    async def wrapper(*args, __implicitly: Sequence[Any] = (), **kwargs):
12✔
65
        call = Call(rule_id, output_type, args, *__implicitly)
12✔
66
        return await call
12✔
67

68
    return cast(Callable[P, R], wrapper)
12✔
69

70

71
def _make_rule(
12✔
72
    func_id: str,
73
    rule_type: RuleType,
74
    return_type: type[Any],
75
    parameter_types: dict[str, type[Any]],
76
    masked_types: Iterable[type[Any]],
77
    *,
78
    cacheable: bool,
79
    polymorphic: bool,
80
    canonical_name: str,
81
    desc: str | None,
82
    level: LogLevel,
83
) -> RuleDecorator:
84
    """A @decorator that declares that a particular static function may be used as a TaskRule.
85

86
    :param rule_type: The specific decorator used to declare the rule.
87
    :param return_type: The return/output type for the Rule. This must be a concrete Python type.
88
    :param parameter_types: A sequence of types that matches the number and order of arguments to
89
                            the decorated function.
90
    :param cacheable: Whether the results of executing the Rule should be cached as keyed by all of
91
                      its inputs.
92
    :param polymorphic: Whether the rule is an abstract base method for polymorphic dispatch via
93
                        a union type.
94
    """
95

96
    is_goal_cls = getattr(return_type, "__goal__", False)
12✔
97
    if rule_type == RuleType.rule and is_goal_cls:
12✔
UNCOV
98
        raise TypeError(
1✔
99
            "An `@rule` that returns a `Goal` must instead be declared with `@goal_rule`."
100
        )
101
    if rule_type == RuleType.goal_rule and not is_goal_cls:
12✔
UNCOV
102
        raise TypeError("An `@goal_rule` must return a subclass of `engine.goal.Goal`.")
1✔
103

104
    def wrapper(original_func):
12✔
105
        if not inspect.isfunction(original_func):
12✔
106
            raise ValueError("The @rule decorator must be applied innermost of all decorators.")
×
107

108
        # Set our own custom `__line_number__` dunder so that the engine may visualize the line number.
109
        original_func.__line_number__ = original_func.__code__.co_firstlineno
12✔
110
        original_func.rule_id = canonical_name
12✔
111

112
        awaitables = FrozenOrderedSet(collect_awaitables(original_func))
12✔
113

114
        validate_requirements(func_id, parameter_types, awaitables, cacheable)
12✔
115
        func = _rule_call_trampoline(canonical_name, return_type, original_func)
12✔
116

117
        # NB: The named definition of the rule ends up wrapped in a trampoline to handle memoization
118
        # and implicit arguments for direct by-name calls. But the `TaskRule` takes a reference to
119
        # the original unwrapped function, which avoids the need for a special protocol when the
120
        # engine invokes a @rule under memoization.
121
        func.rule = TaskRule(
12✔
122
            return_type,
123
            FrozenDict(parameter_types),
124
            awaitables,
125
            masked_types,
126
            original_func,
127
            canonical_name=canonical_name,
128
            desc=desc,
129
            level=level,
130
            cacheable=cacheable,
131
            polymorphic=polymorphic,
132
        )
133
        return func
12✔
134

135
    return wrapper
12✔
136

137

138
class InvalidTypeAnnotation(TypeError):
12✔
139
    """Indicates an incorrect type annotation for an `@rule`."""
140

141

142
class UnrecognizedRuleArgument(TypeError):
12✔
143
    """Indicates an unrecognized keyword argument to a `@rule`."""
144

145

146
class MissingTypeAnnotation(TypeError):
12✔
147
    """Indicates a missing type annotation for an `@rule`."""
148

149

150
class MissingReturnTypeAnnotation(InvalidTypeAnnotation):
12✔
151
    """Indicates a missing return type annotation for an `@rule`."""
152

153

154
class MissingParameterTypeAnnotation(InvalidTypeAnnotation):
12✔
155
    """Indicates a missing parameter type annotation for an `@rule`."""
156

157

158
class DuplicateRuleError(TypeError):
12✔
159
    """Invalid to overwrite `@rule`s using the same name in the same module."""
160

161

162
def _ensure_type_annotation(
12✔
163
    *,
164
    type_annotation: type[Any] | None,
165
    name: str,
166
    raise_type: type[InvalidTypeAnnotation],
167
) -> type[Any]:
168
    if type_annotation is None:
12✔
UNCOV
169
        raise raise_type(f"{name} is missing a type annotation.")
1✔
170
    if not isinstance(type_annotation, type):
12✔
UNCOV
171
        raise raise_type(
1✔
172
            f"The annotation for {name} must be a type, got {type_annotation} of type {type(type_annotation)}."
173
        )
174
    return type_annotation
12✔
175

176

177
PUBLIC_RULE_DECORATOR_ARGUMENTS = {
12✔
178
    "canonical_name",
179
    "canonical_name_suffix",
180
    "desc",
181
    "level",
182
    "polymorphic",
183
}
184
# We aren't sure if these'll stick around or be removed at some point, so they are "private"
185
# and should only be used in Pants' codebase.
186
PRIVATE_RULE_DECORATOR_ARGUMENTS = {
12✔
187
    # Allows callers to override the type Pants will use for the params listed.
188
    #
189
    # It is assumed (but not enforced) that the provided type is a subclass of the annotated type.
190
    # (We assume but not enforce since this is likely to be used with unions, which has the same
191
    # assumption between the union base and its members).
192
    "_param_type_overrides",
193
    # Allows callers to prevent the given list of types from being included in the identity of
194
    # a @rule. Although the type may be in scope for callers, it will not be consumable in the
195
    # `@rule` which declares the type masked.
196
    "_masked_types",
197
}
198
# We don't want @rule-writers to use 'rule_type' or 'cacheable' as kwargs directly,
199
# but rather set them implicitly based on the rule annotation.
200
# So we leave it out of PUBLIC_RULE_DECORATOR_ARGUMENTS.
201
IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS = {"rule_type", "cacheable"}
12✔
202

203

204
class RuleDecoratorKwargs(TypedDict):
12✔
205
    """Public-facing @rule kwargs used in the codebase."""
206

207
    canonical_name: NotRequired[str]
12✔
208

209
    canonical_name_suffix: NotRequired[str]
12✔
210

211
    desc: NotRequired[str]
12✔
212
    """The rule's description as it appears in stacktraces/debugging. For goal rules, defaults to the goal name."""
12✔
213

214
    level: NotRequired[LogLevel]
12✔
215
    """The logging level applied to this rule. Defaults to TRACE."""
12✔
216

217
    polymorphic: NotRequired[bool]
12✔
218
    """Whether this rule represents an abstract method for a union.
219

220
    A polymorphic rule can only be called by name, and must have a single input type that is a
221
    union base type (plus other non-union arguments as needed). Execution will be dispatched to the
222
    @rule with the same signature with the union base type replaced by one of its member types.
223

224
    E.g., given
225

226
    ```
227
    @rule(polymorphic=True)
228
    async def base_rule(arg: UnionBase, other_arg: OtherType) -> OutputType
229
        ...
230

231
    @rule(polymorphic=True)
232
    async def derived_rule(arg: UnionMember, other_arg: OtherType) -> OutputType
233
       ...
234

235
    ```
236

237
    And an arg of type UnionMember, then
238

239
    `await base_rule(arg, other_arg)`
240

241
    will invoke `derived_rule(arg, other_arg)`
242

243
    This is the call-by-name equivalent of Get(OutputType, UnionBase, union_member_instance).
244
    """
245

246
    _masked_types: NotRequired[Iterable[type[Any]]]
12✔
247
    """Unstable. Internal Pants usage only."""
12✔
248

249
    _param_type_overrides: NotRequired[dict[str, type[Any]]]
12✔
250
    """Unstable. Internal Pants usage only."""
12✔
251

252

253
class _RuleDecoratorKwargs(RuleDecoratorKwargs):
12✔
254
    """Internal/Implicit @rule kwargs (not for use outside rules.py)"""
255

256
    rule_type: RuleType
12✔
257
    """The decorator used to declare the rule (see rules.py:_make_rule(...))"""
12✔
258

259
    cacheable: bool
12✔
260
    """Whether the results of this rule should be cached.
12✔
261
    Typically true for rules, false for goal_rules (see rules.py:_make_rule(...))
262
    """
263

264

265
def rule_decorator(
12✔
266
    func: SyncRuleT | AsyncRuleT, **kwargs: Unpack[_RuleDecoratorKwargs]
267
) -> AsyncRuleT:
268
    if not inspect.isfunction(func):
12✔
269
        raise ValueError("The @rule decorator expects to be placed on a function.")
×
270

271
    if (
12✔
272
        len(
273
            set(kwargs)
274
            - PUBLIC_RULE_DECORATOR_ARGUMENTS
275
            - PRIVATE_RULE_DECORATOR_ARGUMENTS
276
            - IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS
277
        )
278
        != 0
279
    ):
UNCOV
280
        raise UnrecognizedRuleArgument(
1✔
281
            f"`@rule`s and `@goal_rule`s only accept the following keyword arguments: {PUBLIC_RULE_DECORATOR_ARGUMENTS}"
282
        )
283

284
    rule_type = kwargs["rule_type"]
12✔
285
    cacheable = kwargs["cacheable"]
12✔
286
    polymorphic = kwargs.get("polymorphic", False)
12✔
287
    masked_types: tuple[type, ...] = tuple(kwargs.get("_masked_types", ()))
12✔
288
    param_type_overrides: dict[str, type] = kwargs.get("_param_type_overrides", {})
12✔
289

290
    func_id = f"@rule {func.__module__}:{func.__name__}"
12✔
291
    type_hints = get_type_hints(func)
12✔
292
    return_type = _ensure_type_annotation(
12✔
293
        type_annotation=type_hints.get("return"),
294
        name=f"{func_id} return",
295
        raise_type=MissingReturnTypeAnnotation,
296
    )
297

298
    func_params = inspect.signature(func).parameters
12✔
299
    for parameter in param_type_overrides:
12✔
300
        if parameter not in func_params:
12✔
UNCOV
301
            raise ValueError(
1✔
302
                f"Unknown parameter name in `param_type_overrides`: {parameter}."
303
                + f" Parameter names: '{', '.join(func_params)}'"
304
            )
305

306
    parameter_types = {
12✔
307
        parameter: _ensure_type_annotation(
308
            type_annotation=param_type_overrides.get(parameter, type_hints.get(parameter)),
309
            name=f"{func_id} parameter {parameter}",
310
            raise_type=MissingParameterTypeAnnotation,
311
        )
312
        for parameter in func_params
313
    }
314
    is_goal_cls = getattr(return_type, "__goal__", False)
12✔
315

316
    # Set a default canonical name if one is not explicitly provided to the module and name of the
317
    # function that implements it, plus an optional suffix. This is used as the workunit name.
318
    # The suffix is a convenient way to disambiguate multiple rules registered dynamically from the
319
    # same static code (by overriding the inferred param types in the @rule decorator).
320
    # TODO: It is not yet clear how dynamically registered rules whose names are generated
321
    #  with a suffix will work in practice with the new call-by-name semantics.
322
    #  For now the suffix serves to ensure unique names. Whether they are useful is another matter.
323
    suffix = kwargs.get("canonical_name_suffix", "")
12✔
324
    effective_name = kwargs.get(
12✔
325
        "canonical_name",
326
        f"{func.__module__}.{func.__qualname__}{('_' + suffix) if suffix else ''}".replace(
327
            ".<locals>", ""
328
        ),
329
    )
330

331
    # Set a default description, which is used in the dynamic UI and stacktraces.
332
    effective_desc = kwargs.get("desc")
12✔
333
    if effective_desc is None and is_goal_cls:
12✔
334
        effective_desc = f"`{return_type.name}` goal"
12✔
335

336
    effective_level = kwargs.get("level", LogLevel.TRACE)
12✔
337
    if not isinstance(effective_level, LogLevel):  # type: ignore[unused-ignore]
12✔
338
        raise ValueError(
×
339
            "Expected to receive a value of type LogLevel for the level "
340
            f"argument, but got: {effective_level}"
341
        )
342

343
    module = sys.modules[func.__module__]
12✔
344
    pants_rules = getattr(module, PANTS_RULES_MODULE_KEY, None)
12✔
345
    if pants_rules is None:
12✔
346
        pants_rules = {}
12✔
347
        setattr(module, PANTS_RULES_MODULE_KEY, pants_rules)
12✔
348

349
    if effective_name not in pants_rules:
12✔
350
        pants_rules[effective_name] = func
12✔
351
    else:
352
        prev_func = pants_rules[effective_name]
12✔
353
        if prev_func.__code__ != func.__code__:
12✔
UNCOV
354
            raise DuplicateRuleError(
1✔
355
                softwrap(
356
                    f"""
357
                    Redeclaring rule {effective_name} with {func} at line
358
                    {func.__code__.co_firstlineno}, previously defined by {prev_func} at line
359
                    {prev_func.__code__.co_firstlineno}.
360
                    """
361
                )
362
            )
363

364
    return _make_rule(
12✔
365
        func_id,
366
        rule_type,
367
        return_type,
368
        parameter_types,
369
        masked_types,
370
        cacheable=cacheable,
371
        polymorphic=polymorphic,
372
        canonical_name=effective_name,
373
        desc=effective_desc,
374
        level=effective_level,
375
    )(func)
376

377

378
def validate_requirements(
12✔
379
    func_id: str,
380
    parameter_types: dict[str, type],
381
    awaitables: tuple[AwaitableConstraints, ...],
382
    cacheable: bool,
383
) -> None:
384
    # TODO: Technically this will also fire for an @_uncacheable_rule, but we don't expose those as
385
    # part of the API, so it's OK for these errors not to mention them.
386
    for ty in parameter_types.values():
12✔
387
        if cacheable and issubclass(ty, SideEffecting):
12✔
UNCOV
388
            raise ValueError(
1✔
389
                f"A `@rule` that is not a @goal_rule ({func_id}) may not have "
390
                f"a side-effecting parameter: {ty}."
391
            )
392
    for awaitable in awaitables:
12✔
393
        input_type_side_effecting = [
12✔
394
            it for it in awaitable.input_types if issubclass(it, SideEffecting)
395
        ]
396
        if input_type_side_effecting and not awaitable.is_effect:
12✔
397
            raise ValueError(
×
398
                f"A `Get` may not request side-effecting types ({input_type_side_effecting}). "
399
                f"Use `Effect` instead: `{awaitable}`."
400
            )
401
        if not input_type_side_effecting and awaitable.is_effect:
12✔
402
            raise ValueError(
×
403
                f"An `Effect` should not be used with pure types ({awaitable.input_types}). "
404
                f"Use `Get` instead: `{awaitable}`."
405
            )
406
        if cacheable and awaitable.is_effect:
12✔
407
            raise ValueError(
×
408
                f"A `@rule` that is not a @goal_rule ({func_id}) may not use an "
409
                f"Effect: `{awaitable}`."
410
            )
411

412

413
def inner_rule(*args, **kwargs) -> AsyncRuleT | RuleDecorator:
12✔
414
    if len(args) == 1 and inspect.isfunction(args[0]):
12✔
415
        return rule_decorator(*args, **kwargs)
12✔
416
    else:
417

418
        def wrapper(*args):
12✔
419
            return rule_decorator(*args, **kwargs)
12✔
420

421
        return wrapper
12✔
422

423

424
F = TypeVar("F", bound=Callable[..., Any | Coroutine[Any, Any, Any]])
12✔
425

426

427
@overload
12✔
428
def rule(**kwargs: Unpack[RuleDecoratorKwargs]) -> Callable[[F], F]:
12✔
429
    """Handles decorator factories of the form `@rule(foo=..., bar=...)`
430
    https://mypy.readthedocs.io/en/stable/generics.html#decorator-factories.
431

432
    Note: This needs to be the first rule, otherwise MyPy goes nuts
433
    """
434
    ...
435

436

437
@overload
12✔
438
def rule(_func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]:
12✔
439
    """Handles bare @rule decorators on async functions.
440

441
    Usage of Coroutine[...] (vs Awaitable[...]) is intentional, as `concurrently` uses coroutines
442
    directly.
443
    """
444
    ...
445

446

447
@overload
12✔
448
def rule(_func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]:
12✔
449
    """Handles bare @rule decorators on non-async functions It's debatable whether we should even
450
    have non-async @rule functions, but keeping this to not break the world for plugin authors.
451

452
    Usage of Coroutine[...] (vs Awaitable[...]) is intentional, as `concurrently` uses coroutines
453
    directly.
454
    """
455
    ...
456

457

458
def rule(*args, **kwargs):
12✔
459
    return inner_rule(*args, **kwargs, rule_type=RuleType.rule, cacheable=True)
12✔
460

461

462
@overload
463
def goal_rule(func: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Coroutine[Any, Any, R]]: ...
464

465

466
@overload
467
def goal_rule(func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]: ...
468

469

470
@overload
471
def goal_rule(
472
    *args, func: None = None, **kwargs: Any
473
) -> Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]: ...
474

475

476
def goal_rule(*args, **kwargs):
12✔
477
    if "level" not in kwargs:
12✔
478
        kwargs["level"] = LogLevel.DEBUG
12✔
479
    return inner_rule(
12✔
480
        *args,
481
        **kwargs,
482
        rule_type=RuleType.goal_rule,
483
        cacheable=False,
484
    )
485

486

487
@overload
488
def _uncacheable_rule(
489
    func: Callable[P, Coroutine[Any, Any, R]],
490
) -> Callable[P, Coroutine[Any, Any, R]]: ...
491

492

493
@overload
494
def _uncacheable_rule(func: Callable[P, R]) -> Callable[P, Coroutine[Any, Any, R]]: ...
495

496

497
@overload
498
def _uncacheable_rule(
499
    *args, func: None = None, **kwargs: Any
500
) -> Callable[[SyncRuleT | AsyncRuleT], AsyncRuleT]: ...
501

502

503
# This has a "private" name, as we don't (yet?) want it to be part of the rule API, at least
504
# until we figure out the implications, and have a handle on the semantics and use-cases.
505
def _uncacheable_rule(*args, **kwargs):
12✔
506
    return inner_rule(
12✔
507
        *args, **kwargs, rule_type=RuleType.uncacheable_rule, cacheable=False, polymorphic=False
508
    )
509

510

511
class Rule(Protocol):
12✔
512
    """Rules declare how to produce products for the product graph.
513

514
    A rule describes what dependencies must be provided to produce a particular product. They also
515
    act as factories for constructing the nodes within the graph.
516
    """
517

518
    @property
12✔
519
    def output_type(self):
12✔
520
        """An output `type` for the rule."""
521

522

523
def collect_rules(*namespaces: ModuleType | Mapping[str, Any]) -> Iterable[Rule]:
12✔
524
    """Collects all @rules in the given namespaces.
525

526
    If no namespaces are given, collects all the @rules in the caller's module namespace.
527
    """
528

529
    if not namespaces:
12✔
530
        currentframe = inspect.currentframe()
12✔
531
        assert isinstance(currentframe, FrameType)
12✔
532
        caller_frame = currentframe.f_back
12✔
533
        assert isinstance(caller_frame, FrameType)
12✔
534

535
        global_items = caller_frame.f_globals
12✔
536
        namespaces = (global_items,)
12✔
537

538
    def iter_rules():
12✔
539
        for namespace in namespaces:
12✔
540
            mapping = namespace.__dict__ if isinstance(namespace, ModuleType) else namespace
12✔
541
            for item in mapping.values():
12✔
542
                if not callable(item):
12✔
543
                    continue
12✔
544
                rule = getattr(item, "rule", None)
12✔
545
                if isinstance(rule, TaskRule):
12✔
546
                    for input in rule.parameters.values():
12✔
547
                        if getattr(input, "__subsystem__", False):
12✔
548
                            yield from input.rules()
12✔
549
                        if getattr(input, "__subsystem_environment_aware__", False):
12✔
550
                            yield from input.subsystem.rules()
12✔
551
                    if getattr(rule.output_type, "__goal__", False):
12✔
552
                        yield from rule.output_type.subsystem_cls.rules()
12✔
553
                    yield rule
12✔
554

555
    return list(iter_rules())
12✔
556

557

558
@dataclass(frozen=True)
12✔
559
class TaskRule:
12✔
560
    """A Rule that runs a task function when all of its input selectors are satisfied.
561

562
    NB: This API is not meant for direct consumption. To create a `TaskRule` you should always
563
    prefer the `@rule` constructor.
564
    """
565

566
    output_type: type[Any]
12✔
567
    parameters: FrozenDict[str, type[Any]]
12✔
568
    awaitables: tuple[AwaitableConstraints, ...]
12✔
569
    masked_types: tuple[type[Any], ...]
12✔
570
    func: Callable
12✔
571
    canonical_name: str
12✔
572
    desc: str | None = None
12✔
573
    level: LogLevel = LogLevel.TRACE
12✔
574
    cacheable: bool = True
12✔
575
    polymorphic: bool = False
12✔
576

577
    def __str__(self):
12✔
578
        return "(name={}, {}, {!r}, {}, gets={})".format(
×
579
            getattr(self, "name", "<not defined>"),
580
            self.output_type.__name__,
581
            self.parameters.values(),
582
            self.func.__name__,
583
            self.awaitables,
584
        )
585

586

587
@dataclass(frozen=True)
12✔
588
class QueryRule:
12✔
589
    """A QueryRule declares that a given set of Params will be used to request an output type.
590

591
    Every callsite to `Scheduler.product_request` should have a corresponding QueryRule to ensure
592
    that the relevant portions of the RuleGraph are generated.
593
    """
594

595
    output_type: type[Any]
12✔
596
    input_types: tuple[type[Any], ...]
12✔
597

598
    def __init__(self, output_type: type[Any], input_types: Iterable[type[Any]]) -> None:
12✔
599
        object.__setattr__(self, "output_type", output_type)
12✔
600
        object.__setattr__(self, "input_types", tuple(input_types))
12✔
601

602

603
@dataclass(frozen=True)
12✔
604
class RuleIndex:
12✔
605
    """Holds a normalized index of Rules used to instantiate Nodes."""
606

607
    rules: FrozenOrderedSet[TaskRule]
12✔
608
    queries: FrozenOrderedSet[QueryRule]
12✔
609
    union_rules: FrozenOrderedSet[UnionRule]
12✔
610

611
    @classmethod
12✔
612
    def create(cls, rule_entries: Iterable[Rule | UnionRule]) -> RuleIndex:
12✔
613
        """Creates a RuleIndex with tasks indexed by their output type."""
614
        rules: OrderedSet[TaskRule] = OrderedSet()
12✔
615
        queries: OrderedSet[QueryRule] = OrderedSet()
12✔
616
        union_rules: OrderedSet[UnionRule] = OrderedSet()
12✔
617

618
        for entry in rule_entries:
12✔
619
            if isinstance(entry, TaskRule):
12✔
620
                rules.add(entry)
12✔
621
            elif isinstance(entry, UnionRule):
12✔
622
                union_rules.add(entry)
12✔
623
            elif isinstance(entry, QueryRule):
12✔
624
                queries.add(entry)
12✔
625
            elif hasattr(entry, "__call__"):
12✔
626
                rule = getattr(entry, "rule", None)
12✔
627
                if rule is None:
12✔
628
                    raise TypeError(f"Expected function {entry} to be decorated with @rule.")
×
629
                rules.add(rule)
12✔
630
            else:
UNCOV
631
                raise TypeError(
1✔
632
                    f"Rule entry {entry} had an unexpected type: {type(entry)}. Rules either "
633
                    "extend Rule or UnionRule, or are static functions decorated with @rule."
634
                )
635

636
        return RuleIndex(
12✔
637
            rules=FrozenOrderedSet(rules),
638
            queries=FrozenOrderedSet(queries),
639
            union_rules=FrozenOrderedSet(union_rules),
640
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc