• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 26899214681

03 Jun 2026 04:42PM UTC coverage: 88.354% (+0.02%) from 88.33%
26899214681

push

github

johanneswilm
4.1.3

10887 of 12322 relevant lines covered (88.35%)

5.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.77
fiduswriter/testing/live_server.py
1
import asyncio
19✔
2
import multiprocessing
19✔
3
import socket
19✔
4
import threading
19✔
5
import time
19✔
6
import traceback as _traceback
19✔
7
import warnings
19✔
8
from functools import partial
19✔
9
from pathlib import Path
19✔
10
from django.conf import settings
19✔
11

12
# Python 3.14 changed the Linux default from "fork" to "forkserver".  The
13
# live-server test case relies on the child process inheriting the parent's
14
# Django configuration, which only happens with "fork".  Restore the old
15
# default before any multiprocessing code runs.
16
try:
19✔
17
    multiprocessing.set_start_method("fork", force=True)
19✔
18
except (RuntimeError, AttributeError):
×
19
    pass
×
20

21
from django.core.exceptions import ImproperlyConfigured
19✔
22
from django.db import connections
19✔
23
from django.db.backends.base.creation import TEST_DATABASE_PREFIX
19✔
24
from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler
19✔
25
from django.test.testcases import TransactionTestCase
19✔
26
from django.test.utils import modify_settings
19✔
27

28
from channels.routing import get_default_application
19✔
29

30

31
def _find_free_port(host):
19✔
32
    """Bind to port 0, let the OS pick a free port, return it."""
33
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
19✔
34
        s.bind((host, 0))
19✔
35
        return s.getsockname()[1]
19✔
36

37

38
def _wait_for_port(host, port, timeout=10):
19✔
39
    """Poll until the TCP port accepts connections or timeout expires."""
40
    deadline = time.monotonic() + timeout
19✔
41
    while time.monotonic() < deadline:
19✔
42
        try:
19✔
43
            with socket.create_connection((host, port), timeout=0.1):
19✔
44
                return True
19✔
45
        except (ConnectionRefusedError, OSError):
19✔
46
            time.sleep(0.05)
19✔
47
    return False
×
48

49

50
# ---------------------------------------------------------------------------
51
# Reimplementation of channels.testing.live.set_database_connection
52
#
53
# We cannot import from channels.testing at all: channels/testing/__init__.py
54
# unconditionally does `from .live import ChannelsLiveServerTestCase`, and
55
# live.py does `from daphne.testing import DaphneProcess` at module level.
56
# That blows up as soon as daphne is not installed – even if DaphneProcess is
57
# never used.  So we reproduce the tiny helpers we need here instead.
58
# ---------------------------------------------------------------------------
59

60

61
def set_database_connection():
19✔
62
    """
63
    Switch the default database to the test database.
64
    Called as a setup hook inside the server child process.
65
    """
66
    from django.conf import settings
19✔
67

68
    test_db_name = settings.DATABASES["default"]["TEST"].get("NAME")
19✔
69
    if not test_db_name:
19✔
70
        test_db_name = (
×
71
            TEST_DATABASE_PREFIX + settings.DATABASES["default"]["NAME"]
72
        )
73
    settings.DATABASES["default"]["NAME"] = test_db_name
19✔
74

75

76
# ---------------------------------------------------------------------------
77
# GranianProcess – drop-in replacement for daphne.testing.DaphneProcess
78
# ---------------------------------------------------------------------------
79

80

81
class GranianProcess(multiprocessing.Process):
19✔
82
    """
83
    A ``multiprocessing.Process`` subclass that boots an ASGI application
84
    using Granian instead of Daphne/Twisted.
85

86
    The public interface is intentionally identical to
87
    ``daphne.testing.DaphneProcess`` so that it can be used as a direct
88
    replacement::
89

90
        class ProtocolServerProcess = GranianProcess  # on the test-case class
91

92
    Attributes
93
    ----------
94
    port : multiprocessing.Value("i", …)
95
        Holds the actual TCP port the server is listening on.  Written by the
96
        child process once the listen socket is bound; readable by the parent.
97
    ready : multiprocessing.Event
98
        Set by the child as soon as the listen socket is bound and before the
99
        first worker is spawned.  The parent waits on this event.
100
    errors : multiprocessing.Queue
101
        Receives ``(exception, traceback_string)`` tuples if the child process
102
        fails to start.  The parent can check it after a failed ``ready``
103
        wait.
104
    """
105

106
    def __init__(
19✔
107
        self,
108
        host,
109
        get_application,
110
        kwargs=None,
111
        setup=None,
112
        teardown=None,
113
        port=None,
114
    ):
115
        super().__init__()
19✔
116
        self.host = host
19✔
117
        self.get_application = get_application
19✔
118
        # kwargs are accepted for interface compatibility but not forwarded to
119
        # Granian because the two servers have different parameter names.
120
        self._extra_kwargs = kwargs or {}
19✔
121
        self.setup = setup
19✔
122
        self.teardown = teardown
19✔
123
        self.port = multiprocessing.Value("i", port if port is not None else 0)
19✔
124
        self.ready = multiprocessing.Event()
19✔
125
        self.errors = multiprocessing.Queue()
19✔
126

127
    def run(self):
19✔
128
        """Entry point of the child process."""
129
        from granian import Granian
19✔
130
        from granian.constants import Interfaces
19✔
131

132
        try:
19✔
133
            if self.setup is not None:
19✔
134
                self.setup()
19✔
135

136
            # Granian requires a raw IP address (not a hostname like
137
            # "localhost"), so resolve it now.
138
            resolved_host = socket.gethostbyname(self.host)
19✔
139

140
            # Pre-select a free port and publish it to shared memory *before*
141
            # starting the server.  This guarantees the parent always reads a
142
            # valid port number — even if an exception is raised later.
143
            port = self.port.value or _find_free_port(self.host)
19✔
144
            self.port.value = port
19✔
145

146
            application = self.get_application()
19✔
147

148
            # Read MEDIA_ROOT after get_application() has fully bootstrapped
149
            # Django (via asgi.py), then let Granian serve media files directly
150
            # at the Rust level, avoiding Django's sync FileResponse entirely.
151

152
            media_root = getattr(settings, "MEDIA_ROOT", None)
19✔
153
            static_path_mount = [Path(media_root)] if media_root else None
19✔
154
            static_path_route = ["/media"] if media_root else None
19✔
155

156
            # Daemon thread: probe the TCP port and signal ready as soon as
157
            # the server accepts its first connection.  This avoids relying on
158
            # any Granian-internal hooks (_init_shared_socket, _sso, _sfd)
159
            # that may change across Granian versions.
160
            def _probe():
19✔
161
                if _wait_for_port(resolved_host, port):
19✔
162
                    self.ready.set()
19✔
163
                elif not self.ready.is_set():
×
164
                    self.errors.put(
×
165
                        (
166
                            RuntimeError(
167
                                f"Granian did not start on {self.host}:{port}"
168
                            ),
169
                            "",
170
                        )
171
                    )
172
                    self.ready.set()
×
173

174
            threading.Thread(target=_probe, daemon=True).start()
19✔
175

176
            Granian(
19✔
177
                target="__live_test_app__",
178
                address=resolved_host,
179
                port=port,
180
                interface=Interfaces.ASGI,
181
                workers=1,
182
                log_enabled=False,
183
                static_path_route=static_path_route,
184
                static_path_mount=static_path_mount,
185
            ).serve(target_loader=lambda _: application, wrap_loader=True)
186

187
        except BaseException as exc:
×
188
            self.errors.put((exc, _traceback.format_exc()))
×
189
            if not self.ready.is_set():
×
190
                self.ready.set()
×
191
        finally:
192
            try:
19✔
193
                if self.teardown is not None:
19✔
194
                    self.teardown()
×
195
            except Exception:
×
196
                pass
×
197

198

199
# ---------------------------------------------------------------------------
200
# Helpers used by ChannelsLiveServerTestCase
201
# ---------------------------------------------------------------------------
202

203
_server_command_queue = None
19✔
204

205

206
def clear_contenttype_cache():
19✔
207
    from django.contrib.contenttypes.models import ContentType
13✔
208

209
    ContentType.objects.clear_cache()
13✔
210

211

212
def make_application(*, static_wrapper, commands={}):
19✔
213
    # Module-level function for pickle-ability
214
    application = get_default_application()
19✔
215
    # Wrap the application with our command processing middleware
216
    application = ServerCommandMiddleware(application, commands)
19✔
217
    if static_wrapper is not None:
19✔
218
        application = static_wrapper(application)
19✔
219
    return application
19✔
220

221

222
class ServerCommandMiddleware:
19✔
223
    """
224
    Middleware that processes commands from the test process.
225
    This is automatically added to the ASGI application in test mode.
226
    """
227

228
    def __init__(self, app, commands):
19✔
229
        self.app = app
19✔
230
        self.commands = commands
19✔
231
        self._exception_handler_installed = False
19✔
232

233
    async def __call__(self, scope, receive, send):
19✔
234
        # Install a custom event loop exception handler on first request
235
        # to suppress CancelledError noise from asyncio/asgiref during tests.
236
        # This handles the "CancelledError exception in shielded future"
237
        # messages that occur when Selenium navigates away while the server
238
        # is still processing a request via sync_to_async.
239
        if not self._exception_handler_installed:
19✔
240
            self._install_exception_handler()
19✔
241
            self._exception_handler_installed = True
19✔
242

243
        # Process any pending server commands before handling the request
244
        self.process_server_commands()
19✔
245
        try:
19✔
246
            return await self.app(scope, receive, send)
19✔
247
        except asyncio.CancelledError:
×
248
            # Silently handle cancellation. This happens when the test
249
            # client (Selenium) navigates away or disconnects while the
250
            # server is still processing a previous request.
251
            pass
×
252

253
    def _install_exception_handler(self):
19✔
254
        """
255
        Set a custom asyncio event loop exception handler that suppresses
256
        CancelledError. The default handler logs these to stderr which
257
        produces noisy output in CI and can be mistaken for real errors.
258
        """
259
        try:
19✔
260
            loop = asyncio.get_running_loop()
19✔
261
        except RuntimeError:
×
262
            return
×
263

264
        original_handler = loop.get_exception_handler()
19✔
265

266
        def _custom_exception_handler(loop, context):
19✔
267
            exception = context.get("exception")
×
268
            if isinstance(exception, asyncio.CancelledError):
×
269
                return
×
270
            # For all other exceptions, use the original handler or default
271
            if original_handler is not None:
×
272
                original_handler(loop, context)
×
273
            else:
274
                loop.default_exception_handler(context)
×
275

276
        loop.set_exception_handler(_custom_exception_handler)
19✔
277

278
    def process_server_commands(self):
19✔
279
        global _server_command_queue
280
        if _server_command_queue is None:
19✔
281
            return
×
282

283
        while not _server_command_queue.empty():
19✔
284
            command = _server_command_queue.get_nowait()
13✔
285
            if command in self.commands:
13✔
286
                self.commands[command]()
13✔
287

288

289
# ---------------------------------------------------------------------------
290
# ChannelsLiveServerTestCase
291
#
292
# Reimplemented from scratch on top of Django's TransactionTestCase so that
293
# channels.testing (and therefore daphne) is never imported.
294
# ---------------------------------------------------------------------------
295

296

297
class ChannelsLiveServerTestCase(TransactionTestCase):
19✔
298
    """
299
    Equivalent to ``channels.testing.ChannelsLiveServerTestCase`` but backed
300
    by Granian instead of Daphne, and with no daphne dependency at import time.
301
    """
302

303
    host = "localhost"
19✔
304
    ProtocolServerProcess = GranianProcess
19✔
305
    static_wrapper = ASGIStaticFilesHandler
19✔
306
    serve_static = True
19✔
307
    commands = {"clear_contenttype_cache": clear_contenttype_cache}
19✔
308

309
    @property
19✔
310
    def live_server_url(self):
19✔
311
        return f"http://{self.host}:{self._port}"
19✔
312

313
    @property
19✔
314
    def live_server_ws_url(self):
19✔
315
        return f"ws://{self.host}:{self._port}"
×
316

317
    @classmethod
19✔
318
    def setUpClass(cls):
19✔
319
        for connection in connections.all():
19✔
320
            if cls._is_in_memory_db(connection):
19✔
321
                raise ImproperlyConfigured(
×
322
                    "ChannelsLiveServerTestCase cannot be used with "
323
                    "in-memory databases"
324
                )
325

326
        # Suppress Django's warning about sync StreamingHttpResponse
327
        # iterators served through the ASGI handler.  This is set before
328
        # the server child is forked so it is inherited by the child.
329
        warnings.filterwarnings(
19✔
330
            "ignore",
331
            message=".*StreamingHttpResponse must consume synchronous iterators.*",
332
            category=Warning,
333
        )
334

335
        super().setUpClass()
19✔
336

337
        cls._live_server_modified_settings = modify_settings(
19✔
338
            ALLOWED_HOSTS={"append": cls.host}
339
        )
340
        cls._live_server_modified_settings.enable()
19✔
341

342
        # Ensure MEDIA_ROOT exists so Granian's static mount doesn't crash.
343
        # On a fresh CI checkout the media/ directory is in .gitignore and
344
        # has never been created.  Create it here as an empty placeholder;
345
        # tests that upload files will populate it, and the tempdir below
346
        # would break image serving.
347
        media_root_path = Path(settings.MEDIA_ROOT)
19✔
348
        media_root_path.mkdir(parents=True, exist_ok=True)
19✔
349

350
        global _server_command_queue
351
        _server_command_queue = multiprocessing.Queue()
19✔
352
        cls._server_command_queue = _server_command_queue
19✔
353

354
        get_application = partial(
19✔
355
            make_application,
356
            static_wrapper=cls.static_wrapper if cls.serve_static else None,
357
            commands=cls.commands,
358
        )
359
        cls._server_process = cls.ProtocolServerProcess(
19✔
360
            cls.host,
361
            get_application,
362
            setup=set_database_connection,
363
        )
364
        cls._server_process.start()
19✔
365
        while True:
19✔
366
            if not cls._server_process.ready.wait(timeout=1):
19✔
367
                if cls._server_process.is_alive():
×
368
                    continue
×
369
                raise RuntimeError("Server stopped unexpectedly") from None
×
370
            break
19✔
371

372
        # Surface any startup error from the child process immediately,
373
        # rather than proceeding with port=0 and failing obscurely later.
374
        if not cls._server_process.errors.empty():
19✔
375
            _exc, tb = cls._server_process.errors.get()
×
376
            raise RuntimeError(
×
377
                f"Live server process failed to start:\n{tb}"
378
            ) from _exc
379

380
        cls._port = cls._server_process.port.value
19✔
381

382
    @classmethod
19✔
383
    def tearDownClass(cls):
19✔
384
        cls._server_process.terminate()
19✔
385
        cls._server_process.join()
19✔
386
        cls._live_server_modified_settings.disable()
19✔
387
        super().tearDownClass()
19✔
388

389
    @classmethod
19✔
390
    def _is_in_memory_db(cls, connection):
19✔
391
        if connection.vendor == "sqlite":
19✔
392
            return connection.is_in_memory_db()
19✔
393
        return False
×
394

395
    def setUp(self):
19✔
396
        super().setUp()
13✔
397
        self.run_server_command("clear_contenttype_cache")
13✔
398

399
    def run_server_command(self, command):
19✔
400
        """Send a command to be executed in the server process."""
401
        if hasattr(self.__class__, "_server_command_queue"):
13✔
402
            self._server_command_queue.put(command)
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc