• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 26837595127

02 Jun 2026 05:45PM UTC coverage: 88.314% (-0.1%) from 88.452%
26837595127

push

github

johanneswilm
translation update

10860 of 12297 relevant lines covered (88.31%)

5.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.11
fiduswriter/testing/live_server.py
1
import asyncio
19✔
2
import multiprocessing
19✔
3
import socket
19✔
4
import threading
19✔
5
import time
19✔
6
import traceback as _traceback
19✔
7
import warnings
19✔
8
from functools import partial
19✔
9
from pathlib import Path
19✔
10
from django.conf import settings
19✔
11

12
# Python 3.14 changed the Linux default from "fork" to "forkserver".  The
13
# live-server test case relies on the child process inheriting the parent's
14
# Django configuration, which only happens with "fork".  Restore the old
15
# default before any multiprocessing code runs.
16
try:
19✔
17
    multiprocessing.set_start_method("fork", force=True)
19✔
18
except (RuntimeError, AttributeError):
×
19
    pass
×
20

21
from django.core.exceptions import ImproperlyConfigured
19✔
22
from django.db import connections
19✔
23
from django.db.backends.base.creation import TEST_DATABASE_PREFIX
19✔
24
from django.test.testcases import TransactionTestCase
19✔
25
from django.test.utils import modify_settings
19✔
26

27
from channels.routing import get_default_application
19✔
28

29

30
def _find_free_port(host):
19✔
31
    """Bind to port 0, let the OS pick a free port, return it."""
32
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
19✔
33
        s.bind((host, 0))
19✔
34
        return s.getsockname()[1]
19✔
35

36

37
def _wait_for_port(host, port, timeout=10):
19✔
38
    """Poll until the TCP port accepts connections or timeout expires."""
39
    deadline = time.monotonic() + timeout
19✔
40
    while time.monotonic() < deadline:
19✔
41
        try:
19✔
42
            with socket.create_connection((host, port), timeout=0.1):
19✔
43
                return True
19✔
44
        except (ConnectionRefusedError, OSError):
19✔
45
            time.sleep(0.05)
19✔
46
    return False
×
47

48

49
# ---------------------------------------------------------------------------
50
# Reimplementation of channels.testing.live.set_database_connection
51
#
52
# We cannot import from channels.testing at all: channels/testing/__init__.py
53
# unconditionally does `from .live import ChannelsLiveServerTestCase`, and
54
# live.py does `from daphne.testing import DaphneProcess` at module level.
55
# That blows up as soon as daphne is not installed – even if DaphneProcess is
56
# never used.  So we reproduce the tiny helpers we need here instead.
57
# ---------------------------------------------------------------------------
58

59

60
def set_database_connection():
19✔
61
    """
62
    Switch the default database to the test database.
63
    Called as a setup hook inside the server child process.
64
    """
65
    from django.conf import settings
19✔
66

67
    test_db_name = settings.DATABASES["default"]["TEST"].get("NAME")
19✔
68
    if not test_db_name:
19✔
69
        test_db_name = (
×
70
            TEST_DATABASE_PREFIX + settings.DATABASES["default"]["NAME"]
71
        )
72
    settings.DATABASES["default"]["NAME"] = test_db_name
19✔
73

74

75
# ---------------------------------------------------------------------------
76
# GranianProcess – drop-in replacement for daphne.testing.DaphneProcess
77
# ---------------------------------------------------------------------------
78

79

80
class GranianProcess(multiprocessing.Process):
19✔
81
    """
82
    A ``multiprocessing.Process`` subclass that boots an ASGI application
83
    using Granian instead of Daphne/Twisted.
84

85
    The public interface is intentionally identical to
86
    ``daphne.testing.DaphneProcess`` so that it can be used as a direct
87
    replacement::
88

89
        class ProtocolServerProcess = GranianProcess  # on the test-case class
90

91
    Attributes
92
    ----------
93
    port : multiprocessing.Value("i", …)
94
        Holds the actual TCP port the server is listening on.  Written by the
95
        child process once the listen socket is bound; readable by the parent.
96
    ready : multiprocessing.Event
97
        Set by the child as soon as the listen socket is bound and before the
98
        first worker is spawned.  The parent waits on this event.
99
    errors : multiprocessing.Queue
100
        Receives ``(exception, traceback_string)`` tuples if the child process
101
        fails to start.  The parent can check it after a failed ``ready``
102
        wait.
103
    """
104

105
    def __init__(
19✔
106
        self,
107
        host,
108
        get_application,
109
        kwargs=None,
110
        setup=None,
111
        teardown=None,
112
        port=None,
113
    ):
114
        super().__init__()
19✔
115
        self.host = host
19✔
116
        self.get_application = get_application
19✔
117
        # kwargs are accepted for interface compatibility but not forwarded to
118
        # Granian because the two servers have different parameter names.
119
        self._extra_kwargs = kwargs or {}
19✔
120
        self.setup = setup
19✔
121
        self.teardown = teardown
19✔
122
        self.port = multiprocessing.Value("i", port if port is not None else 0)
19✔
123
        self.ready = multiprocessing.Event()
19✔
124
        self.errors = multiprocessing.Queue()
19✔
125

126
    def run(self):
19✔
127
        """Entry point of the child process."""
128
        from granian import Granian
19✔
129
        from granian.constants import Interfaces
19✔
130

131
        try:
19✔
132
            if self.setup is not None:
19✔
133
                self.setup()
19✔
134

135
            # Granian requires a raw IP address (not a hostname like
136
            # "localhost"), so resolve it now.
137
            resolved_host = socket.gethostbyname(self.host)
19✔
138

139
            # Pre-select a free port and publish it to shared memory *before*
140
            # starting the server.  This guarantees the parent always reads a
141
            # valid port number — even if an exception is raised later.
142
            port = self.port.value or _find_free_port(self.host)
19✔
143
            self.port.value = port
19✔
144

145
            application = self.get_application()
19✔
146

147
            # Read MEDIA_ROOT after get_application() has fully bootstrapped
148
            # Django (via asgi.py), then let Granian serve media files directly
149
            # at the Rust level, avoiding Django's sync FileResponse entirely.
150

151
            media_root = getattr(settings, "MEDIA_ROOT", None)
19✔
152
            static_path_mount = [Path(media_root)] if media_root else None
19✔
153
            static_path_route = ["/media"] if media_root else None
19✔
154

155
            # Daemon thread: probe the TCP port and signal ready as soon as
156
            # the server accepts its first connection.  This avoids relying on
157
            # any Granian-internal hooks (_init_shared_socket, _sso, _sfd)
158
            # that may change across Granian versions.
159
            def _probe():
19✔
160
                if _wait_for_port(resolved_host, port):
19✔
161
                    self.ready.set()
19✔
162
                elif not self.ready.is_set():
×
163
                    self.errors.put(
×
164
                        (
165
                            RuntimeError(
166
                                f"Granian did not start on {self.host}:{port}"
167
                            ),
168
                            "",
169
                        )
170
                    )
171
                    self.ready.set()
×
172

173
            threading.Thread(target=_probe, daemon=True).start()
19✔
174

175
            Granian(
19✔
176
                target="__live_test_app__",
177
                address=resolved_host,
178
                port=port,
179
                interface=Interfaces.ASGI,
180
                workers=1,
181
                log_enabled=False,
182
                static_path_route=static_path_route,
183
                static_path_mount=static_path_mount,
184
            ).serve(target_loader=lambda _: application, wrap_loader=True)
185

186
        except BaseException as exc:
×
187
            self.errors.put((exc, _traceback.format_exc()))
×
188
            if not self.ready.is_set():
×
189
                self.ready.set()
×
190
        finally:
191
            try:
19✔
192
                if self.teardown is not None:
19✔
193
                    self.teardown()
×
194
            except Exception:
×
195
                pass
×
196

197

198
# ---------------------------------------------------------------------------
199
# Helpers used by ChannelsLiveServerTestCase
200
# ---------------------------------------------------------------------------
201

202
_server_command_queue = None
19✔
203

204

205
def clear_contenttype_cache():
19✔
206
    from django.contrib.contenttypes.models import ContentType
13✔
207

208
    ContentType.objects.clear_cache()
13✔
209

210

211
def make_application(*, static_wrapper, commands={}):
19✔
212
    # Module-level function for pickle-ability
213
    application = get_default_application()
19✔
214
    # Wrap the application with our command processing middleware
215
    application = ServerCommandMiddleware(application, commands)
19✔
216
    if static_wrapper is not None:
19✔
217
        application = static_wrapper(application)
×
218
    return application
19✔
219

220

221
class ServerCommandMiddleware:
19✔
222
    """
223
    Middleware that processes commands from the test process.
224
    This is automatically added to the ASGI application in test mode.
225
    """
226

227
    def __init__(self, app, commands):
19✔
228
        self.app = app
19✔
229
        self.commands = commands
19✔
230
        self._exception_handler_installed = False
19✔
231

232
    async def __call__(self, scope, receive, send):
19✔
233
        # Install a custom event loop exception handler on first request
234
        # to suppress CancelledError noise from asyncio/asgiref during tests.
235
        # This handles the "CancelledError exception in shielded future"
236
        # messages that occur when Selenium navigates away while the server
237
        # is still processing a request via sync_to_async.
238
        if not self._exception_handler_installed:
19✔
239
            self._install_exception_handler()
19✔
240
            self._exception_handler_installed = True
19✔
241

242
        # Process any pending server commands before handling the request
243
        self.process_server_commands()
19✔
244
        try:
19✔
245
            return await self.app(scope, receive, send)
19✔
246
        except asyncio.CancelledError:
×
247
            # Silently handle cancellation. This happens when the test
248
            # client (Selenium) navigates away or disconnects while the
249
            # server is still processing a previous request.
250
            pass
×
251

252
    def _install_exception_handler(self):
19✔
253
        """
254
        Set a custom asyncio event loop exception handler that suppresses
255
        CancelledError. The default handler logs these to stderr which
256
        produces noisy output in CI and can be mistaken for real errors.
257
        """
258
        try:
19✔
259
            loop = asyncio.get_running_loop()
19✔
260
        except RuntimeError:
×
261
            return
×
262

263
        original_handler = loop.get_exception_handler()
19✔
264

265
        def _custom_exception_handler(loop, context):
19✔
266
            exception = context.get("exception")
×
267
            if isinstance(exception, asyncio.CancelledError):
×
268
                return
×
269
            # For all other exceptions, use the original handler or default
270
            if original_handler is not None:
×
271
                original_handler(loop, context)
×
272
            else:
273
                loop.default_exception_handler(context)
×
274

275
        loop.set_exception_handler(_custom_exception_handler)
19✔
276

277
    def process_server_commands(self):
19✔
278
        global _server_command_queue
279
        if _server_command_queue is None:
19✔
280
            return
×
281

282
        while not _server_command_queue.empty():
19✔
283
            command = _server_command_queue.get_nowait()
13✔
284
            if command in self.commands:
13✔
285
                self.commands[command]()
13✔
286

287

288
# ---------------------------------------------------------------------------
289
# ChannelsLiveServerTestCase
290
#
291
# Reimplemented from scratch on top of Django's TransactionTestCase so that
292
# channels.testing (and therefore daphne) is never imported.
293
# ---------------------------------------------------------------------------
294

295

296
class ChannelsLiveServerTestCase(TransactionTestCase):
19✔
297
    """
298
    Equivalent to ``channels.testing.ChannelsLiveServerTestCase`` but backed
299
    by Granian instead of Daphne, and with no daphne dependency at import time.
300
    """
301

302
    host = "localhost"
19✔
303
    ProtocolServerProcess = GranianProcess
19✔
304
    static_wrapper = None
19✔
305
    serve_static = False
19✔
306
    commands = {"clear_contenttype_cache": clear_contenttype_cache}
19✔
307

308
    @property
19✔
309
    def live_server_url(self):
19✔
310
        return f"http://{self.host}:{self._port}"
19✔
311

312
    @property
19✔
313
    def live_server_ws_url(self):
19✔
314
        return f"ws://{self.host}:{self._port}"
×
315

316
    @classmethod
19✔
317
    def setUpClass(cls):
19✔
318
        for connection in connections.all():
19✔
319
            if cls._is_in_memory_db(connection):
19✔
320
                raise ImproperlyConfigured(
×
321
                    "ChannelsLiveServerTestCase cannot be used with "
322
                    "in-memory databases"
323
                )
324

325
        # Suppress Django's warning about sync StreamingHttpResponse
326
        # iterators served through the ASGI handler.  This is set before
327
        # the server child is forked so it is inherited by the child.
328
        warnings.filterwarnings(
19✔
329
            "ignore",
330
            message=".*StreamingHttpResponse must consume synchronous iterators.*",
331
            category=Warning,
332
        )
333

334
        super().setUpClass()
19✔
335

336
        cls._live_server_modified_settings = modify_settings(
19✔
337
            ALLOWED_HOSTS={"append": cls.host}
338
        )
339
        cls._live_server_modified_settings.enable()
19✔
340

341
        # Ensure MEDIA_ROOT exists so Granian's static mount doesn't crash.
342
        # On a fresh CI checkout the media/ directory is in .gitignore and
343
        # has never been created.  Create it here as an empty placeholder;
344
        # tests that upload files will populate it, and the tempdir below
345
        # would break image serving.
346
        media_root_path = Path(settings.MEDIA_ROOT)
19✔
347
        media_root_path.mkdir(parents=True, exist_ok=True)
19✔
348

349
        global _server_command_queue
350
        _server_command_queue = multiprocessing.Queue()
19✔
351
        cls._server_command_queue = _server_command_queue
19✔
352

353
        get_application = partial(
19✔
354
            make_application,
355
            static_wrapper=cls.static_wrapper if cls.serve_static else None,
356
            commands=cls.commands,
357
        )
358
        cls._server_process = cls.ProtocolServerProcess(
19✔
359
            cls.host,
360
            get_application,
361
            setup=set_database_connection,
362
        )
363
        cls._server_process.start()
19✔
364
        while True:
19✔
365
            if not cls._server_process.ready.wait(timeout=1):
19✔
366
                if cls._server_process.is_alive():
×
367
                    continue
×
368
                raise RuntimeError("Server stopped unexpectedly") from None
×
369
            break
19✔
370

371
        # Surface any startup error from the child process immediately,
372
        # rather than proceeding with port=0 and failing obscurely later.
373
        if not cls._server_process.errors.empty():
19✔
374
            _exc, tb = cls._server_process.errors.get()
×
375
            raise RuntimeError(
×
376
                f"Live server process failed to start:\n{tb}"
377
            ) from _exc
378

379
        cls._port = cls._server_process.port.value
19✔
380

381
    @classmethod
19✔
382
    def tearDownClass(cls):
19✔
383
        cls._server_process.terminate()
19✔
384
        cls._server_process.join()
19✔
385
        cls._live_server_modified_settings.disable()
19✔
386
        super().tearDownClass()
19✔
387

388
    @classmethod
19✔
389
    def _is_in_memory_db(cls, connection):
19✔
390
        if connection.vendor == "sqlite":
19✔
391
            return connection.is_in_memory_db()
19✔
392
        return False
×
393

394
    def setUp(self):
19✔
395
        super().setUp()
13✔
396
        self.run_server_command("clear_contenttype_cache")
13✔
397

398
    def run_server_command(self, command):
19✔
399
        """Send a command to be executed in the server process."""
400
        if hasattr(self.__class__, "_server_command_queue"):
13✔
401
            self._server_command_queue.put(command)
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc