• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 11877549041

17 Nov 2024 09:03AM UTC coverage: 91.937% (-0.3%) from 92.228%
11877549041

Pull #820

github

web-flow
Merge 1051fcd91 into e3acd0223
Pull Request #820: Use ThreadSelectorEventLoop on Windows with ProactorEventLoop

142 of 171 new or added lines in 2 files covered. (83.04%)

3 existing lines in 1 file now uncovered.

5051 of 5494 relevant lines covered (91.94%)

8.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.05
/src/anyio/_backends/_selector_thread.py
1
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2
Running select in a thread and defining these methods on the running event loop.
3
Originally in tornado.platform.asyncio.
4
Redistributed under license Apache-2.0
5
"""
6

7
from __future__ import annotations
2✔
8

9
import asyncio
2✔
10
import errno
2✔
11
import functools
2✔
12
import select
2✔
13
import socket
2✔
14
import threading
2✔
15
import typing
2✔
16
from typing import (
2✔
17
    Any,
18
    Callable,
19
    Union,
20
)
21
from weakref import WeakKeyDictionary
2✔
22

23
from ._asyncio import find_root_task
2✔
24

25
if typing.TYPE_CHECKING:
2✔
NEW
26
    from typing_extensions import Protocol
×
27

NEW
28
    class _HasFileno(Protocol):
×
NEW
29
        def fileno(self) -> int:
×
NEW
30
            pass
×
31

NEW
32
    _FileDescriptorLike = Union[int, _HasFileno]
×
33

34

35
# registry of asyncio loop : selector thread
36
_selectors: WeakKeyDictionary = WeakKeyDictionary()
2✔
37

38
# Collection of selector thread event loops to shut down on exit.
39
_selector_loops: set[SelectorThread] = set()
2✔
40

41

42
def _at_loop_close_callback(future: asyncio.Future) -> None:
2✔
43
    for loop in _selector_loops:
2✔
44
        with loop._select_cond:
2✔
45
            loop._closing_selector = True
2✔
46
            loop._select_cond.notify()
2✔
47
        try:
2✔
48
            loop._waker_w.send(b"a")
2✔
NEW
49
        except BlockingIOError:
×
NEW
50
            pass
×
51
        # If we don't join our (daemon) thread here, we may get a deadlock
52
        # during interpreter shutdown. I don't really understand why. This
53
        # deadlock happens every time in CI (both travis and appveyor) but
54
        # I've never been able to reproduce locally.
55
        assert loop._thread is not None
2✔
56
        loop._thread.join()
2✔
57
    _selector_loops.clear()
2✔
58

59

60
# SelectorThread from tornado 6.4.0
61
class SelectorThread:
2✔
62
    """Define ``add_reader`` methods to be called in a background select thread.
63

64
    Instances of this class start a second thread to run a selector.
65
    This thread is completely hidden from the user;
66
    all callbacks are run on the wrapped event loop's thread.
67

68
    Typically used via ``AddThreadSelectorEventLoop``,
69
    but can be attached to a running asyncio loop.
70
    """
71

72
    _closed = False
2✔
73

74
    def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
2✔
75
        self._real_loop = real_loop
2✔
76

77
        self._select_cond = threading.Condition()
2✔
78
        self._select_args: (
2✔
79
            tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
80
        ) = None
81
        self._closing_selector = False
2✔
82
        self._thread: threading.Thread | None = None
2✔
83

84
        self._readers: dict[_FileDescriptorLike, Callable] = {}
2✔
85
        self._writers: dict[_FileDescriptorLike, Callable] = {}
2✔
86

87
        # Writing to _waker_w will wake up the selector thread, which
88
        # watches for _waker_r to be readable.
89
        self._waker_r, self._waker_w = socket.socketpair()
2✔
90
        self._waker_r.setblocking(False)
2✔
91
        self._waker_w.setblocking(False)
2✔
92
        _selector_loops.add(self)
2✔
93
        self.add_reader(self._waker_r, self._consume_waker)
2✔
94
        self._thread_manager()
2✔
95

96
    def close(self) -> None:
2✔
97
        if self._closed:
2✔
NEW
98
            return
×
99
        with self._select_cond:
2✔
100
            self._closing_selector = True
2✔
101
            self._select_cond.notify()
2✔
102
        self._wake_selector()
2✔
103
        if self._thread is not None:
2✔
104
            self._thread.join()
2✔
105
        _selector_loops.discard(self)
2✔
106
        self.remove_reader(self._waker_r)
2✔
107
        self._waker_r.close()
2✔
108
        self._waker_w.close()
2✔
109
        self._closed = True
2✔
110

111
    def _thread_manager(self) -> None:
2✔
112
        # Create a thread to run the select system call. We manage this thread
113
        # manually so we can trigger a clean shutdown at loop teardown. Note
114
        # that due to the order of operations at shutdown, only daemon threads
115
        # can be shut down in this way (non-daemon threads would require the
116
        # introduction of a new hook: https://bugs.python.org/issue41962)
117
        self._thread = threading.Thread(
2✔
118
            name="AnyIO selector",
119
            daemon=True,
120
            target=self._run_select,
121
        )
122
        self._thread.start()
2✔
123
        self._start_select()
2✔
124

125
    def _wake_selector(self) -> None:
2✔
126
        if self._closed:
2✔
NEW
127
            return
×
128
        try:
2✔
129
            self._waker_w.send(b"a")
2✔
NEW
130
        except BlockingIOError:
×
NEW
131
            pass
×
132

133
    def _consume_waker(self) -> None:
2✔
134
        try:
2✔
135
            self._waker_r.recv(1024)
2✔
NEW
136
        except BlockingIOError:
×
NEW
137
            pass
×
138

139
    def _start_select(self) -> None:
2✔
140
        # Capture reader and writer sets here in the event loop
141
        # thread to avoid any problems with concurrent
142
        # modification while the select loop uses them.
143
        with self._select_cond:
2✔
144
            assert self._select_args is None
2✔
145
            self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
2✔
146
            self._select_cond.notify()
2✔
147

148
    def _run_select(self) -> None:
2✔
149
        while True:
1✔
150
            with self._select_cond:
2✔
151
                while self._select_args is None and not self._closing_selector:
2✔
152
                    self._select_cond.wait()
2✔
153
                if self._closing_selector:
2✔
154
                    return
2✔
155
                assert self._select_args is not None
2✔
156
                to_read, to_write = self._select_args
2✔
157
                self._select_args = None
2✔
158

159
            # We use the simpler interface of the select module instead of
160
            # the more stateful interface in the selectors module because
161
            # this class is only intended for use on windows, where
162
            # select.select is the only option. The selector interface
163
            # does not have well-documented thread-safety semantics that
164
            # we can rely on so ensuring proper synchronization would be
165
            # tricky.
166
            try:
2✔
167
                # On windows, selecting on a socket for write will not
168
                # return the socket when there is an error (but selecting
169
                # for reads works). Also select for errors when selecting
170
                # for writes, and merge the results.
171
                #
172
                # This pattern is also used in
173
                # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
174
                rs, ws, xs = select.select(to_read, to_write, to_write)
2✔
175
                ws = ws + xs
2✔
NEW
176
            except OSError as e:
×
177
                # After remove_reader or remove_writer is called, the file
178
                # descriptor may subsequently be closed on the event loop
179
                # thread. It's possible that this select thread hasn't
180
                # gotten into the select system call by the time that
181
                # happens in which case (at least on macOS), select may
182
                # raise a "bad file descriptor" error. If we get that
183
                # error, check and see if we're also being woken up by
184
                # polling the waker alone. If we are, just return to the
185
                # event loop and we'll get the updated set of file
186
                # descriptors on the next iteration. Otherwise, raise the
187
                # original error.
NEW
188
                if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
×
NEW
189
                    rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
×
NEW
190
                    if rs:
×
NEW
191
                        ws = []
×
192
                    else:
NEW
193
                        raise
×
194
                else:
NEW
195
                    raise
×
196

197
            try:
2✔
198
                self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
2✔
NEW
199
            except RuntimeError:
×
200
                # "Event loop is closed". Swallow the exception for
201
                # consistency with PollIOLoop (and logical consistency
202
                # with the fact that we can't guarantee that an
203
                # add_callback that completes without error will
204
                # eventually execute).
NEW
205
                pass
×
NEW
206
            except AttributeError:
×
207
                # ProactorEventLoop may raise this instead of RuntimeError
208
                # if call_soon_threadsafe races with a call to close().
209
                # Swallow it too for consistency.
NEW
210
                pass
×
211

212
    def _handle_select(
2✔
213
        self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
214
    ) -> None:
215
        for r in rs:
2✔
216
            self._handle_event(r, self._readers)
2✔
217
        for w in ws:
2✔
218
            self._handle_event(w, self._writers)
2✔
219
        self._start_select()
2✔
220

221
    def _handle_event(
2✔
222
        self,
223
        fd: _FileDescriptorLike,
224
        cb_map: dict[_FileDescriptorLike, Callable],
225
    ) -> None:
226
        try:
2✔
227
            callback = cb_map[fd]
2✔
228
        except KeyError:
2✔
229
            return
2✔
230
        callback()
2✔
231

232
    def add_reader(
2✔
233
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
234
    ) -> None:
235
        self._readers[fd] = functools.partial(callback, *args)
2✔
236
        self._wake_selector()
2✔
237

238
    def add_writer(
2✔
239
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
240
    ) -> None:
241
        self._writers[fd] = functools.partial(callback, *args)
2✔
242
        self._wake_selector()
2✔
243

244
    def remove_reader(self, fd: _FileDescriptorLike) -> bool:
2✔
245
        try:
2✔
246
            del self._readers[fd]
2✔
NEW
247
        except KeyError:
×
NEW
248
            return False
×
249
        self._wake_selector()
2✔
250
        return True
2✔
251

252
    def remove_writer(self, fd: _FileDescriptorLike) -> bool:
2✔
253
        try:
2✔
254
            del self._writers[fd]
2✔
NEW
255
        except KeyError:
×
NEW
256
            return False
×
257
        self._wake_selector()
2✔
258
        return True
2✔
259

260

261
def _get_selector_windows(
2✔
262
    asyncio_loop: asyncio.AbstractEventLoop,
263
) -> SelectorThread:
264
    """Get selector-compatible loop.
265

266
    Sets ``add_reader`` family of methods on the asyncio loop.
267

268
    Workaround Windows proactor removal of *reader methods.
269
    """
270

271
    if asyncio_loop in _selectors:
2✔
NEW
272
        return _selectors[asyncio_loop]
×
273

274
    find_root_task().add_done_callback(_at_loop_close_callback)
2✔
275
    selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop)
2✔
276

277
    # patch loop.close to also close the selector thread
278
    loop_close = asyncio_loop.close
2✔
279

280
    def _close_selector_and_loop() -> None:
2✔
281
        # restore original before calling selector.close,
282
        # which in turn calls eventloop.close!
283
        asyncio_loop.close = loop_close  # type: ignore[method-assign]
2✔
284
        _selectors.pop(asyncio_loop, None)
2✔
285
        selector_thread.close()
2✔
286
        asyncio_loop.close()
2✔
287

288
    asyncio_loop.close = _close_selector_and_loop  # type: ignore[method-assign]
2✔
289

290
    return selector_thread
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc