• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 24372686465

13 Apr 2026 11:42PM UTC coverage: 86.712% (+0.1%) from 86.612%
24372686465

Pull #1379

github

web-flow
Merge 2fde3a9ba into 729e9926f
Pull Request #1379: Bump pillow from 12.1.1 to 12.2.0 in /fiduswriter

7785 of 8978 relevant lines covered (86.71%)

5.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
fiduswriter/testing/channels_patch.py
1
import asyncio
5✔
2
from functools import partial
5✔
3
import multiprocessing
5✔
4

5
from django.test.utils import modify_settings
5✔
6
from django.contrib.contenttypes.models import ContentType
5✔
7

8
from channels.routing import get_default_application
5✔
9
from channels.testing import (
5✔
10
    ChannelsLiveServerTestCase as ChannelsLiveServerTestCaseBase,
11
)
12
from channels.testing.live import set_database_connection
5✔
13

14
# Modifications to get around https://github.com/django/channels/issues/2208 until fixed.
15

16
_server_command_queue = None
5✔
17

18

19
def clear_contenttype_cache():
5✔
20
    ContentType.objects.clear_cache()
4✔
21

22

23
def make_application(*, static_wrapper, commands={}):
5✔
24
    # Module-level function for pickle-ability
25
    application = get_default_application()
5✔
26
    # Wrap the application with our command processing middleware
27
    application = ServerCommandMiddleware(application, commands)
5✔
28
    if static_wrapper is not None:
5✔
29
        application = static_wrapper(application)
5✔
30
    return application
5✔
31

32

33
class ServerCommandMiddleware:
5✔
34
    """
35
    Middleware that processes commands from the test process.
36
    This is automatically added to the ASGI application in test mode.
37
    """
38

39
    def __init__(self, app, commands):
5✔
40
        self.app = app
5✔
41
        self.commands = commands
5✔
42
        self._exception_handler_installed = False
5✔
43

44
    async def __call__(self, scope, receive, send):
5✔
45
        # Install a custom event loop exception handler on first request
46
        # to suppress CancelledError noise from asyncio/asgiref during tests.
47
        # This handles the "CancelledError exception in shielded future"
48
        # messages that occur when Selenium navigates away while the server
49
        # is still processing a request via sync_to_async.
50
        if not self._exception_handler_installed:
5✔
51
            self._install_exception_handler()
5✔
52
            self._exception_handler_installed = True
5✔
53

54
        # Process any pending server commands before handling the request
55
        self.process_server_commands()
5✔
56
        try:
5✔
57
            return await self.app(scope, receive, send)
5✔
58
        except asyncio.CancelledError:
×
59
            # Silently handle cancellation. This happens when the test
60
            # client (Selenium) navigates away or disconnects while the
61
            # server is still processing a previous request.
62
            pass
×
63

64
    def _install_exception_handler(self):
5✔
65
        """
66
        Set a custom asyncio event loop exception handler that suppresses
67
        CancelledError. The default handler logs these to stderr which
68
        produces noisy output in CI and can be mistaken for real errors.
69
        """
70
        try:
5✔
71
            loop = asyncio.get_running_loop()
5✔
72
        except RuntimeError:
×
73
            return
×
74

75
        original_handler = loop.get_exception_handler()
5✔
76

77
        def _custom_exception_handler(loop, context):
5✔
78
            exception = context.get("exception")
×
79
            if isinstance(exception, asyncio.CancelledError):
×
80
                return
×
81
            # For all other exceptions, use the original handler or default
82
            if original_handler is not None:
×
83
                original_handler(loop, context)
×
84
            else:
85
                loop.default_exception_handler(context)
×
86

87
        loop.set_exception_handler(_custom_exception_handler)
5✔
88

89
    def process_server_commands(self):
5✔
90
        global _server_command_queue
91
        if _server_command_queue is None:
5✔
92
            return
×
93

94
        while not _server_command_queue.empty():
5✔
95
            command = _server_command_queue.get_nowait()
4✔
96
            if command in self.commands:
4✔
97
                self.commands[command]()
4✔
98

99

100
class ChannelsLiveServerTestCase(ChannelsLiveServerTestCaseBase):
5✔
101
    commands = {"clear_contenttype_cache": clear_contenttype_cache}
5✔
102

103
    @classmethod
5✔
104
    def setUpClass(cls):
5✔
105
        global _server_command_queue
106

107
        # for connection in connections.all():
108
        #     if cls._is_in_memory_db(connection):
109
        #         raise ImproperlyConfigured(
110
        #             "ChannelLiveServerTestCase can not be used with in memory databases"
111
        #         )
112

113
        super(ChannelsLiveServerTestCaseBase, cls).setUpClass()
5✔
114

115
        cls._live_server_modified_settings = modify_settings(
5✔
116
            ALLOWED_HOSTS={"append": cls.host}
117
        )
118
        cls._live_server_modified_settings.enable()
5✔
119

120
        # Create a command queue for communication with the server process
121
        _server_command_queue = multiprocessing.Queue()
5✔
122
        cls._server_command_queue = _server_command_queue
5✔
123

124
        get_application = partial(
5✔
125
            make_application,
126
            static_wrapper=cls.static_wrapper if cls.serve_static else None,
127
            commands=cls.commands,
128
        )
129
        cls._server_process = cls.ProtocolServerProcess(
5✔
130
            cls.host,
131
            get_application,
132
            setup=set_database_connection,
133
        )
134
        cls._server_process.start()
5✔
135
        while True:
5✔
136
            if not cls._server_process.ready.wait(timeout=1):
5✔
137
                if cls._server_process.is_alive():
×
138
                    continue
×
139
                raise RuntimeError("Server stopped") from None
×
140
            break
5✔
141
        cls._port = cls._server_process.port.value
5✔
142

143
    def setUp(self):
5✔
144
        super().setUp()
4✔
145
        self.run_server_command("clear_contenttype_cache")
4✔
146

147
    def run_server_command(self, command):
5✔
148
        """
149
        Add command to server command queue.
150
        """
151
        if hasattr(self.__class__, "_server_command_queue"):
4✔
152
            self._server_command_queue.put(command)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc