• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MrThearMan / undine / 16081423623

04 Jul 2025 09:57PM UTC coverage: 97.685%. First build
16081423623

Pull #33

github

web-flow
Merge 6eb57167c into 784a68391
Pull Request #33: Add Subscriptions

1798 of 1841 branches covered (97.66%)

Branch coverage included in aggregate %.

1009 of 1176 new or added lines in 36 files covered. (85.8%)

26853 of 27489 relevant lines covered (97.69%)

8.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/tests/test_end_to_end/test_graphql_over_websocket.py
1
from __future__ import annotations
9✔
2

3
import asyncio
9✔
4
from typing import AsyncGenerator
9✔
5

6
import pytest
9✔
7
from graphql import FormattedExecutionResult, GraphQLError, GraphQLFormattedError
9✔
8

9
from undine import Entrypoint, GQLInfo, RootType, create_schema
9✔
10
from undine.exceptions import GraphQLErrorGroup, GraphQLPermissionError
9✔
11

12
pytestmark = [
9✔
13
    pytest.mark.asyncio,
14
    pytest.mark.django_db(transaction=True),  # For sessions
15
]
16

17

18
async def test_graphql_over_websocket(graphql, undine_settings) -> None:
9✔
19
    class Query(RootType):
9✔
20
        @Entrypoint
9✔
21
        def test(self) -> str:
9✔
22
            return "Hello, World!"
9✔
23

24
    undine_settings.SCHEMA = create_schema(query=Query)
9✔
25

26
    query = "query { test }"
9✔
27

28
    async for response in graphql.over_websocket(query):
9✔
29
        assert response.data == {"test": "Hello, World!"}
9✔
30

31

32
async def test_graphql_over_websocket__subscription(graphql, undine_settings) -> None:
9✔
33
    undine_settings.ASYNC = True
9✔
34

35
    class Query(RootType):
9✔
36
        @Entrypoint
9✔
37
        def test(self) -> str:
9✔
NEW
38
            return "Hello, World!"
×
39

40
    class Subscription(RootType):
9✔
41
        @Entrypoint
9✔
42
        async def countdown(self) -> AsyncGenerator[int, None]:
9✔
43
            for i in range(3, 0, -1):
9✔
44
                await asyncio.sleep(0)
9✔
45
                yield i
9✔
46

47
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
48

49
    query = "subscription { countdown }"
9✔
50

51
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
52
    expected = [
9✔
53
        FormattedExecutionResult(data={"countdown": 3}),
54
        FormattedExecutionResult(data={"countdown": 2}),
55
        FormattedExecutionResult(data={"countdown": 1}),
56
    ]
57

58
    assert responses == expected
9✔
59

60

61
async def test_graphql_over_websocket__subscription__error(graphql, undine_settings) -> None:
9✔
62
    undine_settings.ASYNC = True
9✔
63

64
    class Query(RootType):
9✔
65
        @Entrypoint
9✔
66
        def test(self) -> str:
9✔
NEW
67
            return "Hello, World!"
×
68

69
    class Subscription(RootType):
9✔
70
        @Entrypoint
9✔
71
        async def countdown(self) -> AsyncGenerator[int, None]:
9✔
72
            for i in range(3, 0, -1):
9✔
73
                if i == 2:
9✔
74
                    msg = "Test error"
9✔
75
                    raise GraphQLError(msg)
9✔
76
                yield i
9✔
77

78
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
79

80
    query = "subscription { countdown }"
9✔
81

82
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
83
    expected = [
9✔
84
        FormattedExecutionResult(data={"countdown": 3}),
85
        FormattedExecutionResult(
86
            data=None,
87
            errors=[
88
                GraphQLFormattedError(message="Test error", path=["countdown"]),
89
            ],
90
        ),
91
    ]
92

93
    assert responses == expected
9✔
94

95

96
async def test_graphql_over_websocket__subscription__error__as_value(graphql, undine_settings) -> None:
9✔
97
    undine_settings.ASYNC = True
9✔
98

99
    class Query(RootType):
9✔
100
        @Entrypoint
9✔
101
        def test(self) -> str:
9✔
NEW
102
            return "Hello, World!"
×
103

104
    class Subscription(RootType):
9✔
105
        @Entrypoint
9✔
106
        async def countdown(self) -> AsyncGenerator[int | GraphQLError, None]:
9✔
107
            for i in range(3, 0, -1):
9✔
108
                if i == 2:
9✔
109
                    msg = "Test error"
9✔
110
                    yield GraphQLError(msg)
9✔
111
                else:
112
                    yield i
9✔
113

114
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
115

116
    query = "subscription { countdown }"
9✔
117

118
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
119
    expected = [
9✔
120
        FormattedExecutionResult(data={"countdown": 3}),
121
        FormattedExecutionResult(
122
            data=None,
123
            errors=[
124
                GraphQLFormattedError(message="Test error", path=["countdown"]),
125
            ],
126
        ),
127
        FormattedExecutionResult(data={"countdown": 1}),
128
    ]
129

130
    assert responses == expected
9✔
131

132

133
async def test_graphql_over_websocket__subscription__error_group(graphql, undine_settings) -> None:
9✔
134
    undine_settings.ASYNC = True
9✔
135

136
    class Query(RootType):
9✔
137
        @Entrypoint
9✔
138
        def test(self) -> str:
9✔
NEW
139
            return "Hello, World!"
×
140

141
    class Subscription(RootType):
9✔
142
        @Entrypoint
9✔
143
        async def countdown(self) -> AsyncGenerator[int, None]:
9✔
144
            for i in range(3, 0, -1):
9✔
145
                if i == 2:
9✔
146
                    msg_1 = "Test error"
9✔
147
                    msg_2 = "Real error"
9✔
148
                    error_1 = GraphQLError(msg_1)
9✔
149
                    error_2 = GraphQLError(msg_2)
9✔
150
                    raise GraphQLErrorGroup([error_1, error_2])
9✔
151
                yield i
9✔
152

153
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
154

155
    query = "subscription { countdown }"
9✔
156

157
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
158
    expected = [
9✔
159
        FormattedExecutionResult(data={"countdown": 3}),
160
        FormattedExecutionResult(
161
            data=None,
162
            errors=[
163
                GraphQLFormattedError(message="Test error", path=["countdown"]),
164
                GraphQLFormattedError(message="Real error", path=["countdown"]),
165
            ],
166
        ),
167
    ]
168

169
    assert responses == expected
9✔
170

171

172
async def test_graphql_over_websocket__subscription__error_group__as_value(graphql, undine_settings) -> None:
9✔
173
    undine_settings.ASYNC = True
9✔
174

175
    class Query(RootType):
9✔
176
        @Entrypoint
9✔
177
        def test(self) -> str:
9✔
NEW
178
            return "Hello, World!"
×
179

180
    class Subscription(RootType):
9✔
181
        @Entrypoint
9✔
182
        async def countdown(self) -> AsyncGenerator[int | GraphQLErrorGroup, None]:
9✔
183
            for i in range(3, 0, -1):
9✔
184
                if i == 2:
9✔
185
                    msg_1 = "Test error"
9✔
186
                    msg_2 = "Real error"
9✔
187
                    error_1 = GraphQLError(msg_1)
9✔
188
                    error_2 = GraphQLError(msg_2)
9✔
189
                    yield GraphQLErrorGroup([error_1, error_2])
9✔
190
                else:
191
                    yield i
9✔
192

193
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
194

195
    query = "subscription { countdown }"
9✔
196

197
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
198
    expected = [
9✔
199
        FormattedExecutionResult(data={"countdown": 3}),
200
        FormattedExecutionResult(
201
            data=None,
202
            errors=[
203
                GraphQLFormattedError(message="Test error", path=["countdown"]),
204
                GraphQLFormattedError(message="Real error", path=["countdown"]),
205
            ],
206
        ),
207
        FormattedExecutionResult(data={"countdown": 1}),
208
    ]
209

210
    assert responses == expected
9✔
211

212

213
async def test_graphql_over_websocket__subscription__error__permissions(graphql, undine_settings) -> None:
9✔
214
    undine_settings.ASYNC = True
9✔
215

216
    class Query(RootType):
9✔
217
        @Entrypoint
9✔
218
        def test(self) -> str:
9✔
NEW
219
            return "Hello, World!"
×
220

221
    class Subscription(RootType):
9✔
222
        @Entrypoint
9✔
223
        async def countdown(self) -> AsyncGenerator[int, None]:
9✔
224
            for i in range(3, 0, -1):
9✔
225
                yield i
9✔
226

227
        @countdown.permissions
9✔
228
        def countdown_permissions(self, info: GQLInfo, value: int) -> None:
9✔
229
            raise GraphQLPermissionError
9✔
230

231
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
232

233
    query = "subscription { countdown }"
9✔
234

235
    responses = [response.json async for response in graphql.over_websocket(query)]
9✔
236
    expected = [
9✔
237
        FormattedExecutionResult(
238
            data=None,
239
            errors=[
240
                GraphQLFormattedError(
241
                    message="Permission denied.",
242
                    path=["countdown"],
243
                    extensions={
244
                        "status_code": 403,
245
                        "error_code": "PERMISSION_DENIED",
246
                    },
247
                ),
248
            ],
249
        ),
250
    ]
251

252
    assert responses == expected
9✔
253

254

255
async def test_graphql_over_websocket__subscription__unsubscribe(graphql, undine_settings) -> None:
9✔
256
    undine_settings.ASYNC = True
9✔
257

258
    class Query(RootType):
9✔
259
        @Entrypoint
9✔
260
        def test(self) -> str:
9✔
NEW
261
            return "Hello, World!"
×
262

263
    counted: list[int] = []
9✔
264

265
    class Subscription(RootType):
9✔
266
        @Entrypoint
9✔
267
        async def countdown(self) -> AsyncGenerator[int, None]:
9✔
268
            nonlocal counted
269
            for i in range(100, 0, -1):
9✔
270
                await asyncio.sleep(0.001)
9✔
271
                counted.append(i)
9✔
272
                yield i
9✔
273

274
    undine_settings.SCHEMA = create_schema(query=Query, subscription=Subscription)
9✔
275

276
    operation_id = "1"
9✔
277
    body = {"query": "subscription { countdown }"}
9✔
278

279
    async with graphql.websocket() as websocket:
9✔
280
        await websocket.connection_init()
9✔
281

282
        # Subscribe
283
        result = await websocket.subscribe(body, operation_id=operation_id)
9✔
284
        assert result["type"] == "next"
9✔
285
        assert result["payload"] == FormattedExecutionResult(data={"countdown": 100})
9✔
286
        assert operation_id in websocket.consumer.handler.operations
9✔
287

288
        # Unsubscribe and wait it to take effect
289
        await websocket.unsubscribe(operation_id=operation_id)
9✔
290
        with pytest.raises(asyncio.CancelledError):
9✔
291
            await websocket.consumer.handler.operations[operation_id].task
9✔
292
        assert operation_id not in websocket.consumer.handler.operations
9✔
293

294
    # It's not guaranteed at which point the unsubscribe will complete (usually withing 1-2 messages
295
    # from the subscription), but it should be completed before the subscription is completed.
296
    assert len(counted) < 100
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc