• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 9723180092

29 Jun 2024 09:30AM UTC coverage: 91.488% (-0.04%) from 91.523%
9723180092

Pull #752

github

web-flow
Merge 8d9a0243c into 3ecc42273
Pull Request #752: Fixed feed_data after feed_eof assertion errors on asyncio

4 of 5 new or added lines in 1 file covered. (80.0%)

1 existing line in 1 file now uncovered.

4568 of 4993 relevant lines covered (91.49%)

8.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.22
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
10✔
2

3
import array
10✔
4
import asyncio
10✔
5
import concurrent.futures
10✔
6
import math
10✔
7
import socket
10✔
8
import sys
10✔
9
import threading
10✔
10
import weakref
10✔
11
from asyncio import (
10✔
12
    AbstractEventLoop,
13
    CancelledError,
14
    all_tasks,
15
    create_task,
16
    current_task,
17
    get_running_loop,
18
    sleep,
19
)
20
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
10✔
21
from collections import OrderedDict, deque
10✔
22
from collections.abc import AsyncIterator, Generator, Iterable
10✔
23
from concurrent.futures import Future
10✔
24
from contextlib import suppress
10✔
25
from contextvars import Context, copy_context
10✔
26
from dataclasses import dataclass, field
10✔
27
from functools import partial, wraps
10✔
28
from inspect import (
10✔
29
    CORO_RUNNING,
30
    CORO_SUSPENDED,
31
    getcoroutinestate,
32
    iscoroutine,
33
)
34
from io import IOBase
10✔
35
from os import PathLike
10✔
36
from queue import Queue
10✔
37
from signal import Signals
10✔
38
from socket import AddressFamily, SocketKind
10✔
39
from threading import Thread
10✔
40
from types import TracebackType
10✔
41
from typing import (
10✔
42
    IO,
43
    Any,
44
    AsyncGenerator,
45
    Awaitable,
46
    Callable,
47
    Collection,
48
    ContextManager,
49
    Coroutine,
50
    Mapping,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
10✔
58

59
import sniffio
10✔
60

61
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
10✔
62
from .._core._eventloop import claim_worker_thread, threadlocals
10✔
63
from .._core._exceptions import (
10✔
64
    BrokenResourceError,
65
    BusyResourceError,
66
    ClosedResourceError,
67
    EndOfStream,
68
    WouldBlock,
69
)
70
from .._core._sockets import convert_ipv6_sockaddr
10✔
71
from .._core._streams import create_memory_object_stream
10✔
72
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
10✔
73
from .._core._synchronization import Event as BaseEvent
10✔
74
from .._core._synchronization import ResourceGuard
10✔
75
from .._core._tasks import CancelScope as BaseCancelScope
10✔
76
from ..abc import (
10✔
77
    AsyncBackend,
78
    IPSockAddrType,
79
    SocketListener,
80
    UDPPacketType,
81
    UNIXDatagramPacketType,
82
)
83
from ..lowlevel import RunVar
10✔
84
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10✔
85

86
if sys.version_info >= (3, 10):
10✔
87
    from typing import ParamSpec
6✔
88
else:
89
    from typing_extensions import ParamSpec
4✔
90

91
if sys.version_info >= (3, 11):
10✔
92
    from asyncio import Runner
4✔
93
    from typing import TypeVarTuple, Unpack
4✔
94
else:
95
    import contextvars
6✔
96
    import enum
6✔
97
    import signal
6✔
98
    from asyncio import coroutines, events, exceptions, tasks
6✔
99

100
    from exceptiongroup import BaseExceptionGroup
6✔
101
    from typing_extensions import TypeVarTuple, Unpack
6✔
102

103
    class _State(enum.Enum):
6✔
104
        CREATED = "created"
6✔
105
        INITIALIZED = "initialized"
6✔
106
        CLOSED = "closed"
6✔
107

108
    class Runner:
6✔
109
        # Copied from CPython 3.11
110
        def __init__(
6✔
111
            self,
112
            *,
113
            debug: bool | None = None,
114
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
115
        ):
116
            self._state = _State.CREATED
6✔
117
            self._debug = debug
6✔
118
            self._loop_factory = loop_factory
6✔
119
            self._loop: AbstractEventLoop | None = None
6✔
120
            self._context = None
6✔
121
            self._interrupt_count = 0
6✔
122
            self._set_event_loop = False
6✔
123

124
        def __enter__(self) -> Runner:
6✔
125
            self._lazy_init()
6✔
126
            return self
6✔
127

128
        def __exit__(
6✔
129
            self,
130
            exc_type: type[BaseException],
131
            exc_val: BaseException,
132
            exc_tb: TracebackType,
133
        ) -> None:
134
            self.close()
6✔
135

136
        def close(self) -> None:
6✔
137
            """Shutdown and close event loop."""
138
            if self._state is not _State.INITIALIZED:
6✔
139
                return
×
140
            try:
6✔
141
                loop = self._loop
6✔
142
                _cancel_all_tasks(loop)
6✔
143
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
144
                if hasattr(loop, "shutdown_default_executor"):
6✔
145
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
146
                else:
147
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
148
            finally:
149
                if self._set_event_loop:
6✔
150
                    events.set_event_loop(None)
6✔
151
                loop.close()
6✔
152
                self._loop = None
6✔
153
                self._state = _State.CLOSED
6✔
154

155
        def get_loop(self) -> AbstractEventLoop:
6✔
156
            """Return embedded event loop."""
157
            self._lazy_init()
6✔
158
            return self._loop
6✔
159

160
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
161
            """Run a coroutine inside the embedded event loop."""
162
            if not coroutines.iscoroutine(coro):
6✔
163
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
164

165
            if events._get_running_loop() is not None:
6✔
166
                # fail fast with short traceback
167
                raise RuntimeError(
×
168
                    "Runner.run() cannot be called from a running event loop"
169
                )
170

171
            self._lazy_init()
6✔
172

173
            if context is None:
6✔
174
                context = self._context
6✔
175
            task = context.run(self._loop.create_task, coro)
6✔
176

177
            if (
6✔
178
                threading.current_thread() is threading.main_thread()
179
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
180
            ):
181
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
182
                try:
6✔
183
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
184
                except ValueError:
×
185
                    # `signal.signal` may throw if `threading.main_thread` does
186
                    # not support signals (e.g. embedded interpreter with signals
187
                    # not registered - see gh-91880)
188
                    sigint_handler = None
×
189
            else:
190
                sigint_handler = None
6✔
191

192
            self._interrupt_count = 0
6✔
193
            try:
6✔
194
                return self._loop.run_until_complete(task)
6✔
195
            except exceptions.CancelledError:
6✔
196
                if self._interrupt_count > 0:
×
197
                    uncancel = getattr(task, "uncancel", None)
×
198
                    if uncancel is not None and uncancel() == 0:
×
199
                        raise KeyboardInterrupt()
×
200
                raise  # CancelledError
1✔
201
            finally:
202
                if (
6✔
203
                    sigint_handler is not None
204
                    and signal.getsignal(signal.SIGINT) is sigint_handler
205
                ):
206
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
207

208
        def _lazy_init(self) -> None:
6✔
209
            if self._state is _State.CLOSED:
6✔
210
                raise RuntimeError("Runner is closed")
×
211
            if self._state is _State.INITIALIZED:
6✔
212
                return
6✔
213
            if self._loop_factory is None:
6✔
214
                self._loop = events.new_event_loop()
6✔
215
                if not self._set_event_loop:
6✔
216
                    # Call set_event_loop only once to avoid calling
217
                    # attach_loop multiple times on child watchers
218
                    events.set_event_loop(self._loop)
6✔
219
                    self._set_event_loop = True
6✔
220
            else:
221
                self._loop = self._loop_factory()
4✔
222
            if self._debug is not None:
6✔
223
                self._loop.set_debug(self._debug)
6✔
224
            self._context = contextvars.copy_context()
6✔
225
            self._state = _State.INITIALIZED
6✔
226

227
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
228
            self._interrupt_count += 1
×
229
            if self._interrupt_count == 1 and not main_task.done():
×
230
                main_task.cancel()
×
231
                # wakeup loop if it is blocked by select() with long timeout
232
                self._loop.call_soon_threadsafe(lambda: None)
×
233
                return
×
234
            raise KeyboardInterrupt()
×
235

236
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
237
        to_cancel = tasks.all_tasks(loop)
6✔
238
        if not to_cancel:
6✔
239
            return
6✔
240

241
        for task in to_cancel:
6✔
242
            task.cancel()
6✔
243

244
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
245

246
        for task in to_cancel:
6✔
247
            if task.cancelled():
6✔
248
                continue
6✔
249
            if task.exception() is not None:
5✔
250
                loop.call_exception_handler(
×
251
                    {
252
                        "message": "unhandled exception during asyncio.run() shutdown",
253
                        "exception": task.exception(),
254
                        "task": task,
255
                    }
256
                )
257

258
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
259
        """Schedule the shutdown of the default executor."""
260

261
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
262
            try:
3✔
263
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
264
                loop.call_soon_threadsafe(future.set_result, None)
3✔
265
            except Exception as ex:
×
266
                loop.call_soon_threadsafe(future.set_exception, ex)
×
267

268
        loop._executor_shutdown_called = True
3✔
269
        if loop._default_executor is None:
3✔
270
            return
3✔
271
        future = loop.create_future()
3✔
272
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
273
        thread.start()
3✔
274
        try:
3✔
275
            await future
3✔
276
        finally:
277
            thread.join()
3✔
278

279

280
T_Retval = TypeVar("T_Retval")
10✔
281
T_contra = TypeVar("T_contra", contravariant=True)
10✔
282
PosArgsT = TypeVarTuple("PosArgsT")
10✔
283
P = ParamSpec("P")
10✔
284

285
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
10✔
286

287

288
def find_root_task() -> asyncio.Task:
10✔
289
    root_task = _root_task.get(None)
10✔
290
    if root_task is not None and not root_task.done():
10✔
291
        return root_task
10✔
292

293
    # Look for a task that has been started via run_until_complete()
294
    for task in all_tasks():
10✔
295
        if task._callbacks and not task.done():
10✔
296
            callbacks = [cb for cb, context in task._callbacks]
10✔
297
            for cb in callbacks:
10✔
298
                if (
10✔
299
                    cb is _run_until_complete_cb
300
                    or getattr(cb, "__module__", None) == "uvloop.loop"
301
                ):
302
                    _root_task.set(task)
10✔
303
                    return task
10✔
304

305
    # Look up the topmost task in the AnyIO task tree, if possible
306
    task = cast(asyncio.Task, current_task())
9✔
307
    state = _task_states.get(task)
9✔
308
    if state:
9✔
309
        cancel_scope = state.cancel_scope
9✔
310
        while cancel_scope and cancel_scope._parent_scope is not None:
9✔
311
            cancel_scope = cancel_scope._parent_scope
×
312

313
        if cancel_scope is not None:
9✔
314
            return cast(asyncio.Task, cancel_scope._host_task)
9✔
315

316
    return task
×
317

318

319
def get_callable_name(func: Callable) -> str:
10✔
320
    module = getattr(func, "__module__", None)
10✔
321
    qualname = getattr(func, "__qualname__", None)
10✔
322
    return ".".join([x for x in (module, qualname) if x])
10✔
323

324

325
#
326
# Event loop
327
#
328

329
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
10✔
330

331

332
def _task_started(task: asyncio.Task) -> bool:
10✔
333
    """Return ``True`` if the task has been started and has not finished."""
334
    try:
10✔
335
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
10✔
336
    except AttributeError:
×
337
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
338
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
339

340

341
#
342
# Timeouts and cancellation
343
#
344

345

346
class CancelScope(BaseCancelScope):
10✔
347
    def __new__(
10✔
348
        cls, *, deadline: float = math.inf, shield: bool = False
349
    ) -> CancelScope:
350
        return object.__new__(cls)
10✔
351

352
    def __init__(self, deadline: float = math.inf, shield: bool = False):
10✔
353
        self._deadline = deadline
10✔
354
        self._shield = shield
10✔
355
        self._parent_scope: CancelScope | None = None
10✔
356
        self._child_scopes: set[CancelScope] = set()
10✔
357
        self._cancel_called = False
10✔
358
        self._cancelled_caught = False
10✔
359
        self._active = False
10✔
360
        self._timeout_handle: asyncio.TimerHandle | None = None
10✔
361
        self._cancel_handle: asyncio.Handle | None = None
10✔
362
        self._tasks: set[asyncio.Task] = set()
10✔
363
        self._host_task: asyncio.Task | None = None
10✔
364
        self._cancel_calls: int = 0
10✔
365
        self._cancelling: int | None = None
10✔
366

367
    def __enter__(self) -> CancelScope:
10✔
368
        if self._active:
10✔
369
            raise RuntimeError(
×
370
                "Each CancelScope may only be used for a single 'with' block"
371
            )
372

373
        self._host_task = host_task = cast(asyncio.Task, current_task())
10✔
374
        self._tasks.add(host_task)
10✔
375
        try:
10✔
376
            task_state = _task_states[host_task]
10✔
377
        except KeyError:
10✔
378
            task_state = TaskState(None, self)
10✔
379
            _task_states[host_task] = task_state
10✔
380
        else:
381
            self._parent_scope = task_state.cancel_scope
10✔
382
            task_state.cancel_scope = self
10✔
383
            if self._parent_scope is not None:
10✔
384
                self._parent_scope._child_scopes.add(self)
10✔
385
                self._parent_scope._tasks.remove(host_task)
10✔
386

387
        self._timeout()
10✔
388
        self._active = True
10✔
389
        if sys.version_info >= (3, 11):
10✔
390
            self._cancelling = self._host_task.cancelling()
4✔
391

392
        # Start cancelling the host task if the scope was cancelled before entering
393
        if self._cancel_called:
10✔
394
            self._deliver_cancellation(self)
10✔
395

396
        return self
10✔
397

398
    def __exit__(
10✔
399
        self,
400
        exc_type: type[BaseException] | None,
401
        exc_val: BaseException | None,
402
        exc_tb: TracebackType | None,
403
    ) -> bool | None:
404
        if not self._active:
10✔
405
            raise RuntimeError("This cancel scope is not active")
9✔
406
        if current_task() is not self._host_task:
10✔
407
            raise RuntimeError(
9✔
408
                "Attempted to exit cancel scope in a different task than it was "
409
                "entered in"
410
            )
411

412
        assert self._host_task is not None
10✔
413
        host_task_state = _task_states.get(self._host_task)
10✔
414
        if host_task_state is None or host_task_state.cancel_scope is not self:
10✔
415
            raise RuntimeError(
9✔
416
                "Attempted to exit a cancel scope that isn't the current tasks's "
417
                "current cancel scope"
418
            )
419

420
        self._active = False
10✔
421
        if self._timeout_handle:
10✔
422
            self._timeout_handle.cancel()
10✔
423
            self._timeout_handle = None
10✔
424

425
        self._tasks.remove(self._host_task)
10✔
426
        if self._parent_scope is not None:
10✔
427
            self._parent_scope._child_scopes.remove(self)
10✔
428
            self._parent_scope._tasks.add(self._host_task)
10✔
429

430
        host_task_state.cancel_scope = self._parent_scope
10✔
431

432
        # Restart the cancellation effort in the closest directly cancelled parent
433
        # scope if this one was shielded
434
        self._restart_cancellation_in_parent()
10✔
435

436
        if self._cancel_called and exc_val is not None:
10✔
437
            for exc in iterate_exceptions(exc_val):
10✔
438
                if isinstance(exc, CancelledError):
10✔
439
                    self._cancelled_caught = self._uncancel(exc)
10✔
440
                    if self._cancelled_caught:
10✔
441
                        break
10✔
442

443
            return self._cancelled_caught
10✔
444

445
        return None
10✔
446

447
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
10✔
448
        if sys.version_info < (3, 9) or self._host_task is None:
10✔
449
            self._cancel_calls = 0
3✔
450
            return True
3✔
451

452
        # Undo all cancellations done by this scope
453
        if self._cancelling is not None:
7✔
454
            while self._cancel_calls:
4✔
455
                self._cancel_calls -= 1
4✔
456
                if self._host_task.uncancel() <= self._cancelling:
4✔
457
                    return True
4✔
458

459
        self._cancel_calls = 0
7✔
460
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
7✔
461

462
    def _timeout(self) -> None:
10✔
463
        if self._deadline != math.inf:
10✔
464
            loop = get_running_loop()
10✔
465
            if loop.time() >= self._deadline:
10✔
466
                self.cancel()
10✔
467
            else:
468
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
10✔
469

470
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
10✔
471
        """
472
        Deliver cancellation to directly contained tasks and nested cancel scopes.
473

474
        Schedule another run at the end if we still have tasks eligible for
475
        cancellation.
476

477
        :param origin: the cancel scope that originated the cancellation
478
        :return: ``True`` if the delivery needs to be retried on the next cycle
479

480
        """
481
        should_retry = False
10✔
482
        current = current_task()
10✔
483
        for task in self._tasks:
10✔
484
            if task._must_cancel:  # type: ignore[attr-defined]
10✔
485
                continue
9✔
486

487
            # The task is eligible for cancellation if it has started
488
            should_retry = True
10✔
489
            if task is not current and (task is self._host_task or _task_started(task)):
10✔
490
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
491
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
10✔
492
                    origin._cancel_calls += 1
10✔
493
                    if sys.version_info >= (3, 9):
10✔
494
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
7✔
495
                    else:
496
                        task.cancel()
3✔
497

498
        # Deliver cancellation to child scopes that aren't shielded or running their own
499
        # cancellation callbacks
500
        for scope in self._child_scopes:
10✔
501
            if not scope._shield and not scope.cancel_called:
10✔
502
                should_retry = scope._deliver_cancellation(origin) or should_retry
10✔
503

504
        # Schedule another callback if there are still tasks left
505
        if origin is self:
10✔
506
            if should_retry:
10✔
507
                self._cancel_handle = get_running_loop().call_soon(
10✔
508
                    self._deliver_cancellation, origin
509
                )
510
            else:
511
                self._cancel_handle = None
10✔
512

513
        return should_retry
10✔
514

515
    def _restart_cancellation_in_parent(self) -> None:
10✔
516
        """
517
        Restart the cancellation effort in the closest directly cancelled parent scope.
518

519
        """
520
        scope = self._parent_scope
10✔
521
        while scope is not None:
10✔
522
            if scope._cancel_called:
10✔
523
                if scope._cancel_handle is None:
10✔
524
                    scope._deliver_cancellation(scope)
10✔
525

526
                break
10✔
527

528
            # No point in looking beyond any shielded scope
529
            if scope._shield:
10✔
530
                break
9✔
531

532
            scope = scope._parent_scope
10✔
533

534
    def _parent_cancelled(self) -> bool:
10✔
535
        # Check whether any parent has been cancelled
536
        cancel_scope = self._parent_scope
10✔
537
        while cancel_scope is not None and not cancel_scope._shield:
10✔
538
            if cancel_scope._cancel_called:
10✔
539
                return True
10✔
540
            else:
541
                cancel_scope = cancel_scope._parent_scope
10✔
542

543
        return False
10✔
544

545
    def cancel(self) -> None:
10✔
546
        if not self._cancel_called:
10✔
547
            if self._timeout_handle:
10✔
548
                self._timeout_handle.cancel()
10✔
549
                self._timeout_handle = None
10✔
550

551
            self._cancel_called = True
10✔
552
            if self._host_task is not None:
10✔
553
                self._deliver_cancellation(self)
10✔
554

555
    @property
10✔
556
    def deadline(self) -> float:
10✔
557
        return self._deadline
9✔
558

559
    @deadline.setter
10✔
560
    def deadline(self, value: float) -> None:
10✔
561
        self._deadline = float(value)
9✔
562
        if self._timeout_handle is not None:
9✔
563
            self._timeout_handle.cancel()
9✔
564
            self._timeout_handle = None
9✔
565

566
        if self._active and not self._cancel_called:
9✔
567
            self._timeout()
9✔
568

569
    @property
10✔
570
    def cancel_called(self) -> bool:
10✔
571
        return self._cancel_called
10✔
572

573
    @property
10✔
574
    def cancelled_caught(self) -> bool:
10✔
575
        return self._cancelled_caught
10✔
576

577
    @property
10✔
578
    def shield(self) -> bool:
10✔
579
        return self._shield
10✔
580

581
    @shield.setter
10✔
582
    def shield(self, value: bool) -> None:
10✔
583
        if self._shield != value:
9✔
584
            self._shield = value
9✔
585
            if not value:
9✔
586
                self._restart_cancellation_in_parent()
9✔
587

588

589
#
590
# Task states
591
#
592

593

594
class TaskState:
10✔
595
    """
596
    Encapsulates auxiliary task information that cannot be added to the Task instance
597
    itself because there are no guarantees about its implementation.
598
    """
599

600
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
10✔
601

602
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
10✔
603
        self.parent_id = parent_id
10✔
604
        self.cancel_scope = cancel_scope
10✔
605

606

607
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
10✔
608

609

610
#
611
# Task groups
612
#
613

614

615
class _AsyncioTaskStatus(abc.TaskStatus):
10✔
616
    def __init__(self, future: asyncio.Future, parent_id: int):
10✔
617
        self._future = future
10✔
618
        self._parent_id = parent_id
10✔
619

620
    def started(self, value: T_contra | None = None) -> None:
10✔
621
        try:
10✔
622
            self._future.set_result(value)
10✔
623
        except asyncio.InvalidStateError:
9✔
624
            if not self._future.cancelled():
9✔
625
                raise RuntimeError(
9✔
626
                    "called 'started' twice on the same task status"
627
                ) from None
628

629
        task = cast(asyncio.Task, current_task())
10✔
630
        _task_states[task].parent_id = self._parent_id
10✔
631

632

633
def iterate_exceptions(
10✔
634
    exception: BaseException,
635
) -> Generator[BaseException, None, None]:
636
    if isinstance(exception, BaseExceptionGroup):
10✔
637
        for exc in exception.exceptions:
9✔
638
            yield from iterate_exceptions(exc)
9✔
639
    else:
640
        yield exception
10✔
641

642

643
class TaskGroup(abc.TaskGroup):
10✔
644
    def __init__(self) -> None:
10✔
645
        self.cancel_scope: CancelScope = CancelScope()
10✔
646
        self._active = False
10✔
647
        self._exceptions: list[BaseException] = []
10✔
648
        self._tasks: set[asyncio.Task] = set()
10✔
649

650
    async def __aenter__(self) -> TaskGroup:
10✔
651
        self.cancel_scope.__enter__()
10✔
652
        self._active = True
10✔
653
        return self
10✔
654

655
    async def __aexit__(
10✔
656
        self,
657
        exc_type: type[BaseException] | None,
658
        exc_val: BaseException | None,
659
        exc_tb: TracebackType | None,
660
    ) -> bool | None:
661
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
10✔
662
        if exc_val is not None:
10✔
663
            self.cancel_scope.cancel()
10✔
664
            if not isinstance(exc_val, CancelledError):
10✔
665
                self._exceptions.append(exc_val)
10✔
666

667
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
10✔
668
        while self._tasks:
10✔
669
            try:
10✔
670
                await asyncio.wait(self._tasks)
10✔
671
            except CancelledError as exc:
10✔
672
                # This task was cancelled natively; reraise the CancelledError later
673
                # unless this task was already interrupted by another exception
674
                self.cancel_scope.cancel()
10✔
675
                if cancelled_exc_while_waiting_tasks is None:
10✔
676
                    cancelled_exc_while_waiting_tasks = exc
10✔
677

678
        self._active = False
10✔
679
        if self._exceptions:
10✔
680
            raise BaseExceptionGroup(
10✔
681
                "unhandled errors in a TaskGroup", self._exceptions
682
            )
683

684
        # Raise the CancelledError received while waiting for child tasks to exit,
685
        # unless the context manager itself was previously exited with another
686
        # exception, or if any of the  child tasks raised an exception other than
687
        # CancelledError
688
        if cancelled_exc_while_waiting_tasks:
10✔
689
            if exc_val is None or ignore_exception:
10✔
690
                raise cancelled_exc_while_waiting_tasks
10✔
691

692
        return ignore_exception
10✔
693

694
    def _spawn(
10✔
695
        self,
696
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
697
        args: tuple[Unpack[PosArgsT]],
698
        name: object,
699
        task_status_future: asyncio.Future | None = None,
700
    ) -> asyncio.Task:
701
        def task_done(_task: asyncio.Task) -> None:
10✔
702
            task_state = _task_states[_task]
10✔
703
            assert task_state.cancel_scope is not None
10✔
704
            assert _task in task_state.cancel_scope._tasks
10✔
705
            task_state.cancel_scope._tasks.remove(_task)
10✔
706
            self._tasks.remove(task)
10✔
707
            del _task_states[_task]
10✔
708

709
            try:
10✔
710
                exc = _task.exception()
10✔
711
            except CancelledError as e:
10✔
712
                while isinstance(e.__context__, CancelledError):
10✔
713
                    e = e.__context__
3✔
714

715
                exc = e
10✔
716

717
            if exc is not None:
10✔
718
                # The future can only be in the cancelled state if the host task was
719
                # cancelled, so return immediately instead of adding one more
720
                # CancelledError to the exceptions list
721
                if task_status_future is not None and task_status_future.cancelled():
10✔
722
                    return
9✔
723

724
                if task_status_future is None or task_status_future.done():
10✔
725
                    if not isinstance(exc, CancelledError):
10✔
726
                        self._exceptions.append(exc)
10✔
727

728
                    if not self.cancel_scope._parent_cancelled():
10✔
729
                        self.cancel_scope.cancel()
10✔
730
                else:
731
                    task_status_future.set_exception(exc)
9✔
732
            elif task_status_future is not None and not task_status_future.done():
10✔
733
                task_status_future.set_exception(
9✔
734
                    RuntimeError("Child exited without calling task_status.started()")
735
                )
736

737
        if not self._active:
10✔
738
            raise RuntimeError(
10✔
739
                "This task group is not active; no new tasks can be started."
740
            )
741

742
        kwargs = {}
10✔
743
        if task_status_future:
10✔
744
            parent_id = id(current_task())
10✔
745
            kwargs["task_status"] = _AsyncioTaskStatus(
10✔
746
                task_status_future, id(self.cancel_scope._host_task)
747
            )
748
        else:
749
            parent_id = id(self.cancel_scope._host_task)
10✔
750

751
        coro = func(*args, **kwargs)
10✔
752
        if not iscoroutine(coro):
10✔
753
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
9✔
754
            raise TypeError(
9✔
755
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
756
                f"the return value ({coro!r}) is not a coroutine object"
757
            )
758

759
        name = get_callable_name(func) if name is None else str(name)
10✔
760
        task = create_task(coro, name=name)
10✔
761
        task.add_done_callback(task_done)
10✔
762

763
        # Make the spawned task inherit the task group's cancel scope
764
        _task_states[task] = TaskState(
10✔
765
            parent_id=parent_id, cancel_scope=self.cancel_scope
766
        )
767
        self.cancel_scope._tasks.add(task)
10✔
768
        self._tasks.add(task)
10✔
769
        return task
10✔
770

771
    def start_soon(
10✔
772
        self,
773
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
774
        *args: Unpack[PosArgsT],
775
        name: object = None,
776
    ) -> None:
777
        self._spawn(func, args, name)
10✔
778

779
    async def start(
10✔
780
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
781
    ) -> Any:
782
        future: asyncio.Future = asyncio.Future()
10✔
783
        task = self._spawn(func, args, name, future)
10✔
784

785
        # If the task raises an exception after sending a start value without a switch
786
        # point between, the task group is cancelled and this method never proceeds to
787
        # process the completed future. That's why we have to have a shielded cancel
788
        # scope here.
789
        try:
10✔
790
            return await future
10✔
791
        except CancelledError:
9✔
792
            # Cancel the task and wait for it to exit before returning
793
            task.cancel()
9✔
794
            with CancelScope(shield=True), suppress(CancelledError):
9✔
795
                await task
9✔
796

797
            raise
9✔
798

799

800
#
801
# Threads
802
#
803

804
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
10✔
805

806

807
class WorkerThread(Thread):
10✔
808
    MAX_IDLE_TIME = 10  # seconds
10✔
809

810
    def __init__(
10✔
811
        self,
812
        root_task: asyncio.Task,
813
        workers: set[WorkerThread],
814
        idle_workers: deque[WorkerThread],
815
    ):
816
        super().__init__(name="AnyIO worker thread")
10✔
817
        self.root_task = root_task
10✔
818
        self.workers = workers
10✔
819
        self.idle_workers = idle_workers
10✔
820
        self.loop = root_task._loop
10✔
821
        self.queue: Queue[
10✔
822
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
823
        ] = Queue(2)
824
        self.idle_since = AsyncIOBackend.current_time()
10✔
825
        self.stopping = False
10✔
826

827
    def _report_result(
10✔
828
        self, future: asyncio.Future, result: Any, exc: BaseException | None
829
    ) -> None:
830
        self.idle_since = AsyncIOBackend.current_time()
10✔
831
        if not self.stopping:
10✔
832
            self.idle_workers.append(self)
10✔
833

834
        if not future.cancelled():
10✔
835
            if exc is not None:
10✔
836
                if isinstance(exc, StopIteration):
10✔
837
                    new_exc = RuntimeError("coroutine raised StopIteration")
9✔
838
                    new_exc.__cause__ = exc
9✔
839
                    exc = new_exc
9✔
840

841
                future.set_exception(exc)
10✔
842
            else:
843
                future.set_result(result)
10✔
844

845
    def run(self) -> None:
10✔
846
        with claim_worker_thread(AsyncIOBackend, self.loop):
10✔
847
            while True:
6✔
848
                item = self.queue.get()
10✔
849
                if item is None:
10✔
850
                    # Shutdown command received
851
                    return
10✔
852

853
                context, func, args, future, cancel_scope = item
10✔
854
                if not future.cancelled():
10✔
855
                    result = None
10✔
856
                    exception: BaseException | None = None
10✔
857
                    threadlocals.current_cancel_scope = cancel_scope
10✔
858
                    try:
10✔
859
                        result = context.run(func, *args)
10✔
860
                    except BaseException as exc:
10✔
861
                        exception = exc
10✔
862
                    finally:
863
                        del threadlocals.current_cancel_scope
10✔
864

865
                    if not self.loop.is_closed():
10✔
866
                        self.loop.call_soon_threadsafe(
10✔
867
                            self._report_result, future, result, exception
868
                        )
869

870
                self.queue.task_done()
10✔
871

872
    def stop(self, f: asyncio.Task | None = None) -> None:
10✔
873
        self.stopping = True
10✔
874
        self.queue.put_nowait(None)
10✔
875
        self.workers.discard(self)
10✔
876
        try:
10✔
877
            self.idle_workers.remove(self)
10✔
878
        except ValueError:
9✔
879
            pass
9✔
880

881

882
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
10✔
883
    "_threadpool_idle_workers"
884
)
885
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
10✔
886

887

888
class BlockingPortal(abc.BlockingPortal):
10✔
889
    def __new__(cls) -> BlockingPortal:
10✔
890
        return object.__new__(cls)
10✔
891

892
    def __init__(self) -> None:
10✔
893
        super().__init__()
10✔
894
        self._loop = get_running_loop()
10✔
895

896
    def _spawn_task_from_thread(
10✔
897
        self,
898
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
899
        args: tuple[Unpack[PosArgsT]],
900
        kwargs: dict[str, Any],
901
        name: object,
902
        future: Future[T_Retval],
903
    ) -> None:
904
        AsyncIOBackend.run_sync_from_thread(
10✔
905
            partial(self._task_group.start_soon, name=name),
906
            (self._call_func, func, args, kwargs, future),
907
            self._loop,
908
        )
909

910

911
#
912
# Subprocesses
913
#
914

915

916
@dataclass(eq=False)
10✔
917
class StreamReaderWrapper(abc.ByteReceiveStream):
10✔
918
    _stream: asyncio.StreamReader
10✔
919
    _closed: bool = field(init=False, default=False)
10✔
920

921
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
922
        if self._closed:
9✔
NEW
923
            raise ClosedResourceError
×
924

925
        data = await self._stream.read(max_bytes)
9✔
926
        if data:
9✔
927
            return data
9✔
928
        else:
929
            raise EndOfStream
9✔
930

931
    async def aclose(self) -> None:
10✔
932
        self._closed = True
9✔
933
        await AsyncIOBackend.checkpoint()
9✔
934

935

936
@dataclass(eq=False)
10✔
937
class StreamWriterWrapper(abc.ByteSendStream):
10✔
938
    _stream: asyncio.StreamWriter
10✔
939

940
    async def send(self, item: bytes) -> None:
10✔
941
        self._stream.write(item)
9✔
942
        await self._stream.drain()
9✔
943

944
    async def aclose(self) -> None:
10✔
945
        self._stream.close()
9✔
946
        await AsyncIOBackend.checkpoint()
9✔
947

948

949
@dataclass(eq=False)
10✔
950
class Process(abc.Process):
10✔
951
    _process: asyncio.subprocess.Process
10✔
952
    _stdin: StreamWriterWrapper | None
10✔
953
    _stdout: StreamReaderWrapper | None
10✔
954
    _stderr: StreamReaderWrapper | None
10✔
955

956
    async def aclose(self) -> None:
10✔
957
        with CancelScope(shield=True):
9✔
958
            if self._stdin:
9✔
959
                await self._stdin.aclose()
9✔
960
            if self._stdout:
9✔
961
                await self._stdout.aclose()
9✔
962
            if self._stderr:
9✔
963
                await self._stderr.aclose()
9✔
964

965
        try:
9✔
966
            await self.wait()
9✔
967
        except BaseException:
9✔
968
            self.kill()
9✔
969
            with CancelScope(shield=True):
9✔
970
                await self.wait()
9✔
971

972
            raise
9✔
973

974
    async def wait(self) -> int:
10✔
975
        return await self._process.wait()
9✔
976

977
    def terminate(self) -> None:
10✔
978
        self._process.terminate()
7✔
979

980
    def kill(self) -> None:
10✔
981
        self._process.kill()
9✔
982

983
    def send_signal(self, signal: int) -> None:
10✔
984
        self._process.send_signal(signal)
×
985

986
    @property
10✔
987
    def pid(self) -> int:
10✔
988
        return self._process.pid
×
989

990
    @property
10✔
991
    def returncode(self) -> int | None:
10✔
992
        return self._process.returncode
9✔
993

994
    @property
10✔
995
    def stdin(self) -> abc.ByteSendStream | None:
10✔
996
        return self._stdin
9✔
997

998
    @property
10✔
999
    def stdout(self) -> abc.ByteReceiveStream | None:
10✔
1000
        return self._stdout
9✔
1001

1002
    @property
10✔
1003
    def stderr(self) -> abc.ByteReceiveStream | None:
10✔
1004
        return self._stderr
9✔
1005

1006

1007
def _forcibly_shutdown_process_pool_on_exit(
10✔
1008
    workers: set[Process], _task: object
1009
) -> None:
1010
    """
1011
    Forcibly shuts down worker processes belonging to this event loop."""
1012
    child_watcher: asyncio.AbstractChildWatcher | None = None
9✔
1013
    if sys.version_info < (3, 12):
9✔
1014
        try:
6✔
1015
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1016
        except NotImplementedError:
1✔
1017
            pass
1✔
1018

1019
    # Close as much as possible (w/o async/await) to avoid warnings
1020
    for process in workers:
9✔
1021
        if process.returncode is None:
9✔
1022
            continue
9✔
1023

1024
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1025
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1026
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1027
        process.kill()
×
1028
        if child_watcher:
×
1029
            child_watcher.remove_child_handler(process.pid)
×
1030

1031

1032
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
10✔
1033
    """
1034
    Shuts down worker processes belonging to this event loop.
1035

1036
    NOTE: this only works when the event loop was started using asyncio.run() or
1037
    anyio.run().
1038

1039
    """
1040
    process: abc.Process
1041
    try:
9✔
1042
        await sleep(math.inf)
9✔
1043
    except asyncio.CancelledError:
9✔
1044
        for process in workers:
9✔
1045
            if process.returncode is None:
9✔
1046
                process.kill()
9✔
1047

1048
        for process in workers:
9✔
1049
            await process.aclose()
9✔
1050

1051

1052
#
1053
# Sockets and networking
1054
#
1055

1056

1057
class StreamProtocol(asyncio.Protocol):
10✔
1058
    read_queue: deque[bytes]
10✔
1059
    read_event: asyncio.Event
10✔
1060
    write_event: asyncio.Event
10✔
1061
    exception: Exception | None = None
10✔
1062
    is_at_eof: bool = False
10✔
1063

1064
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1065
        self.read_queue = deque()
10✔
1066
        self.read_event = asyncio.Event()
10✔
1067
        self.write_event = asyncio.Event()
10✔
1068
        self.write_event.set()
10✔
1069
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
10✔
1070

1071
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1072
        if exc:
10✔
1073
            self.exception = BrokenResourceError()
10✔
1074
            self.exception.__cause__ = exc
10✔
1075

1076
        self.read_event.set()
10✔
1077
        self.write_event.set()
10✔
1078

1079
    def data_received(self, data: bytes) -> None:
10✔
1080
        self.read_queue.append(data)
10✔
1081
        self.read_event.set()
10✔
1082

1083
    def eof_received(self) -> bool | None:
10✔
1084
        self.is_at_eof = True
10✔
1085
        self.read_event.set()
10✔
1086
        return True
10✔
1087

1088
    def pause_writing(self) -> None:
10✔
1089
        self.write_event = asyncio.Event()
10✔
1090

1091
    def resume_writing(self) -> None:
10✔
UNCOV
1092
        self.write_event.set()
×
1093

1094

1095
class DatagramProtocol(asyncio.DatagramProtocol):
10✔
1096
    read_queue: deque[tuple[bytes, IPSockAddrType]]
10✔
1097
    read_event: asyncio.Event
10✔
1098
    write_event: asyncio.Event
10✔
1099
    exception: Exception | None = None
10✔
1100

1101
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1102
        self.read_queue = deque(maxlen=100)  # arbitrary value
9✔
1103
        self.read_event = asyncio.Event()
9✔
1104
        self.write_event = asyncio.Event()
9✔
1105
        self.write_event.set()
9✔
1106

1107
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1108
        self.read_event.set()
9✔
1109
        self.write_event.set()
9✔
1110

1111
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
10✔
1112
        addr = convert_ipv6_sockaddr(addr)
9✔
1113
        self.read_queue.append((data, addr))
9✔
1114
        self.read_event.set()
9✔
1115

1116
    def error_received(self, exc: Exception) -> None:
10✔
1117
        self.exception = exc
×
1118

1119
    def pause_writing(self) -> None:
10✔
1120
        self.write_event.clear()
×
1121

1122
    def resume_writing(self) -> None:
10✔
1123
        self.write_event.set()
×
1124

1125

1126
class SocketStream(abc.SocketStream):
10✔
1127
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
10✔
1128
        self._transport = transport
10✔
1129
        self._protocol = protocol
10✔
1130
        self._receive_guard = ResourceGuard("reading from")
10✔
1131
        self._send_guard = ResourceGuard("writing to")
10✔
1132
        self._closed = False
10✔
1133

1134
    @property
10✔
1135
    def _raw_socket(self) -> socket.socket:
10✔
1136
        return self._transport.get_extra_info("socket")
10✔
1137

1138
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1139
        with self._receive_guard:
10✔
1140
            if (
10✔
1141
                not self._protocol.read_event.is_set()
1142
                and not self._transport.is_closing()
1143
                and not self._protocol.is_at_eof
1144
            ):
1145
                self._transport.resume_reading()
10✔
1146
                await self._protocol.read_event.wait()
10✔
1147
                self._transport.pause_reading()
10✔
1148
            else:
1149
                await AsyncIOBackend.checkpoint()
10✔
1150

1151
            try:
10✔
1152
                chunk = self._protocol.read_queue.popleft()
10✔
1153
            except IndexError:
10✔
1154
                if self._closed:
10✔
1155
                    raise ClosedResourceError from None
10✔
1156
                elif self._protocol.exception:
10✔
1157
                    raise self._protocol.exception from None
10✔
1158
                else:
1159
                    raise EndOfStream from None
10✔
1160

1161
            if len(chunk) > max_bytes:
10✔
1162
                # Split the oversized chunk
1163
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
8✔
1164
                self._protocol.read_queue.appendleft(leftover)
8✔
1165

1166
            # If the read queue is empty, clear the flag so that the next call will
1167
            # block until data is available
1168
            if not self._protocol.read_queue:
10✔
1169
                self._protocol.read_event.clear()
10✔
1170

1171
        return chunk
10✔
1172

1173
    async def send(self, item: bytes) -> None:
10✔
1174
        with self._send_guard:
10✔
1175
            await AsyncIOBackend.checkpoint()
10✔
1176

1177
            if self._closed:
10✔
1178
                raise ClosedResourceError
10✔
1179
            elif self._protocol.exception is not None:
10✔
1180
                raise self._protocol.exception
10✔
1181

1182
            try:
10✔
1183
                self._transport.write(item)
10✔
1184
            except RuntimeError as exc:
×
1185
                if self._transport.is_closing():
×
1186
                    raise BrokenResourceError from exc
×
1187
                else:
1188
                    raise
×
1189

1190
            await self._protocol.write_event.wait()
10✔
1191

1192
    async def send_eof(self) -> None:
10✔
1193
        try:
10✔
1194
            self._transport.write_eof()
10✔
1195
        except OSError:
×
1196
            pass
×
1197

1198
    async def aclose(self) -> None:
10✔
1199
        if not self._transport.is_closing():
10✔
1200
            self._closed = True
10✔
1201
            try:
10✔
1202
                self._transport.write_eof()
10✔
1203
            except OSError:
6✔
1204
                pass
6✔
1205

1206
            self._transport.close()
10✔
1207
            await sleep(0)
10✔
1208
            self._transport.abort()
10✔
1209

1210

1211
class _RawSocketMixin:
10✔
1212
    _receive_future: asyncio.Future | None = None
10✔
1213
    _send_future: asyncio.Future | None = None
10✔
1214
    _closing = False
10✔
1215

1216
    def __init__(self, raw_socket: socket.socket):
10✔
1217
        self.__raw_socket = raw_socket
7✔
1218
        self._receive_guard = ResourceGuard("reading from")
7✔
1219
        self._send_guard = ResourceGuard("writing to")
7✔
1220

1221
    @property
10✔
1222
    def _raw_socket(self) -> socket.socket:
10✔
1223
        return self.__raw_socket
7✔
1224

1225
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1226
        def callback(f: object) -> None:
7✔
1227
            del self._receive_future
7✔
1228
            loop.remove_reader(self.__raw_socket)
7✔
1229

1230
        f = self._receive_future = asyncio.Future()
7✔
1231
        loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1232
        f.add_done_callback(callback)
7✔
1233
        return f
7✔
1234

1235
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1236
        def callback(f: object) -> None:
7✔
1237
            del self._send_future
7✔
1238
            loop.remove_writer(self.__raw_socket)
7✔
1239

1240
        f = self._send_future = asyncio.Future()
7✔
1241
        loop.add_writer(self.__raw_socket, f.set_result, None)
7✔
1242
        f.add_done_callback(callback)
7✔
1243
        return f
7✔
1244

1245
    async def aclose(self) -> None:
10✔
1246
        if not self._closing:
7✔
1247
            self._closing = True
7✔
1248
            if self.__raw_socket.fileno() != -1:
7✔
1249
                self.__raw_socket.close()
7✔
1250

1251
            if self._receive_future:
7✔
1252
                self._receive_future.set_result(None)
7✔
1253
            if self._send_future:
7✔
1254
                self._send_future.set_result(None)
×
1255

1256

1257
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
10✔
1258
    async def send_eof(self) -> None:
10✔
1259
        with self._send_guard:
7✔
1260
            self._raw_socket.shutdown(socket.SHUT_WR)
7✔
1261

1262
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1263
        loop = get_running_loop()
7✔
1264
        await AsyncIOBackend.checkpoint()
7✔
1265
        with self._receive_guard:
7✔
1266
            while True:
4✔
1267
                try:
7✔
1268
                    data = self._raw_socket.recv(max_bytes)
7✔
1269
                except BlockingIOError:
7✔
1270
                    await self._wait_until_readable(loop)
7✔
1271
                except OSError as exc:
7✔
1272
                    if self._closing:
7✔
1273
                        raise ClosedResourceError from None
7✔
1274
                    else:
1275
                        raise BrokenResourceError from exc
1✔
1276
                else:
1277
                    if not data:
7✔
1278
                        raise EndOfStream
7✔
1279

1280
                    return data
7✔
1281

1282
    async def send(self, item: bytes) -> None:
10✔
1283
        loop = get_running_loop()
7✔
1284
        await AsyncIOBackend.checkpoint()
7✔
1285
        with self._send_guard:
7✔
1286
            view = memoryview(item)
7✔
1287
            while view:
7✔
1288
                try:
7✔
1289
                    bytes_sent = self._raw_socket.send(view)
7✔
1290
                except BlockingIOError:
7✔
1291
                    await self._wait_until_writable(loop)
7✔
1292
                except OSError as exc:
7✔
1293
                    if self._closing:
7✔
1294
                        raise ClosedResourceError from None
7✔
1295
                    else:
1296
                        raise BrokenResourceError from exc
1✔
1297
                else:
1298
                    view = view[bytes_sent:]
7✔
1299

1300
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
10✔
1301
        if not isinstance(msglen, int) or msglen < 0:
7✔
1302
            raise ValueError("msglen must be a non-negative integer")
7✔
1303
        if not isinstance(maxfds, int) or maxfds < 1:
7✔
1304
            raise ValueError("maxfds must be a positive integer")
7✔
1305

1306
        loop = get_running_loop()
7✔
1307
        fds = array.array("i")
7✔
1308
        await AsyncIOBackend.checkpoint()
7✔
1309
        with self._receive_guard:
7✔
1310
            while True:
4✔
1311
                try:
7✔
1312
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
7✔
1313
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1314
                    )
1315
                except BlockingIOError:
7✔
1316
                    await self._wait_until_readable(loop)
7✔
1317
                except OSError as exc:
×
1318
                    if self._closing:
×
1319
                        raise ClosedResourceError from None
×
1320
                    else:
1321
                        raise BrokenResourceError from exc
×
1322
                else:
1323
                    if not message and not ancdata:
7✔
1324
                        raise EndOfStream
×
1325

1326
                    break
4✔
1327

1328
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
7✔
1329
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
7✔
1330
                raise RuntimeError(
×
1331
                    f"Received unexpected ancillary data; message = {message!r}, "
1332
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1333
                )
1334

1335
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
7✔
1336

1337
        return message, list(fds)
7✔
1338

1339
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
10✔
1340
        if not message:
7✔
1341
            raise ValueError("message must not be empty")
7✔
1342
        if not fds:
7✔
1343
            raise ValueError("fds must not be empty")
7✔
1344

1345
        loop = get_running_loop()
7✔
1346
        filenos: list[int] = []
7✔
1347
        for fd in fds:
7✔
1348
            if isinstance(fd, int):
7✔
1349
                filenos.append(fd)
×
1350
            elif isinstance(fd, IOBase):
7✔
1351
                filenos.append(fd.fileno())
7✔
1352

1353
        fdarray = array.array("i", filenos)
7✔
1354
        await AsyncIOBackend.checkpoint()
7✔
1355
        with self._send_guard:
7✔
1356
            while True:
4✔
1357
                try:
7✔
1358
                    # The ignore can be removed after mypy picks up
1359
                    # https://github.com/python/typeshed/pull/5545
1360
                    self._raw_socket.sendmsg(
7✔
1361
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1362
                    )
1363
                    break
7✔
1364
                except BlockingIOError:
×
1365
                    await self._wait_until_writable(loop)
×
1366
                except OSError as exc:
×
1367
                    if self._closing:
×
1368
                        raise ClosedResourceError from None
×
1369
                    else:
1370
                        raise BrokenResourceError from exc
×
1371

1372

1373
class TCPSocketListener(abc.SocketListener):
10✔
1374
    _accept_scope: CancelScope | None = None
10✔
1375
    _closed = False
10✔
1376

1377
    def __init__(self, raw_socket: socket.socket):
10✔
1378
        self.__raw_socket = raw_socket
10✔
1379
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
10✔
1380
        self._accept_guard = ResourceGuard("accepting connections from")
10✔
1381

1382
    @property
10✔
1383
    def _raw_socket(self) -> socket.socket:
10✔
1384
        return self.__raw_socket
10✔
1385

1386
    async def accept(self) -> abc.SocketStream:
10✔
1387
        if self._closed:
10✔
1388
            raise ClosedResourceError
10✔
1389

1390
        with self._accept_guard:
10✔
1391
            await AsyncIOBackend.checkpoint()
10✔
1392
            with CancelScope() as self._accept_scope:
10✔
1393
                try:
10✔
1394
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
10✔
1395
                except asyncio.CancelledError:
10✔
1396
                    # Workaround for https://bugs.python.org/issue41317
1397
                    try:
10✔
1398
                        self._loop.remove_reader(self._raw_socket)
10✔
1399
                    except (ValueError, NotImplementedError):
2✔
1400
                        pass
2✔
1401

1402
                    if self._closed:
10✔
1403
                        raise ClosedResourceError from None
9✔
1404

1405
                    raise
10✔
1406
                finally:
1407
                    self._accept_scope = None
10✔
1408

1409
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
10✔
1410
        transport, protocol = await self._loop.connect_accepted_socket(
10✔
1411
            StreamProtocol, client_sock
1412
        )
1413
        return SocketStream(transport, protocol)
10✔
1414

1415
    async def aclose(self) -> None:
10✔
1416
        if self._closed:
10✔
1417
            return
10✔
1418

1419
        self._closed = True
10✔
1420
        if self._accept_scope:
10✔
1421
            # Workaround for https://bugs.python.org/issue41317
1422
            try:
10✔
1423
                self._loop.remove_reader(self._raw_socket)
10✔
1424
            except (ValueError, NotImplementedError):
2✔
1425
                pass
2✔
1426

1427
            self._accept_scope.cancel()
9✔
1428
            await sleep(0)
9✔
1429

1430
        self._raw_socket.close()
10✔
1431

1432

1433
class UNIXSocketListener(abc.SocketListener):
10✔
1434
    def __init__(self, raw_socket: socket.socket):
10✔
1435
        self.__raw_socket = raw_socket
7✔
1436
        self._loop = get_running_loop()
7✔
1437
        self._accept_guard = ResourceGuard("accepting connections from")
7✔
1438
        self._closed = False
7✔
1439

1440
    async def accept(self) -> abc.SocketStream:
10✔
1441
        await AsyncIOBackend.checkpoint()
7✔
1442
        with self._accept_guard:
7✔
1443
            while True:
4✔
1444
                try:
7✔
1445
                    client_sock, _ = self.__raw_socket.accept()
7✔
1446
                    client_sock.setblocking(False)
7✔
1447
                    return UNIXSocketStream(client_sock)
7✔
1448
                except BlockingIOError:
7✔
1449
                    f: asyncio.Future = asyncio.Future()
7✔
1450
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1451
                    f.add_done_callback(
7✔
1452
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1453
                    )
1454
                    await f
7✔
1455
                except OSError as exc:
×
1456
                    if self._closed:
×
1457
                        raise ClosedResourceError from None
×
1458
                    else:
1459
                        raise BrokenResourceError from exc
1✔
1460

1461
    async def aclose(self) -> None:
10✔
1462
        self._closed = True
7✔
1463
        self.__raw_socket.close()
7✔
1464

1465
    @property
10✔
1466
    def _raw_socket(self) -> socket.socket:
10✔
1467
        return self.__raw_socket
7✔
1468

1469

1470
class UDPSocket(abc.UDPSocket):
10✔
1471
    def __init__(
10✔
1472
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1473
    ):
1474
        self._transport = transport
9✔
1475
        self._protocol = protocol
9✔
1476
        self._receive_guard = ResourceGuard("reading from")
9✔
1477
        self._send_guard = ResourceGuard("writing to")
9✔
1478
        self._closed = False
9✔
1479

1480
    @property
10✔
1481
    def _raw_socket(self) -> socket.socket:
10✔
1482
        return self._transport.get_extra_info("socket")
9✔
1483

1484
    async def aclose(self) -> None:
10✔
1485
        if not self._transport.is_closing():
9✔
1486
            self._closed = True
9✔
1487
            self._transport.close()
9✔
1488

1489
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
10✔
1490
        with self._receive_guard:
9✔
1491
            await AsyncIOBackend.checkpoint()
9✔
1492

1493
            # If the buffer is empty, ask for more data
1494
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1495
                self._protocol.read_event.clear()
9✔
1496
                await self._protocol.read_event.wait()
9✔
1497

1498
            try:
9✔
1499
                return self._protocol.read_queue.popleft()
9✔
1500
            except IndexError:
9✔
1501
                if self._closed:
9✔
1502
                    raise ClosedResourceError from None
9✔
1503
                else:
1504
                    raise BrokenResourceError from None
1✔
1505

1506
    async def send(self, item: UDPPacketType) -> None:
10✔
1507
        with self._send_guard:
9✔
1508
            await AsyncIOBackend.checkpoint()
9✔
1509
            await self._protocol.write_event.wait()
9✔
1510
            if self._closed:
9✔
1511
                raise ClosedResourceError
9✔
1512
            elif self._transport.is_closing():
9✔
1513
                raise BrokenResourceError
×
1514
            else:
1515
                self._transport.sendto(*item)
9✔
1516

1517

1518
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
10✔
1519
    def __init__(
10✔
1520
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1521
    ):
1522
        self._transport = transport
9✔
1523
        self._protocol = protocol
9✔
1524
        self._receive_guard = ResourceGuard("reading from")
9✔
1525
        self._send_guard = ResourceGuard("writing to")
9✔
1526
        self._closed = False
9✔
1527

1528
    @property
10✔
1529
    def _raw_socket(self) -> socket.socket:
10✔
1530
        return self._transport.get_extra_info("socket")
9✔
1531

1532
    async def aclose(self) -> None:
10✔
1533
        if not self._transport.is_closing():
9✔
1534
            self._closed = True
9✔
1535
            self._transport.close()
9✔
1536

1537
    async def receive(self) -> bytes:
10✔
1538
        with self._receive_guard:
9✔
1539
            await AsyncIOBackend.checkpoint()
9✔
1540

1541
            # If the buffer is empty, ask for more data
1542
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1543
                self._protocol.read_event.clear()
9✔
1544
                await self._protocol.read_event.wait()
9✔
1545

1546
            try:
9✔
1547
                packet = self._protocol.read_queue.popleft()
9✔
1548
            except IndexError:
9✔
1549
                if self._closed:
9✔
1550
                    raise ClosedResourceError from None
9✔
1551
                else:
1552
                    raise BrokenResourceError from None
×
1553

1554
            return packet[0]
9✔
1555

1556
    async def send(self, item: bytes) -> None:
10✔
1557
        with self._send_guard:
9✔
1558
            await AsyncIOBackend.checkpoint()
9✔
1559
            await self._protocol.write_event.wait()
9✔
1560
            if self._closed:
9✔
1561
                raise ClosedResourceError
9✔
1562
            elif self._transport.is_closing():
9✔
1563
                raise BrokenResourceError
×
1564
            else:
1565
                self._transport.sendto(item)
9✔
1566

1567

1568
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
10✔
1569
    async def receive(self) -> UNIXDatagramPacketType:
10✔
1570
        loop = get_running_loop()
7✔
1571
        await AsyncIOBackend.checkpoint()
7✔
1572
        with self._receive_guard:
7✔
1573
            while True:
4✔
1574
                try:
7✔
1575
                    data = self._raw_socket.recvfrom(65536)
7✔
1576
                except BlockingIOError:
7✔
1577
                    await self._wait_until_readable(loop)
7✔
1578
                except OSError as exc:
7✔
1579
                    if self._closing:
7✔
1580
                        raise ClosedResourceError from None
7✔
1581
                    else:
1582
                        raise BrokenResourceError from exc
1✔
1583
                else:
1584
                    return data
7✔
1585

1586
    async def send(self, item: UNIXDatagramPacketType) -> None:
10✔
1587
        loop = get_running_loop()
7✔
1588
        await AsyncIOBackend.checkpoint()
7✔
1589
        with self._send_guard:
7✔
1590
            while True:
4✔
1591
                try:
7✔
1592
                    self._raw_socket.sendto(*item)
7✔
1593
                except BlockingIOError:
7✔
1594
                    await self._wait_until_writable(loop)
×
1595
                except OSError as exc:
7✔
1596
                    if self._closing:
7✔
1597
                        raise ClosedResourceError from None
7✔
1598
                    else:
1599
                        raise BrokenResourceError from exc
1✔
1600
                else:
1601
                    return
7✔
1602

1603

1604
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
10✔
1605
    async def receive(self) -> bytes:
10✔
1606
        loop = get_running_loop()
7✔
1607
        await AsyncIOBackend.checkpoint()
7✔
1608
        with self._receive_guard:
7✔
1609
            while True:
4✔
1610
                try:
7✔
1611
                    data = self._raw_socket.recv(65536)
7✔
1612
                except BlockingIOError:
7✔
1613
                    await self._wait_until_readable(loop)
7✔
1614
                except OSError as exc:
7✔
1615
                    if self._closing:
7✔
1616
                        raise ClosedResourceError from None
7✔
1617
                    else:
1618
                        raise BrokenResourceError from exc
1✔
1619
                else:
1620
                    return data
7✔
1621

1622
    async def send(self, item: bytes) -> None:
10✔
1623
        loop = get_running_loop()
7✔
1624
        await AsyncIOBackend.checkpoint()
7✔
1625
        with self._send_guard:
7✔
1626
            while True:
4✔
1627
                try:
7✔
1628
                    self._raw_socket.send(item)
7✔
1629
                except BlockingIOError:
7✔
1630
                    await self._wait_until_writable(loop)
×
1631
                except OSError as exc:
7✔
1632
                    if self._closing:
7✔
1633
                        raise ClosedResourceError from None
7✔
1634
                    else:
1635
                        raise BrokenResourceError from exc
1✔
1636
                else:
1637
                    return
7✔
1638

1639

1640
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
10✔
1641
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
10✔
1642

1643

1644
#
1645
# Synchronization
1646
#
1647

1648

1649
class Event(BaseEvent):
10✔
1650
    def __new__(cls) -> Event:
10✔
1651
        return object.__new__(cls)
10✔
1652

1653
    def __init__(self) -> None:
10✔
1654
        self._event = asyncio.Event()
10✔
1655

1656
    def set(self) -> None:
10✔
1657
        self._event.set()
10✔
1658

1659
    def is_set(self) -> bool:
10✔
1660
        return self._event.is_set()
10✔
1661

1662
    async def wait(self) -> None:
10✔
1663
        if self.is_set():
10✔
1664
            await AsyncIOBackend.checkpoint()
10✔
1665
        else:
1666
            await self._event.wait()
10✔
1667

1668
    def statistics(self) -> EventStatistics:
10✔
1669
        return EventStatistics(len(self._event._waiters))
9✔
1670

1671

1672
class CapacityLimiter(BaseCapacityLimiter):
10✔
1673
    _total_tokens: float = 0
10✔
1674

1675
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
10✔
1676
        return object.__new__(cls)
10✔
1677

1678
    def __init__(self, total_tokens: float):
10✔
1679
        self._borrowers: set[Any] = set()
10✔
1680
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
10✔
1681
        self.total_tokens = total_tokens
10✔
1682

1683
    async def __aenter__(self) -> None:
10✔
1684
        await self.acquire()
10✔
1685

1686
    async def __aexit__(
10✔
1687
        self,
1688
        exc_type: type[BaseException] | None,
1689
        exc_val: BaseException | None,
1690
        exc_tb: TracebackType | None,
1691
    ) -> None:
1692
        self.release()
10✔
1693

1694
    @property
10✔
1695
    def total_tokens(self) -> float:
10✔
1696
        return self._total_tokens
9✔
1697

1698
    @total_tokens.setter
10✔
1699
    def total_tokens(self, value: float) -> None:
10✔
1700
        if not isinstance(value, int) and not math.isinf(value):
10✔
1701
            raise TypeError("total_tokens must be an int or math.inf")
9✔
1702
        if value < 1:
10✔
1703
            raise ValueError("total_tokens must be >= 1")
9✔
1704

1705
        waiters_to_notify = max(value - self._total_tokens, 0)
10✔
1706
        self._total_tokens = value
10✔
1707

1708
        # Notify waiting tasks that they have acquired the limiter
1709
        while self._wait_queue and waiters_to_notify:
10✔
1710
            event = self._wait_queue.popitem(last=False)[1]
9✔
1711
            event.set()
9✔
1712
            waiters_to_notify -= 1
9✔
1713

1714
    @property
10✔
1715
    def borrowed_tokens(self) -> int:
10✔
1716
        return len(self._borrowers)
9✔
1717

1718
    @property
10✔
1719
    def available_tokens(self) -> float:
10✔
1720
        return self._total_tokens - len(self._borrowers)
9✔
1721

1722
    def acquire_nowait(self) -> None:
10✔
1723
        self.acquire_on_behalf_of_nowait(current_task())
×
1724

1725
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
10✔
1726
        if borrower in self._borrowers:
10✔
1727
            raise RuntimeError(
9✔
1728
                "this borrower is already holding one of this CapacityLimiter's "
1729
                "tokens"
1730
            )
1731

1732
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
10✔
1733
            raise WouldBlock
9✔
1734

1735
        self._borrowers.add(borrower)
10✔
1736

1737
    async def acquire(self) -> None:
10✔
1738
        return await self.acquire_on_behalf_of(current_task())
10✔
1739

1740
    async def acquire_on_behalf_of(self, borrower: object) -> None:
10✔
1741
        await AsyncIOBackend.checkpoint_if_cancelled()
10✔
1742
        try:
10✔
1743
            self.acquire_on_behalf_of_nowait(borrower)
10✔
1744
        except WouldBlock:
9✔
1745
            event = asyncio.Event()
9✔
1746
            self._wait_queue[borrower] = event
9✔
1747
            try:
9✔
1748
                await event.wait()
9✔
1749
            except BaseException:
×
1750
                self._wait_queue.pop(borrower, None)
×
1751
                raise
×
1752

1753
            self._borrowers.add(borrower)
9✔
1754
        else:
1755
            try:
10✔
1756
                await AsyncIOBackend.cancel_shielded_checkpoint()
10✔
1757
            except BaseException:
9✔
1758
                self.release()
9✔
1759
                raise
9✔
1760

1761
    def release(self) -> None:
10✔
1762
        self.release_on_behalf_of(current_task())
10✔
1763

1764
    def release_on_behalf_of(self, borrower: object) -> None:
10✔
1765
        try:
10✔
1766
            self._borrowers.remove(borrower)
10✔
1767
        except KeyError:
9✔
1768
            raise RuntimeError(
9✔
1769
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1770
            ) from None
1771

1772
        # Notify the next task in line if this limiter has free capacity now
1773
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
10✔
1774
            event = self._wait_queue.popitem(last=False)[1]
9✔
1775
            event.set()
9✔
1776

1777
    def statistics(self) -> CapacityLimiterStatistics:
10✔
1778
        return CapacityLimiterStatistics(
9✔
1779
            self.borrowed_tokens,
1780
            self.total_tokens,
1781
            tuple(self._borrowers),
1782
            len(self._wait_queue),
1783
        )
1784

1785

1786
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
10✔
1787

1788

1789
#
1790
# Operating system signals
1791
#
1792

1793

1794
class _SignalReceiver:
10✔
1795
    def __init__(self, signals: tuple[Signals, ...]):
10✔
1796
        self._signals = signals
8✔
1797
        self._loop = get_running_loop()
8✔
1798
        self._signal_queue: deque[Signals] = deque()
8✔
1799
        self._future: asyncio.Future = asyncio.Future()
8✔
1800
        self._handled_signals: set[Signals] = set()
8✔
1801

1802
    def _deliver(self, signum: Signals) -> None:
10✔
1803
        self._signal_queue.append(signum)
8✔
1804
        if not self._future.done():
8✔
1805
            self._future.set_result(None)
8✔
1806

1807
    def __enter__(self) -> _SignalReceiver:
10✔
1808
        for sig in set(self._signals):
8✔
1809
            self._loop.add_signal_handler(sig, self._deliver, sig)
8✔
1810
            self._handled_signals.add(sig)
8✔
1811

1812
        return self
8✔
1813

1814
    def __exit__(
10✔
1815
        self,
1816
        exc_type: type[BaseException] | None,
1817
        exc_val: BaseException | None,
1818
        exc_tb: TracebackType | None,
1819
    ) -> bool | None:
1820
        for sig in self._handled_signals:
8✔
1821
            self._loop.remove_signal_handler(sig)
8✔
1822
        return None
8✔
1823

1824
    def __aiter__(self) -> _SignalReceiver:
10✔
1825
        return self
8✔
1826

1827
    async def __anext__(self) -> Signals:
10✔
1828
        await AsyncIOBackend.checkpoint()
8✔
1829
        if not self._signal_queue:
8✔
1830
            self._future = asyncio.Future()
×
1831
            await self._future
×
1832

1833
        return self._signal_queue.popleft()
8✔
1834

1835

1836
#
1837
# Testing and debugging
1838
#
1839

1840

1841
class AsyncIOTaskInfo(TaskInfo):
10✔
1842
    def __init__(self, task: asyncio.Task):
10✔
1843
        task_state = _task_states.get(task)
10✔
1844
        if task_state is None:
10✔
1845
            parent_id = None
10✔
1846
        else:
1847
            parent_id = task_state.parent_id
10✔
1848

1849
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
10✔
1850
        self._task = weakref.ref(task)
10✔
1851

1852
    def has_pending_cancellation(self) -> bool:
10✔
1853
        if not (task := self._task()):
10✔
1854
            # If the task isn't around anymore, it won't have a pending cancellation
1855
            return False
×
1856

1857
        if sys.version_info >= (3, 11):
10✔
1858
            if task.cancelling():
4✔
1859
                return True
4✔
1860
        elif (
6✔
1861
            isinstance(task._fut_waiter, asyncio.Future)
1862
            and task._fut_waiter.cancelled()
1863
        ):
1864
            return True
6✔
1865

1866
        if task_state := _task_states.get(task):
10✔
1867
            if cancel_scope := task_state.cancel_scope:
10✔
1868
                return cancel_scope.cancel_called or cancel_scope._parent_cancelled()
10✔
1869

1870
        return False
10✔
1871

1872

1873
class TestRunner(abc.TestRunner):
10✔
1874
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
10✔
1875

1876
    def __init__(
10✔
1877
        self,
1878
        *,
1879
        debug: bool | None = None,
1880
        use_uvloop: bool = False,
1881
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1882
    ) -> None:
1883
        if use_uvloop and loop_factory is None:
10✔
1884
            import uvloop
×
1885

1886
            loop_factory = uvloop.new_event_loop
×
1887

1888
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
10✔
1889
        self._exceptions: list[BaseException] = []
10✔
1890
        self._runner_task: asyncio.Task | None = None
10✔
1891

1892
    def __enter__(self) -> TestRunner:
10✔
1893
        self._runner.__enter__()
10✔
1894
        self.get_loop().set_exception_handler(self._exception_handler)
10✔
1895
        return self
10✔
1896

1897
    def __exit__(
10✔
1898
        self,
1899
        exc_type: type[BaseException] | None,
1900
        exc_val: BaseException | None,
1901
        exc_tb: TracebackType | None,
1902
    ) -> None:
1903
        self._runner.__exit__(exc_type, exc_val, exc_tb)
10✔
1904

1905
    def get_loop(self) -> AbstractEventLoop:
10✔
1906
        return self._runner.get_loop()
10✔
1907

1908
    def _exception_handler(
10✔
1909
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1910
    ) -> None:
1911
        if isinstance(context.get("exception"), Exception):
10✔
1912
            self._exceptions.append(context["exception"])
10✔
1913
        else:
1914
            loop.default_exception_handler(context)
10✔
1915

1916
    def _raise_async_exceptions(self) -> None:
10✔
1917
        # Re-raise any exceptions raised in asynchronous callbacks
1918
        if self._exceptions:
10✔
1919
            exceptions, self._exceptions = self._exceptions, []
10✔
1920
            if len(exceptions) == 1:
10✔
1921
                raise exceptions[0]
10✔
1922
            elif exceptions:
×
1923
                raise BaseExceptionGroup(
×
1924
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1925
                )
1926

1927
    async def _run_tests_and_fixtures(
10✔
1928
        self,
1929
        receive_stream: MemoryObjectReceiveStream[
1930
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1931
        ],
1932
    ) -> None:
1933
        with receive_stream, self._send_stream:
10✔
1934
            async for coro, future in receive_stream:
10✔
1935
                try:
10✔
1936
                    retval = await coro
10✔
1937
                except BaseException as exc:
10✔
1938
                    if not future.cancelled():
10✔
1939
                        future.set_exception(exc)
10✔
1940
                else:
1941
                    if not future.cancelled():
10✔
1942
                        future.set_result(retval)
10✔
1943

1944
    async def _call_in_runner_task(
10✔
1945
        self,
1946
        func: Callable[P, Awaitable[T_Retval]],
1947
        *args: P.args,
1948
        **kwargs: P.kwargs,
1949
    ) -> T_Retval:
1950
        if not self._runner_task:
10✔
1951
            self._send_stream, receive_stream = create_memory_object_stream[
10✔
1952
                Tuple[Awaitable[Any], asyncio.Future]
1953
            ](1)
1954
            self._runner_task = self.get_loop().create_task(
10✔
1955
                self._run_tests_and_fixtures(receive_stream)
1956
            )
1957

1958
        coro = func(*args, **kwargs)
10✔
1959
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
10✔
1960
        self._send_stream.send_nowait((coro, future))
10✔
1961
        return await future
10✔
1962

1963
    def run_asyncgen_fixture(
10✔
1964
        self,
1965
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1966
        kwargs: dict[str, Any],
1967
    ) -> Iterable[T_Retval]:
1968
        asyncgen = fixture_func(**kwargs)
10✔
1969
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
10✔
1970
            self._call_in_runner_task(asyncgen.asend, None)
1971
        )
1972
        self._raise_async_exceptions()
10✔
1973

1974
        yield fixturevalue
10✔
1975

1976
        try:
10✔
1977
            self.get_loop().run_until_complete(
10✔
1978
                self._call_in_runner_task(asyncgen.asend, None)
1979
            )
1980
        except StopAsyncIteration:
10✔
1981
            self._raise_async_exceptions()
10✔
1982
        else:
1983
            self.get_loop().run_until_complete(asyncgen.aclose())
×
1984
            raise RuntimeError("Async generator fixture did not stop")
×
1985

1986
    def run_fixture(
10✔
1987
        self,
1988
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1989
        kwargs: dict[str, Any],
1990
    ) -> T_Retval:
1991
        retval = self.get_loop().run_until_complete(
10✔
1992
            self._call_in_runner_task(fixture_func, **kwargs)
1993
        )
1994
        self._raise_async_exceptions()
10✔
1995
        return retval
10✔
1996

1997
    def run_test(
10✔
1998
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
1999
    ) -> None:
2000
        try:
10✔
2001
            self.get_loop().run_until_complete(
10✔
2002
                self._call_in_runner_task(test_func, **kwargs)
2003
            )
2004
        except Exception as exc:
10✔
2005
            self._exceptions.append(exc)
9✔
2006

2007
        self._raise_async_exceptions()
10✔
2008

2009

2010
class AsyncIOBackend(AsyncBackend):
10✔
2011
    @classmethod
10✔
2012
    def run(
10✔
2013
        cls,
2014
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2015
        args: tuple[Unpack[PosArgsT]],
2016
        kwargs: dict[str, Any],
2017
        options: dict[str, Any],
2018
    ) -> T_Retval:
2019
        @wraps(func)
10✔
2020
        async def wrapper() -> T_Retval:
10✔
2021
            task = cast(asyncio.Task, current_task())
10✔
2022
            task.set_name(get_callable_name(func))
10✔
2023
            _task_states[task] = TaskState(None, None)
10✔
2024

2025
            try:
10✔
2026
                return await func(*args)
10✔
2027
            finally:
2028
                del _task_states[task]
10✔
2029

2030
        debug = options.get("debug", None)
10✔
2031
        loop_factory = options.get("loop_factory", None)
10✔
2032
        if loop_factory is None and options.get("use_uvloop", False):
10✔
2033
            import uvloop
7✔
2034

2035
            loop_factory = uvloop.new_event_loop
7✔
2036

2037
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
10✔
2038
            return runner.run(wrapper())
10✔
2039

2040
    @classmethod
10✔
2041
    def current_token(cls) -> object:
10✔
2042
        return get_running_loop()
10✔
2043

2044
    @classmethod
10✔
2045
    def current_time(cls) -> float:
10✔
2046
        return get_running_loop().time()
10✔
2047

2048
    @classmethod
10✔
2049
    def cancelled_exception_class(cls) -> type[BaseException]:
10✔
2050
        return CancelledError
10✔
2051

2052
    @classmethod
10✔
2053
    async def checkpoint(cls) -> None:
10✔
2054
        await sleep(0)
10✔
2055

2056
    @classmethod
10✔
2057
    async def checkpoint_if_cancelled(cls) -> None:
10✔
2058
        task = current_task()
10✔
2059
        if task is None:
10✔
2060
            return
×
2061

2062
        try:
10✔
2063
            cancel_scope = _task_states[task].cancel_scope
10✔
2064
        except KeyError:
10✔
2065
            return
10✔
2066

2067
        while cancel_scope:
10✔
2068
            if cancel_scope.cancel_called:
10✔
2069
                await sleep(0)
10✔
2070
            elif cancel_scope.shield:
10✔
2071
                break
9✔
2072
            else:
2073
                cancel_scope = cancel_scope._parent_scope
10✔
2074

2075
    @classmethod
10✔
2076
    async def cancel_shielded_checkpoint(cls) -> None:
10✔
2077
        with CancelScope(shield=True):
10✔
2078
            await sleep(0)
10✔
2079

2080
    @classmethod
10✔
2081
    async def sleep(cls, delay: float) -> None:
10✔
2082
        await sleep(delay)
10✔
2083

2084
    @classmethod
10✔
2085
    def create_cancel_scope(
10✔
2086
        cls, *, deadline: float = math.inf, shield: bool = False
2087
    ) -> CancelScope:
2088
        return CancelScope(deadline=deadline, shield=shield)
10✔
2089

2090
    @classmethod
10✔
2091
    def current_effective_deadline(cls) -> float:
10✔
2092
        try:
9✔
2093
            cancel_scope = _task_states[
9✔
2094
                current_task()  # type: ignore[index]
2095
            ].cancel_scope
2096
        except KeyError:
×
2097
            return math.inf
×
2098

2099
        deadline = math.inf
9✔
2100
        while cancel_scope:
9✔
2101
            deadline = min(deadline, cancel_scope.deadline)
9✔
2102
            if cancel_scope._cancel_called:
9✔
2103
                deadline = -math.inf
9✔
2104
                break
9✔
2105
            elif cancel_scope.shield:
9✔
2106
                break
9✔
2107
            else:
2108
                cancel_scope = cancel_scope._parent_scope
9✔
2109

2110
        return deadline
9✔
2111

2112
    @classmethod
10✔
2113
    def create_task_group(cls) -> abc.TaskGroup:
10✔
2114
        return TaskGroup()
10✔
2115

2116
    @classmethod
10✔
2117
    def create_event(cls) -> abc.Event:
10✔
2118
        return Event()
10✔
2119

2120
    @classmethod
10✔
2121
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
10✔
2122
        return CapacityLimiter(total_tokens)
9✔
2123

2124
    @classmethod
10✔
2125
    async def run_sync_in_worker_thread(
10✔
2126
        cls,
2127
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2128
        args: tuple[Unpack[PosArgsT]],
2129
        abandon_on_cancel: bool = False,
2130
        limiter: abc.CapacityLimiter | None = None,
2131
    ) -> T_Retval:
2132
        await cls.checkpoint()
10✔
2133

2134
        # If this is the first run in this event loop thread, set up the necessary
2135
        # variables
2136
        try:
10✔
2137
            idle_workers = _threadpool_idle_workers.get()
10✔
2138
            workers = _threadpool_workers.get()
10✔
2139
        except LookupError:
10✔
2140
            idle_workers = deque()
10✔
2141
            workers = set()
10✔
2142
            _threadpool_idle_workers.set(idle_workers)
10✔
2143
            _threadpool_workers.set(workers)
10✔
2144

2145
        async with limiter or cls.current_default_thread_limiter():
10✔
2146
            with CancelScope(shield=not abandon_on_cancel) as scope:
10✔
2147
                future: asyncio.Future = asyncio.Future()
10✔
2148
                root_task = find_root_task()
10✔
2149
                if not idle_workers:
10✔
2150
                    worker = WorkerThread(root_task, workers, idle_workers)
10✔
2151
                    worker.start()
10✔
2152
                    workers.add(worker)
10✔
2153
                    root_task.add_done_callback(worker.stop)
10✔
2154
                else:
2155
                    worker = idle_workers.pop()
10✔
2156

2157
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2158
                    # seconds or longer
2159
                    now = cls.current_time()
10✔
2160
                    while idle_workers:
10✔
2161
                        if (
9✔
2162
                            now - idle_workers[0].idle_since
2163
                            < WorkerThread.MAX_IDLE_TIME
2164
                        ):
2165
                            break
9✔
2166

2167
                        expired_worker = idle_workers.popleft()
×
2168
                        expired_worker.root_task.remove_done_callback(
×
2169
                            expired_worker.stop
2170
                        )
2171
                        expired_worker.stop()
×
2172

2173
                context = copy_context()
10✔
2174
                context.run(sniffio.current_async_library_cvar.set, None)
10✔
2175
                if abandon_on_cancel or scope._parent_scope is None:
10✔
2176
                    worker_scope = scope
10✔
2177
                else:
2178
                    worker_scope = scope._parent_scope
10✔
2179

2180
                worker.queue.put_nowait((context, func, args, future, worker_scope))
10✔
2181
                return await future
10✔
2182

2183
    @classmethod
10✔
2184
    def check_cancelled(cls) -> None:
10✔
2185
        scope: CancelScope | None = threadlocals.current_cancel_scope
10✔
2186
        while scope is not None:
10✔
2187
            if scope.cancel_called:
10✔
2188
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
10✔
2189

2190
            if scope.shield:
10✔
2191
                return
×
2192

2193
            scope = scope._parent_scope
10✔
2194

2195
    @classmethod
10✔
2196
    def run_async_from_thread(
10✔
2197
        cls,
2198
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2199
        args: tuple[Unpack[PosArgsT]],
2200
        token: object,
2201
    ) -> T_Retval:
2202
        async def task_wrapper(scope: CancelScope) -> T_Retval:
10✔
2203
            __tracebackhide__ = True
10✔
2204
            task = cast(asyncio.Task, current_task())
10✔
2205
            _task_states[task] = TaskState(None, scope)
10✔
2206
            scope._tasks.add(task)
10✔
2207
            try:
10✔
2208
                return await func(*args)
10✔
2209
            except CancelledError as exc:
10✔
2210
                raise concurrent.futures.CancelledError(str(exc)) from None
10✔
2211
            finally:
2212
                scope._tasks.discard(task)
10✔
2213

2214
        loop = cast(AbstractEventLoop, token)
10✔
2215
        context = copy_context()
10✔
2216
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
10✔
2217
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
10✔
2218
        f: concurrent.futures.Future[T_Retval] = context.run(
10✔
2219
            asyncio.run_coroutine_threadsafe, wrapper, loop
2220
        )
2221
        return f.result()
10✔
2222

2223
    @classmethod
10✔
2224
    def run_sync_from_thread(
10✔
2225
        cls,
2226
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2227
        args: tuple[Unpack[PosArgsT]],
2228
        token: object,
2229
    ) -> T_Retval:
2230
        @wraps(func)
10✔
2231
        def wrapper() -> None:
10✔
2232
            try:
10✔
2233
                sniffio.current_async_library_cvar.set("asyncio")
10✔
2234
                f.set_result(func(*args))
10✔
2235
            except BaseException as exc:
10✔
2236
                f.set_exception(exc)
10✔
2237
                if not isinstance(exc, Exception):
10✔
2238
                    raise
×
2239

2240
        f: concurrent.futures.Future[T_Retval] = Future()
10✔
2241
        loop = cast(AbstractEventLoop, token)
10✔
2242
        loop.call_soon_threadsafe(wrapper)
10✔
2243
        return f.result()
10✔
2244

2245
    @classmethod
10✔
2246
    def create_blocking_portal(cls) -> abc.BlockingPortal:
10✔
2247
        return BlockingPortal()
10✔
2248

2249
    @classmethod
10✔
2250
    async def open_process(
10✔
2251
        cls,
2252
        command: str | bytes | Sequence[str | bytes],
2253
        *,
2254
        shell: bool,
2255
        stdin: int | IO[Any] | None,
2256
        stdout: int | IO[Any] | None,
2257
        stderr: int | IO[Any] | None,
2258
        cwd: str | bytes | PathLike | None = None,
2259
        env: Mapping[str, str] | None = None,
2260
        start_new_session: bool = False,
2261
    ) -> Process:
2262
        await cls.checkpoint()
9✔
2263
        if shell:
9✔
2264
            process = await asyncio.create_subprocess_shell(
9✔
2265
                cast("str | bytes", command),
2266
                stdin=stdin,
2267
                stdout=stdout,
2268
                stderr=stderr,
2269
                cwd=cwd,
2270
                env=env,
2271
                start_new_session=start_new_session,
2272
            )
2273
        else:
2274
            process = await asyncio.create_subprocess_exec(
9✔
2275
                *command,
2276
                stdin=stdin,
2277
                stdout=stdout,
2278
                stderr=stderr,
2279
                cwd=cwd,
2280
                env=env,
2281
                start_new_session=start_new_session,
2282
            )
2283

2284
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
9✔
2285
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
9✔
2286
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
9✔
2287
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
9✔
2288

2289
    @classmethod
10✔
2290
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
10✔
2291
        create_task(
9✔
2292
            _shutdown_process_pool_on_exit(workers),
2293
            name="AnyIO process pool shutdown task",
2294
        )
2295
        find_root_task().add_done_callback(
9✔
2296
            partial(_forcibly_shutdown_process_pool_on_exit, workers)
2297
        )
2298

2299
    @classmethod
10✔
2300
    async def connect_tcp(
10✔
2301
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2302
    ) -> abc.SocketStream:
2303
        transport, protocol = cast(
10✔
2304
            Tuple[asyncio.Transport, StreamProtocol],
2305
            await get_running_loop().create_connection(
2306
                StreamProtocol, host, port, local_addr=local_address
2307
            ),
2308
        )
2309
        transport.pause_reading()
10✔
2310
        return SocketStream(transport, protocol)
10✔
2311

2312
    @classmethod
10✔
2313
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
10✔
2314
        await cls.checkpoint()
7✔
2315
        loop = get_running_loop()
7✔
2316
        raw_socket = socket.socket(socket.AF_UNIX)
7✔
2317
        raw_socket.setblocking(False)
7✔
2318
        while True:
4✔
2319
            try:
7✔
2320
                raw_socket.connect(path)
7✔
2321
            except BlockingIOError:
7✔
2322
                f: asyncio.Future = asyncio.Future()
×
2323
                loop.add_writer(raw_socket, f.set_result, None)
×
2324
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2325
                await f
×
2326
            except BaseException:
7✔
2327
                raw_socket.close()
7✔
2328
                raise
7✔
2329
            else:
2330
                return UNIXSocketStream(raw_socket)
7✔
2331

2332
    @classmethod
10✔
2333
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2334
        return TCPSocketListener(sock)
10✔
2335

2336
    @classmethod
10✔
2337
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2338
        return UNIXSocketListener(sock)
7✔
2339

2340
    @classmethod
10✔
2341
    async def create_udp_socket(
10✔
2342
        cls,
2343
        family: AddressFamily,
2344
        local_address: IPSockAddrType | None,
2345
        remote_address: IPSockAddrType | None,
2346
        reuse_port: bool,
2347
    ) -> UDPSocket | ConnectedUDPSocket:
2348
        transport, protocol = await get_running_loop().create_datagram_endpoint(
9✔
2349
            DatagramProtocol,
2350
            local_addr=local_address,
2351
            remote_addr=remote_address,
2352
            family=family,
2353
            reuse_port=reuse_port,
2354
        )
2355
        if protocol.exception:
9✔
2356
            transport.close()
×
2357
            raise protocol.exception
×
2358

2359
        if not remote_address:
9✔
2360
            return UDPSocket(transport, protocol)
9✔
2361
        else:
2362
            return ConnectedUDPSocket(transport, protocol)
9✔
2363

2364
    @classmethod
10✔
2365
    async def create_unix_datagram_socket(  # type: ignore[override]
10✔
2366
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2367
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2368
        await cls.checkpoint()
7✔
2369
        loop = get_running_loop()
7✔
2370

2371
        if remote_path:
7✔
2372
            while True:
4✔
2373
                try:
7✔
2374
                    raw_socket.connect(remote_path)
7✔
2375
                except BlockingIOError:
×
2376
                    f: asyncio.Future = asyncio.Future()
×
2377
                    loop.add_writer(raw_socket, f.set_result, None)
×
2378
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2379
                    await f
×
2380
                except BaseException:
×
2381
                    raw_socket.close()
×
2382
                    raise
×
2383
                else:
2384
                    return ConnectedUNIXDatagramSocket(raw_socket)
7✔
2385
        else:
2386
            return UNIXDatagramSocket(raw_socket)
7✔
2387

2388
    @classmethod
10✔
2389
    async def getaddrinfo(
10✔
2390
        cls,
2391
        host: bytes | str | None,
2392
        port: str | int | None,
2393
        *,
2394
        family: int | AddressFamily = 0,
2395
        type: int | SocketKind = 0,
2396
        proto: int = 0,
2397
        flags: int = 0,
2398
    ) -> list[
2399
        tuple[
2400
            AddressFamily,
2401
            SocketKind,
2402
            int,
2403
            str,
2404
            tuple[str, int] | tuple[str, int, int, int],
2405
        ]
2406
    ]:
2407
        return await get_running_loop().getaddrinfo(
10✔
2408
            host, port, family=family, type=type, proto=proto, flags=flags
2409
        )
2410

2411
    @classmethod
10✔
2412
    async def getnameinfo(
10✔
2413
        cls, sockaddr: IPSockAddrType, flags: int = 0
2414
    ) -> tuple[str, str]:
2415
        return await get_running_loop().getnameinfo(sockaddr, flags)
9✔
2416

2417
    @classmethod
10✔
2418
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
10✔
2419
        await cls.checkpoint()
×
2420
        try:
×
2421
            read_events = _read_events.get()
×
2422
        except LookupError:
×
2423
            read_events = {}
×
2424
            _read_events.set(read_events)
×
2425

2426
        if read_events.get(sock):
×
2427
            raise BusyResourceError("reading from") from None
×
2428

2429
        loop = get_running_loop()
×
2430
        event = read_events[sock] = asyncio.Event()
×
2431
        loop.add_reader(sock, event.set)
×
2432
        try:
×
2433
            await event.wait()
×
2434
        finally:
2435
            if read_events.pop(sock, None) is not None:
×
2436
                loop.remove_reader(sock)
×
2437
                readable = True
×
2438
            else:
2439
                readable = False
×
2440

2441
        if not readable:
×
2442
            raise ClosedResourceError
×
2443

2444
    @classmethod
10✔
2445
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
10✔
2446
        await cls.checkpoint()
×
2447
        try:
×
2448
            write_events = _write_events.get()
×
2449
        except LookupError:
×
2450
            write_events = {}
×
2451
            _write_events.set(write_events)
×
2452

2453
        if write_events.get(sock):
×
2454
            raise BusyResourceError("writing to") from None
×
2455

2456
        loop = get_running_loop()
×
2457
        event = write_events[sock] = asyncio.Event()
×
2458
        loop.add_writer(sock.fileno(), event.set)
×
2459
        try:
×
2460
            await event.wait()
×
2461
        finally:
2462
            if write_events.pop(sock, None) is not None:
×
2463
                loop.remove_writer(sock)
×
2464
                writable = True
×
2465
            else:
2466
                writable = False
×
2467

2468
        if not writable:
×
2469
            raise ClosedResourceError
×
2470

2471
    @classmethod
10✔
2472
    def current_default_thread_limiter(cls) -> CapacityLimiter:
10✔
2473
        try:
10✔
2474
            return _default_thread_limiter.get()
10✔
2475
        except LookupError:
10✔
2476
            limiter = CapacityLimiter(40)
10✔
2477
            _default_thread_limiter.set(limiter)
10✔
2478
            return limiter
10✔
2479

2480
    @classmethod
10✔
2481
    def open_signal_receiver(
10✔
2482
        cls, *signals: Signals
2483
    ) -> ContextManager[AsyncIterator[Signals]]:
2484
        return _SignalReceiver(signals)
8✔
2485

2486
    @classmethod
10✔
2487
    def get_current_task(cls) -> TaskInfo:
10✔
2488
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
10✔
2489

2490
    @classmethod
10✔
2491
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
10✔
2492
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
10✔
2493

2494
    @classmethod
10✔
2495
    async def wait_all_tasks_blocked(cls) -> None:
10✔
2496
        await cls.checkpoint()
10✔
2497
        this_task = current_task()
10✔
2498
        while True:
6✔
2499
            for task in all_tasks():
10✔
2500
                if task is this_task:
10✔
2501
                    continue
10✔
2502

2503
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
2504
                if waiter is None or waiter.done():
10✔
2505
                    await sleep(0.1)
10✔
2506
                    break
10✔
2507
            else:
2508
                return
10✔
2509

2510
    @classmethod
10✔
2511
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
10✔
2512
        return TestRunner(**options)
10✔
2513

2514

2515
backend_class = AsyncIOBackend
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc