• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 11757328248

09 Nov 2024 03:18PM UTC coverage: 91.232%. First build
11757328248

Pull #820

github

web-flow
Merge 6719d6957 into 4d3dd2697
Pull Request #820: Use ThreadSelectorEventLoop on Windows with ProactorEventLoop

135 of 182 new or added lines in 2 files covered. (74.18%)

5005 of 5486 relevant lines covered (91.23%)

8.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.46
/src/anyio/_backends/_selector_thread.py
1
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2
Running select in a thread and defining these methods on the running event loop.
3
Originally in tornado.platform.asyncio.
4
Redistributed under license Apache-2.0
5
"""
6

7
from __future__ import annotations
2✔
8

9
import asyncio
2✔
10
import atexit
2✔
11
import errno
2✔
12
import functools
2✔
13
import select
2✔
14
import socket
2✔
15
import threading
2✔
16
import typing
2✔
17
from typing import (
2✔
18
    Any,
19
    Callable,
20
    Union,
21
)
22

23
if typing.TYPE_CHECKING:
2✔
NEW
24
    from typing_extensions import Protocol
×
25

NEW
26
    class _HasFileno(Protocol):
×
NEW
27
        def fileno(self) -> int:
×
NEW
28
            pass
×
29

NEW
30
    _FileDescriptorLike = Union[int, _HasFileno]
×
31

32

33
# Collection of selector thread event loops to shut down on exit.
34
_selector_loops: set[SelectorThread] = set()
2✔
35

36

37
def _atexit_callback() -> None:
2✔
NEW
38
    for loop in _selector_loops:
×
NEW
39
        with loop._select_cond:
×
NEW
40
            loop._closing_selector = True
×
NEW
41
            loop._select_cond.notify()
×
NEW
42
        try:
×
NEW
43
            loop._waker_w.send(b"a")
×
NEW
44
        except BlockingIOError:
×
NEW
45
            pass
×
46
        # If we don't join our (daemon) thread here, we may get a deadlock
47
        # during interpreter shutdown. I don't really understand why. This
48
        # deadlock happens every time in CI (both travis and appveyor) but
49
        # I've never been able to reproduce locally.
NEW
50
        assert loop._thread is not None
×
NEW
51
        loop._thread.join()
×
NEW
52
    _selector_loops.clear()
×
53

54

55
atexit.register(_atexit_callback)
2✔
56

57

58
# SelectorThread from tornado 6.4.0
59

60

61
class SelectorThread:
2✔
62
    """Define ``add_reader`` methods to be called in a background select thread.
63

64
    Instances of this class start a second thread to run a selector.
65
    This thread is completely hidden from the user;
66
    all callbacks are run on the wrapped event loop's thread.
67

68
    Typically used via ``AddThreadSelectorEventLoop``,
69
    but can be attached to a running asyncio loop.
70
    """
71

72
    _closed = False
2✔
73

74
    def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
2✔
75
        self._real_loop = real_loop
2✔
76

77
        self._select_cond = threading.Condition()
2✔
78
        self._select_args: (
2✔
79
            tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
80
        ) = None
81
        self._closing_selector = False
2✔
82
        self._thread: threading.Thread | None = None
2✔
83
        self._thread_manager_handle = self._thread_manager()
2✔
84

85
        async def thread_manager_anext() -> None:
2✔
86
            # the anext builtin wasn't added until 3.10. We just need to iterate
87
            # this generator one step.
88
            await self._thread_manager_handle.__anext__()
2✔
89

90
        # When the loop starts, start the thread. Not too soon because we can't
91
        # clean up if we get to this point but the event loop is closed without
92
        # starting.
93
        self._real_loop.call_soon(
2✔
94
            lambda: self._real_loop.create_task(thread_manager_anext())
95
        )
96

97
        self._readers: dict[_FileDescriptorLike, Callable] = {}
2✔
98
        self._writers: dict[_FileDescriptorLike, Callable] = {}
2✔
99

100
        # Writing to _waker_w will wake up the selector thread, which
101
        # watches for _waker_r to be readable.
102
        self._waker_r, self._waker_w = socket.socketpair()
2✔
103
        self._waker_r.setblocking(False)
2✔
104
        self._waker_w.setblocking(False)
2✔
105
        _selector_loops.add(self)
2✔
106
        self.add_reader(self._waker_r, self._consume_waker)
2✔
107

108
    def close(self) -> None:
2✔
109
        if self._closed:
2✔
110
            return
2✔
111
        with self._select_cond:
2✔
112
            self._closing_selector = True
2✔
113
            self._select_cond.notify()
2✔
114
        self._wake_selector()
2✔
115
        if self._thread is not None:
2✔
116
            self._thread.join()
2✔
117
        _selector_loops.discard(self)
2✔
118
        self.remove_reader(self._waker_r)
2✔
119
        self._waker_r.close()
2✔
120
        self._waker_w.close()
2✔
121
        self._closed = True
2✔
122

123
    async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
2✔
124
        # Create a thread to run the select system call. We manage this thread
125
        # manually so we can trigger a clean shutdown from an atexit hook. Note
126
        # that due to the order of operations at shutdown, only daemon threads
127
        # can be shut down in this way (non-daemon threads would require the
128
        # introduction of a new hook: https://bugs.python.org/issue41962)
129
        self._thread = threading.Thread(
2✔
130
            name="Tornado selector",
131
            daemon=True,
132
            target=self._run_select,
133
        )
134
        self._thread.start()
2✔
135
        self._start_select()
2✔
136
        try:
2✔
137
            # The presense of this yield statement means that this coroutine
138
            # is actually an asynchronous generator, which has a special
139
            # shutdown protocol. We wait at this yield point until the
140
            # event loop's shutdown_asyncgens method is called, at which point
141
            # we will get a GeneratorExit exception and can shut down the
142
            # selector thread.
143
            yield
2✔
144
        except GeneratorExit:
2✔
145
            self.close()
2✔
146
            raise
2✔
147

148
    def _wake_selector(self) -> None:
2✔
149
        if self._closed:
2✔
NEW
150
            return
×
151
        try:
2✔
152
            self._waker_w.send(b"a")
2✔
NEW
153
        except BlockingIOError:
×
NEW
154
            pass
×
155

156
    def _consume_waker(self) -> None:
2✔
157
        try:
2✔
158
            self._waker_r.recv(1024)
2✔
NEW
159
        except BlockingIOError:
×
NEW
160
            pass
×
161

162
    def _start_select(self) -> None:
2✔
163
        # Capture reader and writer sets here in the event loop
164
        # thread to avoid any problems with concurrent
165
        # modification while the select loop uses them.
166
        with self._select_cond:
2✔
167
            assert self._select_args is None
2✔
168
            self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
2✔
169
            self._select_cond.notify()
2✔
170

171
    def _run_select(self) -> None:
2✔
172
        while True:
1✔
173
            with self._select_cond:
2✔
174
                while self._select_args is None and not self._closing_selector:
2✔
175
                    self._select_cond.wait()
2✔
176
                if self._closing_selector:
2✔
177
                    return
2✔
178
                assert self._select_args is not None
2✔
179
                to_read, to_write = self._select_args
2✔
180
                self._select_args = None
2✔
181

182
            # We use the simpler interface of the select module instead of
183
            # the more stateful interface in the selectors module because
184
            # this class is only intended for use on windows, where
185
            # select.select is the only option. The selector interface
186
            # does not have well-documented thread-safety semantics that
187
            # we can rely on so ensuring proper synchronization would be
188
            # tricky.
189
            try:
2✔
190
                # On windows, selecting on a socket for write will not
191
                # return the socket when there is an error (but selecting
192
                # for reads works). Also select for errors when selecting
193
                # for writes, and merge the results.
194
                #
195
                # This pattern is also used in
196
                # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
197
                rs, ws, xs = select.select(to_read, to_write, to_write)
2✔
198
                ws = ws + xs
2✔
NEW
199
            except OSError as e:
×
200
                # After remove_reader or remove_writer is called, the file
201
                # descriptor may subsequently be closed on the event loop
202
                # thread. It's possible that this select thread hasn't
203
                # gotten into the select system call by the time that
204
                # happens in which case (at least on macOS), select may
205
                # raise a "bad file descriptor" error. If we get that
206
                # error, check and see if we're also being woken up by
207
                # polling the waker alone. If we are, just return to the
208
                # event loop and we'll get the updated set of file
209
                # descriptors on the next iteration. Otherwise, raise the
210
                # original error.
NEW
211
                if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
×
NEW
212
                    rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
×
NEW
213
                    if rs:
×
NEW
214
                        ws = []
×
215
                    else:
NEW
216
                        raise
×
217
                else:
NEW
218
                    raise
×
219

220
            try:
2✔
221
                self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
2✔
NEW
222
            except RuntimeError:
×
223
                # "Event loop is closed". Swallow the exception for
224
                # consistency with PollIOLoop (and logical consistency
225
                # with the fact that we can't guarantee that an
226
                # add_callback that completes without error will
227
                # eventually execute).
NEW
228
                pass
×
NEW
229
            except AttributeError:
×
230
                # ProactorEventLoop may raise this instead of RuntimeError
231
                # if call_soon_threadsafe races with a call to close().
232
                # Swallow it too for consistency.
NEW
233
                pass
×
234

235
    def _handle_select(
2✔
236
        self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
237
    ) -> None:
238
        for r in rs:
2✔
239
            self._handle_event(r, self._readers)
2✔
240
        for w in ws:
2✔
NEW
241
            self._handle_event(w, self._writers)
×
242
        self._start_select()
2✔
243

244
    def _handle_event(
2✔
245
        self,
246
        fd: _FileDescriptorLike,
247
        cb_map: dict[_FileDescriptorLike, Callable],
248
    ) -> None:
249
        try:
2✔
250
            callback = cb_map[fd]
2✔
251
        except KeyError:
2✔
252
            return
2✔
253
        callback()
2✔
254

255
    def add_reader(
2✔
256
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
257
    ) -> None:
258
        self._readers[fd] = functools.partial(callback, *args)
2✔
259
        self._wake_selector()
2✔
260

261
    def add_writer(
2✔
262
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
263
    ) -> None:
NEW
264
        self._writers[fd] = functools.partial(callback, *args)
×
NEW
265
        self._wake_selector()
×
266

267
    def remove_reader(self, fd: _FileDescriptorLike) -> bool:
2✔
268
        try:
2✔
269
            del self._readers[fd]
2✔
NEW
270
        except KeyError:
×
NEW
271
            return False
×
272
        self._wake_selector()
2✔
273
        return True
2✔
274

275
    def remove_writer(self, fd: _FileDescriptorLike) -> bool:
2✔
NEW
276
        try:
×
NEW
277
            del self._writers[fd]
×
NEW
278
        except KeyError:
×
NEW
279
            return False
×
NEW
280
        self._wake_selector()
×
NEW
281
        return True
×
282

283

284
# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0
285
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
2✔
286
    """Wrap an event loop to add implementations of the ``add_reader`` method family.
287

288
    Instances of this class start a second thread to run a selector.
289
    This thread is completely hidden from the user; all callbacks are
290
    run on the wrapped event loop's thread.
291

292
    This class is used automatically by Tornado; applications should not need
293
    to refer to it directly.
294

295
    It is safe to wrap any event loop with this class, although it only makes sense
296
    for event loops that do not implement the ``add_reader`` family of methods
297
    themselves (i.e. ``WindowsProactorEventLoop``)
298

299
    Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
300
    """
301

302
    # This class is a __getattribute__-based proxy. All attributes other than those
303
    # in this set are proxied through to the underlying loop.
304
    MY_ATTRIBUTES = {
2✔
305
        "_real_loop",
306
        "_selector",
307
        "add_reader",
308
        "add_writer",
309
        "close",
310
        "remove_reader",
311
        "remove_writer",
312
    }
313

314
    def __getattribute__(self, name: str) -> Any:
2✔
315
        if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
2✔
316
            return super().__getattribute__(name)
2✔
NEW
317
        return getattr(self._real_loop, name)
×
318

319
    def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
2✔
320
        self._real_loop = real_loop
2✔
321
        self._selector = SelectorThread(real_loop)
2✔
322

323
    def close(self) -> None:
2✔
324
        self._selector.close()
2✔
325
        self._real_loop.close()
2✔
326

327
    def add_reader(  # type: ignore[override]
2✔
328
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
329
    ) -> None:
330
        return self._selector.add_reader(fd, callback, *args)
2✔
331

332
    def add_writer(  # type: ignore[override]
2✔
333
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
334
    ) -> None:
NEW
335
        return self._selector.add_writer(fd, callback, *args)
×
336

337
    def remove_reader(self, fd: _FileDescriptorLike) -> bool:
2✔
338
        return self._selector.remove_reader(fd)
2✔
339

340
    def remove_writer(self, fd: _FileDescriptorLike) -> bool:
2✔
NEW
341
        return self._selector.remove_writer(fd)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc