• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fiduswriter / fiduswriter / 25809400794

13 May 2026 03:34PM UTC coverage: 89.092% (-0.02%) from 89.114%
25809400794

push

github

johanneswilm
Include encrypted images in get_documentlist_extra

10585 of 11881 relevant lines covered (89.09%)

5.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
fiduswriter/testing/channels_patch.py
1
import asyncio
10✔
2
from functools import partial
10✔
3
import multiprocessing
10✔
4

5
from django.test.utils import modify_settings
10✔
6
from django.contrib.contenttypes.models import ContentType
10✔
7

8
from channels.routing import get_default_application
10✔
9
from channels.testing import (
10✔
10
    ChannelsLiveServerTestCase as ChannelsLiveServerTestCaseBase,
11
)
12
from channels.testing.live import set_database_connection
10✔
13

14
# Modifications to get around https://github.com/django/channels/issues/2208 until fixed.
15

16
_server_command_queue = None
10✔
17

18

19
def clear_contenttype_cache():
10✔
20
    ContentType.objects.clear_cache()
9✔
21

22

23
def make_application(*, static_wrapper, commands={}):
10✔
24
    # Module-level function for pickle-ability
25
    application = get_default_application()
10✔
26
    # Wrap the application with our command processing middleware
27
    application = ServerCommandMiddleware(application, commands)
10✔
28
    if static_wrapper is not None:
10✔
29
        application = static_wrapper(application)
10✔
30
    return application
10✔
31

32

33
class ServerCommandMiddleware:
10✔
34
    """
35
    Middleware that processes commands from the test process.
36
    This is automatically added to the ASGI application in test mode.
37
    """
38

39
    def __init__(self, app, commands):
10✔
40
        self.app = app
10✔
41
        self.commands = commands
10✔
42
        self._exception_handler_installed = False
10✔
43

44
    async def __call__(self, scope, receive, send):
10✔
45
        # Install a custom event loop exception handler on first request
46
        # to suppress CancelledError noise from asyncio/asgiref during tests.
47
        # This handles the "CancelledError exception in shielded future"
48
        # messages that occur when Selenium navigates away while the server
49
        # is still processing a request via sync_to_async.
50
        if not self._exception_handler_installed:
10✔
51
            self._install_exception_handler()
10✔
52
            self._exception_handler_installed = True
10✔
53

54
        # Process any pending server commands before handling the request
55
        self.process_server_commands()
10✔
56
        try:
10✔
57
            return await self.app(scope, receive, send)
10✔
58
        except asyncio.CancelledError:
×
59
            # Silently handle cancellation. This happens when the test
60
            # client (Selenium) navigates away or disconnects while the
61
            # server is still processing a previous request.
62
            pass
×
63

64
    def _install_exception_handler(self):
10✔
65
        """
66
        Set a custom asyncio event loop exception handler that suppresses
67
        CancelledError. The default handler logs these to stderr which
68
        produces noisy output in CI and can be mistaken for real errors.
69
        """
70
        try:
10✔
71
            loop = asyncio.get_running_loop()
10✔
72
        except RuntimeError:
×
73
            return
×
74

75
        original_handler = loop.get_exception_handler()
10✔
76

77
        def _custom_exception_handler(loop, context):
10✔
78
            exception = context.get("exception")
1✔
79
            if isinstance(exception, asyncio.CancelledError):
1✔
80
                return
1✔
81
            # For all other exceptions, use the original handler or default
82
            if original_handler is not None:
×
83
                original_handler(loop, context)
×
84
            else:
85
                loop.default_exception_handler(context)
×
86

87
        loop.set_exception_handler(_custom_exception_handler)
10✔
88

89
    def process_server_commands(self):
10✔
90
        global _server_command_queue
91
        if _server_command_queue is None:
10✔
92
            return
×
93

94
        while not _server_command_queue.empty():
10✔
95
            command = _server_command_queue.get_nowait()
9✔
96
            if command in self.commands:
9✔
97
                self.commands[command]()
9✔
98

99

100
class ChannelsLiveServerTestCase(ChannelsLiveServerTestCaseBase):
10✔
101
    commands = {"clear_contenttype_cache": clear_contenttype_cache}
10✔
102

103
    @classmethod
10✔
104
    def setUpClass(cls):
10✔
105
        global _server_command_queue
106

107
        # for connection in connections.all():
108
        #     if cls._is_in_memory_db(connection):
109
        #         raise ImproperlyConfigured(
110
        #             "ChannelLiveServerTestCase can not be used with in memory databases"
111
        #         )
112

113
        super(ChannelsLiveServerTestCaseBase, cls).setUpClass()
10✔
114

115
        cls._live_server_modified_settings = modify_settings(
10✔
116
            ALLOWED_HOSTS={"append": cls.host}
117
        )
118
        cls._live_server_modified_settings.enable()
10✔
119

120
        # Create a command queue for communication with the server process
121
        _server_command_queue = multiprocessing.Queue()
10✔
122
        cls._server_command_queue = _server_command_queue
10✔
123

124
        get_application = partial(
10✔
125
            make_application,
126
            static_wrapper=cls.static_wrapper if cls.serve_static else None,
127
            commands=cls.commands,
128
        )
129
        cls._server_process = cls.ProtocolServerProcess(
10✔
130
            cls.host,
131
            get_application,
132
            setup=set_database_connection,
133
        )
134
        cls._server_process.start()
10✔
135
        while True:
10✔
136
            if not cls._server_process.ready.wait(timeout=1):
10✔
137
                if cls._server_process.is_alive():
×
138
                    continue
×
139
                raise RuntimeError("Server stopped") from None
×
140
            break
10✔
141
        cls._port = cls._server_process.port.value
10✔
142

143
    def setUp(self):
10✔
144
        super().setUp()
9✔
145
        self.run_server_command("clear_contenttype_cache")
9✔
146

147
    def run_server_command(self, command):
10✔
148
        """
149
        Add command to server command queue.
150
        """
151
        if hasattr(self.__class__, "_server_command_queue"):
9✔
152
            self._server_command_queue.put(command)
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc