• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 11890040966

18 Nov 2024 10:01AM UTC coverage: 92.023% (-0.3%) from 92.285%
11890040966

Pull #820

github

web-flow
Merge 872329a27 into 6224525ed
Pull Request #820: Use ThreadSelectorEventLoop on Windows with ProactorEventLoop

141 of 167 new or added lines in 2 files covered. (84.43%)

1 existing line in 1 file now uncovered.

5053 of 5491 relevant lines covered (92.02%)

8.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.67
/src/anyio/_backends/_selector_thread.py
1
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2
Running select in a thread and defining these methods on the running event loop.
3
Originally in tornado.platform.asyncio.
4
Redistributed under license Apache-2.0
5
"""
6

7
from __future__ import annotations
2✔
8

9
import asyncio
2✔
10
import errno
2✔
11
import functools
2✔
12
import select
2✔
13
import socket
2✔
14
import threading
2✔
15
import typing
2✔
16
from typing import (
2✔
17
    TYPE_CHECKING,
18
    Any,
19
    Callable,
20
    Union,
21
)
22
from weakref import WeakKeyDictionary
2✔
23

24
from ._asyncio import find_root_task
2✔
25

26
if TYPE_CHECKING:
2✔
NEW
27
    from _typeshed import HasFileno
×
28

NEW
29
    _FileDescriptorLike = HasFileno | int
×
30

31

32
# registry of asyncio loop : selector thread
33
_selectors: WeakKeyDictionary = WeakKeyDictionary()
2✔
34

35
# Collection of selector thread event loops to shut down on exit.
36
_selector_loops: set[SelectorThread] = set()
2✔
37

38

39
def _at_loop_close_callback(future: asyncio.Future) -> None:
2✔
40
    for loop in _selector_loops:
2✔
41
        with loop._select_cond:
2✔
42
            loop._closing_selector = True
2✔
43
            loop._select_cond.notify()
2✔
44
        try:
2✔
45
            loop._waker_w.send(b"a")
2✔
NEW
46
        except BlockingIOError:
×
NEW
47
            pass
×
48
        # If we don't join our (daemon) thread here, we may get a deadlock
49
        # during interpreter shutdown. I don't really understand why. This
50
        # deadlock happens every time in CI (both travis and appveyor) but
51
        # I've never been able to reproduce locally.
52
        assert loop._thread is not None
2✔
53
        loop._thread.join()
2✔
54
    _selector_loops.clear()
2✔
55

56

57
# SelectorThread from tornado 6.4.0
58
class SelectorThread:
2✔
59
    """Define ``add_reader`` methods to be called in a background select thread.
60

61
    Instances of this class start a second thread to run a selector.
62
    This thread is completely hidden from the user;
63
    all callbacks are run on the wrapped event loop's thread.
64

65
    Typically used via ``AddThreadSelectorEventLoop``,
66
    but can be attached to a running asyncio loop.
67
    """
68

69
    _closed = False
2✔
70

71
    def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
2✔
72
        self._real_loop = real_loop
2✔
73

74
        self._select_cond = threading.Condition()
2✔
75
        self._select_args: (
2✔
76
            tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
77
        ) = None
78
        self._closing_selector = False
2✔
79
        self._thread: threading.Thread | None = None
2✔
80

81
        self._readers: dict[_FileDescriptorLike, Callable] = {}
2✔
82
        self._writers: dict[_FileDescriptorLike, Callable] = {}
2✔
83

84
        # Writing to _waker_w will wake up the selector thread, which
85
        # watches for _waker_r to be readable.
86
        self._waker_r, self._waker_w = socket.socketpair()
2✔
87
        self._waker_r.setblocking(False)
2✔
88
        self._waker_w.setblocking(False)
2✔
89
        _selector_loops.add(self)
2✔
90
        self.add_reader(self._waker_r, self._consume_waker)
2✔
91
        self._thread_manager()
2✔
92

93
    def close(self) -> None:
2✔
94
        if self._closed:
2✔
NEW
95
            return
×
96
        with self._select_cond:
2✔
97
            self._closing_selector = True
2✔
98
            self._select_cond.notify()
2✔
99
        self._wake_selector()
2✔
100
        if self._thread is not None:
2✔
101
            self._thread.join()
2✔
102
        _selector_loops.discard(self)
2✔
103
        self.remove_reader(self._waker_r)
2✔
104
        self._waker_r.close()
2✔
105
        self._waker_w.close()
2✔
106
        self._closed = True
2✔
107

108
    def _thread_manager(self) -> None:
2✔
109
        # Create a thread to run the select system call. We manage this thread
110
        # manually so we can trigger a clean shutdown at loop teardown. Note
111
        # that due to the order of operations at shutdown, only daemon threads
112
        # can be shut down in this way (non-daemon threads would require the
113
        # introduction of a new hook: https://bugs.python.org/issue41962)
114
        self._thread = threading.Thread(
2✔
115
            name="AnyIO selector",
116
            daemon=True,
117
            target=self._run_select,
118
        )
119
        self._thread.start()
2✔
120
        self._start_select()
2✔
121

122
    def _wake_selector(self) -> None:
2✔
123
        if self._closed:
2✔
NEW
124
            return
×
125
        try:
2✔
126
            self._waker_w.send(b"a")
2✔
NEW
127
        except BlockingIOError:
×
NEW
128
            pass
×
129

130
    def _consume_waker(self) -> None:
2✔
131
        try:
2✔
132
            self._waker_r.recv(1024)
2✔
NEW
133
        except BlockingIOError:
×
NEW
134
            pass
×
135

136
    def _start_select(self) -> None:
2✔
137
        # Capture reader and writer sets here in the event loop
138
        # thread to avoid any problems with concurrent
139
        # modification while the select loop uses them.
140
        with self._select_cond:
2✔
141
            assert self._select_args is None
2✔
142
            self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
2✔
143
            self._select_cond.notify()
2✔
144

145
    def _run_select(self) -> None:
2✔
146
        while True:
1✔
147
            with self._select_cond:
2✔
148
                while self._select_args is None and not self._closing_selector:
2✔
149
                    self._select_cond.wait()
2✔
150
                if self._closing_selector:
2✔
151
                    return
2✔
152
                assert self._select_args is not None
2✔
153
                to_read, to_write = self._select_args
2✔
154
                self._select_args = None
2✔
155

156
            # We use the simpler interface of the select module instead of
157
            # the more stateful interface in the selectors module because
158
            # this class is only intended for use on windows, where
159
            # select.select is the only option. The selector interface
160
            # does not have well-documented thread-safety semantics that
161
            # we can rely on so ensuring proper synchronization would be
162
            # tricky.
163
            try:
2✔
164
                # On windows, selecting on a socket for write will not
165
                # return the socket when there is an error (but selecting
166
                # for reads works). Also select for errors when selecting
167
                # for writes, and merge the results.
168
                #
169
                # This pattern is also used in
170
                # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
171
                rs, ws, xs = select.select(to_read, to_write, to_write)
2✔
172
                ws = ws + xs
2✔
NEW
173
            except OSError as e:
×
174
                # After remove_reader or remove_writer is called, the file
175
                # descriptor may subsequently be closed on the event loop
176
                # thread. It's possible that this select thread hasn't
177
                # gotten into the select system call by the time that
178
                # happens in which case (at least on macOS), select may
179
                # raise a "bad file descriptor" error. If we get that
180
                # error, check and see if we're also being woken up by
181
                # polling the waker alone. If we are, just return to the
182
                # event loop and we'll get the updated set of file
183
                # descriptors on the next iteration. Otherwise, raise the
184
                # original error.
NEW
185
                if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
×
NEW
186
                    rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
×
NEW
187
                    if rs:
×
NEW
188
                        ws = []
×
189
                    else:
NEW
190
                        raise
×
191
                else:
NEW
192
                    raise
×
193

194
            try:
2✔
195
                self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
2✔
NEW
196
            except RuntimeError:
×
197
                # "Event loop is closed". Swallow the exception for
198
                # consistency with PollIOLoop (and logical consistency
199
                # with the fact that we can't guarantee that an
200
                # add_callback that completes without error will
201
                # eventually execute).
NEW
202
                pass
×
NEW
203
            except AttributeError:
×
204
                # ProactorEventLoop may raise this instead of RuntimeError
205
                # if call_soon_threadsafe races with a call to close().
206
                # Swallow it too for consistency.
NEW
207
                pass
×
208

209
    def _handle_select(
2✔
210
        self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
211
    ) -> None:
212
        for r in rs:
2✔
213
            self._handle_event(r, self._readers)
2✔
214
        for w in ws:
2✔
215
            self._handle_event(w, self._writers)
2✔
216
        self._start_select()
2✔
217

218
    def _handle_event(
2✔
219
        self,
220
        fd: _FileDescriptorLike,
221
        cb_map: dict[_FileDescriptorLike, Callable],
222
    ) -> None:
223
        try:
2✔
224
            callback = cb_map[fd]
2✔
225
        except KeyError:
2✔
226
            return
2✔
227
        callback()
2✔
228

229
    def add_reader(
2✔
230
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
231
    ) -> None:
232
        self._readers[fd] = functools.partial(callback, *args)
2✔
233
        self._wake_selector()
2✔
234

235
    def add_writer(
2✔
236
        self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
237
    ) -> None:
238
        self._writers[fd] = functools.partial(callback, *args)
2✔
239
        self._wake_selector()
2✔
240

241
    def remove_reader(self, fd: _FileDescriptorLike) -> bool:
2✔
242
        try:
2✔
243
            del self._readers[fd]
2✔
NEW
244
        except KeyError:
×
NEW
245
            return False
×
246
        self._wake_selector()
2✔
247
        return True
2✔
248

249
    def remove_writer(self, fd: _FileDescriptorLike) -> bool:
2✔
250
        try:
2✔
251
            del self._writers[fd]
2✔
NEW
252
        except KeyError:
×
NEW
253
            return False
×
254
        self._wake_selector()
2✔
255
        return True
2✔
256

257

258
def _get_selector_windows(
2✔
259
    asyncio_loop: asyncio.AbstractEventLoop,
260
) -> SelectorThread:
261
    """Get selector-compatible loop.
262

263
    Sets ``add_reader`` family of methods on the asyncio loop.
264

265
    Workaround Windows proactor removal of *reader methods.
266
    """
267

268
    if asyncio_loop in _selectors:
2✔
NEW
269
        return _selectors[asyncio_loop]
×
270

271
    find_root_task().add_done_callback(_at_loop_close_callback)
2✔
272
    selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop)
2✔
273

274
    # patch loop.close to also close the selector thread
275
    loop_close = asyncio_loop.close
2✔
276

277
    def _close_selector_and_loop() -> None:
2✔
278
        # restore original before calling selector.close,
279
        # which in turn calls eventloop.close!
280
        asyncio_loop.close = loop_close  # type: ignore[method-assign]
2✔
281
        _selectors.pop(asyncio_loop, None)
2✔
282
        selector_thread.close()
2✔
283
        asyncio_loop.close()
2✔
284

285
    asyncio_loop.close = _close_selector_and_loop  # type: ignore[method-assign]
2✔
286

287
    return selector_thread
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc