• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10643958642

31 Aug 2024 08:48AM UTC coverage: 91.75% (+0.005%) from 91.745%
10643958642

Pull #752

github

web-flow
Merge 571cc5324 into e5a8a9314
Pull Request #752: Fixed feed_data after feed_eof assertion errors on asyncio

4 of 5 new or added lines in 1 file covered. (80.0%)

76 existing lines in 1 file now uncovered.

4582 of 4994 relevant lines covered (91.75%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.28
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
11✔
2

3
import array
11✔
4
import asyncio
11✔
5
import concurrent.futures
11✔
6
import math
11✔
7
import socket
11✔
8
import sys
11✔
9
import threading
11✔
10
import weakref
11✔
11
from asyncio import (
11✔
12
    AbstractEventLoop,
13
    CancelledError,
14
    all_tasks,
15
    create_task,
16
    current_task,
17
    get_running_loop,
18
    sleep,
19
)
20
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
11✔
21
from collections import OrderedDict, deque
11✔
22
from collections.abc import AsyncIterator, Generator, Iterable
11✔
23
from concurrent.futures import Future
11✔
24
from contextlib import suppress
11✔
25
from contextvars import Context, copy_context
11✔
26
from dataclasses import dataclass, field
11✔
27
from functools import partial, wraps
11✔
28
from inspect import (
11✔
29
    CORO_RUNNING,
30
    CORO_SUSPENDED,
31
    getcoroutinestate,
32
    iscoroutine,
33
)
34
from io import IOBase
11✔
35
from os import PathLike
11✔
36
from queue import Queue
11✔
37
from signal import Signals
11✔
38
from socket import AddressFamily, SocketKind
11✔
39
from threading import Thread
11✔
40
from types import TracebackType
11✔
41
from typing import (
11✔
42
    IO,
43
    Any,
44
    AsyncGenerator,
45
    Awaitable,
46
    Callable,
47
    Collection,
48
    ContextManager,
49
    Coroutine,
50
    Mapping,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
11✔
58

59
import sniffio
11✔
60

61
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
11✔
62
from .._core._eventloop import claim_worker_thread, threadlocals
11✔
63
from .._core._exceptions import (
11✔
64
    BrokenResourceError,
65
    BusyResourceError,
66
    ClosedResourceError,
67
    EndOfStream,
68
    WouldBlock,
69
)
70
from .._core._sockets import convert_ipv6_sockaddr
11✔
71
from .._core._streams import create_memory_object_stream
11✔
72
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
11✔
73
from .._core._synchronization import Event as BaseEvent
11✔
74
from .._core._synchronization import ResourceGuard
11✔
75
from .._core._tasks import CancelScope as BaseCancelScope
11✔
76
from ..abc import (
11✔
77
    AsyncBackend,
78
    IPSockAddrType,
79
    SocketListener,
80
    UDPPacketType,
81
    UNIXDatagramPacketType,
82
)
83
from ..lowlevel import RunVar
11✔
84
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
11✔
85

86
if sys.version_info >= (3, 10):
11✔
87
    from typing import ParamSpec
7✔
88
else:
89
    from typing_extensions import ParamSpec
4✔
90

91
if sys.version_info >= (3, 11):
11✔
92
    from asyncio import Runner
5✔
93
    from typing import TypeVarTuple, Unpack
5✔
94
else:
95
    import contextvars
6✔
96
    import enum
6✔
97
    import signal
6✔
98
    from asyncio import coroutines, events, exceptions, tasks
6✔
99

100
    from exceptiongroup import BaseExceptionGroup
6✔
101
    from typing_extensions import TypeVarTuple, Unpack
6✔
102

103
    class _State(enum.Enum):
6✔
104
        CREATED = "created"
6✔
105
        INITIALIZED = "initialized"
6✔
106
        CLOSED = "closed"
6✔
107

108
    class Runner:
6✔
109
        # Copied from CPython 3.11
110
        def __init__(
6✔
111
            self,
112
            *,
113
            debug: bool | None = None,
114
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
115
        ):
116
            self._state = _State.CREATED
6✔
117
            self._debug = debug
6✔
118
            self._loop_factory = loop_factory
6✔
119
            self._loop: AbstractEventLoop | None = None
6✔
120
            self._context = None
6✔
121
            self._interrupt_count = 0
6✔
122
            self._set_event_loop = False
6✔
123

124
        def __enter__(self) -> Runner:
6✔
125
            self._lazy_init()
6✔
126
            return self
6✔
127

128
        def __exit__(
6✔
129
            self,
130
            exc_type: type[BaseException],
131
            exc_val: BaseException,
132
            exc_tb: TracebackType,
133
        ) -> None:
134
            self.close()
6✔
135

136
        def close(self) -> None:
6✔
137
            """Shutdown and close event loop."""
138
            if self._state is not _State.INITIALIZED:
6✔
139
                return
×
140
            try:
6✔
141
                loop = self._loop
6✔
142
                _cancel_all_tasks(loop)
6✔
143
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
144
                if hasattr(loop, "shutdown_default_executor"):
6✔
145
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
146
                else:
147
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
148
            finally:
149
                if self._set_event_loop:
6✔
150
                    events.set_event_loop(None)
6✔
151
                loop.close()
6✔
152
                self._loop = None
6✔
153
                self._state = _State.CLOSED
6✔
154

155
        def get_loop(self) -> AbstractEventLoop:
6✔
156
            """Return embedded event loop."""
157
            self._lazy_init()
6✔
158
            return self._loop
6✔
159

160
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
161
            """Run a coroutine inside the embedded event loop."""
162
            if not coroutines.iscoroutine(coro):
6✔
163
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
164

165
            if events._get_running_loop() is not None:
6✔
166
                # fail fast with short traceback
167
                raise RuntimeError(
×
168
                    "Runner.run() cannot be called from a running event loop"
169
                )
170

171
            self._lazy_init()
6✔
172

173
            if context is None:
6✔
174
                context = self._context
6✔
175
            task = context.run(self._loop.create_task, coro)
6✔
176

177
            if (
6✔
178
                threading.current_thread() is threading.main_thread()
179
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
180
            ):
181
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
182
                try:
6✔
183
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
184
                except ValueError:
×
185
                    # `signal.signal` may throw if `threading.main_thread` does
186
                    # not support signals (e.g. embedded interpreter with signals
187
                    # not registered - see gh-91880)
188
                    sigint_handler = None
×
189
            else:
190
                sigint_handler = None
6✔
191

192
            self._interrupt_count = 0
6✔
193
            try:
6✔
194
                return self._loop.run_until_complete(task)
6✔
195
            except exceptions.CancelledError:
6✔
196
                if self._interrupt_count > 0:
×
197
                    uncancel = getattr(task, "uncancel", None)
×
198
                    if uncancel is not None and uncancel() == 0:
×
199
                        raise KeyboardInterrupt()
×
200
                raise  # CancelledError
1✔
201
            finally:
202
                if (
6✔
203
                    sigint_handler is not None
204
                    and signal.getsignal(signal.SIGINT) is sigint_handler
205
                ):
206
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
207

208
        def _lazy_init(self) -> None:
6✔
209
            if self._state is _State.CLOSED:
6✔
210
                raise RuntimeError("Runner is closed")
×
211
            if self._state is _State.INITIALIZED:
6✔
212
                return
6✔
213
            if self._loop_factory is None:
6✔
214
                self._loop = events.new_event_loop()
6✔
215
                if not self._set_event_loop:
6✔
216
                    # Call set_event_loop only once to avoid calling
217
                    # attach_loop multiple times on child watchers
218
                    events.set_event_loop(self._loop)
6✔
219
                    self._set_event_loop = True
6✔
220
            else:
221
                self._loop = self._loop_factory()
4✔
222
            if self._debug is not None:
6✔
223
                self._loop.set_debug(self._debug)
6✔
224
            self._context = contextvars.copy_context()
6✔
225
            self._state = _State.INITIALIZED
6✔
226

227
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
228
            self._interrupt_count += 1
×
229
            if self._interrupt_count == 1 and not main_task.done():
×
230
                main_task.cancel()
×
231
                # wakeup loop if it is blocked by select() with long timeout
232
                self._loop.call_soon_threadsafe(lambda: None)
×
233
                return
×
234
            raise KeyboardInterrupt()
×
235

236
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
237
        to_cancel = tasks.all_tasks(loop)
6✔
238
        if not to_cancel:
6✔
239
            return
6✔
240

241
        for task in to_cancel:
6✔
242
            task.cancel()
6✔
243

244
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
245

246
        for task in to_cancel:
6✔
247
            if task.cancelled():
6✔
248
                continue
6✔
249
            if task.exception() is not None:
5✔
250
                loop.call_exception_handler(
×
251
                    {
252
                        "message": "unhandled exception during asyncio.run() shutdown",
253
                        "exception": task.exception(),
254
                        "task": task,
255
                    }
256
                )
257

258
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
259
        """Schedule the shutdown of the default executor."""
260

261
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
262
            try:
3✔
263
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
264
                loop.call_soon_threadsafe(future.set_result, None)
3✔
265
            except Exception as ex:
×
266
                loop.call_soon_threadsafe(future.set_exception, ex)
×
267

268
        loop._executor_shutdown_called = True
3✔
269
        if loop._default_executor is None:
3✔
270
            return
3✔
271
        future = loop.create_future()
3✔
272
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
273
        thread.start()
3✔
274
        try:
3✔
275
            await future
3✔
276
        finally:
277
            thread.join()
3✔
278

279

280
T_Retval = TypeVar("T_Retval")
11✔
281
T_contra = TypeVar("T_contra", contravariant=True)
11✔
282
PosArgsT = TypeVarTuple("PosArgsT")
11✔
283
P = ParamSpec("P")
11✔
284

285
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
11✔
286

287

288
def find_root_task() -> asyncio.Task:
11✔
289
    root_task = _root_task.get(None)
11✔
290
    if root_task is not None and not root_task.done():
11✔
291
        return root_task
11✔
292

293
    # Look for a task that has been started via run_until_complete()
294
    for task in all_tasks():
11✔
295
        if task._callbacks and not task.done():
11✔
296
            callbacks = [cb for cb, context in task._callbacks]
11✔
297
            for cb in callbacks:
11✔
298
                if (
11✔
299
                    cb is _run_until_complete_cb
300
                    or getattr(cb, "__module__", None) == "uvloop.loop"
301
                ):
302
                    _root_task.set(task)
11✔
303
                    return task
11✔
304

305
    # Look up the topmost task in the AnyIO task tree, if possible
306
    task = cast(asyncio.Task, current_task())
10✔
307
    state = _task_states.get(task)
10✔
308
    if state:
10✔
309
        cancel_scope = state.cancel_scope
10✔
310
        while cancel_scope and cancel_scope._parent_scope is not None:
10✔
311
            cancel_scope = cancel_scope._parent_scope
×
312

313
        if cancel_scope is not None:
10✔
314
            return cast(asyncio.Task, cancel_scope._host_task)
10✔
315

316
    return task
×
317

318

319
def get_callable_name(func: Callable) -> str:
11✔
320
    module = getattr(func, "__module__", None)
11✔
321
    qualname = getattr(func, "__qualname__", None)
11✔
322
    return ".".join([x for x in (module, qualname) if x])
11✔
323

324

325
#
326
# Event loop
327
#
328

329
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
11✔
330

331

332
def _task_started(task: asyncio.Task) -> bool:
11✔
333
    """Return ``True`` if the task has been started and has not finished."""
334
    try:
11✔
335
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
11✔
336
    except AttributeError:
×
337
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
338
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
339

340

341
#
342
# Timeouts and cancellation
343
#
344

345

346
class CancelScope(BaseCancelScope):
11✔
347
    def __new__(
11✔
348
        cls, *, deadline: float = math.inf, shield: bool = False
349
    ) -> CancelScope:
350
        return object.__new__(cls)
11✔
351

352
    def __init__(self, deadline: float = math.inf, shield: bool = False):
11✔
353
        self._deadline = deadline
11✔
354
        self._shield = shield
11✔
355
        self._parent_scope: CancelScope | None = None
11✔
356
        self._child_scopes: set[CancelScope] = set()
11✔
357
        self._cancel_called = False
11✔
358
        self._cancelled_caught = False
11✔
359
        self._active = False
11✔
360
        self._timeout_handle: asyncio.TimerHandle | None = None
11✔
361
        self._cancel_handle: asyncio.Handle | None = None
11✔
362
        self._tasks: set[asyncio.Task] = set()
11✔
363
        self._host_task: asyncio.Task | None = None
11✔
364
        self._cancel_calls: int = 0
11✔
365
        self._cancelling: int | None = None
11✔
366

367
    def __enter__(self) -> CancelScope:
11✔
368
        if self._active:
11✔
369
            raise RuntimeError(
×
370
                "Each CancelScope may only be used for a single 'with' block"
371
            )
372

373
        self._host_task = host_task = cast(asyncio.Task, current_task())
11✔
374
        self._tasks.add(host_task)
11✔
375
        try:
11✔
376
            task_state = _task_states[host_task]
11✔
377
        except KeyError:
11✔
378
            task_state = TaskState(None, self)
11✔
379
            _task_states[host_task] = task_state
11✔
380
        else:
381
            self._parent_scope = task_state.cancel_scope
11✔
382
            task_state.cancel_scope = self
11✔
383
            if self._parent_scope is not None:
11✔
384
                self._parent_scope._child_scopes.add(self)
11✔
385
                self._parent_scope._tasks.remove(host_task)
11✔
386

387
        self._timeout()
11✔
388
        self._active = True
11✔
389
        if sys.version_info >= (3, 11):
11✔
390
            self._cancelling = self._host_task.cancelling()
5✔
391

392
        # Start cancelling the host task if the scope was cancelled before entering
393
        if self._cancel_called:
11✔
394
            self._deliver_cancellation(self)
11✔
395

396
        return self
11✔
397

398
    def __exit__(
11✔
399
        self,
400
        exc_type: type[BaseException] | None,
401
        exc_val: BaseException | None,
402
        exc_tb: TracebackType | None,
403
    ) -> bool | None:
404
        if not self._active:
11✔
405
            raise RuntimeError("This cancel scope is not active")
10✔
406
        if current_task() is not self._host_task:
11✔
407
            raise RuntimeError(
10✔
408
                "Attempted to exit cancel scope in a different task than it was "
409
                "entered in"
410
            )
411

412
        assert self._host_task is not None
11✔
413
        host_task_state = _task_states.get(self._host_task)
11✔
414
        if host_task_state is None or host_task_state.cancel_scope is not self:
11✔
415
            raise RuntimeError(
10✔
416
                "Attempted to exit a cancel scope that isn't the current tasks's "
417
                "current cancel scope"
418
            )
419

420
        self._active = False
11✔
421
        if self._timeout_handle:
11✔
422
            self._timeout_handle.cancel()
11✔
423
            self._timeout_handle = None
11✔
424

425
        self._tasks.remove(self._host_task)
11✔
426
        if self._parent_scope is not None:
11✔
427
            self._parent_scope._child_scopes.remove(self)
11✔
428
            self._parent_scope._tasks.add(self._host_task)
11✔
429

430
        host_task_state.cancel_scope = self._parent_scope
11✔
431

432
        # Restart the cancellation effort in the closest directly cancelled parent
433
        # scope if this one was shielded
434
        self._restart_cancellation_in_parent()
11✔
435

436
        if self._cancel_called and exc_val is not None:
11✔
437
            for exc in iterate_exceptions(exc_val):
11✔
438
                if isinstance(exc, CancelledError):
11✔
439
                    self._cancelled_caught = self._uncancel(exc)
11✔
440
                    if self._cancelled_caught:
11✔
441
                        break
11✔
442

443
            return self._cancelled_caught
11✔
444

445
        return None
11✔
446

447
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
11✔
448
        if sys.version_info < (3, 9) or self._host_task is None:
11✔
449
            self._cancel_calls = 0
3✔
450
            return True
3✔
451

452
        # Undo all cancellations done by this scope
453
        if self._cancelling is not None:
8✔
454
            while self._cancel_calls:
5✔
455
                self._cancel_calls -= 1
5✔
456
                if self._host_task.uncancel() <= self._cancelling:
5✔
457
                    return True
5✔
458

459
        self._cancel_calls = 0
8✔
460
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
8✔
461

462
    def _timeout(self) -> None:
11✔
463
        if self._deadline != math.inf:
11✔
464
            loop = get_running_loop()
11✔
465
            if loop.time() >= self._deadline:
11✔
466
                self.cancel()
11✔
467
            else:
468
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
11✔
469

470
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
11✔
471
        """
472
        Deliver cancellation to directly contained tasks and nested cancel scopes.
473

474
        Schedule another run at the end if we still have tasks eligible for
475
        cancellation.
476

477
        :param origin: the cancel scope that originated the cancellation
478
        :return: ``True`` if the delivery needs to be retried on the next cycle
479

480
        """
481
        should_retry = False
11✔
482
        current = current_task()
11✔
483
        for task in self._tasks:
11✔
484
            if task._must_cancel:  # type: ignore[attr-defined]
11✔
485
                continue
10✔
486

487
            # The task is eligible for cancellation if it has started
488
            should_retry = True
11✔
489
            if task is not current and (task is self._host_task or _task_started(task)):
11✔
490
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
491
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
11✔
492
                    origin._cancel_calls += 1
11✔
493
                    if sys.version_info >= (3, 9):
11✔
494
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
8✔
495
                    else:
496
                        task.cancel()
3✔
497

498
        # Deliver cancellation to child scopes that aren't shielded or running their own
499
        # cancellation callbacks
500
        for scope in self._child_scopes:
11✔
501
            if not scope._shield and not scope.cancel_called:
11✔
502
                should_retry = scope._deliver_cancellation(origin) or should_retry
11✔
503

504
        # Schedule another callback if there are still tasks left
505
        if origin is self:
11✔
506
            if should_retry:
11✔
507
                self._cancel_handle = get_running_loop().call_soon(
11✔
508
                    self._deliver_cancellation, origin
509
                )
510
            else:
511
                self._cancel_handle = None
11✔
512

513
        return should_retry
11✔
514

515
    def _restart_cancellation_in_parent(self) -> None:
11✔
516
        """
517
        Restart the cancellation effort in the closest directly cancelled parent scope.
518

519
        """
520
        scope = self._parent_scope
11✔
521
        while scope is not None:
11✔
522
            if scope._cancel_called:
11✔
523
                if scope._cancel_handle is None:
11✔
524
                    scope._deliver_cancellation(scope)
11✔
525

526
                break
11✔
527

528
            # No point in looking beyond any shielded scope
529
            if scope._shield:
11✔
530
                break
10✔
531

532
            scope = scope._parent_scope
11✔
533

534
    def _parent_cancelled(self) -> bool:
11✔
535
        # Check whether any parent has been cancelled
536
        cancel_scope = self._parent_scope
11✔
537
        while cancel_scope is not None and not cancel_scope._shield:
11✔
538
            if cancel_scope._cancel_called:
11✔
539
                return True
11✔
540
            else:
541
                cancel_scope = cancel_scope._parent_scope
11✔
542

543
        return False
11✔
544

545
    def cancel(self) -> None:
11✔
546
        if not self._cancel_called:
11✔
547
            if self._timeout_handle:
11✔
548
                self._timeout_handle.cancel()
11✔
549
                self._timeout_handle = None
11✔
550

551
            self._cancel_called = True
11✔
552
            if self._host_task is not None:
11✔
553
                self._deliver_cancellation(self)
11✔
554

555
    @property
11✔
556
    def deadline(self) -> float:
11✔
557
        return self._deadline
10✔
558

559
    @deadline.setter
11✔
560
    def deadline(self, value: float) -> None:
11✔
561
        self._deadline = float(value)
10✔
562
        if self._timeout_handle is not None:
10✔
563
            self._timeout_handle.cancel()
10✔
564
            self._timeout_handle = None
10✔
565

566
        if self._active and not self._cancel_called:
10✔
567
            self._timeout()
10✔
568

569
    @property
11✔
570
    def cancel_called(self) -> bool:
11✔
571
        return self._cancel_called
11✔
572

573
    @property
11✔
574
    def cancelled_caught(self) -> bool:
11✔
575
        return self._cancelled_caught
11✔
576

577
    @property
11✔
578
    def shield(self) -> bool:
11✔
579
        return self._shield
11✔
580

581
    @shield.setter
11✔
582
    def shield(self, value: bool) -> None:
11✔
583
        if self._shield != value:
10✔
584
            self._shield = value
10✔
585
            if not value:
10✔
586
                self._restart_cancellation_in_parent()
10✔
587

588

589
#
590
# Task states
591
#
592

593

594
class TaskState:
11✔
595
    """
596
    Encapsulates auxiliary task information that cannot be added to the Task instance
597
    itself because there are no guarantees about its implementation.
598
    """
599

600
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
11✔
601

602
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
11✔
603
        self.parent_id = parent_id
11✔
604
        self.cancel_scope = cancel_scope
11✔
605

606

607
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
11✔
608

609

610
#
611
# Task groups
612
#
613

614

615
class _AsyncioTaskStatus(abc.TaskStatus):
11✔
616
    def __init__(self, future: asyncio.Future, parent_id: int):
11✔
617
        self._future = future
11✔
618
        self._parent_id = parent_id
11✔
619

620
    def started(self, value: T_contra | None = None) -> None:
11✔
621
        try:
11✔
622
            self._future.set_result(value)
11✔
623
        except asyncio.InvalidStateError:
10✔
624
            if not self._future.cancelled():
10✔
625
                raise RuntimeError(
10✔
626
                    "called 'started' twice on the same task status"
627
                ) from None
628

629
        task = cast(asyncio.Task, current_task())
11✔
630
        _task_states[task].parent_id = self._parent_id
11✔
631

632

633
def iterate_exceptions(
11✔
634
    exception: BaseException,
635
) -> Generator[BaseException, None, None]:
636
    if isinstance(exception, BaseExceptionGroup):
11✔
637
        for exc in exception.exceptions:
10✔
638
            yield from iterate_exceptions(exc)
10✔
639
    else:
640
        yield exception
11✔
641

642

643
class TaskGroup(abc.TaskGroup):
11✔
644
    def __init__(self) -> None:
11✔
645
        self.cancel_scope: CancelScope = CancelScope()
11✔
646
        self._active = False
11✔
647
        self._exceptions: list[BaseException] = []
11✔
648
        self._tasks: set[asyncio.Task] = set()
11✔
649

650
    async def __aenter__(self) -> TaskGroup:
11✔
651
        self.cancel_scope.__enter__()
11✔
652
        self._active = True
11✔
653
        return self
11✔
654

655
    async def __aexit__(
11✔
656
        self,
657
        exc_type: type[BaseException] | None,
658
        exc_val: BaseException | None,
659
        exc_tb: TracebackType | None,
660
    ) -> bool | None:
661
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
11✔
662
        if exc_val is not None:
11✔
663
            self.cancel_scope.cancel()
11✔
664
            if not isinstance(exc_val, CancelledError):
11✔
665
                self._exceptions.append(exc_val)
11✔
666

667
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
11✔
668
        while self._tasks:
11✔
669
            try:
11✔
670
                await asyncio.wait(self._tasks)
11✔
671
            except CancelledError as exc:
11✔
672
                # This task was cancelled natively; reraise the CancelledError later
673
                # unless this task was already interrupted by another exception
674
                self.cancel_scope.cancel()
11✔
675
                if cancelled_exc_while_waiting_tasks is None:
11✔
676
                    cancelled_exc_while_waiting_tasks = exc
11✔
677

678
        self._active = False
11✔
679
        if self._exceptions:
11✔
680
            raise BaseExceptionGroup(
11✔
681
                "unhandled errors in a TaskGroup", self._exceptions
682
            )
683

684
        # Raise the CancelledError received while waiting for child tasks to exit,
685
        # unless the context manager itself was previously exited with another
686
        # exception, or if any of the  child tasks raised an exception other than
687
        # CancelledError
688
        if cancelled_exc_while_waiting_tasks:
11✔
689
            if exc_val is None or ignore_exception:
11✔
690
                raise cancelled_exc_while_waiting_tasks
11✔
691

692
        return ignore_exception
11✔
693

694
    def _spawn(
11✔
695
        self,
696
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
697
        args: tuple[Unpack[PosArgsT]],
698
        name: object,
699
        task_status_future: asyncio.Future | None = None,
700
    ) -> asyncio.Task:
701
        def task_done(_task: asyncio.Task) -> None:
11✔
702
            task_state = _task_states[_task]
11✔
703
            assert task_state.cancel_scope is not None
11✔
704
            assert _task in task_state.cancel_scope._tasks
11✔
705
            task_state.cancel_scope._tasks.remove(_task)
11✔
706
            self._tasks.remove(task)
11✔
707
            del _task_states[_task]
11✔
708

709
            try:
11✔
710
                exc = _task.exception()
11✔
711
            except CancelledError as e:
11✔
712
                while isinstance(e.__context__, CancelledError):
11✔
713
                    e = e.__context__
3✔
714

715
                exc = e
11✔
716

717
            if exc is not None:
11✔
718
                # The future can only be in the cancelled state if the host task was
719
                # cancelled, so return immediately instead of adding one more
720
                # CancelledError to the exceptions list
721
                if task_status_future is not None and task_status_future.cancelled():
11✔
722
                    return
10✔
723

724
                if task_status_future is None or task_status_future.done():
11✔
725
                    if not isinstance(exc, CancelledError):
11✔
726
                        self._exceptions.append(exc)
11✔
727

728
                    if not self.cancel_scope._parent_cancelled():
11✔
729
                        self.cancel_scope.cancel()
11✔
730
                else:
731
                    task_status_future.set_exception(exc)
10✔
732
            elif task_status_future is not None and not task_status_future.done():
11✔
733
                task_status_future.set_exception(
10✔
734
                    RuntimeError("Child exited without calling task_status.started()")
735
                )
736

737
        if not self._active:
11✔
738
            raise RuntimeError(
10✔
739
                "This task group is not active; no new tasks can be started."
740
            )
741

742
        kwargs = {}
11✔
743
        if task_status_future:
11✔
744
            parent_id = id(current_task())
11✔
745
            kwargs["task_status"] = _AsyncioTaskStatus(
11✔
746
                task_status_future, id(self.cancel_scope._host_task)
747
            )
748
        else:
749
            parent_id = id(self.cancel_scope._host_task)
11✔
750

751
        coro = func(*args, **kwargs)
11✔
752
        if not iscoroutine(coro):
11✔
753
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
10✔
754
            raise TypeError(
10✔
755
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
756
                f"the return value ({coro!r}) is not a coroutine object"
757
            )
758

759
        name = get_callable_name(func) if name is None else str(name)
11✔
760
        task = create_task(coro, name=name)
11✔
761
        task.add_done_callback(task_done)
11✔
762

763
        # Make the spawned task inherit the task group's cancel scope
764
        _task_states[task] = TaskState(
11✔
765
            parent_id=parent_id, cancel_scope=self.cancel_scope
766
        )
767
        self.cancel_scope._tasks.add(task)
11✔
768
        self._tasks.add(task)
11✔
769
        return task
11✔
770

771
    def start_soon(
11✔
772
        self,
773
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
774
        *args: Unpack[PosArgsT],
775
        name: object = None,
776
    ) -> None:
777
        self._spawn(func, args, name)
11✔
778

779
    async def start(
11✔
780
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
781
    ) -> Any:
782
        future: asyncio.Future = asyncio.Future()
11✔
783
        task = self._spawn(func, args, name, future)
11✔
784

785
        # If the task raises an exception after sending a start value without a switch
786
        # point between, the task group is cancelled and this method never proceeds to
787
        # process the completed future. That's why we have to have a shielded cancel
788
        # scope here.
789
        try:
11✔
790
            return await future
11✔
791
        except CancelledError:
10✔
792
            # Cancel the task and wait for it to exit before returning
793
            task.cancel()
10✔
794
            with CancelScope(shield=True), suppress(CancelledError):
10✔
795
                await task
10✔
796

797
            raise
10✔
798

799

800
#
801
# Threads
802
#
803

804
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
11✔
805

806

807
class WorkerThread(Thread):
11✔
808
    MAX_IDLE_TIME = 10  # seconds
11✔
809

810
    def __init__(
11✔
811
        self,
812
        root_task: asyncio.Task,
813
        workers: set[WorkerThread],
814
        idle_workers: deque[WorkerThread],
815
    ):
816
        super().__init__(name="AnyIO worker thread")
11✔
817
        self.root_task = root_task
11✔
818
        self.workers = workers
11✔
819
        self.idle_workers = idle_workers
11✔
820
        self.loop = root_task._loop
11✔
821
        self.queue: Queue[
11✔
822
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
823
        ] = Queue(2)
824
        self.idle_since = AsyncIOBackend.current_time()
11✔
825
        self.stopping = False
11✔
826

827
    def _report_result(
11✔
828
        self, future: asyncio.Future, result: Any, exc: BaseException | None
829
    ) -> None:
830
        self.idle_since = AsyncIOBackend.current_time()
11✔
831
        if not self.stopping:
11✔
832
            self.idle_workers.append(self)
11✔
833

834
        if not future.cancelled():
11✔
835
            if exc is not None:
11✔
836
                if isinstance(exc, StopIteration):
11✔
837
                    new_exc = RuntimeError("coroutine raised StopIteration")
10✔
838
                    new_exc.__cause__ = exc
10✔
839
                    exc = new_exc
10✔
840

841
                future.set_exception(exc)
11✔
842
            else:
843
                future.set_result(result)
11✔
844

845
    def run(self) -> None:
11✔
846
        with claim_worker_thread(AsyncIOBackend, self.loop):
11✔
847
            while True:
7✔
848
                item = self.queue.get()
11✔
849
                if item is None:
11✔
850
                    # Shutdown command received
851
                    return
11✔
852

853
                context, func, args, future, cancel_scope = item
11✔
854
                if not future.cancelled():
11✔
855
                    result = None
11✔
856
                    exception: BaseException | None = None
11✔
857
                    threadlocals.current_cancel_scope = cancel_scope
11✔
858
                    try:
11✔
859
                        result = context.run(func, *args)
11✔
860
                    except BaseException as exc:
11✔
861
                        exception = exc
11✔
862
                    finally:
863
                        del threadlocals.current_cancel_scope
11✔
864

865
                    if not self.loop.is_closed():
11✔
866
                        self.loop.call_soon_threadsafe(
11✔
867
                            self._report_result, future, result, exception
868
                        )
869

870
                self.queue.task_done()
11✔
871

872
    def stop(self, f: asyncio.Task | None = None) -> None:
11✔
873
        self.stopping = True
11✔
874
        self.queue.put_nowait(None)
11✔
875
        self.workers.discard(self)
11✔
876
        try:
11✔
877
            self.idle_workers.remove(self)
11✔
878
        except ValueError:
10✔
879
            pass
10✔
880

881

882
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
11✔
883
    "_threadpool_idle_workers"
884
)
885
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
11✔
886

887

888
class BlockingPortal(abc.BlockingPortal):
11✔
889
    def __new__(cls) -> BlockingPortal:
11✔
890
        return object.__new__(cls)
11✔
891

892
    def __init__(self) -> None:
11✔
893
        super().__init__()
11✔
894
        self._loop = get_running_loop()
11✔
895

896
    def _spawn_task_from_thread(
11✔
897
        self,
898
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
899
        args: tuple[Unpack[PosArgsT]],
900
        kwargs: dict[str, Any],
901
        name: object,
902
        future: Future[T_Retval],
903
    ) -> None:
904
        AsyncIOBackend.run_sync_from_thread(
11✔
905
            partial(self._task_group.start_soon, name=name),
906
            (self._call_func, func, args, kwargs, future),
907
            self._loop,
908
        )
909

910

911
#
912
# Subprocesses
913
#
914

915

916
@dataclass(eq=False)
11✔
917
class StreamReaderWrapper(abc.ByteReceiveStream):
11✔
918
    _stream: asyncio.StreamReader
11✔
919
    _closed: bool = field(init=False, default=False)
11✔
920

921
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
922
        if self._closed:
10✔
NEW
UNCOV
923
            raise ClosedResourceError
×
924

925
        data = await self._stream.read(max_bytes)
10✔
926
        if data:
10✔
927
            return data
10✔
928
        else:
929
            raise EndOfStream
10✔
930

931
    async def aclose(self) -> None:
11✔
932
        self._closed = True
10✔
933
        await AsyncIOBackend.checkpoint()
10✔
934

935

936
@dataclass(eq=False)
11✔
937
class StreamWriterWrapper(abc.ByteSendStream):
11✔
938
    _stream: asyncio.StreamWriter
11✔
939

940
    async def send(self, item: bytes) -> None:
11✔
941
        self._stream.write(item)
10✔
942
        await self._stream.drain()
10✔
943

944
    async def aclose(self) -> None:
11✔
945
        self._stream.close()
10✔
946
        await AsyncIOBackend.checkpoint()
10✔
947

948

949
@dataclass(eq=False)
11✔
950
class Process(abc.Process):
11✔
951
    _process: asyncio.subprocess.Process
11✔
952
    _stdin: StreamWriterWrapper | None
11✔
953
    _stdout: StreamReaderWrapper | None
11✔
954
    _stderr: StreamReaderWrapper | None
11✔
955

956
    async def aclose(self) -> None:
11✔
957
        with CancelScope(shield=True):
10✔
958
            if self._stdin:
10✔
959
                await self._stdin.aclose()
10✔
960
            if self._stdout:
10✔
961
                await self._stdout.aclose()
10✔
962
            if self._stderr:
10✔
963
                await self._stderr.aclose()
10✔
964

965
        try:
10✔
966
            await self.wait()
10✔
967
        except BaseException:
10✔
968
            self.kill()
10✔
969
            with CancelScope(shield=True):
10✔
970
                await self.wait()
10✔
971

972
            raise
10✔
973

974
    async def wait(self) -> int:
11✔
975
        return await self._process.wait()
10✔
976

977
    def terminate(self) -> None:
11✔
978
        self._process.terminate()
8✔
979

980
    def kill(self) -> None:
11✔
981
        self._process.kill()
10✔
982

983
    def send_signal(self, signal: int) -> None:
11✔
984
        self._process.send_signal(signal)
×
985

986
    @property
11✔
987
    def pid(self) -> int:
11✔
UNCOV
988
        return self._process.pid
×
989

990
    @property
11✔
991
    def returncode(self) -> int | None:
11✔
992
        return self._process.returncode
10✔
993

994
    @property
11✔
995
    def stdin(self) -> abc.ByteSendStream | None:
11✔
996
        return self._stdin
10✔
997

998
    @property
11✔
999
    def stdout(self) -> abc.ByteReceiveStream | None:
11✔
1000
        return self._stdout
10✔
1001

1002
    @property
11✔
1003
    def stderr(self) -> abc.ByteReceiveStream | None:
11✔
1004
        return self._stderr
10✔
1005

1006

1007
def _forcibly_shutdown_process_pool_on_exit(
11✔
1008
    workers: set[Process], _task: object
1009
) -> None:
1010
    """
1011
    Forcibly shuts down worker processes belonging to this event loop."""
1012
    child_watcher: asyncio.AbstractChildWatcher | None = None
10✔
1013
    if sys.version_info < (3, 12):
10✔
1014
        try:
6✔
1015
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1016
        except NotImplementedError:
1✔
1017
            pass
1✔
1018

1019
    # Close as much as possible (w/o async/await) to avoid warnings
1020
    for process in workers:
10✔
1021
        if process.returncode is None:
10✔
1022
            continue
10✔
1023

1024
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1025
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
UNCOV
1026
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
UNCOV
1027
        process.kill()
×
UNCOV
1028
        if child_watcher:
×
UNCOV
1029
            child_watcher.remove_child_handler(process.pid)
×
1030

1031

1032
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
11✔
1033
    """
1034
    Shuts down worker processes belonging to this event loop.
1035

1036
    NOTE: this only works when the event loop was started using asyncio.run() or
1037
    anyio.run().
1038

1039
    """
1040
    process: abc.Process
1041
    try:
10✔
1042
        await sleep(math.inf)
10✔
1043
    except asyncio.CancelledError:
10✔
1044
        for process in workers:
10✔
1045
            if process.returncode is None:
10✔
1046
                process.kill()
10✔
1047

1048
        for process in workers:
10✔
1049
            await process.aclose()
10✔
1050

1051

1052
#
1053
# Sockets and networking
1054
#
1055

1056

1057
class StreamProtocol(asyncio.Protocol):
11✔
1058
    read_queue: deque[bytes]
11✔
1059
    read_event: asyncio.Event
11✔
1060
    write_event: asyncio.Event
11✔
1061
    exception: Exception | None = None
11✔
1062
    is_at_eof: bool = False
11✔
1063

1064
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1065
        self.read_queue = deque()
11✔
1066
        self.read_event = asyncio.Event()
11✔
1067
        self.write_event = asyncio.Event()
11✔
1068
        self.write_event.set()
11✔
1069
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
11✔
1070

1071
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1072
        if exc:
11✔
1073
            self.exception = BrokenResourceError()
11✔
1074
            self.exception.__cause__ = exc
11✔
1075

1076
        self.read_event.set()
11✔
1077
        self.write_event.set()
11✔
1078

1079
    def data_received(self, data: bytes) -> None:
11✔
1080
        self.read_queue.append(data)
11✔
1081
        self.read_event.set()
11✔
1082

1083
    def eof_received(self) -> bool | None:
11✔
1084
        self.is_at_eof = True
11✔
1085
        self.read_event.set()
11✔
1086
        return True
11✔
1087

1088
    def pause_writing(self) -> None:
11✔
1089
        self.write_event = asyncio.Event()
11✔
1090

1091
    def resume_writing(self) -> None:
11✔
1092
        self.write_event.set()
1✔
1093

1094

1095
class DatagramProtocol(asyncio.DatagramProtocol):
11✔
1096
    read_queue: deque[tuple[bytes, IPSockAddrType]]
11✔
1097
    read_event: asyncio.Event
11✔
1098
    write_event: asyncio.Event
11✔
1099
    exception: Exception | None = None
11✔
1100

1101
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1102
        self.read_queue = deque(maxlen=100)  # arbitrary value
10✔
1103
        self.read_event = asyncio.Event()
10✔
1104
        self.write_event = asyncio.Event()
10✔
1105
        self.write_event.set()
10✔
1106

1107
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1108
        self.read_event.set()
10✔
1109
        self.write_event.set()
10✔
1110

1111
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
11✔
1112
        addr = convert_ipv6_sockaddr(addr)
10✔
1113
        self.read_queue.append((data, addr))
10✔
1114
        self.read_event.set()
10✔
1115

1116
    def error_received(self, exc: Exception) -> None:
11✔
UNCOV
1117
        self.exception = exc
×
1118

1119
    def pause_writing(self) -> None:
11✔
UNCOV
1120
        self.write_event.clear()
×
1121

1122
    def resume_writing(self) -> None:
11✔
UNCOV
1123
        self.write_event.set()
×
1124

1125

1126
class SocketStream(abc.SocketStream):
11✔
1127
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
11✔
1128
        self._transport = transport
11✔
1129
        self._protocol = protocol
11✔
1130
        self._receive_guard = ResourceGuard("reading from")
11✔
1131
        self._send_guard = ResourceGuard("writing to")
11✔
1132
        self._closed = False
11✔
1133

1134
    @property
11✔
1135
    def _raw_socket(self) -> socket.socket:
11✔
1136
        return self._transport.get_extra_info("socket")
11✔
1137

1138
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1139
        with self._receive_guard:
11✔
1140
            if (
11✔
1141
                not self._protocol.read_event.is_set()
1142
                and not self._transport.is_closing()
1143
                and not self._protocol.is_at_eof
1144
            ):
1145
                self._transport.resume_reading()
11✔
1146
                await self._protocol.read_event.wait()
11✔
1147
                self._transport.pause_reading()
11✔
1148
            else:
1149
                await AsyncIOBackend.checkpoint()
11✔
1150

1151
            try:
11✔
1152
                chunk = self._protocol.read_queue.popleft()
11✔
1153
            except IndexError:
11✔
1154
                if self._closed:
11✔
1155
                    raise ClosedResourceError from None
11✔
1156
                elif self._protocol.exception:
11✔
1157
                    raise self._protocol.exception from None
11✔
1158
                else:
1159
                    raise EndOfStream from None
11✔
1160

1161
            if len(chunk) > max_bytes:
11✔
1162
                # Split the oversized chunk
1163
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
9✔
1164
                self._protocol.read_queue.appendleft(leftover)
9✔
1165

1166
            # If the read queue is empty, clear the flag so that the next call will
1167
            # block until data is available
1168
            if not self._protocol.read_queue:
11✔
1169
                self._protocol.read_event.clear()
11✔
1170

1171
        return chunk
11✔
1172

1173
    async def send(self, item: bytes) -> None:
11✔
1174
        with self._send_guard:
11✔
1175
            await AsyncIOBackend.checkpoint()
11✔
1176

1177
            if self._closed:
11✔
1178
                raise ClosedResourceError
11✔
1179
            elif self._protocol.exception is not None:
11✔
1180
                raise self._protocol.exception
11✔
1181

1182
            try:
11✔
1183
                self._transport.write(item)
11✔
1184
            except RuntimeError as exc:
×
UNCOV
1185
                if self._transport.is_closing():
×
UNCOV
1186
                    raise BrokenResourceError from exc
×
1187
                else:
UNCOV
1188
                    raise
×
1189

1190
            await self._protocol.write_event.wait()
11✔
1191

1192
    async def send_eof(self) -> None:
11✔
1193
        try:
11✔
1194
            self._transport.write_eof()
11✔
UNCOV
1195
        except OSError:
×
UNCOV
1196
            pass
×
1197

1198
    async def aclose(self) -> None:
11✔
1199
        if not self._transport.is_closing():
11✔
1200
            self._closed = True
11✔
1201
            try:
11✔
1202
                self._transport.write_eof()
11✔
1203
            except OSError:
7✔
1204
                pass
7✔
1205

1206
            self._transport.close()
11✔
1207
            await sleep(0)
11✔
1208
            self._transport.abort()
11✔
1209

1210

1211
class _RawSocketMixin:
11✔
1212
    _receive_future: asyncio.Future | None = None
11✔
1213
    _send_future: asyncio.Future | None = None
11✔
1214
    _closing = False
11✔
1215

1216
    def __init__(self, raw_socket: socket.socket):
11✔
1217
        self.__raw_socket = raw_socket
8✔
1218
        self._receive_guard = ResourceGuard("reading from")
8✔
1219
        self._send_guard = ResourceGuard("writing to")
8✔
1220

1221
    @property
11✔
1222
    def _raw_socket(self) -> socket.socket:
11✔
1223
        return self.__raw_socket
8✔
1224

1225
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1226
        def callback(f: object) -> None:
8✔
1227
            del self._receive_future
8✔
1228
            loop.remove_reader(self.__raw_socket)
8✔
1229

1230
        f = self._receive_future = asyncio.Future()
8✔
1231
        loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1232
        f.add_done_callback(callback)
8✔
1233
        return f
8✔
1234

1235
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1236
        def callback(f: object) -> None:
8✔
1237
            del self._send_future
8✔
1238
            loop.remove_writer(self.__raw_socket)
8✔
1239

1240
        f = self._send_future = asyncio.Future()
8✔
1241
        loop.add_writer(self.__raw_socket, f.set_result, None)
8✔
1242
        f.add_done_callback(callback)
8✔
1243
        return f
8✔
1244

1245
    async def aclose(self) -> None:
11✔
1246
        if not self._closing:
8✔
1247
            self._closing = True
8✔
1248
            if self.__raw_socket.fileno() != -1:
8✔
1249
                self.__raw_socket.close()
8✔
1250

1251
            if self._receive_future:
8✔
1252
                self._receive_future.set_result(None)
8✔
1253
            if self._send_future:
8✔
UNCOV
1254
                self._send_future.set_result(None)
×
1255

1256

1257
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
11✔
1258
    async def send_eof(self) -> None:
11✔
1259
        with self._send_guard:
8✔
1260
            self._raw_socket.shutdown(socket.SHUT_WR)
8✔
1261

1262
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1263
        loop = get_running_loop()
8✔
1264
        await AsyncIOBackend.checkpoint()
8✔
1265
        with self._receive_guard:
8✔
1266
            while True:
5✔
1267
                try:
8✔
1268
                    data = self._raw_socket.recv(max_bytes)
8✔
1269
                except BlockingIOError:
8✔
1270
                    await self._wait_until_readable(loop)
8✔
1271
                except OSError as exc:
8✔
1272
                    if self._closing:
8✔
1273
                        raise ClosedResourceError from None
8✔
1274
                    else:
1275
                        raise BrokenResourceError from exc
1✔
1276
                else:
1277
                    if not data:
8✔
1278
                        raise EndOfStream
8✔
1279

1280
                    return data
8✔
1281

1282
    async def send(self, item: bytes) -> None:
11✔
1283
        loop = get_running_loop()
8✔
1284
        await AsyncIOBackend.checkpoint()
8✔
1285
        with self._send_guard:
8✔
1286
            view = memoryview(item)
8✔
1287
            while view:
8✔
1288
                try:
8✔
1289
                    bytes_sent = self._raw_socket.send(view)
8✔
1290
                except BlockingIOError:
8✔
1291
                    await self._wait_until_writable(loop)
8✔
1292
                except OSError as exc:
8✔
1293
                    if self._closing:
8✔
1294
                        raise ClosedResourceError from None
8✔
1295
                    else:
1296
                        raise BrokenResourceError from exc
1✔
1297
                else:
1298
                    view = view[bytes_sent:]
8✔
1299

1300
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
11✔
1301
        if not isinstance(msglen, int) or msglen < 0:
8✔
1302
            raise ValueError("msglen must be a non-negative integer")
8✔
1303
        if not isinstance(maxfds, int) or maxfds < 1:
8✔
1304
            raise ValueError("maxfds must be a positive integer")
8✔
1305

1306
        loop = get_running_loop()
8✔
1307
        fds = array.array("i")
8✔
1308
        await AsyncIOBackend.checkpoint()
8✔
1309
        with self._receive_guard:
8✔
1310
            while True:
5✔
1311
                try:
8✔
1312
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
8✔
1313
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1314
                    )
1315
                except BlockingIOError:
8✔
1316
                    await self._wait_until_readable(loop)
8✔
1317
                except OSError as exc:
×
UNCOV
1318
                    if self._closing:
×
UNCOV
1319
                        raise ClosedResourceError from None
×
1320
                    else:
UNCOV
1321
                        raise BrokenResourceError from exc
×
1322
                else:
1323
                    if not message and not ancdata:
8✔
UNCOV
1324
                        raise EndOfStream
×
1325

1326
                    break
5✔
1327

1328
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
8✔
1329
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
8✔
UNCOV
1330
                raise RuntimeError(
×
1331
                    f"Received unexpected ancillary data; message = {message!r}, "
1332
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1333
                )
1334

1335
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
8✔
1336

1337
        return message, list(fds)
8✔
1338

1339
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
11✔
1340
        if not message:
8✔
1341
            raise ValueError("message must not be empty")
8✔
1342
        if not fds:
8✔
1343
            raise ValueError("fds must not be empty")
8✔
1344

1345
        loop = get_running_loop()
8✔
1346
        filenos: list[int] = []
8✔
1347
        for fd in fds:
8✔
1348
            if isinstance(fd, int):
8✔
UNCOV
1349
                filenos.append(fd)
×
1350
            elif isinstance(fd, IOBase):
8✔
1351
                filenos.append(fd.fileno())
8✔
1352

1353
        fdarray = array.array("i", filenos)
8✔
1354
        await AsyncIOBackend.checkpoint()
8✔
1355
        with self._send_guard:
8✔
1356
            while True:
5✔
1357
                try:
8✔
1358
                    # The ignore can be removed after mypy picks up
1359
                    # https://github.com/python/typeshed/pull/5545
1360
                    self._raw_socket.sendmsg(
8✔
1361
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1362
                    )
1363
                    break
8✔
1364
                except BlockingIOError:
×
UNCOV
1365
                    await self._wait_until_writable(loop)
×
1366
                except OSError as exc:
×
UNCOV
1367
                    if self._closing:
×
UNCOV
1368
                        raise ClosedResourceError from None
×
1369
                    else:
UNCOV
1370
                        raise BrokenResourceError from exc
×
1371

1372

1373
class TCPSocketListener(abc.SocketListener):
11✔
1374
    _accept_scope: CancelScope | None = None
11✔
1375
    _closed = False
11✔
1376

1377
    def __init__(self, raw_socket: socket.socket):
11✔
1378
        self.__raw_socket = raw_socket
11✔
1379
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
11✔
1380
        self._accept_guard = ResourceGuard("accepting connections from")
11✔
1381

1382
    @property
11✔
1383
    def _raw_socket(self) -> socket.socket:
11✔
1384
        return self.__raw_socket
11✔
1385

1386
    async def accept(self) -> abc.SocketStream:
11✔
1387
        if self._closed:
11✔
1388
            raise ClosedResourceError
11✔
1389

1390
        with self._accept_guard:
11✔
1391
            await AsyncIOBackend.checkpoint()
11✔
1392
            with CancelScope() as self._accept_scope:
11✔
1393
                try:
11✔
1394
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
11✔
1395
                except asyncio.CancelledError:
11✔
1396
                    # Workaround for https://bugs.python.org/issue41317
1397
                    try:
11✔
1398
                        self._loop.remove_reader(self._raw_socket)
11✔
1399
                    except (ValueError, NotImplementedError):
2✔
1400
                        pass
2✔
1401

1402
                    if self._closed:
11✔
1403
                        raise ClosedResourceError from None
10✔
1404

1405
                    raise
11✔
1406
                finally:
1407
                    self._accept_scope = None
11✔
1408

1409
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
1410
        transport, protocol = await self._loop.connect_accepted_socket(
11✔
1411
            StreamProtocol, client_sock
1412
        )
1413
        return SocketStream(transport, protocol)
11✔
1414

1415
    async def aclose(self) -> None:
11✔
1416
        if self._closed:
11✔
1417
            return
11✔
1418

1419
        self._closed = True
11✔
1420
        if self._accept_scope:
11✔
1421
            # Workaround for https://bugs.python.org/issue41317
1422
            try:
11✔
1423
                self._loop.remove_reader(self._raw_socket)
11✔
1424
            except (ValueError, NotImplementedError):
2✔
1425
                pass
2✔
1426

1427
            self._accept_scope.cancel()
10✔
1428
            await sleep(0)
10✔
1429

1430
        self._raw_socket.close()
11✔
1431

1432

1433
class UNIXSocketListener(abc.SocketListener):
11✔
1434
    def __init__(self, raw_socket: socket.socket):
11✔
1435
        self.__raw_socket = raw_socket
8✔
1436
        self._loop = get_running_loop()
8✔
1437
        self._accept_guard = ResourceGuard("accepting connections from")
8✔
1438
        self._closed = False
8✔
1439

1440
    async def accept(self) -> abc.SocketStream:
11✔
1441
        await AsyncIOBackend.checkpoint()
8✔
1442
        with self._accept_guard:
8✔
1443
            while True:
5✔
1444
                try:
8✔
1445
                    client_sock, _ = self.__raw_socket.accept()
8✔
1446
                    client_sock.setblocking(False)
8✔
1447
                    return UNIXSocketStream(client_sock)
8✔
1448
                except BlockingIOError:
8✔
1449
                    f: asyncio.Future = asyncio.Future()
8✔
1450
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1451
                    f.add_done_callback(
8✔
1452
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1453
                    )
1454
                    await f
8✔
UNCOV
1455
                except OSError as exc:
×
UNCOV
1456
                    if self._closed:
×
UNCOV
1457
                        raise ClosedResourceError from None
×
1458
                    else:
1459
                        raise BrokenResourceError from exc
1✔
1460

1461
    async def aclose(self) -> None:
11✔
1462
        self._closed = True
8✔
1463
        self.__raw_socket.close()
8✔
1464

1465
    @property
11✔
1466
    def _raw_socket(self) -> socket.socket:
11✔
1467
        return self.__raw_socket
8✔
1468

1469

1470
class UDPSocket(abc.UDPSocket):
11✔
1471
    def __init__(
11✔
1472
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1473
    ):
1474
        self._transport = transport
10✔
1475
        self._protocol = protocol
10✔
1476
        self._receive_guard = ResourceGuard("reading from")
10✔
1477
        self._send_guard = ResourceGuard("writing to")
10✔
1478
        self._closed = False
10✔
1479

1480
    @property
11✔
1481
    def _raw_socket(self) -> socket.socket:
11✔
1482
        return self._transport.get_extra_info("socket")
10✔
1483

1484
    async def aclose(self) -> None:
11✔
1485
        if not self._transport.is_closing():
10✔
1486
            self._closed = True
10✔
1487
            self._transport.close()
10✔
1488

1489
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
11✔
1490
        with self._receive_guard:
10✔
1491
            await AsyncIOBackend.checkpoint()
10✔
1492

1493
            # If the buffer is empty, ask for more data
1494
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1495
                self._protocol.read_event.clear()
10✔
1496
                await self._protocol.read_event.wait()
10✔
1497

1498
            try:
10✔
1499
                return self._protocol.read_queue.popleft()
10✔
1500
            except IndexError:
10✔
1501
                if self._closed:
10✔
1502
                    raise ClosedResourceError from None
10✔
1503
                else:
1504
                    raise BrokenResourceError from None
1✔
1505

1506
    async def send(self, item: UDPPacketType) -> None:
11✔
1507
        with self._send_guard:
10✔
1508
            await AsyncIOBackend.checkpoint()
10✔
1509
            await self._protocol.write_event.wait()
10✔
1510
            if self._closed:
10✔
1511
                raise ClosedResourceError
10✔
1512
            elif self._transport.is_closing():
10✔
UNCOV
1513
                raise BrokenResourceError
×
1514
            else:
1515
                self._transport.sendto(*item)
10✔
1516

1517

1518
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
11✔
1519
    def __init__(
11✔
1520
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1521
    ):
1522
        self._transport = transport
10✔
1523
        self._protocol = protocol
10✔
1524
        self._receive_guard = ResourceGuard("reading from")
10✔
1525
        self._send_guard = ResourceGuard("writing to")
10✔
1526
        self._closed = False
10✔
1527

1528
    @property
11✔
1529
    def _raw_socket(self) -> socket.socket:
11✔
1530
        return self._transport.get_extra_info("socket")
10✔
1531

1532
    async def aclose(self) -> None:
11✔
1533
        if not self._transport.is_closing():
10✔
1534
            self._closed = True
10✔
1535
            self._transport.close()
10✔
1536

1537
    async def receive(self) -> bytes:
11✔
1538
        with self._receive_guard:
10✔
1539
            await AsyncIOBackend.checkpoint()
10✔
1540

1541
            # If the buffer is empty, ask for more data
1542
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1543
                self._protocol.read_event.clear()
10✔
1544
                await self._protocol.read_event.wait()
10✔
1545

1546
            try:
10✔
1547
                packet = self._protocol.read_queue.popleft()
10✔
1548
            except IndexError:
10✔
1549
                if self._closed:
10✔
1550
                    raise ClosedResourceError from None
10✔
1551
                else:
UNCOV
1552
                    raise BrokenResourceError from None
×
1553

1554
            return packet[0]
10✔
1555

1556
    async def send(self, item: bytes) -> None:
11✔
1557
        with self._send_guard:
10✔
1558
            await AsyncIOBackend.checkpoint()
10✔
1559
            await self._protocol.write_event.wait()
10✔
1560
            if self._closed:
10✔
1561
                raise ClosedResourceError
10✔
1562
            elif self._transport.is_closing():
10✔
UNCOV
1563
                raise BrokenResourceError
×
1564
            else:
1565
                self._transport.sendto(item)
10✔
1566

1567

1568
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
11✔
1569
    async def receive(self) -> UNIXDatagramPacketType:
11✔
1570
        loop = get_running_loop()
8✔
1571
        await AsyncIOBackend.checkpoint()
8✔
1572
        with self._receive_guard:
8✔
1573
            while True:
5✔
1574
                try:
8✔
1575
                    data = self._raw_socket.recvfrom(65536)
8✔
1576
                except BlockingIOError:
8✔
1577
                    await self._wait_until_readable(loop)
8✔
1578
                except OSError as exc:
8✔
1579
                    if self._closing:
8✔
1580
                        raise ClosedResourceError from None
8✔
1581
                    else:
1582
                        raise BrokenResourceError from exc
1✔
1583
                else:
1584
                    return data
8✔
1585

1586
    async def send(self, item: UNIXDatagramPacketType) -> None:
11✔
1587
        loop = get_running_loop()
8✔
1588
        await AsyncIOBackend.checkpoint()
8✔
1589
        with self._send_guard:
8✔
1590
            while True:
5✔
1591
                try:
8✔
1592
                    self._raw_socket.sendto(*item)
8✔
1593
                except BlockingIOError:
8✔
UNCOV
1594
                    await self._wait_until_writable(loop)
×
1595
                except OSError as exc:
8✔
1596
                    if self._closing:
8✔
1597
                        raise ClosedResourceError from None
8✔
1598
                    else:
1599
                        raise BrokenResourceError from exc
1✔
1600
                else:
1601
                    return
8✔
1602

1603

1604
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
11✔
1605
    async def receive(self) -> bytes:
11✔
1606
        loop = get_running_loop()
8✔
1607
        await AsyncIOBackend.checkpoint()
8✔
1608
        with self._receive_guard:
8✔
1609
            while True:
5✔
1610
                try:
8✔
1611
                    data = self._raw_socket.recv(65536)
8✔
1612
                except BlockingIOError:
8✔
1613
                    await self._wait_until_readable(loop)
8✔
1614
                except OSError as exc:
8✔
1615
                    if self._closing:
8✔
1616
                        raise ClosedResourceError from None
8✔
1617
                    else:
1618
                        raise BrokenResourceError from exc
1✔
1619
                else:
1620
                    return data
8✔
1621

1622
    async def send(self, item: bytes) -> None:
11✔
1623
        loop = get_running_loop()
8✔
1624
        await AsyncIOBackend.checkpoint()
8✔
1625
        with self._send_guard:
8✔
1626
            while True:
5✔
1627
                try:
8✔
1628
                    self._raw_socket.send(item)
8✔
1629
                except BlockingIOError:
8✔
UNCOV
1630
                    await self._wait_until_writable(loop)
×
1631
                except OSError as exc:
8✔
1632
                    if self._closing:
8✔
1633
                        raise ClosedResourceError from None
8✔
1634
                    else:
1635
                        raise BrokenResourceError from exc
1✔
1636
                else:
1637
                    return
8✔
1638

1639

1640
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
11✔
1641
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
11✔
1642

1643

1644
#
1645
# Synchronization
1646
#
1647

1648

1649
class Event(BaseEvent):
11✔
1650
    def __new__(cls) -> Event:
11✔
1651
        return object.__new__(cls)
11✔
1652

1653
    def __init__(self) -> None:
11✔
1654
        self._event = asyncio.Event()
11✔
1655

1656
    def set(self) -> None:
11✔
1657
        self._event.set()
11✔
1658

1659
    def is_set(self) -> bool:
11✔
1660
        return self._event.is_set()
11✔
1661

1662
    async def wait(self) -> None:
11✔
1663
        if self.is_set():
11✔
1664
            await AsyncIOBackend.checkpoint()
11✔
1665
        else:
1666
            await self._event.wait()
11✔
1667

1668
    def statistics(self) -> EventStatistics:
11✔
1669
        return EventStatistics(len(self._event._waiters))
10✔
1670

1671

1672
class CapacityLimiter(BaseCapacityLimiter):
11✔
1673
    _total_tokens: float = 0
11✔
1674

1675
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
11✔
1676
        return object.__new__(cls)
11✔
1677

1678
    def __init__(self, total_tokens: float):
11✔
1679
        self._borrowers: set[Any] = set()
11✔
1680
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
11✔
1681
        self.total_tokens = total_tokens
11✔
1682

1683
    async def __aenter__(self) -> None:
11✔
1684
        await self.acquire()
11✔
1685

1686
    async def __aexit__(
11✔
1687
        self,
1688
        exc_type: type[BaseException] | None,
1689
        exc_val: BaseException | None,
1690
        exc_tb: TracebackType | None,
1691
    ) -> None:
1692
        self.release()
11✔
1693

1694
    @property
11✔
1695
    def total_tokens(self) -> float:
11✔
1696
        return self._total_tokens
10✔
1697

1698
    @total_tokens.setter
11✔
1699
    def total_tokens(self, value: float) -> None:
11✔
1700
        if not isinstance(value, int) and not math.isinf(value):
11✔
1701
            raise TypeError("total_tokens must be an int or math.inf")
10✔
1702
        if value < 1:
11✔
1703
            raise ValueError("total_tokens must be >= 1")
10✔
1704

1705
        waiters_to_notify = max(value - self._total_tokens, 0)
11✔
1706
        self._total_tokens = value
11✔
1707

1708
        # Notify waiting tasks that they have acquired the limiter
1709
        while self._wait_queue and waiters_to_notify:
11✔
1710
            event = self._wait_queue.popitem(last=False)[1]
10✔
1711
            event.set()
10✔
1712
            waiters_to_notify -= 1
10✔
1713

1714
    @property
11✔
1715
    def borrowed_tokens(self) -> int:
11✔
1716
        return len(self._borrowers)
10✔
1717

1718
    @property
11✔
1719
    def available_tokens(self) -> float:
11✔
1720
        return self._total_tokens - len(self._borrowers)
10✔
1721

1722
    def acquire_nowait(self) -> None:
11✔
UNCOV
1723
        self.acquire_on_behalf_of_nowait(current_task())
×
1724

1725
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
1726
        if borrower in self._borrowers:
11✔
1727
            raise RuntimeError(
10✔
1728
                "this borrower is already holding one of this CapacityLimiter's "
1729
                "tokens"
1730
            )
1731

1732
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
11✔
1733
            raise WouldBlock
10✔
1734

1735
        self._borrowers.add(borrower)
11✔
1736

1737
    async def acquire(self) -> None:
11✔
1738
        return await self.acquire_on_behalf_of(current_task())
11✔
1739

1740
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
1741
        await AsyncIOBackend.checkpoint_if_cancelled()
11✔
1742
        try:
11✔
1743
            self.acquire_on_behalf_of_nowait(borrower)
11✔
1744
        except WouldBlock:
10✔
1745
            event = asyncio.Event()
10✔
1746
            self._wait_queue[borrower] = event
10✔
1747
            try:
10✔
1748
                await event.wait()
10✔
UNCOV
1749
            except BaseException:
×
UNCOV
1750
                self._wait_queue.pop(borrower, None)
×
UNCOV
1751
                raise
×
1752

1753
            self._borrowers.add(borrower)
10✔
1754
        else:
1755
            try:
11✔
1756
                await AsyncIOBackend.cancel_shielded_checkpoint()
11✔
1757
            except BaseException:
10✔
1758
                self.release()
10✔
1759
                raise
10✔
1760

1761
    def release(self) -> None:
11✔
1762
        self.release_on_behalf_of(current_task())
11✔
1763

1764
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
1765
        try:
11✔
1766
            self._borrowers.remove(borrower)
11✔
1767
        except KeyError:
10✔
1768
            raise RuntimeError(
10✔
1769
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1770
            ) from None
1771

1772
        # Notify the next task in line if this limiter has free capacity now
1773
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
11✔
1774
            event = self._wait_queue.popitem(last=False)[1]
10✔
1775
            event.set()
10✔
1776

1777
    def statistics(self) -> CapacityLimiterStatistics:
11✔
1778
        return CapacityLimiterStatistics(
10✔
1779
            self.borrowed_tokens,
1780
            self.total_tokens,
1781
            tuple(self._borrowers),
1782
            len(self._wait_queue),
1783
        )
1784

1785

1786
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
11✔
1787

1788

1789
#
1790
# Operating system signals
1791
#
1792

1793

1794
class _SignalReceiver:
11✔
1795
    def __init__(self, signals: tuple[Signals, ...]):
11✔
1796
        self._signals = signals
9✔
1797
        self._loop = get_running_loop()
9✔
1798
        self._signal_queue: deque[Signals] = deque()
9✔
1799
        self._future: asyncio.Future = asyncio.Future()
9✔
1800
        self._handled_signals: set[Signals] = set()
9✔
1801

1802
    def _deliver(self, signum: Signals) -> None:
11✔
1803
        self._signal_queue.append(signum)
9✔
1804
        if not self._future.done():
9✔
1805
            self._future.set_result(None)
9✔
1806

1807
    def __enter__(self) -> _SignalReceiver:
11✔
1808
        for sig in set(self._signals):
9✔
1809
            self._loop.add_signal_handler(sig, self._deliver, sig)
9✔
1810
            self._handled_signals.add(sig)
9✔
1811

1812
        return self
9✔
1813

1814
    def __exit__(
11✔
1815
        self,
1816
        exc_type: type[BaseException] | None,
1817
        exc_val: BaseException | None,
1818
        exc_tb: TracebackType | None,
1819
    ) -> bool | None:
1820
        for sig in self._handled_signals:
9✔
1821
            self._loop.remove_signal_handler(sig)
9✔
1822
        return None
9✔
1823

1824
    def __aiter__(self) -> _SignalReceiver:
11✔
1825
        return self
9✔
1826

1827
    async def __anext__(self) -> Signals:
11✔
1828
        await AsyncIOBackend.checkpoint()
9✔
1829
        if not self._signal_queue:
9✔
UNCOV
1830
            self._future = asyncio.Future()
×
UNCOV
1831
            await self._future
×
1832

1833
        return self._signal_queue.popleft()
9✔
1834

1835

1836
#
1837
# Testing and debugging
1838
#
1839

1840

1841
class AsyncIOTaskInfo(TaskInfo):
11✔
1842
    def __init__(self, task: asyncio.Task):
11✔
1843
        task_state = _task_states.get(task)
11✔
1844
        if task_state is None:
11✔
1845
            parent_id = None
11✔
1846
        else:
1847
            parent_id = task_state.parent_id
11✔
1848

1849
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
11✔
1850
        self._task = weakref.ref(task)
11✔
1851

1852
    def has_pending_cancellation(self) -> bool:
11✔
1853
        if not (task := self._task()):
11✔
1854
            # If the task isn't around anymore, it won't have a pending cancellation
UNCOV
1855
            return False
×
1856

1857
        if sys.version_info >= (3, 11):
11✔
1858
            if task.cancelling():
5✔
1859
                return True
5✔
1860
        elif (
6✔
1861
            isinstance(task._fut_waiter, asyncio.Future)
1862
            and task._fut_waiter.cancelled()
1863
        ):
1864
            return True
6✔
1865

1866
        if task_state := _task_states.get(task):
11✔
1867
            if cancel_scope := task_state.cancel_scope:
11✔
1868
                return cancel_scope.cancel_called or (
11✔
1869
                    not cancel_scope.shield and cancel_scope._parent_cancelled()
1870
                )
1871

1872
        return False
11✔
1873

1874

1875
class TestRunner(abc.TestRunner):
11✔
1876
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
11✔
1877

1878
    def __init__(
11✔
1879
        self,
1880
        *,
1881
        debug: bool | None = None,
1882
        use_uvloop: bool = False,
1883
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1884
    ) -> None:
1885
        if use_uvloop and loop_factory is None:
11✔
UNCOV
1886
            import uvloop
×
1887

UNCOV
1888
            loop_factory = uvloop.new_event_loop
×
1889

1890
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
11✔
1891
        self._exceptions: list[BaseException] = []
11✔
1892
        self._runner_task: asyncio.Task | None = None
11✔
1893

1894
    def __enter__(self) -> TestRunner:
11✔
1895
        self._runner.__enter__()
11✔
1896
        self.get_loop().set_exception_handler(self._exception_handler)
11✔
1897
        return self
11✔
1898

1899
    def __exit__(
11✔
1900
        self,
1901
        exc_type: type[BaseException] | None,
1902
        exc_val: BaseException | None,
1903
        exc_tb: TracebackType | None,
1904
    ) -> None:
1905
        self._runner.__exit__(exc_type, exc_val, exc_tb)
11✔
1906

1907
    def get_loop(self) -> AbstractEventLoop:
11✔
1908
        return self._runner.get_loop()
11✔
1909

1910
    def _exception_handler(
11✔
1911
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1912
    ) -> None:
1913
        if isinstance(context.get("exception"), Exception):
11✔
1914
            self._exceptions.append(context["exception"])
11✔
1915
        else:
1916
            loop.default_exception_handler(context)
11✔
1917

1918
    def _raise_async_exceptions(self) -> None:
11✔
1919
        # Re-raise any exceptions raised in asynchronous callbacks
1920
        if self._exceptions:
11✔
1921
            exceptions, self._exceptions = self._exceptions, []
11✔
1922
            if len(exceptions) == 1:
11✔
1923
                raise exceptions[0]
11✔
UNCOV
1924
            elif exceptions:
×
UNCOV
1925
                raise BaseExceptionGroup(
×
1926
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1927
                )
1928

1929
    async def _run_tests_and_fixtures(
11✔
1930
        self,
1931
        receive_stream: MemoryObjectReceiveStream[
1932
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1933
        ],
1934
    ) -> None:
1935
        with receive_stream, self._send_stream:
11✔
1936
            async for coro, future in receive_stream:
11✔
1937
                try:
11✔
1938
                    retval = await coro
11✔
1939
                except BaseException as exc:
11✔
1940
                    if not future.cancelled():
11✔
1941
                        future.set_exception(exc)
11✔
1942
                else:
1943
                    if not future.cancelled():
11✔
1944
                        future.set_result(retval)
11✔
1945

1946
    async def _call_in_runner_task(
11✔
1947
        self,
1948
        func: Callable[P, Awaitable[T_Retval]],
1949
        *args: P.args,
1950
        **kwargs: P.kwargs,
1951
    ) -> T_Retval:
1952
        if not self._runner_task:
11✔
1953
            self._send_stream, receive_stream = create_memory_object_stream[
11✔
1954
                Tuple[Awaitable[Any], asyncio.Future]
1955
            ](1)
1956
            self._runner_task = self.get_loop().create_task(
11✔
1957
                self._run_tests_and_fixtures(receive_stream)
1958
            )
1959

1960
        coro = func(*args, **kwargs)
11✔
1961
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
11✔
1962
        self._send_stream.send_nowait((coro, future))
11✔
1963
        return await future
11✔
1964

1965
    def run_asyncgen_fixture(
11✔
1966
        self,
1967
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1968
        kwargs: dict[str, Any],
1969
    ) -> Iterable[T_Retval]:
1970
        asyncgen = fixture_func(**kwargs)
11✔
1971
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
11✔
1972
            self._call_in_runner_task(asyncgen.asend, None)
1973
        )
1974
        self._raise_async_exceptions()
11✔
1975

1976
        yield fixturevalue
11✔
1977

1978
        try:
11✔
1979
            self.get_loop().run_until_complete(
11✔
1980
                self._call_in_runner_task(asyncgen.asend, None)
1981
            )
1982
        except StopAsyncIteration:
11✔
1983
            self._raise_async_exceptions()
11✔
1984
        else:
UNCOV
1985
            self.get_loop().run_until_complete(asyncgen.aclose())
×
UNCOV
1986
            raise RuntimeError("Async generator fixture did not stop")
×
1987

1988
    def run_fixture(
11✔
1989
        self,
1990
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1991
        kwargs: dict[str, Any],
1992
    ) -> T_Retval:
1993
        retval = self.get_loop().run_until_complete(
11✔
1994
            self._call_in_runner_task(fixture_func, **kwargs)
1995
        )
1996
        self._raise_async_exceptions()
11✔
1997
        return retval
11✔
1998

1999
    def run_test(
11✔
2000
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
2001
    ) -> None:
2002
        try:
11✔
2003
            self.get_loop().run_until_complete(
11✔
2004
                self._call_in_runner_task(test_func, **kwargs)
2005
            )
2006
        except Exception as exc:
11✔
2007
            self._exceptions.append(exc)
10✔
2008

2009
        self._raise_async_exceptions()
11✔
2010

2011

2012
class AsyncIOBackend(AsyncBackend):
11✔
2013
    @classmethod
11✔
2014
    def run(
11✔
2015
        cls,
2016
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2017
        args: tuple[Unpack[PosArgsT]],
2018
        kwargs: dict[str, Any],
2019
        options: dict[str, Any],
2020
    ) -> T_Retval:
2021
        @wraps(func)
11✔
2022
        async def wrapper() -> T_Retval:
11✔
2023
            task = cast(asyncio.Task, current_task())
11✔
2024
            task.set_name(get_callable_name(func))
11✔
2025
            _task_states[task] = TaskState(None, None)
11✔
2026

2027
            try:
11✔
2028
                return await func(*args)
11✔
2029
            finally:
2030
                del _task_states[task]
11✔
2031

2032
        debug = options.get("debug", None)
11✔
2033
        loop_factory = options.get("loop_factory", None)
11✔
2034
        if loop_factory is None and options.get("use_uvloop", False):
11✔
2035
            import uvloop
7✔
2036

2037
            loop_factory = uvloop.new_event_loop
7✔
2038

2039
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
11✔
2040
            return runner.run(wrapper())
11✔
2041

2042
    @classmethod
11✔
2043
    def current_token(cls) -> object:
11✔
2044
        return get_running_loop()
11✔
2045

2046
    @classmethod
11✔
2047
    def current_time(cls) -> float:
11✔
2048
        return get_running_loop().time()
11✔
2049

2050
    @classmethod
11✔
2051
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
2052
        return CancelledError
11✔
2053

2054
    @classmethod
11✔
2055
    async def checkpoint(cls) -> None:
11✔
2056
        await sleep(0)
11✔
2057

2058
    @classmethod
11✔
2059
    async def checkpoint_if_cancelled(cls) -> None:
11✔
2060
        task = current_task()
11✔
2061
        if task is None:
11✔
UNCOV
2062
            return
×
2063

2064
        try:
11✔
2065
            cancel_scope = _task_states[task].cancel_scope
11✔
2066
        except KeyError:
11✔
2067
            return
11✔
2068

2069
        while cancel_scope:
11✔
2070
            if cancel_scope.cancel_called:
11✔
2071
                await sleep(0)
11✔
2072
            elif cancel_scope.shield:
11✔
2073
                break
10✔
2074
            else:
2075
                cancel_scope = cancel_scope._parent_scope
11✔
2076

2077
    @classmethod
11✔
2078
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
2079
        with CancelScope(shield=True):
11✔
2080
            await sleep(0)
11✔
2081

2082
    @classmethod
11✔
2083
    async def sleep(cls, delay: float) -> None:
11✔
2084
        await sleep(delay)
11✔
2085

2086
    @classmethod
11✔
2087
    def create_cancel_scope(
11✔
2088
        cls, *, deadline: float = math.inf, shield: bool = False
2089
    ) -> CancelScope:
2090
        return CancelScope(deadline=deadline, shield=shield)
11✔
2091

2092
    @classmethod
11✔
2093
    def current_effective_deadline(cls) -> float:
11✔
2094
        try:
10✔
2095
            cancel_scope = _task_states[
10✔
2096
                current_task()  # type: ignore[index]
2097
            ].cancel_scope
UNCOV
2098
        except KeyError:
×
UNCOV
2099
            return math.inf
×
2100

2101
        deadline = math.inf
10✔
2102
        while cancel_scope:
10✔
2103
            deadline = min(deadline, cancel_scope.deadline)
10✔
2104
            if cancel_scope._cancel_called:
10✔
2105
                deadline = -math.inf
10✔
2106
                break
10✔
2107
            elif cancel_scope.shield:
10✔
2108
                break
10✔
2109
            else:
2110
                cancel_scope = cancel_scope._parent_scope
10✔
2111

2112
        return deadline
10✔
2113

2114
    @classmethod
11✔
2115
    def create_task_group(cls) -> abc.TaskGroup:
11✔
2116
        return TaskGroup()
11✔
2117

2118
    @classmethod
11✔
2119
    def create_event(cls) -> abc.Event:
11✔
2120
        return Event()
11✔
2121

2122
    @classmethod
11✔
2123
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
11✔
2124
        return CapacityLimiter(total_tokens)
10✔
2125

2126
    @classmethod
11✔
2127
    async def run_sync_in_worker_thread(
11✔
2128
        cls,
2129
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2130
        args: tuple[Unpack[PosArgsT]],
2131
        abandon_on_cancel: bool = False,
2132
        limiter: abc.CapacityLimiter | None = None,
2133
    ) -> T_Retval:
2134
        await cls.checkpoint()
11✔
2135

2136
        # If this is the first run in this event loop thread, set up the necessary
2137
        # variables
2138
        try:
11✔
2139
            idle_workers = _threadpool_idle_workers.get()
11✔
2140
            workers = _threadpool_workers.get()
11✔
2141
        except LookupError:
11✔
2142
            idle_workers = deque()
11✔
2143
            workers = set()
11✔
2144
            _threadpool_idle_workers.set(idle_workers)
11✔
2145
            _threadpool_workers.set(workers)
11✔
2146

2147
        async with limiter or cls.current_default_thread_limiter():
11✔
2148
            with CancelScope(shield=not abandon_on_cancel) as scope:
11✔
2149
                future: asyncio.Future = asyncio.Future()
11✔
2150
                root_task = find_root_task()
11✔
2151
                if not idle_workers:
11✔
2152
                    worker = WorkerThread(root_task, workers, idle_workers)
11✔
2153
                    worker.start()
11✔
2154
                    workers.add(worker)
11✔
2155
                    root_task.add_done_callback(worker.stop)
11✔
2156
                else:
2157
                    worker = idle_workers.pop()
11✔
2158

2159
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2160
                    # seconds or longer
2161
                    now = cls.current_time()
11✔
2162
                    while idle_workers:
11✔
2163
                        if (
10✔
2164
                            now - idle_workers[0].idle_since
2165
                            < WorkerThread.MAX_IDLE_TIME
2166
                        ):
2167
                            break
10✔
2168

2169
                        expired_worker = idle_workers.popleft()
×
UNCOV
2170
                        expired_worker.root_task.remove_done_callback(
×
2171
                            expired_worker.stop
2172
                        )
UNCOV
2173
                        expired_worker.stop()
×
2174

2175
                context = copy_context()
11✔
2176
                context.run(sniffio.current_async_library_cvar.set, None)
11✔
2177
                if abandon_on_cancel or scope._parent_scope is None:
11✔
2178
                    worker_scope = scope
11✔
2179
                else:
2180
                    worker_scope = scope._parent_scope
11✔
2181

2182
                worker.queue.put_nowait((context, func, args, future, worker_scope))
11✔
2183
                return await future
11✔
2184

2185
    @classmethod
11✔
2186
    def check_cancelled(cls) -> None:
11✔
2187
        scope: CancelScope | None = threadlocals.current_cancel_scope
11✔
2188
        while scope is not None:
11✔
2189
            if scope.cancel_called:
11✔
2190
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
11✔
2191

2192
            if scope.shield:
11✔
UNCOV
2193
                return
×
2194

2195
            scope = scope._parent_scope
11✔
2196

2197
    @classmethod
11✔
2198
    def run_async_from_thread(
11✔
2199
        cls,
2200
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2201
        args: tuple[Unpack[PosArgsT]],
2202
        token: object,
2203
    ) -> T_Retval:
2204
        async def task_wrapper(scope: CancelScope) -> T_Retval:
11✔
2205
            __tracebackhide__ = True
11✔
2206
            task = cast(asyncio.Task, current_task())
11✔
2207
            _task_states[task] = TaskState(None, scope)
11✔
2208
            scope._tasks.add(task)
11✔
2209
            try:
11✔
2210
                return await func(*args)
11✔
2211
            except CancelledError as exc:
11✔
2212
                raise concurrent.futures.CancelledError(str(exc)) from None
11✔
2213
            finally:
2214
                scope._tasks.discard(task)
11✔
2215

2216
        loop = cast(AbstractEventLoop, token)
11✔
2217
        context = copy_context()
11✔
2218
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
11✔
2219
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
11✔
2220
        f: concurrent.futures.Future[T_Retval] = context.run(
11✔
2221
            asyncio.run_coroutine_threadsafe, wrapper, loop
2222
        )
2223
        return f.result()
11✔
2224

2225
    @classmethod
11✔
2226
    def run_sync_from_thread(
11✔
2227
        cls,
2228
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2229
        args: tuple[Unpack[PosArgsT]],
2230
        token: object,
2231
    ) -> T_Retval:
2232
        @wraps(func)
11✔
2233
        def wrapper() -> None:
11✔
2234
            try:
11✔
2235
                sniffio.current_async_library_cvar.set("asyncio")
11✔
2236
                f.set_result(func(*args))
11✔
2237
            except BaseException as exc:
11✔
2238
                f.set_exception(exc)
11✔
2239
                if not isinstance(exc, Exception):
11✔
UNCOV
2240
                    raise
×
2241

2242
        f: concurrent.futures.Future[T_Retval] = Future()
11✔
2243
        loop = cast(AbstractEventLoop, token)
11✔
2244
        loop.call_soon_threadsafe(wrapper)
11✔
2245
        return f.result()
11✔
2246

2247
    @classmethod
11✔
2248
    def create_blocking_portal(cls) -> abc.BlockingPortal:
11✔
2249
        return BlockingPortal()
11✔
2250

2251
    @classmethod
11✔
2252
    async def open_process(
11✔
2253
        cls,
2254
        command: str | bytes | Sequence[str | bytes],
2255
        *,
2256
        shell: bool,
2257
        stdin: int | IO[Any] | None,
2258
        stdout: int | IO[Any] | None,
2259
        stderr: int | IO[Any] | None,
2260
        cwd: str | bytes | PathLike | None = None,
2261
        env: Mapping[str, str] | None = None,
2262
        start_new_session: bool = False,
2263
    ) -> Process:
2264
        await cls.checkpoint()
10✔
2265
        if shell:
10✔
2266
            process = await asyncio.create_subprocess_shell(
10✔
2267
                cast("str | bytes", command),
2268
                stdin=stdin,
2269
                stdout=stdout,
2270
                stderr=stderr,
2271
                cwd=cwd,
2272
                env=env,
2273
                start_new_session=start_new_session,
2274
            )
2275
        else:
2276
            process = await asyncio.create_subprocess_exec(
10✔
2277
                *command,
2278
                stdin=stdin,
2279
                stdout=stdout,
2280
                stderr=stderr,
2281
                cwd=cwd,
2282
                env=env,
2283
                start_new_session=start_new_session,
2284
            )
2285

2286
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
10✔
2287
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
10✔
2288
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
10✔
2289
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
10✔
2290

2291
    @classmethod
11✔
2292
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
11✔
2293
        create_task(
10✔
2294
            _shutdown_process_pool_on_exit(workers),
2295
            name="AnyIO process pool shutdown task",
2296
        )
2297
        find_root_task().add_done_callback(
10✔
2298
            partial(_forcibly_shutdown_process_pool_on_exit, workers)  # type:ignore[arg-type]
2299
        )
2300

2301
    @classmethod
11✔
2302
    async def connect_tcp(
11✔
2303
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2304
    ) -> abc.SocketStream:
2305
        transport, protocol = cast(
11✔
2306
            Tuple[asyncio.Transport, StreamProtocol],
2307
            await get_running_loop().create_connection(
2308
                StreamProtocol, host, port, local_addr=local_address
2309
            ),
2310
        )
2311
        transport.pause_reading()
11✔
2312
        return SocketStream(transport, protocol)
11✔
2313

2314
    @classmethod
11✔
2315
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
11✔
2316
        await cls.checkpoint()
8✔
2317
        loop = get_running_loop()
8✔
2318
        raw_socket = socket.socket(socket.AF_UNIX)
8✔
2319
        raw_socket.setblocking(False)
8✔
2320
        while True:
5✔
2321
            try:
8✔
2322
                raw_socket.connect(path)
8✔
2323
            except BlockingIOError:
8✔
UNCOV
2324
                f: asyncio.Future = asyncio.Future()
×
UNCOV
2325
                loop.add_writer(raw_socket, f.set_result, None)
×
UNCOV
2326
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
UNCOV
2327
                await f
×
2328
            except BaseException:
8✔
2329
                raw_socket.close()
8✔
2330
                raise
8✔
2331
            else:
2332
                return UNIXSocketStream(raw_socket)
8✔
2333

2334
    @classmethod
11✔
2335
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2336
        return TCPSocketListener(sock)
11✔
2337

2338
    @classmethod
11✔
2339
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2340
        return UNIXSocketListener(sock)
8✔
2341

2342
    @classmethod
11✔
2343
    async def create_udp_socket(
11✔
2344
        cls,
2345
        family: AddressFamily,
2346
        local_address: IPSockAddrType | None,
2347
        remote_address: IPSockAddrType | None,
2348
        reuse_port: bool,
2349
    ) -> UDPSocket | ConnectedUDPSocket:
2350
        transport, protocol = await get_running_loop().create_datagram_endpoint(
10✔
2351
            DatagramProtocol,
2352
            local_addr=local_address,
2353
            remote_addr=remote_address,
2354
            family=family,
2355
            reuse_port=reuse_port,
2356
        )
2357
        if protocol.exception:
10✔
UNCOV
2358
            transport.close()
×
UNCOV
2359
            raise protocol.exception
×
2360

2361
        if not remote_address:
10✔
2362
            return UDPSocket(transport, protocol)
10✔
2363
        else:
2364
            return ConnectedUDPSocket(transport, protocol)
10✔
2365

2366
    @classmethod
11✔
2367
    async def create_unix_datagram_socket(  # type: ignore[override]
11✔
2368
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2369
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2370
        await cls.checkpoint()
8✔
2371
        loop = get_running_loop()
8✔
2372

2373
        if remote_path:
8✔
2374
            while True:
5✔
2375
                try:
8✔
2376
                    raw_socket.connect(remote_path)
8✔
2377
                except BlockingIOError:
×
2378
                    f: asyncio.Future = asyncio.Future()
×
2379
                    loop.add_writer(raw_socket, f.set_result, None)
×
2380
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
UNCOV
2381
                    await f
×
UNCOV
2382
                except BaseException:
×
UNCOV
2383
                    raw_socket.close()
×
UNCOV
2384
                    raise
×
2385
                else:
2386
                    return ConnectedUNIXDatagramSocket(raw_socket)
8✔
2387
        else:
2388
            return UNIXDatagramSocket(raw_socket)
8✔
2389

2390
    @classmethod
11✔
2391
    async def getaddrinfo(
11✔
2392
        cls,
2393
        host: bytes | str | None,
2394
        port: str | int | None,
2395
        *,
2396
        family: int | AddressFamily = 0,
2397
        type: int | SocketKind = 0,
2398
        proto: int = 0,
2399
        flags: int = 0,
2400
    ) -> list[
2401
        tuple[
2402
            AddressFamily,
2403
            SocketKind,
2404
            int,
2405
            str,
2406
            tuple[str, int] | tuple[str, int, int, int],
2407
        ]
2408
    ]:
2409
        return await get_running_loop().getaddrinfo(
11✔
2410
            host, port, family=family, type=type, proto=proto, flags=flags
2411
        )
2412

2413
    @classmethod
11✔
2414
    async def getnameinfo(
11✔
2415
        cls, sockaddr: IPSockAddrType, flags: int = 0
2416
    ) -> tuple[str, str]:
2417
        return await get_running_loop().getnameinfo(sockaddr, flags)
10✔
2418

2419
    @classmethod
11✔
2420
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
11✔
2421
        await cls.checkpoint()
×
2422
        try:
×
UNCOV
2423
            read_events = _read_events.get()
×
2424
        except LookupError:
×
2425
            read_events = {}
×
UNCOV
2426
            _read_events.set(read_events)
×
2427

2428
        if read_events.get(sock):
×
2429
            raise BusyResourceError("reading from") from None
×
2430

2431
        loop = get_running_loop()
×
UNCOV
2432
        event = read_events[sock] = asyncio.Event()
×
2433
        loop.add_reader(sock, event.set)
×
2434
        try:
×
2435
            await event.wait()
×
2436
        finally:
2437
            if read_events.pop(sock, None) is not None:
×
UNCOV
2438
                loop.remove_reader(sock)
×
2439
                readable = True
×
2440
            else:
UNCOV
2441
                readable = False
×
2442

UNCOV
2443
        if not readable:
×
2444
            raise ClosedResourceError
×
2445

2446
    @classmethod
11✔
2447
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
11✔
2448
        await cls.checkpoint()
×
2449
        try:
×
UNCOV
2450
            write_events = _write_events.get()
×
2451
        except LookupError:
×
2452
            write_events = {}
×
UNCOV
2453
            _write_events.set(write_events)
×
2454

2455
        if write_events.get(sock):
×
2456
            raise BusyResourceError("writing to") from None
×
2457

2458
        loop = get_running_loop()
×
UNCOV
2459
        event = write_events[sock] = asyncio.Event()
×
2460
        loop.add_writer(sock.fileno(), event.set)
×
2461
        try:
×
2462
            await event.wait()
×
2463
        finally:
2464
            if write_events.pop(sock, None) is not None:
×
UNCOV
2465
                loop.remove_writer(sock)
×
2466
                writable = True
×
2467
            else:
UNCOV
2468
                writable = False
×
2469

UNCOV
2470
        if not writable:
×
UNCOV
2471
            raise ClosedResourceError
×
2472

2473
    @classmethod
11✔
2474
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
2475
        try:
11✔
2476
            return _default_thread_limiter.get()
11✔
2477
        except LookupError:
11✔
2478
            limiter = CapacityLimiter(40)
11✔
2479
            _default_thread_limiter.set(limiter)
11✔
2480
            return limiter
11✔
2481

2482
    @classmethod
11✔
2483
    def open_signal_receiver(
11✔
2484
        cls, *signals: Signals
2485
    ) -> ContextManager[AsyncIterator[Signals]]:
2486
        return _SignalReceiver(signals)
9✔
2487

2488
    @classmethod
11✔
2489
    def get_current_task(cls) -> TaskInfo:
11✔
2490
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
11✔
2491

2492
    @classmethod
11✔
2493
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
2494
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
11✔
2495

2496
    @classmethod
11✔
2497
    async def wait_all_tasks_blocked(cls) -> None:
11✔
2498
        await cls.checkpoint()
11✔
2499
        this_task = current_task()
11✔
2500
        while True:
7✔
2501
            for task in all_tasks():
11✔
2502
                if task is this_task:
11✔
2503
                    continue
11✔
2504

2505
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
2506
                if waiter is None or waiter.done():
11✔
2507
                    await sleep(0.1)
11✔
2508
                    break
11✔
2509
            else:
2510
                return
11✔
2511

2512
    @classmethod
11✔
2513
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
2514
        return TestRunner(**options)
11✔
2515

2516

2517
backend_class = AsyncIOBackend
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc