• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 26057412189

18 May 2026 08:04PM UTC coverage: 88.7% (-0.09%) from 88.787%
26057412189

push

github

johanneswilm
Update channels_patch to get around fork/forkserver issue in Python 3.14

10675 of 12035 relevant lines covered (88.7%)

5.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.25
fiduswriter/testing/channels_patch.py
1
import asyncio
10✔
2
from functools import partial
10✔
3
import multiprocessing
10✔
4

5
# Python 3.14 changed the Linux default from "fork" to "forkserver".  The
6
# Channels live-server test case relies on the child process inheriting the
7
# parent's Django configuration, which only happens with "fork".  Restore the
8
# old default before any multiprocessing code runs.
9
try:
10✔
10
    multiprocessing.set_start_method("fork", force=True)
10✔
11
except (RuntimeError, AttributeError):
×
12
    pass
×
13

14
from django.test.utils import modify_settings
10✔
15

16
from channels.routing import get_default_application
10✔
17
from channels.testing import (
10✔
18
    ChannelsLiveServerTestCase as ChannelsLiveServerTestCaseBase,
19
)
20
from channels.testing.live import set_database_connection
10✔
21

22
# Modifications to get around https://github.com/django/channels/issues/2208 until fixed.
23

24
_server_command_queue = None
10✔
25

26

27
def clear_contenttype_cache():
10✔
28
    from django.contrib.contenttypes.models import ContentType
9✔
29

30
    ContentType.objects.clear_cache()
9✔
31

32

33
def make_application(*, static_wrapper, commands={}):
10✔
34
    # Module-level function for pickle-ability
35
    application = get_default_application()
10✔
36
    # Wrap the application with our command processing middleware
37
    application = ServerCommandMiddleware(application, commands)
10✔
38
    if static_wrapper is not None:
10✔
39
        application = static_wrapper(application)
10✔
40
    return application
10✔
41

42

43
class ServerCommandMiddleware:
10✔
44
    """
45
    Middleware that processes commands from the test process.
46
    This is automatically added to the ASGI application in test mode.
47
    """
48

49
    def __init__(self, app, commands):
10✔
50
        self.app = app
10✔
51
        self.commands = commands
10✔
52
        self._exception_handler_installed = False
10✔
53

54
    async def __call__(self, scope, receive, send):
10✔
55
        # Install a custom event loop exception handler on first request
56
        # to suppress CancelledError noise from asyncio/asgiref during tests.
57
        # This handles the "CancelledError exception in shielded future"
58
        # messages that occur when Selenium navigates away while the server
59
        # is still processing a request via sync_to_async.
60
        if not self._exception_handler_installed:
10✔
61
            self._install_exception_handler()
10✔
62
            self._exception_handler_installed = True
10✔
63

64
        # Process any pending server commands before handling the request
65
        self.process_server_commands()
10✔
66
        try:
10✔
67
            return await self.app(scope, receive, send)
10✔
68
        except asyncio.CancelledError:
×
69
            # Silently handle cancellation. This happens when the test
70
            # client (Selenium) navigates away or disconnects while the
71
            # server is still processing a previous request.
72
            pass
×
73

74
    def _install_exception_handler(self):
10✔
75
        """
76
        Set a custom asyncio event loop exception handler that suppresses
77
        CancelledError. The default handler logs these to stderr which
78
        produces noisy output in CI and can be mistaken for real errors.
79
        """
80
        try:
10✔
81
            loop = asyncio.get_running_loop()
10✔
82
        except RuntimeError:
×
83
            return
×
84

85
        original_handler = loop.get_exception_handler()
10✔
86

87
        def _custom_exception_handler(loop, context):
10✔
88
            exception = context.get("exception")
×
89
            if isinstance(exception, asyncio.CancelledError):
×
90
                return
×
91
            # For all other exceptions, use the original handler or default
92
            if original_handler is not None:
×
93
                original_handler(loop, context)
×
94
            else:
95
                loop.default_exception_handler(context)
×
96

97
        loop.set_exception_handler(_custom_exception_handler)
10✔
98

99
    def process_server_commands(self):
10✔
100
        global _server_command_queue
101
        if _server_command_queue is None:
10✔
102
            return
×
103

104
        while not _server_command_queue.empty():
10✔
105
            command = _server_command_queue.get_nowait()
9✔
106
            if command in self.commands:
9✔
107
                self.commands[command]()
9✔
108

109

110
class ChannelsLiveServerTestCase(ChannelsLiveServerTestCaseBase):
10✔
111
    commands = {"clear_contenttype_cache": clear_contenttype_cache}
10✔
112

113
    @classmethod
10✔
114
    def setUpClass(cls):
10✔
115
        global _server_command_queue
116

117
        # for connection in connections.all():
118
        #     if cls._is_in_memory_db(connection):
119
        #         raise ImproperlyConfigured(
120
        #             "ChannelLiveServerTestCase can not be used with in memory databases"
121
        #         )
122

123
        super(ChannelsLiveServerTestCaseBase, cls).setUpClass()
10✔
124

125
        cls._live_server_modified_settings = modify_settings(
10✔
126
            ALLOWED_HOSTS={"append": cls.host}
127
        )
128
        cls._live_server_modified_settings.enable()
10✔
129

130
        # Create a command queue for communication with the server process
131
        _server_command_queue = multiprocessing.Queue()
10✔
132
        cls._server_command_queue = _server_command_queue
10✔
133

134
        get_application = partial(
10✔
135
            make_application,
136
            static_wrapper=cls.static_wrapper if cls.serve_static else None,
137
            commands=cls.commands,
138
        )
139
        cls._server_process = cls.ProtocolServerProcess(
10✔
140
            cls.host,
141
            get_application,
142
            setup=set_database_connection,
143
        )
144
        cls._server_process.start()
10✔
145
        while True:
10✔
146
            if not cls._server_process.ready.wait(timeout=1):
10✔
147
                if cls._server_process.is_alive():
×
148
                    continue
×
149
                raise RuntimeError("Server stopped") from None
×
150
            break
10✔
151
        cls._port = cls._server_process.port.value
10✔
152

153
    def setUp(self):
10✔
154
        super().setUp()
9✔
155
        self.run_server_command("clear_contenttype_cache")
9✔
156

157
    def run_server_command(self, command):
10✔
158
        """
159
        Add command to server command queue.
160
        """
161
        if hasattr(self.__class__, "_server_command_queue"):
9✔
162
            self._server_command_queue.put(command)
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc