• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MrThearMan / undine / 16081423623

04 Jul 2025 09:57PM UTC coverage: 97.685%. First build
16081423623

Pull #33

github

web-flow
Merge 6eb57167c into 784a68391
Pull Request #33: Add Subscriptions

1798 of 1841 branches covered (97.66%)

Branch coverage included in aggregate %.

1009 of 1176 new or added lines in 36 files covered. (85.8%)

26853 of 27489 relevant lines covered (97.69%)

8.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.36
/undine/resolvers/subscription.py
1
from __future__ import annotations
9✔
2

3
import dataclasses
9✔
4
from collections.abc import AsyncGenerator
9✔
5
from contextlib import aclosing, nullcontext
9✔
6
from inspect import isawaitable
9✔
7
from typing import TYPE_CHECKING, Any
9✔
8

9
from graphql import GraphQLError, located_error
9✔
10

11
from undine.exceptions import GraphQLErrorGroup
9✔
12
from undine.utils.graphql.utils import pre_evaluate_request_user
9✔
13
from undine.utils.reflection import get_root_and_info_params
9✔
14

15
if TYPE_CHECKING:
16
    from collections.abc import AsyncIterable, Callable, Coroutine
17

18
    from undine import Entrypoint, GQLInfo
19
    from undine.subscription import SubscriptionType
20

21

22
__all__ = [
9✔
23
    "FunctionSubscriptionResolver",
24
    "SubscriptionTypeResolver",
25
    "SubscriptionValueResolver",
26
]
27

28

29
@dataclasses.dataclass(frozen=True, slots=True)
9✔
30
class SubscriptionValueResolver:
9✔
31
    """Resolves a value for a subscription."""
32

33
    def __call__(self, root: Any, info: GQLInfo, **kwargs: Any) -> Any:
9✔
34
        return root
9✔
35

36

37
@dataclasses.dataclass(frozen=True, slots=True)
9✔
38
class FunctionSubscriptionResolver:
9✔
39
    """Subscribes to an `Entrypoint` using the given function."""
40

41
    func: Callable[..., AsyncGenerator[Any, None] | Coroutine[Any, Any, AsyncIterable[Any]]]
9✔
42
    entrypoint: Entrypoint
9✔
43

44
    root_param: str | None = dataclasses.field(default=None, init=False)
9✔
45
    info_param: str | None = dataclasses.field(default=None, init=False)
9✔
46

47
    def __post_init__(self) -> None:
9✔
48
        params = get_root_and_info_params(self.func)
9✔
49
        object.__setattr__(self, "root_param", params.root_param)
9✔
50
        object.__setattr__(self, "info_param", params.info_param)
9✔
51

52
    def __call__(self, root: Any, info: GQLInfo, **kwargs: Any) -> AsyncIterable[Any]:
9✔
53
        return self.gen(root, info, **kwargs)
9✔
54

55
    async def gen(self, root: Any, info: GQLInfo, **kwargs: Any) -> AsyncIterable[Any]:
9✔
56
        # Fetch user eagerly so that its available e.g. for permission checks in synchronous parts of the code.
57
        await pre_evaluate_request_user(info)
9✔
58

59
        if self.root_param is not None:
9✔
60
            kwargs[self.root_param] = root
9✔
61
        if self.info_param is not None:
9✔
62
            kwargs[self.info_param] = info
9✔
63

64
        gen = self.func(**kwargs)
9✔
65
        if isawaitable(gen):
9✔
66
            gen = await gen
9✔
67

68
        manager = aclosing(gen) if isinstance(gen, AsyncGenerator) else nullcontext(gen)
9✔
69
        async with manager:
9✔
70
            try:
9✔
71
                async for result in gen:
9✔
72
                    if isinstance(result, GraphQLError):
9✔
73
                        yield located_error(result, nodes=info.field_nodes, path=info.path.as_list())
9✔
74
                        continue
9✔
75

76
                    if isinstance(result, GraphQLErrorGroup):
9✔
77
                        yield result.located(path=info.path.as_list())
9✔
78
                        continue
9✔
79

80
                    if self.entrypoint.permissions_func is not None:
9✔
81
                        self.entrypoint.permissions_func(root, info, result)
9✔
82

83
                    yield result
9✔
84

85
            except GraphQLErrorGroup as error:
9✔
86
                raise error.located(path=info.path.as_list()) from error
9✔
87

88
            except Exception as error:
9✔
89
                raise located_error(error, nodes=info.field_nodes, path=info.path.as_list()) from error
9✔
90

91

92
@dataclasses.dataclass(frozen=True, slots=True)
9✔
93
class SubscriptionTypeResolver:
9✔
94
    """Subscribes to an `Entrypoint` using the given function."""
95

96
    subscription_type: type[SubscriptionType]
9✔
97
    entrypoint: Entrypoint
9✔
98

99
    def __call__(self, root: Any, info: GQLInfo, **kwargs: Any) -> AsyncIterable[Any]:
9✔
100
        return self.gen(root, info, **kwargs)
9✔
101

102
    async def gen(self, root: Any, info: GQLInfo, **kwargs: Any) -> AsyncIterable[Any]:
9✔
103
        # Fetch user eagerly so that its available e.g. for permission checks in synchronous parts of the code.
104
        await pre_evaluate_request_user(info)
9✔
105

106
        subscription = self.subscription_type(**kwargs)
9✔
107

108
        gen = subscription.__run__(root, info)
9✔
109
        if isawaitable(gen):
9✔
NEW
110
            gen = await gen
×
111

112
        manager = aclosing(gen) if isinstance(gen, AsyncGenerator) else nullcontext(gen)
9✔
113
        async with manager:
9✔
114
            try:
9✔
115
                async for result in gen:
9✔
116
                    if isinstance(result, GraphQLError):
9✔
NEW
117
                        yield located_error(result, nodes=info.field_nodes, path=info.path.as_list())
×
NEW
118
                        continue
×
119

120
                    if isinstance(result, GraphQLErrorGroup):
9✔
NEW
121
                        yield result.located(path=info.path.as_list())
×
NEW
122
                        continue
×
123

124
                    if self.entrypoint.permissions_func is not None:
9✔
NEW
125
                        self.entrypoint.permissions_func(root, info, result)
×
126

127
                    yield result
9✔
128

NEW
129
            except GraphQLErrorGroup as error:
×
NEW
130
                raise error.located(path=info.path.as_list()) from error
×
131

NEW
132
            except Exception as error:
×
NEW
133
                raise located_error(error, nodes=info.field_nodes, path=info.path.as_list()) from error
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc