• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 7242943339

18 Dec 2023 02:41AM UTC coverage: 90.891% (-0.01%) from 90.905%
7242943339

Pull #655

github

web-flow
Merge 36e765177 into f757314f8
Pull Request #655: Path.relative_to() walk_up support

5 of 5 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

4470 of 4918 relevant lines covered (90.89%)

8.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.1
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
10✔
2

3
import array
10✔
4
import asyncio
10✔
5
import concurrent.futures
10✔
6
import math
10✔
7
import socket
10✔
8
import sys
10✔
9
import threading
10✔
10
from asyncio import (
10✔
11
    AbstractEventLoop,
12
    CancelledError,
13
    all_tasks,
14
    create_task,
15
    current_task,
16
    get_running_loop,
17
    sleep,
18
)
19
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
10✔
20
from collections import OrderedDict, deque
10✔
21
from collections.abc import AsyncIterator, Generator, Iterable
10✔
22
from concurrent.futures import Future
10✔
23
from contextlib import suppress
10✔
24
from contextvars import Context, copy_context
10✔
25
from dataclasses import dataclass
10✔
26
from functools import partial, wraps
10✔
27
from inspect import (
10✔
28
    CORO_RUNNING,
29
    CORO_SUSPENDED,
30
    getcoroutinestate,
31
    iscoroutine,
32
)
33
from io import IOBase
10✔
34
from os import PathLike
10✔
35
from queue import Queue
10✔
36
from signal import Signals
10✔
37
from socket import AddressFamily, SocketKind
10✔
38
from threading import Thread
10✔
39
from types import TracebackType
10✔
40
from typing import (
10✔
41
    IO,
42
    Any,
43
    AsyncGenerator,
44
    Awaitable,
45
    Callable,
46
    Collection,
47
    ContextManager,
48
    Coroutine,
49
    Mapping,
50
    Optional,
51
    Sequence,
52
    Tuple,
53
    TypeVar,
54
    cast,
55
)
56
from weakref import WeakKeyDictionary
10✔
57

58
import sniffio
10✔
59

60
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
10✔
61
from .._core._eventloop import claim_worker_thread, threadlocals
10✔
62
from .._core._exceptions import (
10✔
63
    BrokenResourceError,
64
    BusyResourceError,
65
    ClosedResourceError,
66
    EndOfStream,
67
    WouldBlock,
68
)
69
from .._core._sockets import convert_ipv6_sockaddr
10✔
70
from .._core._streams import create_memory_object_stream
10✔
71
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
10✔
72
from .._core._synchronization import Event as BaseEvent
10✔
73
from .._core._synchronization import ResourceGuard
10✔
74
from .._core._tasks import CancelScope as BaseCancelScope
10✔
75
from ..abc import (
10✔
76
    AsyncBackend,
77
    IPSockAddrType,
78
    SocketListener,
79
    UDPPacketType,
80
    UNIXDatagramPacketType,
81
)
82
from ..lowlevel import RunVar
10✔
83
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10✔
84

85
if sys.version_info >= (3, 10):
10✔
86
    from typing import ParamSpec
6✔
87
else:
88
    from typing_extensions import ParamSpec
4✔
89

90
if sys.version_info >= (3, 11):
10✔
91
    from asyncio import Runner
4✔
92
    from typing import TypeVarTuple, Unpack
4✔
93
else:
94
    import contextvars
6✔
95
    import enum
6✔
96
    import signal
6✔
97
    from asyncio import coroutines, events, exceptions, tasks
6✔
98

99
    from exceptiongroup import BaseExceptionGroup
6✔
100
    from typing_extensions import TypeVarTuple, Unpack
6✔
101

102
    class _State(enum.Enum):
6✔
103
        CREATED = "created"
6✔
104
        INITIALIZED = "initialized"
6✔
105
        CLOSED = "closed"
6✔
106

107
    class Runner:
6✔
108
        # Copied from CPython 3.11
109
        def __init__(
6✔
110
            self,
111
            *,
112
            debug: bool | None = None,
113
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
114
        ):
115
            self._state = _State.CREATED
6✔
116
            self._debug = debug
6✔
117
            self._loop_factory = loop_factory
6✔
118
            self._loop: AbstractEventLoop | None = None
6✔
119
            self._context = None
6✔
120
            self._interrupt_count = 0
6✔
121
            self._set_event_loop = False
6✔
122

123
        def __enter__(self) -> Runner:
6✔
124
            self._lazy_init()
6✔
125
            return self
6✔
126

127
        def __exit__(
6✔
128
            self,
129
            exc_type: type[BaseException],
130
            exc_val: BaseException,
131
            exc_tb: TracebackType,
132
        ) -> None:
133
            self.close()
6✔
134

135
        def close(self) -> None:
6✔
136
            """Shutdown and close event loop."""
137
            if self._state is not _State.INITIALIZED:
6✔
138
                return
×
139
            try:
6✔
140
                loop = self._loop
6✔
141
                _cancel_all_tasks(loop)
6✔
142
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
143
                if hasattr(loop, "shutdown_default_executor"):
6✔
144
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
145
                else:
146
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
147
            finally:
148
                if self._set_event_loop:
6✔
149
                    events.set_event_loop(None)
6✔
150
                loop.close()
6✔
151
                self._loop = None
6✔
152
                self._state = _State.CLOSED
6✔
153

154
        def get_loop(self) -> AbstractEventLoop:
6✔
155
            """Return embedded event loop."""
156
            self._lazy_init()
6✔
157
            return self._loop
6✔
158

159
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
160
            """Run a coroutine inside the embedded event loop."""
161
            if not coroutines.iscoroutine(coro):
6✔
162
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
163

164
            if events._get_running_loop() is not None:
6✔
165
                # fail fast with short traceback
166
                raise RuntimeError(
×
167
                    "Runner.run() cannot be called from a running event loop"
168
                )
169

170
            self._lazy_init()
6✔
171

172
            if context is None:
6✔
173
                context = self._context
6✔
174
            task = context.run(self._loop.create_task, coro)
6✔
175

176
            if (
6✔
177
                threading.current_thread() is threading.main_thread()
178
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
179
            ):
180
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
181
                try:
6✔
182
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
183
                except ValueError:
×
184
                    # `signal.signal` may throw if `threading.main_thread` does
185
                    # not support signals (e.g. embedded interpreter with signals
186
                    # not registered - see gh-91880)
187
                    sigint_handler = None
×
188
            else:
189
                sigint_handler = None
6✔
190

191
            self._interrupt_count = 0
6✔
192
            try:
6✔
193
                return self._loop.run_until_complete(task)
6✔
194
            except exceptions.CancelledError:
6✔
195
                if self._interrupt_count > 0:
×
196
                    uncancel = getattr(task, "uncancel", None)
×
197
                    if uncancel is not None and uncancel() == 0:
×
198
                        raise KeyboardInterrupt()
×
199
                raise  # CancelledError
×
200
            finally:
201
                if (
6✔
202
                    sigint_handler is not None
203
                    and signal.getsignal(signal.SIGINT) is sigint_handler
204
                ):
205
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
206

207
        def _lazy_init(self) -> None:
6✔
208
            if self._state is _State.CLOSED:
6✔
209
                raise RuntimeError("Runner is closed")
×
210
            if self._state is _State.INITIALIZED:
6✔
211
                return
6✔
212
            if self._loop_factory is None:
6✔
213
                self._loop = events.new_event_loop()
6✔
214
                if not self._set_event_loop:
6✔
215
                    # Call set_event_loop only once to avoid calling
216
                    # attach_loop multiple times on child watchers
217
                    events.set_event_loop(self._loop)
6✔
218
                    self._set_event_loop = True
6✔
219
            else:
220
                self._loop = self._loop_factory()
4✔
221
            if self._debug is not None:
6✔
222
                self._loop.set_debug(self._debug)
6✔
223
            self._context = contextvars.copy_context()
6✔
224
            self._state = _State.INITIALIZED
6✔
225

226
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
227
            self._interrupt_count += 1
×
228
            if self._interrupt_count == 1 and not main_task.done():
×
229
                main_task.cancel()
×
230
                # wakeup loop if it is blocked by select() with long timeout
231
                self._loop.call_soon_threadsafe(lambda: None)
×
232
                return
×
233
            raise KeyboardInterrupt()
×
234

235
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
236
        to_cancel = tasks.all_tasks(loop)
6✔
237
        if not to_cancel:
6✔
238
            return
6✔
239

240
        for task in to_cancel:
6✔
241
            task.cancel()
6✔
242

243
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
244

245
        for task in to_cancel:
8✔
246
            if task.cancelled():
6✔
247
                continue
6✔
248
            if task.exception() is not None:
5✔
249
                loop.call_exception_handler(
×
250
                    {
251
                        "message": "unhandled exception during asyncio.run() shutdown",
252
                        "exception": task.exception(),
253
                        "task": task,
254
                    }
255
                )
256

257
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
258
        """Schedule the shutdown of the default executor."""
259

260
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
261
            try:
3✔
262
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
263
                loop.call_soon_threadsafe(future.set_result, None)
3✔
264
            except Exception as ex:
×
265
                loop.call_soon_threadsafe(future.set_exception, ex)
×
266

267
        loop._executor_shutdown_called = True
3✔
268
        if loop._default_executor is None:
3✔
269
            return
3✔
270
        future = loop.create_future()
3✔
271
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
272
        thread.start()
3✔
273
        try:
3✔
274
            await future
3✔
275
        finally:
276
            thread.join()
3✔
277

278

279
T_Retval = TypeVar("T_Retval")
10✔
280
T_contra = TypeVar("T_contra", contravariant=True)
10✔
281
PosArgsT = TypeVarTuple("PosArgsT")
10✔
282
P = ParamSpec("P")
10✔
283

284
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
10✔
285

286

287
def find_root_task() -> asyncio.Task:
10✔
288
    root_task = _root_task.get(None)
10✔
289
    if root_task is not None and not root_task.done():
10✔
290
        return root_task
10✔
291

292
    # Look for a task that has been started via run_until_complete()
293
    for task in all_tasks():
10✔
294
        if task._callbacks and not task.done():
10✔
295
            callbacks = [cb for cb, context in task._callbacks]
10✔
296
            for cb in callbacks:
10✔
297
                if (
10✔
298
                    cb is _run_until_complete_cb
299
                    or getattr(cb, "__module__", None) == "uvloop.loop"
300
                ):
301
                    _root_task.set(task)
10✔
302
                    return task
10✔
303

304
    # Look up the topmost task in the AnyIO task tree, if possible
305
    task = cast(asyncio.Task, current_task())
9✔
306
    state = _task_states.get(task)
9✔
307
    if state:
9✔
308
        cancel_scope = state.cancel_scope
9✔
309
        while cancel_scope and cancel_scope._parent_scope is not None:
9✔
310
            cancel_scope = cancel_scope._parent_scope
×
311

312
        if cancel_scope is not None:
9✔
313
            return cast(asyncio.Task, cancel_scope._host_task)
9✔
314

315
    return task
×
316

317

318
def get_callable_name(func: Callable) -> str:
10✔
319
    module = getattr(func, "__module__", None)
10✔
320
    qualname = getattr(func, "__qualname__", None)
10✔
321
    return ".".join([x for x in (module, qualname) if x])
10✔
322

323

324
#
325
# Event loop
326
#
327

328
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
10✔
329

330

331
def _task_started(task: asyncio.Task) -> bool:
10✔
332
    """Return ``True`` if the task has been started and has not finished."""
333
    try:
10✔
334
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
10✔
335
    except AttributeError:
×
336
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
337
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
338

339

340
#
341
# Timeouts and cancellation
342
#
343

344

345
class CancelScope(BaseCancelScope):
10✔
346
    def __new__(
10✔
347
        cls, *, deadline: float = math.inf, shield: bool = False
348
    ) -> CancelScope:
349
        return object.__new__(cls)
10✔
350

351
    def __init__(self, deadline: float = math.inf, shield: bool = False):
10✔
352
        self._deadline = deadline
10✔
353
        self._shield = shield
10✔
354
        self._parent_scope: CancelScope | None = None
10✔
355
        self._child_scopes: set[CancelScope] = set()
10✔
356
        self._cancel_called = False
10✔
357
        self._cancelled_caught = False
10✔
358
        self._active = False
10✔
359
        self._timeout_handle: asyncio.TimerHandle | None = None
10✔
360
        self._cancel_handle: asyncio.Handle | None = None
10✔
361
        self._tasks: set[asyncio.Task] = set()
10✔
362
        self._host_task: asyncio.Task | None = None
10✔
363
        self._cancel_calls: int = 0
10✔
364
        self._cancelling: int | None = None
10✔
365

366
    def __enter__(self) -> CancelScope:
10✔
367
        if self._active:
10✔
368
            raise RuntimeError(
×
369
                "Each CancelScope may only be used for a single 'with' block"
370
            )
371

372
        self._host_task = host_task = cast(asyncio.Task, current_task())
10✔
373
        self._tasks.add(host_task)
10✔
374
        try:
10✔
375
            task_state = _task_states[host_task]
10✔
376
        except KeyError:
10✔
377
            task_state = TaskState(None, self)
10✔
378
            _task_states[host_task] = task_state
10✔
379
        else:
380
            self._parent_scope = task_state.cancel_scope
10✔
381
            task_state.cancel_scope = self
10✔
382
            if self._parent_scope is not None:
10✔
383
                self._parent_scope._child_scopes.add(self)
10✔
384
                self._parent_scope._tasks.remove(host_task)
10✔
385

386
        self._timeout()
10✔
387
        self._active = True
10✔
388
        if sys.version_info >= (3, 11):
10✔
389
            self._cancelling = self._host_task.cancelling()
4✔
390

391
        # Start cancelling the host task if the scope was cancelled before entering
392
        if self._cancel_called:
10✔
393
            self._deliver_cancellation(self)
10✔
394

395
        return self
10✔
396

397
    def __exit__(
10✔
398
        self,
399
        exc_type: type[BaseException] | None,
400
        exc_val: BaseException | None,
401
        exc_tb: TracebackType | None,
402
    ) -> bool | None:
403
        if not self._active:
10✔
404
            raise RuntimeError("This cancel scope is not active")
9✔
405
        if current_task() is not self._host_task:
10✔
406
            raise RuntimeError(
9✔
407
                "Attempted to exit cancel scope in a different task than it was "
408
                "entered in"
409
            )
410

411
        assert self._host_task is not None
10✔
412
        host_task_state = _task_states.get(self._host_task)
10✔
413
        if host_task_state is None or host_task_state.cancel_scope is not self:
10✔
414
            raise RuntimeError(
9✔
415
                "Attempted to exit a cancel scope that isn't the current tasks's "
416
                "current cancel scope"
417
            )
418

419
        self._active = False
10✔
420
        if self._timeout_handle:
10✔
421
            self._timeout_handle.cancel()
10✔
422
            self._timeout_handle = None
10✔
423

424
        self._tasks.remove(self._host_task)
10✔
425
        if self._parent_scope is not None:
10✔
426
            self._parent_scope._child_scopes.remove(self)
10✔
427
            self._parent_scope._tasks.add(self._host_task)
10✔
428

429
        host_task_state.cancel_scope = self._parent_scope
10✔
430

431
        # Restart the cancellation effort in the closest directly cancelled parent
432
        # scope if this one was shielded
433
        self._restart_cancellation_in_parent()
10✔
434

435
        if self._cancel_called and exc_val is not None:
10✔
436
            for exc in iterate_exceptions(exc_val):
10✔
437
                if isinstance(exc, CancelledError):
10✔
438
                    self._cancelled_caught = self._uncancel(exc)
10✔
439
                    if self._cancelled_caught:
10✔
440
                        break
10✔
441

442
            return self._cancelled_caught
10✔
443

444
        return None
10✔
445

446
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
10✔
447
        if sys.version_info < (3, 9) or self._host_task is None:
10✔
448
            self._cancel_calls = 0
3✔
449
            return True
3✔
450

451
        # Undo all cancellations done by this scope
452
        if self._cancelling is not None:
7✔
453
            while self._cancel_calls:
4✔
454
                self._cancel_calls -= 1
4✔
455
                if self._host_task.uncancel() <= self._cancelling:
4✔
456
                    return True
4✔
457

458
        self._cancel_calls = 0
7✔
459
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
7✔
460

461
    def _timeout(self) -> None:
10✔
462
        if self._deadline != math.inf:
10✔
463
            loop = get_running_loop()
10✔
464
            if loop.time() >= self._deadline:
10✔
465
                self.cancel()
10✔
466
            else:
467
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
10✔
468

469
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
10✔
470
        """
471
        Deliver cancellation to directly contained tasks and nested cancel scopes.
472

473
        Schedule another run at the end if we still have tasks eligible for
474
        cancellation.
475

476
        :param origin: the cancel scope that originated the cancellation
477
        :return: ``True`` if the delivery needs to be retried on the next cycle
478

479
        """
480
        should_retry = False
10✔
481
        current = current_task()
10✔
482
        for task in self._tasks:
10✔
483
            if task._must_cancel:  # type: ignore[attr-defined]
10✔
484
                continue
9✔
485

486
            # The task is eligible for cancellation if it has started
487
            should_retry = True
10✔
488
            if task is not current and (task is self._host_task or _task_started(task)):
10✔
489
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
490
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
10✔
491
                    self._cancel_calls += 1
10✔
492
                    if sys.version_info >= (3, 9):
10✔
493
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
7✔
494
                    else:
495
                        task.cancel()
3✔
496

497
        # Deliver cancellation to child scopes that aren't shielded or running their own
498
        # cancellation callbacks
499
        for scope in self._child_scopes:
10✔
500
            if not scope._shield and not scope.cancel_called:
10✔
501
                should_retry = scope._deliver_cancellation(origin) or should_retry
10✔
502

503
        # Schedule another callback if there are still tasks left
504
        if origin is self:
10✔
505
            if should_retry:
10✔
506
                self._cancel_handle = get_running_loop().call_soon(
10✔
507
                    self._deliver_cancellation, origin
508
                )
509
            else:
510
                self._cancel_handle = None
10✔
511

512
        return should_retry
10✔
513

514
    def _restart_cancellation_in_parent(self) -> None:
10✔
515
        """
516
        Restart the cancellation effort in the closest directly cancelled parent scope.
517

518
        """
519
        scope = self._parent_scope
10✔
520
        while scope is not None:
10✔
521
            if scope._cancel_called:
10✔
522
                if scope._cancel_handle is None:
10✔
523
                    scope._deliver_cancellation(scope)
10✔
524

525
                break
10✔
526

527
            # No point in looking beyond any shielded scope
528
            if scope._shield:
10✔
529
                break
9✔
530

531
            scope = scope._parent_scope
10✔
532

533
    def _parent_cancelled(self) -> bool:
10✔
534
        # Check whether any parent has been cancelled
535
        cancel_scope = self._parent_scope
10✔
536
        while cancel_scope is not None and not cancel_scope._shield:
10✔
537
            if cancel_scope._cancel_called:
10✔
538
                return True
9✔
539
            else:
540
                cancel_scope = cancel_scope._parent_scope
10✔
541

542
        return False
10✔
543

544
    def cancel(self) -> None:
10✔
545
        if not self._cancel_called:
10✔
546
            if self._timeout_handle:
10✔
547
                self._timeout_handle.cancel()
10✔
548
                self._timeout_handle = None
10✔
549

550
            self._cancel_called = True
10✔
551
            if self._host_task is not None:
10✔
552
                self._deliver_cancellation(self)
10✔
553

554
    @property
10✔
555
    def deadline(self) -> float:
10✔
556
        return self._deadline
9✔
557

558
    @deadline.setter
10✔
559
    def deadline(self, value: float) -> None:
10✔
560
        self._deadline = float(value)
9✔
561
        if self._timeout_handle is not None:
9✔
562
            self._timeout_handle.cancel()
9✔
563
            self._timeout_handle = None
9✔
564

565
        if self._active and not self._cancel_called:
9✔
566
            self._timeout()
9✔
567

568
    @property
10✔
569
    def cancel_called(self) -> bool:
10✔
570
        return self._cancel_called
10✔
571

572
    @property
10✔
573
    def cancelled_caught(self) -> bool:
10✔
574
        return self._cancelled_caught
10✔
575

576
    @property
10✔
577
    def shield(self) -> bool:
10✔
578
        return self._shield
10✔
579

580
    @shield.setter
10✔
581
    def shield(self, value: bool) -> None:
10✔
582
        if self._shield != value:
9✔
583
            self._shield = value
9✔
584
            if not value:
9✔
585
                self._restart_cancellation_in_parent()
9✔
586

587

588
#
589
# Task states
590
#
591

592

593
class TaskState:
10✔
594
    """
595
    Encapsulates auxiliary task information that cannot be added to the Task instance
596
    itself because there are no guarantees about its implementation.
597
    """
598

599
    __slots__ = "parent_id", "cancel_scope"
10✔
600

601
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
10✔
602
        self.parent_id = parent_id
10✔
603
        self.cancel_scope = cancel_scope
10✔
604

605

606
_task_states = WeakKeyDictionary()  # type: WeakKeyDictionary[asyncio.Task, TaskState]
10✔
607

608

609
#
610
# Task groups
611
#
612

613

614
class _AsyncioTaskStatus(abc.TaskStatus):
10✔
615
    def __init__(self, future: asyncio.Future, parent_id: int):
10✔
616
        self._future = future
10✔
617
        self._parent_id = parent_id
10✔
618

619
    def started(self, value: T_contra | None = None) -> None:
10✔
620
        try:
10✔
621
            self._future.set_result(value)
10✔
622
        except asyncio.InvalidStateError:
9✔
623
            raise RuntimeError(
9✔
624
                "called 'started' twice on the same task status"
625
            ) from None
626

627
        task = cast(asyncio.Task, current_task())
10✔
628
        _task_states[task].parent_id = self._parent_id
10✔
629

630

631
def iterate_exceptions(
10✔
632
    exception: BaseException,
633
) -> Generator[BaseException, None, None]:
634
    if isinstance(exception, BaseExceptionGroup):
10✔
635
        for exc in exception.exceptions:
9✔
636
            yield from iterate_exceptions(exc)
9✔
637
    else:
638
        yield exception
10✔
639

640

641
class TaskGroup(abc.TaskGroup):
10✔
642
    def __init__(self) -> None:
10✔
643
        self.cancel_scope: CancelScope = CancelScope()
10✔
644
        self._active = False
10✔
645
        self._exceptions: list[BaseException] = []
10✔
646
        self._tasks: set[asyncio.Task] = set()
10✔
647

648
    async def __aenter__(self) -> TaskGroup:
10✔
649
        self.cancel_scope.__enter__()
10✔
650
        self._active = True
10✔
651
        return self
10✔
652

653
    async def __aexit__(
10✔
654
        self,
655
        exc_type: type[BaseException] | None,
656
        exc_val: BaseException | None,
657
        exc_tb: TracebackType | None,
658
    ) -> bool | None:
659
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
10✔
660
        if exc_val is not None:
10✔
661
            self.cancel_scope.cancel()
10✔
662
            if not isinstance(exc_val, CancelledError):
10✔
663
                self._exceptions.append(exc_val)
10✔
664

665
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
10✔
666
        while self._tasks:
10✔
667
            try:
10✔
668
                await asyncio.wait(self._tasks)
10✔
669
            except CancelledError as exc:
10✔
670
                # This task was cancelled natively; reraise the CancelledError later
671
                # unless this task was already interrupted by another exception
672
                self.cancel_scope.cancel()
10✔
673
                if cancelled_exc_while_waiting_tasks is None:
10✔
674
                    cancelled_exc_while_waiting_tasks = exc
10✔
675

676
        self._active = False
10✔
677
        if self._exceptions:
10✔
678
            raise BaseExceptionGroup(
10✔
679
                "unhandled errors in a TaskGroup", self._exceptions
680
            )
681

682
        # Raise the CancelledError received while waiting for child tasks to exit,
683
        # unless the context manager itself was previously exited with another
684
        # exception, or if any of the  child tasks raised an exception other than
685
        # CancelledError
686
        if cancelled_exc_while_waiting_tasks:
10✔
687
            if exc_val is None or ignore_exception:
10✔
688
                raise cancelled_exc_while_waiting_tasks
10✔
689

690
        return ignore_exception
10✔
691

692
    def _spawn(
10✔
693
        self,
694
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
695
        args: tuple[Unpack[PosArgsT]],
696
        name: object,
697
        task_status_future: asyncio.Future | None = None,
698
    ) -> asyncio.Task:
699
        def task_done(_task: asyncio.Task) -> None:
10✔
700
            task_state = _task_states[_task]
10✔
701
            assert task_state.cancel_scope is not None
10✔
702
            assert _task in task_state.cancel_scope._tasks
10✔
703
            task_state.cancel_scope._tasks.remove(_task)
10✔
704
            self._tasks.remove(task)
10✔
705
            del _task_states[_task]
10✔
706

707
            try:
10✔
708
                exc = _task.exception()
10✔
709
            except CancelledError as e:
10✔
710
                while isinstance(e.__context__, CancelledError):
10✔
711
                    e = e.__context__
3✔
712

713
                exc = e
10✔
714

715
            if exc is not None:
10✔
716
                if task_status_future is None or task_status_future.done():
10✔
717
                    if not isinstance(exc, CancelledError):
10✔
718
                        self._exceptions.append(exc)
10✔
719

720
                    if not self.cancel_scope._parent_cancelled():
10✔
721
                        self.cancel_scope.cancel()
10✔
722
                else:
723
                    task_status_future.set_exception(exc)
9✔
724
            elif task_status_future is not None and not task_status_future.done():
10✔
725
                task_status_future.set_exception(
9✔
726
                    RuntimeError("Child exited without calling task_status.started()")
727
                )
728

729
        if not self._active:
10✔
730
            raise RuntimeError(
9✔
731
                "This task group is not active; no new tasks can be started."
732
            )
733

734
        kwargs = {}
10✔
735
        if task_status_future:
10✔
736
            parent_id = id(current_task())
10✔
737
            kwargs["task_status"] = _AsyncioTaskStatus(
10✔
738
                task_status_future, id(self.cancel_scope._host_task)
739
            )
740
        else:
741
            parent_id = id(self.cancel_scope._host_task)
10✔
742

743
        coro = func(*args, **kwargs)
10✔
744
        if not iscoroutine(coro):
10✔
745
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
9✔
746
            raise TypeError(
9✔
747
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
748
                f"the return value ({coro!r}) is not a coroutine object"
749
            )
750

751
        name = get_callable_name(func) if name is None else str(name)
10✔
752
        task = create_task(coro, name=name)
10✔
753
        task.add_done_callback(task_done)
10✔
754

755
        # Make the spawned task inherit the task group's cancel scope
756
        _task_states[task] = TaskState(
10✔
757
            parent_id=parent_id, cancel_scope=self.cancel_scope
758
        )
759
        self.cancel_scope._tasks.add(task)
10✔
760
        self._tasks.add(task)
10✔
761
        return task
10✔
762

763
    def start_soon(
10✔
764
        self,
765
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
766
        *args: Unpack[PosArgsT],
767
        name: object = None,
768
    ) -> None:
769
        self._spawn(func, args, name)
10✔
770

771
    async def start(
10✔
772
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
773
    ) -> Any:
774
        future: asyncio.Future = asyncio.Future()
10✔
775
        task = self._spawn(func, args, name, future)
10✔
776

777
        # If the task raises an exception after sending a start value without a switch
778
        # point between, the task group is cancelled and this method never proceeds to
779
        # process the completed future. That's why we have to have a shielded cancel
780
        # scope here.
781
        try:
10✔
782
            return await future
10✔
783
        except CancelledError:
9✔
784
            # Cancel the task and wait for it to exit before returning
785
            task.cancel()
9✔
786
            with CancelScope(shield=True), suppress(CancelledError):
9✔
787
                await task
9✔
788

789
            raise
9✔
790

791

792
#
793
# Threads
794
#
795

796
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
10✔
797

798

799
class WorkerThread(Thread):
10✔
800
    MAX_IDLE_TIME = 10  # seconds
10✔
801

802
    def __init__(
10✔
803
        self,
804
        root_task: asyncio.Task,
805
        workers: set[WorkerThread],
806
        idle_workers: deque[WorkerThread],
807
    ):
808
        super().__init__(name="AnyIO worker thread")
10✔
809
        self.root_task = root_task
10✔
810
        self.workers = workers
10✔
811
        self.idle_workers = idle_workers
10✔
812
        self.loop = root_task._loop
10✔
813
        self.queue: Queue[
10✔
814
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
815
        ] = Queue(2)
816
        self.idle_since = AsyncIOBackend.current_time()
10✔
817
        self.stopping = False
10✔
818

819
    def _report_result(
10✔
820
        self, future: asyncio.Future, result: Any, exc: BaseException | None
821
    ) -> None:
822
        self.idle_since = AsyncIOBackend.current_time()
10✔
823
        if not self.stopping:
10✔
824
            self.idle_workers.append(self)
10✔
825

826
        if not future.cancelled():
10✔
827
            if exc is not None:
10✔
828
                if isinstance(exc, StopIteration):
10✔
829
                    new_exc = RuntimeError("coroutine raised StopIteration")
9✔
830
                    new_exc.__cause__ = exc
9✔
831
                    exc = new_exc
9✔
832

833
                future.set_exception(exc)
10✔
834
            else:
835
                future.set_result(result)
10✔
836

837
    def run(self) -> None:
10✔
838
        with claim_worker_thread(AsyncIOBackend, self.loop):
10✔
839
            while True:
6✔
840
                item = self.queue.get()
10✔
841
                if item is None:
10✔
842
                    # Shutdown command received
843
                    return
10✔
844

845
                context, func, args, future, cancel_scope = item
10✔
846
                if not future.cancelled():
10✔
847
                    result = None
10✔
848
                    exception: BaseException | None = None
10✔
849
                    threadlocals.current_cancel_scope = cancel_scope
10✔
850
                    try:
10✔
851
                        result = context.run(func, *args)
10✔
852
                    except BaseException as exc:
10✔
853
                        exception = exc
10✔
854
                    finally:
855
                        del threadlocals.current_cancel_scope
10✔
856

857
                    if not self.loop.is_closed():
10✔
858
                        self.loop.call_soon_threadsafe(
10✔
859
                            self._report_result, future, result, exception
860
                        )
861

862
                self.queue.task_done()
10✔
863

864
    def stop(self, f: asyncio.Task | None = None) -> None:
10✔
865
        self.stopping = True
10✔
866
        self.queue.put_nowait(None)
10✔
867
        self.workers.discard(self)
10✔
868
        try:
10✔
869
            self.idle_workers.remove(self)
10✔
870
        except ValueError:
9✔
871
            pass
9✔
872

873

874
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
10✔
875
    "_threadpool_idle_workers"
876
)
877
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
10✔
878

879

880
class BlockingPortal(abc.BlockingPortal):
10✔
881
    def __new__(cls) -> BlockingPortal:
10✔
882
        return object.__new__(cls)
10✔
883

884
    def __init__(self) -> None:
10✔
885
        super().__init__()
10✔
886
        self._loop = get_running_loop()
10✔
887

888
    def _spawn_task_from_thread(
10✔
889
        self,
890
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
891
        args: tuple[Unpack[PosArgsT]],
892
        kwargs: dict[str, Any],
893
        name: object,
894
        future: Future[T_Retval],
895
    ) -> None:
896
        AsyncIOBackend.run_sync_from_thread(
10✔
897
            partial(self._task_group.start_soon, name=name),
898
            (self._call_func, func, args, kwargs, future),
899
            self._loop,
900
        )
901

902

903
#
904
# Subprocesses
905
#
906

907

908
@dataclass(eq=False)
10✔
909
class StreamReaderWrapper(abc.ByteReceiveStream):
10✔
910
    _stream: asyncio.StreamReader
10✔
911

912
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
913
        data = await self._stream.read(max_bytes)
9✔
914
        if data:
9✔
915
            return data
9✔
916
        else:
917
            raise EndOfStream
9✔
918

919
    async def aclose(self) -> None:
10✔
920
        self._stream.feed_eof()
9✔
921

922

923
@dataclass(eq=False)
10✔
924
class StreamWriterWrapper(abc.ByteSendStream):
10✔
925
    _stream: asyncio.StreamWriter
10✔
926

927
    async def send(self, item: bytes) -> None:
10✔
928
        self._stream.write(item)
9✔
929
        await self._stream.drain()
9✔
930

931
    async def aclose(self) -> None:
10✔
932
        self._stream.close()
9✔
933

934

935
@dataclass(eq=False)
10✔
936
class Process(abc.Process):
10✔
937
    _process: asyncio.subprocess.Process
10✔
938
    _stdin: StreamWriterWrapper | None
10✔
939
    _stdout: StreamReaderWrapper | None
10✔
940
    _stderr: StreamReaderWrapper | None
10✔
941

942
    async def aclose(self) -> None:
10✔
943
        if self._stdin:
9✔
944
            await self._stdin.aclose()
9✔
945
        if self._stdout:
9✔
946
            await self._stdout.aclose()
9✔
947
        if self._stderr:
9✔
948
            await self._stderr.aclose()
9✔
949

950
        await self.wait()
9✔
951

952
    async def wait(self) -> int:
10✔
953
        return await self._process.wait()
9✔
954

955
    def terminate(self) -> None:
10✔
956
        self._process.terminate()
7✔
957

958
    def kill(self) -> None:
10✔
959
        self._process.kill()
9✔
960

961
    def send_signal(self, signal: int) -> None:
10✔
962
        self._process.send_signal(signal)
×
963

964
    @property
10✔
965
    def pid(self) -> int:
10✔
966
        return self._process.pid
×
967

968
    @property
10✔
969
    def returncode(self) -> int | None:
10✔
970
        return self._process.returncode
9✔
971

972
    @property
10✔
973
    def stdin(self) -> abc.ByteSendStream | None:
10✔
974
        return self._stdin
9✔
975

976
    @property
10✔
977
    def stdout(self) -> abc.ByteReceiveStream | None:
10✔
978
        return self._stdout
9✔
979

980
    @property
10✔
981
    def stderr(self) -> abc.ByteReceiveStream | None:
10✔
982
        return self._stderr
9✔
983

984

985
def _forcibly_shutdown_process_pool_on_exit(
10✔
986
    workers: set[Process], _task: object
987
) -> None:
988
    """
989
    Forcibly shuts down worker processes belonging to this event loop."""
990
    child_watcher: asyncio.AbstractChildWatcher | None = None
9✔
991
    if sys.version_info < (3, 12):
9✔
992
        try:
8✔
993
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
8✔
994
        except NotImplementedError:
2✔
995
            pass
2✔
996

997
    # Close as much as possible (w/o async/await) to avoid warnings
998
    for process in workers:
9✔
999
        if process.returncode is None:
9✔
1000
            continue
9✔
1001

1002
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1003
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1004
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1005
        process.kill()
×
1006
        if child_watcher:
×
1007
            child_watcher.remove_child_handler(process.pid)
×
1008

1009

1010
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
10✔
1011
    """
1012
    Shuts down worker processes belonging to this event loop.
1013

1014
    NOTE: this only works when the event loop was started using asyncio.run() or
1015
    anyio.run().
1016

1017
    """
1018
    process: abc.Process
1019
    try:
9✔
1020
        await sleep(math.inf)
9✔
1021
    except asyncio.CancelledError:
9✔
1022
        for process in workers:
9✔
1023
            if process.returncode is None:
9✔
1024
                process.kill()
9✔
1025

1026
        for process in workers:
9✔
1027
            await process.aclose()
9✔
1028

1029

1030
#
1031
# Sockets and networking
1032
#
1033

1034

1035
class StreamProtocol(asyncio.Protocol):
10✔
1036
    read_queue: deque[bytes]
10✔
1037
    read_event: asyncio.Event
10✔
1038
    write_event: asyncio.Event
10✔
1039
    exception: Exception | None = None
10✔
1040

1041
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1042
        self.read_queue = deque()
10✔
1043
        self.read_event = asyncio.Event()
10✔
1044
        self.write_event = asyncio.Event()
10✔
1045
        self.write_event.set()
10✔
1046
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
10✔
1047

1048
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1049
        if exc:
10✔
1050
            self.exception = BrokenResourceError()
10✔
1051
            self.exception.__cause__ = exc
10✔
1052

1053
        self.read_event.set()
10✔
1054
        self.write_event.set()
10✔
1055

1056
    def data_received(self, data: bytes) -> None:
10✔
1057
        self.read_queue.append(data)
10✔
1058
        self.read_event.set()
10✔
1059

1060
    def eof_received(self) -> bool | None:
10✔
1061
        self.read_event.set()
10✔
1062
        return True
10✔
1063

1064
    def pause_writing(self) -> None:
10✔
1065
        self.write_event = asyncio.Event()
10✔
1066

1067
    def resume_writing(self) -> None:
10✔
UNCOV
1068
        self.write_event.set()
×
1069

1070

1071
class DatagramProtocol(asyncio.DatagramProtocol):
10✔
1072
    read_queue: deque[tuple[bytes, IPSockAddrType]]
10✔
1073
    read_event: asyncio.Event
10✔
1074
    write_event: asyncio.Event
10✔
1075
    exception: Exception | None = None
10✔
1076

1077
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1078
        self.read_queue = deque(maxlen=100)  # arbitrary value
9✔
1079
        self.read_event = asyncio.Event()
9✔
1080
        self.write_event = asyncio.Event()
9✔
1081
        self.write_event.set()
9✔
1082

1083
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1084
        self.read_event.set()
9✔
1085
        self.write_event.set()
9✔
1086

1087
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
10✔
1088
        addr = convert_ipv6_sockaddr(addr)
9✔
1089
        self.read_queue.append((data, addr))
9✔
1090
        self.read_event.set()
9✔
1091

1092
    def error_received(self, exc: Exception) -> None:
10✔
1093
        self.exception = exc
×
1094

1095
    def pause_writing(self) -> None:
10✔
1096
        self.write_event.clear()
×
1097

1098
    def resume_writing(self) -> None:
10✔
1099
        self.write_event.set()
×
1100

1101

1102
class SocketStream(abc.SocketStream):
10✔
1103
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
10✔
1104
        self._transport = transport
10✔
1105
        self._protocol = protocol
10✔
1106
        self._receive_guard = ResourceGuard("reading from")
10✔
1107
        self._send_guard = ResourceGuard("writing to")
10✔
1108
        self._closed = False
10✔
1109

1110
    @property
10✔
1111
    def _raw_socket(self) -> socket.socket:
10✔
1112
        return self._transport.get_extra_info("socket")
10✔
1113

1114
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1115
        with self._receive_guard:
10✔
1116
            await AsyncIOBackend.checkpoint()
10✔
1117

1118
            if (
10✔
1119
                not self._protocol.read_event.is_set()
1120
                and not self._transport.is_closing()
1121
            ):
1122
                self._transport.resume_reading()
10✔
1123
                await self._protocol.read_event.wait()
10✔
1124
                self._transport.pause_reading()
10✔
1125

1126
            try:
10✔
1127
                chunk = self._protocol.read_queue.popleft()
10✔
1128
            except IndexError:
10✔
1129
                if self._closed:
10✔
1130
                    raise ClosedResourceError from None
10✔
1131
                elif self._protocol.exception:
10✔
1132
                    raise self._protocol.exception from None
10✔
1133
                else:
1134
                    raise EndOfStream from None
10✔
1135

1136
            if len(chunk) > max_bytes:
10✔
1137
                # Split the oversized chunk
1138
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
8✔
1139
                self._protocol.read_queue.appendleft(leftover)
8✔
1140

1141
            # If the read queue is empty, clear the flag so that the next call will
1142
            # block until data is available
1143
            if not self._protocol.read_queue:
10✔
1144
                self._protocol.read_event.clear()
10✔
1145

1146
        return chunk
10✔
1147

1148
    async def send(self, item: bytes) -> None:
10✔
1149
        with self._send_guard:
10✔
1150
            await AsyncIOBackend.checkpoint()
10✔
1151

1152
            if self._closed:
10✔
1153
                raise ClosedResourceError
10✔
1154
            elif self._protocol.exception is not None:
10✔
1155
                raise self._protocol.exception
10✔
1156

1157
            try:
10✔
1158
                self._transport.write(item)
10✔
1159
            except RuntimeError as exc:
×
1160
                if self._transport.is_closing():
×
1161
                    raise BrokenResourceError from exc
×
1162
                else:
1163
                    raise
×
1164

1165
            await self._protocol.write_event.wait()
10✔
1166

1167
    async def send_eof(self) -> None:
10✔
1168
        try:
10✔
1169
            self._transport.write_eof()
10✔
1170
        except OSError:
×
1171
            pass
×
1172

1173
    async def aclose(self) -> None:
10✔
1174
        if not self._transport.is_closing():
10✔
1175
            self._closed = True
10✔
1176
            try:
10✔
1177
                self._transport.write_eof()
10✔
1178
            except OSError:
5✔
1179
                pass
5✔
1180

1181
            self._transport.close()
10✔
1182
            await sleep(0)
10✔
1183
            self._transport.abort()
10✔
1184

1185

1186
class _RawSocketMixin:
10✔
1187
    _receive_future: asyncio.Future | None = None
10✔
1188
    _send_future: asyncio.Future | None = None
10✔
1189
    _closing = False
10✔
1190

1191
    def __init__(self, raw_socket: socket.socket):
10✔
1192
        self.__raw_socket = raw_socket
7✔
1193
        self._receive_guard = ResourceGuard("reading from")
7✔
1194
        self._send_guard = ResourceGuard("writing to")
7✔
1195

1196
    @property
10✔
1197
    def _raw_socket(self) -> socket.socket:
10✔
1198
        return self.__raw_socket
7✔
1199

1200
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1201
        def callback(f: object) -> None:
7✔
1202
            del self._receive_future
7✔
1203
            loop.remove_reader(self.__raw_socket)
7✔
1204

1205
        f = self._receive_future = asyncio.Future()
7✔
1206
        loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1207
        f.add_done_callback(callback)
7✔
1208
        return f
7✔
1209

1210
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1211
        def callback(f: object) -> None:
7✔
1212
            del self._send_future
7✔
1213
            loop.remove_writer(self.__raw_socket)
7✔
1214

1215
        f = self._send_future = asyncio.Future()
7✔
1216
        loop.add_writer(self.__raw_socket, f.set_result, None)
7✔
1217
        f.add_done_callback(callback)
7✔
1218
        return f
7✔
1219

1220
    async def aclose(self) -> None:
10✔
1221
        if not self._closing:
7✔
1222
            self._closing = True
7✔
1223
            if self.__raw_socket.fileno() != -1:
7✔
1224
                self.__raw_socket.close()
7✔
1225

1226
            if self._receive_future:
7✔
1227
                self._receive_future.set_result(None)
7✔
1228
            if self._send_future:
7✔
1229
                self._send_future.set_result(None)
×
1230

1231

1232
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
10✔
1233
    async def send_eof(self) -> None:
10✔
1234
        with self._send_guard:
7✔
1235
            self._raw_socket.shutdown(socket.SHUT_WR)
7✔
1236

1237
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1238
        loop = get_running_loop()
7✔
1239
        await AsyncIOBackend.checkpoint()
7✔
1240
        with self._receive_guard:
7✔
1241
            while True:
4✔
1242
                try:
7✔
1243
                    data = self._raw_socket.recv(max_bytes)
7✔
1244
                except BlockingIOError:
7✔
1245
                    await self._wait_until_readable(loop)
7✔
1246
                except OSError as exc:
7✔
1247
                    if self._closing:
7✔
1248
                        raise ClosedResourceError from None
7✔
1249
                    else:
1250
                        raise BrokenResourceError from exc
1✔
1251
                else:
1252
                    if not data:
7✔
1253
                        raise EndOfStream
7✔
1254

1255
                    return data
7✔
1256

1257
    async def send(self, item: bytes) -> None:
10✔
1258
        loop = get_running_loop()
7✔
1259
        await AsyncIOBackend.checkpoint()
7✔
1260
        with self._send_guard:
7✔
1261
            view = memoryview(item)
7✔
1262
            while view:
7✔
1263
                try:
7✔
1264
                    bytes_sent = self._raw_socket.send(view)
7✔
1265
                except BlockingIOError:
7✔
1266
                    await self._wait_until_writable(loop)
7✔
1267
                except OSError as exc:
7✔
1268
                    if self._closing:
7✔
1269
                        raise ClosedResourceError from None
7✔
1270
                    else:
1271
                        raise BrokenResourceError from exc
1✔
1272
                else:
1273
                    view = view[bytes_sent:]
7✔
1274

1275
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
10✔
1276
        if not isinstance(msglen, int) or msglen < 0:
7✔
1277
            raise ValueError("msglen must be a non-negative integer")
7✔
1278
        if not isinstance(maxfds, int) or maxfds < 1:
7✔
1279
            raise ValueError("maxfds must be a positive integer")
7✔
1280

1281
        loop = get_running_loop()
7✔
1282
        fds = array.array("i")
7✔
1283
        await AsyncIOBackend.checkpoint()
7✔
1284
        with self._receive_guard:
7✔
1285
            while True:
4✔
1286
                try:
7✔
1287
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
7✔
1288
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1289
                    )
1290
                except BlockingIOError:
7✔
1291
                    await self._wait_until_readable(loop)
7✔
1292
                except OSError as exc:
×
1293
                    if self._closing:
×
1294
                        raise ClosedResourceError from None
×
1295
                    else:
1296
                        raise BrokenResourceError from exc
×
1297
                else:
1298
                    if not message and not ancdata:
7✔
1299
                        raise EndOfStream
×
1300

1301
                    break
4✔
1302

1303
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
7✔
1304
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
7✔
1305
                raise RuntimeError(
×
1306
                    f"Received unexpected ancillary data; message = {message!r}, "
1307
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1308
                )
1309

1310
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
7✔
1311

1312
        return message, list(fds)
7✔
1313

1314
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
10✔
1315
        if not message:
7✔
1316
            raise ValueError("message must not be empty")
7✔
1317
        if not fds:
7✔
1318
            raise ValueError("fds must not be empty")
7✔
1319

1320
        loop = get_running_loop()
7✔
1321
        filenos: list[int] = []
7✔
1322
        for fd in fds:
7✔
1323
            if isinstance(fd, int):
7✔
1324
                filenos.append(fd)
×
1325
            elif isinstance(fd, IOBase):
7✔
1326
                filenos.append(fd.fileno())
7✔
1327

1328
        fdarray = array.array("i", filenos)
7✔
1329
        await AsyncIOBackend.checkpoint()
7✔
1330
        with self._send_guard:
7✔
1331
            while True:
4✔
1332
                try:
7✔
1333
                    # The ignore can be removed after mypy picks up
1334
                    # https://github.com/python/typeshed/pull/5545
1335
                    self._raw_socket.sendmsg(
7✔
1336
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1337
                    )
1338
                    break
7✔
1339
                except BlockingIOError:
×
1340
                    await self._wait_until_writable(loop)
×
1341
                except OSError as exc:
×
1342
                    if self._closing:
×
1343
                        raise ClosedResourceError from None
×
1344
                    else:
1345
                        raise BrokenResourceError from exc
×
1346

1347

1348
class TCPSocketListener(abc.SocketListener):
10✔
1349
    _accept_scope: CancelScope | None = None
10✔
1350
    _closed = False
10✔
1351

1352
    def __init__(self, raw_socket: socket.socket):
10✔
1353
        self.__raw_socket = raw_socket
10✔
1354
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
10✔
1355
        self._accept_guard = ResourceGuard("accepting connections from")
10✔
1356

1357
    @property
10✔
1358
    def _raw_socket(self) -> socket.socket:
10✔
1359
        return self.__raw_socket
10✔
1360

1361
    async def accept(self) -> abc.SocketStream:
10✔
1362
        if self._closed:
10✔
1363
            raise ClosedResourceError
10✔
1364

1365
        with self._accept_guard:
10✔
1366
            await AsyncIOBackend.checkpoint()
10✔
1367
            with CancelScope() as self._accept_scope:
10✔
1368
                try:
10✔
1369
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
10✔
1370
                except asyncio.CancelledError:
9✔
1371
                    # Workaround for https://bugs.python.org/issue41317
1372
                    try:
9✔
1373
                        self._loop.remove_reader(self._raw_socket)
9✔
1374
                    except (ValueError, NotImplementedError):
2✔
1375
                        pass
2✔
1376

1377
                    if self._closed:
9✔
1378
                        raise ClosedResourceError from None
9✔
1379

1380
                    raise
9✔
1381
                finally:
1382
                    self._accept_scope = None
10✔
1383

1384
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
10✔
1385
        transport, protocol = await self._loop.connect_accepted_socket(
10✔
1386
            StreamProtocol, client_sock
1387
        )
1388
        return SocketStream(transport, protocol)
10✔
1389

1390
    async def aclose(self) -> None:
10✔
1391
        if self._closed:
10✔
1392
            return
10✔
1393

1394
        self._closed = True
10✔
1395
        if self._accept_scope:
10✔
1396
            # Workaround for https://bugs.python.org/issue41317
1397
            try:
10✔
1398
                self._loop.remove_reader(self._raw_socket)
10✔
1399
            except (ValueError, NotImplementedError):
2✔
1400
                pass
2✔
1401

1402
            self._accept_scope.cancel()
9✔
1403
            await sleep(0)
9✔
1404

1405
        self._raw_socket.close()
10✔
1406

1407

1408
class UNIXSocketListener(abc.SocketListener):
10✔
1409
    def __init__(self, raw_socket: socket.socket):
10✔
1410
        self.__raw_socket = raw_socket
7✔
1411
        self._loop = get_running_loop()
7✔
1412
        self._accept_guard = ResourceGuard("accepting connections from")
7✔
1413
        self._closed = False
7✔
1414

1415
    async def accept(self) -> abc.SocketStream:
10✔
1416
        await AsyncIOBackend.checkpoint()
7✔
1417
        with self._accept_guard:
7✔
1418
            while True:
4✔
1419
                try:
7✔
1420
                    client_sock, _ = self.__raw_socket.accept()
7✔
1421
                    client_sock.setblocking(False)
7✔
1422
                    return UNIXSocketStream(client_sock)
7✔
1423
                except BlockingIOError:
7✔
1424
                    f: asyncio.Future = asyncio.Future()
7✔
1425
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1426
                    f.add_done_callback(
7✔
1427
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1428
                    )
1429
                    await f
7✔
1430
                except OSError as exc:
×
1431
                    if self._closed:
×
1432
                        raise ClosedResourceError from None
×
1433
                    else:
1434
                        raise BrokenResourceError from exc
1✔
1435

1436
    async def aclose(self) -> None:
10✔
1437
        self._closed = True
7✔
1438
        self.__raw_socket.close()
7✔
1439

1440
    @property
10✔
1441
    def _raw_socket(self) -> socket.socket:
10✔
1442
        return self.__raw_socket
7✔
1443

1444

1445
class UDPSocket(abc.UDPSocket):
10✔
1446
    def __init__(
10✔
1447
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1448
    ):
1449
        self._transport = transport
9✔
1450
        self._protocol = protocol
9✔
1451
        self._receive_guard = ResourceGuard("reading from")
9✔
1452
        self._send_guard = ResourceGuard("writing to")
9✔
1453
        self._closed = False
9✔
1454

1455
    @property
10✔
1456
    def _raw_socket(self) -> socket.socket:
10✔
1457
        return self._transport.get_extra_info("socket")
9✔
1458

1459
    async def aclose(self) -> None:
10✔
1460
        if not self._transport.is_closing():
9✔
1461
            self._closed = True
9✔
1462
            self._transport.close()
9✔
1463

1464
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
10✔
1465
        with self._receive_guard:
9✔
1466
            await AsyncIOBackend.checkpoint()
9✔
1467

1468
            # If the buffer is empty, ask for more data
1469
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1470
                self._protocol.read_event.clear()
9✔
1471
                await self._protocol.read_event.wait()
9✔
1472

1473
            try:
9✔
1474
                return self._protocol.read_queue.popleft()
9✔
1475
            except IndexError:
9✔
1476
                if self._closed:
9✔
1477
                    raise ClosedResourceError from None
9✔
1478
                else:
1479
                    raise BrokenResourceError from None
1✔
1480

1481
    async def send(self, item: UDPPacketType) -> None:
10✔
1482
        with self._send_guard:
9✔
1483
            await AsyncIOBackend.checkpoint()
9✔
1484
            await self._protocol.write_event.wait()
9✔
1485
            if self._closed:
9✔
1486
                raise ClosedResourceError
9✔
1487
            elif self._transport.is_closing():
9✔
1488
                raise BrokenResourceError
×
1489
            else:
1490
                self._transport.sendto(*item)
9✔
1491

1492

1493
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
10✔
1494
    def __init__(
10✔
1495
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1496
    ):
1497
        self._transport = transport
9✔
1498
        self._protocol = protocol
9✔
1499
        self._receive_guard = ResourceGuard("reading from")
9✔
1500
        self._send_guard = ResourceGuard("writing to")
9✔
1501
        self._closed = False
9✔
1502

1503
    @property
10✔
1504
    def _raw_socket(self) -> socket.socket:
10✔
1505
        return self._transport.get_extra_info("socket")
9✔
1506

1507
    async def aclose(self) -> None:
10✔
1508
        if not self._transport.is_closing():
9✔
1509
            self._closed = True
9✔
1510
            self._transport.close()
9✔
1511

1512
    async def receive(self) -> bytes:
10✔
1513
        with self._receive_guard:
9✔
1514
            await AsyncIOBackend.checkpoint()
9✔
1515

1516
            # If the buffer is empty, ask for more data
1517
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1518
                self._protocol.read_event.clear()
9✔
1519
                await self._protocol.read_event.wait()
9✔
1520

1521
            try:
9✔
1522
                packet = self._protocol.read_queue.popleft()
9✔
1523
            except IndexError:
9✔
1524
                if self._closed:
9✔
1525
                    raise ClosedResourceError from None
9✔
1526
                else:
1527
                    raise BrokenResourceError from None
×
1528

1529
            return packet[0]
9✔
1530

1531
    async def send(self, item: bytes) -> None:
10✔
1532
        with self._send_guard:
9✔
1533
            await AsyncIOBackend.checkpoint()
9✔
1534
            await self._protocol.write_event.wait()
9✔
1535
            if self._closed:
9✔
1536
                raise ClosedResourceError
9✔
1537
            elif self._transport.is_closing():
9✔
1538
                raise BrokenResourceError
×
1539
            else:
1540
                self._transport.sendto(item)
9✔
1541

1542

1543
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
10✔
1544
    async def receive(self) -> UNIXDatagramPacketType:
10✔
1545
        loop = get_running_loop()
7✔
1546
        await AsyncIOBackend.checkpoint()
7✔
1547
        with self._receive_guard:
7✔
1548
            while True:
4✔
1549
                try:
7✔
1550
                    data = self._raw_socket.recvfrom(65536)
7✔
1551
                except BlockingIOError:
7✔
1552
                    await self._wait_until_readable(loop)
7✔
1553
                except OSError as exc:
7✔
1554
                    if self._closing:
7✔
1555
                        raise ClosedResourceError from None
7✔
1556
                    else:
1557
                        raise BrokenResourceError from exc
1✔
1558
                else:
1559
                    return data
7✔
1560

1561
    async def send(self, item: UNIXDatagramPacketType) -> None:
10✔
1562
        loop = get_running_loop()
7✔
1563
        await AsyncIOBackend.checkpoint()
7✔
1564
        with self._send_guard:
7✔
1565
            while True:
4✔
1566
                try:
7✔
1567
                    self._raw_socket.sendto(*item)
7✔
1568
                except BlockingIOError:
7✔
1569
                    await self._wait_until_writable(loop)
×
1570
                except OSError as exc:
7✔
1571
                    if self._closing:
7✔
1572
                        raise ClosedResourceError from None
7✔
1573
                    else:
1574
                        raise BrokenResourceError from exc
1✔
1575
                else:
1576
                    return
7✔
1577

1578

1579
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
10✔
1580
    async def receive(self) -> bytes:
10✔
1581
        loop = get_running_loop()
7✔
1582
        await AsyncIOBackend.checkpoint()
7✔
1583
        with self._receive_guard:
7✔
1584
            while True:
4✔
1585
                try:
7✔
1586
                    data = self._raw_socket.recv(65536)
7✔
1587
                except BlockingIOError:
7✔
1588
                    await self._wait_until_readable(loop)
7✔
1589
                except OSError as exc:
7✔
1590
                    if self._closing:
7✔
1591
                        raise ClosedResourceError from None
7✔
1592
                    else:
1593
                        raise BrokenResourceError from exc
1✔
1594
                else:
1595
                    return data
7✔
1596

1597
    async def send(self, item: bytes) -> None:
10✔
1598
        loop = get_running_loop()
7✔
1599
        await AsyncIOBackend.checkpoint()
7✔
1600
        with self._send_guard:
7✔
1601
            while True:
4✔
1602
                try:
7✔
1603
                    self._raw_socket.send(item)
7✔
1604
                except BlockingIOError:
7✔
1605
                    await self._wait_until_writable(loop)
×
1606
                except OSError as exc:
7✔
1607
                    if self._closing:
7✔
1608
                        raise ClosedResourceError from None
7✔
1609
                    else:
1610
                        raise BrokenResourceError from exc
1✔
1611
                else:
1612
                    return
7✔
1613

1614

1615
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
10✔
1616
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
10✔
1617

1618

1619
#
1620
# Synchronization
1621
#
1622

1623

1624
class Event(BaseEvent):
10✔
1625
    def __new__(cls) -> Event:
10✔
1626
        return object.__new__(cls)
10✔
1627

1628
    def __init__(self) -> None:
10✔
1629
        self._event = asyncio.Event()
10✔
1630

1631
    def set(self) -> None:
10✔
1632
        self._event.set()
10✔
1633

1634
    def is_set(self) -> bool:
10✔
1635
        return self._event.is_set()
10✔
1636

1637
    async def wait(self) -> None:
10✔
1638
        if self.is_set():
10✔
1639
            await AsyncIOBackend.checkpoint()
10✔
1640
        else:
1641
            await self._event.wait()
10✔
1642

1643
    def statistics(self) -> EventStatistics:
10✔
1644
        return EventStatistics(len(self._event._waiters))  # type: ignore[attr-defined]
9✔
1645

1646

1647
class CapacityLimiter(BaseCapacityLimiter):
10✔
1648
    _total_tokens: float = 0
10✔
1649

1650
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
10✔
1651
        return object.__new__(cls)
10✔
1652

1653
    def __init__(self, total_tokens: float):
10✔
1654
        self._borrowers: set[Any] = set()
10✔
1655
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
10✔
1656
        self.total_tokens = total_tokens
10✔
1657

1658
    async def __aenter__(self) -> None:
10✔
1659
        await self.acquire()
10✔
1660

1661
    async def __aexit__(
10✔
1662
        self,
1663
        exc_type: type[BaseException] | None,
1664
        exc_val: BaseException | None,
1665
        exc_tb: TracebackType | None,
1666
    ) -> None:
1667
        self.release()
10✔
1668

1669
    @property
10✔
1670
    def total_tokens(self) -> float:
10✔
1671
        return self._total_tokens
9✔
1672

1673
    @total_tokens.setter
10✔
1674
    def total_tokens(self, value: float) -> None:
10✔
1675
        if not isinstance(value, int) and not math.isinf(value):
10✔
1676
            raise TypeError("total_tokens must be an int or math.inf")
9✔
1677
        if value < 1:
10✔
1678
            raise ValueError("total_tokens must be >= 1")
9✔
1679

1680
        waiters_to_notify = max(value - self._total_tokens, 0)
10✔
1681
        self._total_tokens = value
10✔
1682

1683
        # Notify waiting tasks that they have acquired the limiter
1684
        while self._wait_queue and waiters_to_notify:
10✔
1685
            event = self._wait_queue.popitem(last=False)[1]
9✔
1686
            event.set()
9✔
1687
            waiters_to_notify -= 1
9✔
1688

1689
    @property
10✔
1690
    def borrowed_tokens(self) -> int:
10✔
1691
        return len(self._borrowers)
9✔
1692

1693
    @property
10✔
1694
    def available_tokens(self) -> float:
10✔
1695
        return self._total_tokens - len(self._borrowers)
9✔
1696

1697
    def acquire_nowait(self) -> None:
10✔
1698
        self.acquire_on_behalf_of_nowait(current_task())
×
1699

1700
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
10✔
1701
        if borrower in self._borrowers:
10✔
1702
            raise RuntimeError(
9✔
1703
                "this borrower is already holding one of this CapacityLimiter's "
1704
                "tokens"
1705
            )
1706

1707
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
10✔
1708
            raise WouldBlock
9✔
1709

1710
        self._borrowers.add(borrower)
10✔
1711

1712
    async def acquire(self) -> None:
10✔
1713
        return await self.acquire_on_behalf_of(current_task())
10✔
1714

1715
    async def acquire_on_behalf_of(self, borrower: object) -> None:
10✔
1716
        await AsyncIOBackend.checkpoint_if_cancelled()
10✔
1717
        try:
10✔
1718
            self.acquire_on_behalf_of_nowait(borrower)
10✔
1719
        except WouldBlock:
9✔
1720
            event = asyncio.Event()
9✔
1721
            self._wait_queue[borrower] = event
9✔
1722
            try:
9✔
1723
                await event.wait()
9✔
1724
            except BaseException:
×
1725
                self._wait_queue.pop(borrower, None)
×
1726
                raise
×
1727

1728
            self._borrowers.add(borrower)
9✔
1729
        else:
1730
            try:
10✔
1731
                await AsyncIOBackend.cancel_shielded_checkpoint()
10✔
1732
            except BaseException:
9✔
1733
                self.release()
9✔
1734
                raise
9✔
1735

1736
    def release(self) -> None:
10✔
1737
        self.release_on_behalf_of(current_task())
10✔
1738

1739
    def release_on_behalf_of(self, borrower: object) -> None:
10✔
1740
        try:
10✔
1741
            self._borrowers.remove(borrower)
10✔
1742
        except KeyError:
9✔
1743
            raise RuntimeError(
9✔
1744
                "this borrower isn't holding any of this CapacityLimiter's " "tokens"
1745
            ) from None
1746

1747
        # Notify the next task in line if this limiter has free capacity now
1748
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
10✔
1749
            event = self._wait_queue.popitem(last=False)[1]
9✔
1750
            event.set()
9✔
1751

1752
    def statistics(self) -> CapacityLimiterStatistics:
10✔
1753
        return CapacityLimiterStatistics(
9✔
1754
            self.borrowed_tokens,
1755
            self.total_tokens,
1756
            tuple(self._borrowers),
1757
            len(self._wait_queue),
1758
        )
1759

1760

1761
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
10✔
1762

1763

1764
#
1765
# Operating system signals
1766
#
1767

1768

1769
class _SignalReceiver:
10✔
1770
    def __init__(self, signals: tuple[Signals, ...]):
10✔
1771
        self._signals = signals
8✔
1772
        self._loop = get_running_loop()
8✔
1773
        self._signal_queue: deque[Signals] = deque()
8✔
1774
        self._future: asyncio.Future = asyncio.Future()
8✔
1775
        self._handled_signals: set[Signals] = set()
8✔
1776

1777
    def _deliver(self, signum: Signals) -> None:
10✔
1778
        self._signal_queue.append(signum)
8✔
1779
        if not self._future.done():
8✔
1780
            self._future.set_result(None)
8✔
1781

1782
    def __enter__(self) -> _SignalReceiver:
10✔
1783
        for sig in set(self._signals):
8✔
1784
            self._loop.add_signal_handler(sig, self._deliver, sig)
8✔
1785
            self._handled_signals.add(sig)
8✔
1786

1787
        return self
8✔
1788

1789
    def __exit__(
10✔
1790
        self,
1791
        exc_type: type[BaseException] | None,
1792
        exc_val: BaseException | None,
1793
        exc_tb: TracebackType | None,
1794
    ) -> bool | None:
1795
        for sig in self._handled_signals:
8✔
1796
            self._loop.remove_signal_handler(sig)
8✔
1797
        return None
8✔
1798

1799
    def __aiter__(self) -> _SignalReceiver:
10✔
1800
        return self
8✔
1801

1802
    async def __anext__(self) -> Signals:
10✔
1803
        await AsyncIOBackend.checkpoint()
8✔
1804
        if not self._signal_queue:
8✔
1805
            self._future = asyncio.Future()
×
1806
            await self._future
×
1807

1808
        return self._signal_queue.popleft()
8✔
1809

1810

1811
#
1812
# Testing and debugging
1813
#
1814

1815

1816
def _create_task_info(task: asyncio.Task) -> TaskInfo:
10✔
1817
    task_state = _task_states.get(task)
10✔
1818
    if task_state is None:
10✔
1819
        parent_id = None
10✔
1820
    else:
1821
        parent_id = task_state.parent_id
10✔
1822

1823
    return TaskInfo(id(task), parent_id, task.get_name(), task.get_coro())
10✔
1824

1825

1826
class TestRunner(abc.TestRunner):
10✔
1827
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
10✔
1828

1829
    def __init__(
10✔
1830
        self,
1831
        *,
1832
        debug: bool | None = None,
1833
        use_uvloop: bool = False,
1834
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1835
    ) -> None:
1836
        if use_uvloop and loop_factory is None:
10✔
1837
            import uvloop
×
1838

1839
            loop_factory = uvloop.new_event_loop
×
1840

1841
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
10✔
1842
        self._exceptions: list[BaseException] = []
10✔
1843
        self._runner_task: asyncio.Task | None = None
10✔
1844

1845
    def __enter__(self) -> TestRunner:
10✔
1846
        self._runner.__enter__()
10✔
1847
        self.get_loop().set_exception_handler(self._exception_handler)
10✔
1848
        return self
10✔
1849

1850
    def __exit__(
10✔
1851
        self,
1852
        exc_type: type[BaseException] | None,
1853
        exc_val: BaseException | None,
1854
        exc_tb: TracebackType | None,
1855
    ) -> None:
1856
        self._runner.__exit__(exc_type, exc_val, exc_tb)
10✔
1857

1858
    def get_loop(self) -> AbstractEventLoop:
10✔
1859
        return self._runner.get_loop()
10✔
1860

1861
    def _exception_handler(
10✔
1862
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1863
    ) -> None:
1864
        if isinstance(context.get("exception"), Exception):
10✔
1865
            self._exceptions.append(context["exception"])
10✔
1866
        else:
1867
            loop.default_exception_handler(context)
10✔
1868

1869
    def _raise_async_exceptions(self) -> None:
10✔
1870
        # Re-raise any exceptions raised in asynchronous callbacks
1871
        if self._exceptions:
10✔
1872
            exceptions, self._exceptions = self._exceptions, []
10✔
1873
            if len(exceptions) == 1:
10✔
1874
                raise exceptions[0]
10✔
1875
            elif exceptions:
×
1876
                raise BaseExceptionGroup(
×
1877
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1878
                )
1879

1880
    @staticmethod
10✔
1881
    async def _run_tests_and_fixtures(
10✔
1882
        receive_stream: MemoryObjectReceiveStream[
1883
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1884
        ],
1885
    ) -> None:
1886
        with receive_stream:
10✔
1887
            async for coro, future in receive_stream:
10✔
1888
                try:
10✔
1889
                    retval = await coro
10✔
1890
                except BaseException as exc:
10✔
1891
                    if not future.cancelled():
10✔
1892
                        future.set_exception(exc)
10✔
1893
                else:
1894
                    if not future.cancelled():
10✔
1895
                        future.set_result(retval)
10✔
1896

1897
    async def _call_in_runner_task(
10✔
1898
        self,
1899
        func: Callable[P, Awaitable[T_Retval]],
1900
        *args: P.args,
1901
        **kwargs: P.kwargs,
1902
    ) -> T_Retval:
1903
        if not self._runner_task:
10✔
1904
            self._send_stream, receive_stream = create_memory_object_stream[
10✔
1905
                Tuple[Awaitable[Any], asyncio.Future]
1906
            ](1)
1907
            self._runner_task = self.get_loop().create_task(
10✔
1908
                self._run_tests_and_fixtures(receive_stream)
1909
            )
1910

1911
        coro = func(*args, **kwargs)
10✔
1912
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
10✔
1913
        self._send_stream.send_nowait((coro, future))
10✔
1914
        return await future
10✔
1915

1916
    def run_asyncgen_fixture(
10✔
1917
        self,
1918
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1919
        kwargs: dict[str, Any],
1920
    ) -> Iterable[T_Retval]:
1921
        asyncgen = fixture_func(**kwargs)
10✔
1922
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
10✔
1923
            self._call_in_runner_task(asyncgen.asend, None)
1924
        )
1925
        self._raise_async_exceptions()
10✔
1926

1927
        yield fixturevalue
10✔
1928

1929
        try:
10✔
1930
            self.get_loop().run_until_complete(
10✔
1931
                self._call_in_runner_task(asyncgen.asend, None)
1932
            )
1933
        except StopAsyncIteration:
10✔
1934
            self._raise_async_exceptions()
10✔
1935
        else:
1936
            self.get_loop().run_until_complete(asyncgen.aclose())
×
1937
            raise RuntimeError("Async generator fixture did not stop")
×
1938

1939
    def run_fixture(
10✔
1940
        self,
1941
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1942
        kwargs: dict[str, Any],
1943
    ) -> T_Retval:
1944
        retval = self.get_loop().run_until_complete(
10✔
1945
            self._call_in_runner_task(fixture_func, **kwargs)
1946
        )
1947
        self._raise_async_exceptions()
10✔
1948
        return retval
10✔
1949

1950
    def run_test(
10✔
1951
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
1952
    ) -> None:
1953
        try:
10✔
1954
            self.get_loop().run_until_complete(
10✔
1955
                self._call_in_runner_task(test_func, **kwargs)
1956
            )
1957
        except Exception as exc:
10✔
1958
            self._exceptions.append(exc)
9✔
1959

1960
        self._raise_async_exceptions()
10✔
1961

1962

1963
class AsyncIOBackend(AsyncBackend):
10✔
1964
    @classmethod
10✔
1965
    def run(
10✔
1966
        cls,
1967
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
1968
        args: tuple[Unpack[PosArgsT]],
1969
        kwargs: dict[str, Any],
1970
        options: dict[str, Any],
1971
    ) -> T_Retval:
1972
        @wraps(func)
10✔
1973
        async def wrapper() -> T_Retval:
10✔
1974
            task = cast(asyncio.Task, current_task())
10✔
1975
            task.set_name(get_callable_name(func))
10✔
1976
            _task_states[task] = TaskState(None, None)
10✔
1977

1978
            try:
10✔
1979
                return await func(*args)
10✔
1980
            finally:
1981
                del _task_states[task]
10✔
1982

1983
        debug = options.get("debug", False)
10✔
1984
        loop_factory = options.get("loop_factory", None)
10✔
1985
        if loop_factory is None and options.get("use_uvloop", False):
10✔
1986
            import uvloop
7✔
1987

1988
            loop_factory = uvloop.new_event_loop
7✔
1989

1990
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
10✔
1991
            return runner.run(wrapper())
10✔
1992

1993
    @classmethod
10✔
1994
    def current_token(cls) -> object:
10✔
1995
        return get_running_loop()
10✔
1996

1997
    @classmethod
10✔
1998
    def current_time(cls) -> float:
10✔
1999
        return get_running_loop().time()
10✔
2000

2001
    @classmethod
10✔
2002
    def cancelled_exception_class(cls) -> type[BaseException]:
10✔
2003
        return CancelledError
10✔
2004

2005
    @classmethod
10✔
2006
    async def checkpoint(cls) -> None:
10✔
2007
        await sleep(0)
10✔
2008

2009
    @classmethod
10✔
2010
    async def checkpoint_if_cancelled(cls) -> None:
10✔
2011
        task = current_task()
10✔
2012
        if task is None:
10✔
2013
            return
×
2014

2015
        try:
10✔
2016
            cancel_scope = _task_states[task].cancel_scope
10✔
2017
        except KeyError:
10✔
2018
            return
10✔
2019

2020
        while cancel_scope:
10✔
2021
            if cancel_scope.cancel_called:
10✔
2022
                await sleep(0)
10✔
2023
            elif cancel_scope.shield:
10✔
2024
                break
9✔
2025
            else:
2026
                cancel_scope = cancel_scope._parent_scope
10✔
2027

2028
    @classmethod
10✔
2029
    async def cancel_shielded_checkpoint(cls) -> None:
10✔
2030
        with CancelScope(shield=True):
10✔
2031
            await sleep(0)
10✔
2032

2033
    @classmethod
10✔
2034
    async def sleep(cls, delay: float) -> None:
10✔
2035
        await sleep(delay)
10✔
2036

2037
    @classmethod
10✔
2038
    def create_cancel_scope(
10✔
2039
        cls, *, deadline: float = math.inf, shield: bool = False
2040
    ) -> CancelScope:
2041
        return CancelScope(deadline=deadline, shield=shield)
10✔
2042

2043
    @classmethod
10✔
2044
    def current_effective_deadline(cls) -> float:
10✔
2045
        try:
9✔
2046
            cancel_scope = _task_states[
9✔
2047
                current_task()  # type: ignore[index]
2048
            ].cancel_scope
2049
        except KeyError:
×
2050
            return math.inf
×
2051

2052
        deadline = math.inf
9✔
2053
        while cancel_scope:
9✔
2054
            deadline = min(deadline, cancel_scope.deadline)
9✔
2055
            if cancel_scope._cancel_called:
9✔
2056
                deadline = -math.inf
9✔
2057
                break
9✔
2058
            elif cancel_scope.shield:
9✔
2059
                break
9✔
2060
            else:
2061
                cancel_scope = cancel_scope._parent_scope
9✔
2062

2063
        return deadline
9✔
2064

2065
    @classmethod
10✔
2066
    def create_task_group(cls) -> abc.TaskGroup:
10✔
2067
        return TaskGroup()
10✔
2068

2069
    @classmethod
10✔
2070
    def create_event(cls) -> abc.Event:
10✔
2071
        return Event()
10✔
2072

2073
    @classmethod
10✔
2074
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
10✔
2075
        return CapacityLimiter(total_tokens)
9✔
2076

2077
    @classmethod
10✔
2078
    async def run_sync_in_worker_thread(
10✔
2079
        cls,
2080
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2081
        args: tuple[Unpack[PosArgsT]],
2082
        abandon_on_cancel: bool = False,
2083
        limiter: abc.CapacityLimiter | None = None,
2084
    ) -> T_Retval:
2085
        await cls.checkpoint()
10✔
2086

2087
        # If this is the first run in this event loop thread, set up the necessary
2088
        # variables
2089
        try:
10✔
2090
            idle_workers = _threadpool_idle_workers.get()
10✔
2091
            workers = _threadpool_workers.get()
10✔
2092
        except LookupError:
10✔
2093
            idle_workers = deque()
10✔
2094
            workers = set()
10✔
2095
            _threadpool_idle_workers.set(idle_workers)
10✔
2096
            _threadpool_workers.set(workers)
10✔
2097

2098
        async with limiter or cls.current_default_thread_limiter():
10✔
2099
            with CancelScope(shield=not abandon_on_cancel) as scope:
10✔
2100
                future: asyncio.Future = asyncio.Future()
10✔
2101
                root_task = find_root_task()
10✔
2102
                if not idle_workers:
10✔
2103
                    worker = WorkerThread(root_task, workers, idle_workers)
10✔
2104
                    worker.start()
10✔
2105
                    workers.add(worker)
10✔
2106
                    root_task.add_done_callback(worker.stop)
10✔
2107
                else:
2108
                    worker = idle_workers.pop()
10✔
2109

2110
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2111
                    # seconds or longer
2112
                    now = cls.current_time()
10✔
2113
                    while idle_workers:
10✔
2114
                        if (
9✔
2115
                            now - idle_workers[0].idle_since
2116
                            < WorkerThread.MAX_IDLE_TIME
2117
                        ):
2118
                            break
9✔
2119

2120
                        expired_worker = idle_workers.popleft()
×
2121
                        expired_worker.root_task.remove_done_callback(
×
2122
                            expired_worker.stop
2123
                        )
2124
                        expired_worker.stop()
×
2125

2126
                context = copy_context()
10✔
2127
                context.run(sniffio.current_async_library_cvar.set, None)
10✔
2128
                if abandon_on_cancel or scope._parent_scope is None:
10✔
2129
                    worker_scope = scope
10✔
2130
                else:
2131
                    worker_scope = scope._parent_scope
10✔
2132

2133
                worker.queue.put_nowait((context, func, args, future, worker_scope))
10✔
2134
                return await future
10✔
2135

2136
    @classmethod
10✔
2137
    def check_cancelled(cls) -> None:
10✔
2138
        scope: CancelScope | None = threadlocals.current_cancel_scope
10✔
2139
        while scope is not None:
10✔
2140
            if scope.cancel_called:
10✔
2141
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
10✔
2142

2143
            if scope.shield:
10✔
2144
                return
×
2145

2146
            scope = scope._parent_scope
10✔
2147

2148
    @classmethod
10✔
2149
    def run_async_from_thread(
10✔
2150
        cls,
2151
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2152
        args: tuple[Unpack[PosArgsT]],
2153
        token: object,
2154
    ) -> T_Retval:
2155
        async def task_wrapper(scope: CancelScope) -> T_Retval:
10✔
2156
            __tracebackhide__ = True
10✔
2157
            task = cast(asyncio.Task, current_task())
10✔
2158
            _task_states[task] = TaskState(None, scope)
10✔
2159
            scope._tasks.add(task)
10✔
2160
            try:
10✔
2161
                return await func(*args)
10✔
2162
            except CancelledError as exc:
10✔
2163
                raise concurrent.futures.CancelledError(str(exc)) from None
10✔
2164
            finally:
2165
                scope._tasks.discard(task)
10✔
2166

2167
        loop = cast(AbstractEventLoop, token)
10✔
2168
        context = copy_context()
10✔
2169
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
10✔
2170
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
10✔
2171
        f: concurrent.futures.Future[T_Retval] = context.run(
10✔
2172
            asyncio.run_coroutine_threadsafe, wrapper, loop
2173
        )
2174
        return f.result()
10✔
2175

2176
    @classmethod
10✔
2177
    def run_sync_from_thread(
10✔
2178
        cls,
2179
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2180
        args: tuple[Unpack[PosArgsT]],
2181
        token: object,
2182
    ) -> T_Retval:
2183
        @wraps(func)
10✔
2184
        def wrapper() -> None:
10✔
2185
            try:
10✔
2186
                sniffio.current_async_library_cvar.set("asyncio")
10✔
2187
                f.set_result(func(*args))
10✔
2188
            except BaseException as exc:
10✔
2189
                f.set_exception(exc)
10✔
2190
                if not isinstance(exc, Exception):
10✔
2191
                    raise
×
2192

2193
        f: concurrent.futures.Future[T_Retval] = Future()
10✔
2194
        loop = cast(AbstractEventLoop, token)
10✔
2195
        loop.call_soon_threadsafe(wrapper)
10✔
2196
        return f.result()
10✔
2197

2198
    @classmethod
10✔
2199
    def create_blocking_portal(cls) -> abc.BlockingPortal:
10✔
2200
        return BlockingPortal()
10✔
2201

2202
    @classmethod
10✔
2203
    async def open_process(
10✔
2204
        cls,
2205
        command: str | bytes | Sequence[str | bytes],
2206
        *,
2207
        shell: bool,
2208
        stdin: int | IO[Any] | None,
2209
        stdout: int | IO[Any] | None,
2210
        stderr: int | IO[Any] | None,
2211
        cwd: str | bytes | PathLike | None = None,
2212
        env: Mapping[str, str] | None = None,
2213
        start_new_session: bool = False,
2214
    ) -> Process:
2215
        await cls.checkpoint()
9✔
2216
        if shell:
9✔
2217
            process = await asyncio.create_subprocess_shell(
9✔
2218
                cast("str | bytes", command),
2219
                stdin=stdin,
2220
                stdout=stdout,
2221
                stderr=stderr,
2222
                cwd=cwd,
2223
                env=env,
2224
                start_new_session=start_new_session,
2225
            )
2226
        else:
2227
            process = await asyncio.create_subprocess_exec(
9✔
2228
                *command,
2229
                stdin=stdin,
2230
                stdout=stdout,
2231
                stderr=stderr,
2232
                cwd=cwd,
2233
                env=env,
2234
                start_new_session=start_new_session,
2235
            )
2236

2237
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
9✔
2238
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
9✔
2239
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
9✔
2240
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
9✔
2241

2242
    @classmethod
10✔
2243
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
10✔
2244
        create_task(
9✔
2245
            _shutdown_process_pool_on_exit(workers),
2246
            name="AnyIO process pool shutdown task",
2247
        )
2248
        find_root_task().add_done_callback(
9✔
2249
            partial(_forcibly_shutdown_process_pool_on_exit, workers)
2250
        )
2251

2252
    @classmethod
10✔
2253
    async def connect_tcp(
10✔
2254
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2255
    ) -> abc.SocketStream:
2256
        transport, protocol = cast(
10✔
2257
            Tuple[asyncio.Transport, StreamProtocol],
2258
            await get_running_loop().create_connection(
2259
                StreamProtocol, host, port, local_addr=local_address
2260
            ),
2261
        )
2262
        transport.pause_reading()
10✔
2263
        return SocketStream(transport, protocol)
10✔
2264

2265
    @classmethod
10✔
2266
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
10✔
2267
        await cls.checkpoint()
7✔
2268
        loop = get_running_loop()
7✔
2269
        raw_socket = socket.socket(socket.AF_UNIX)
7✔
2270
        raw_socket.setblocking(False)
7✔
2271
        while True:
4✔
2272
            try:
7✔
2273
                raw_socket.connect(path)
7✔
2274
            except BlockingIOError:
7✔
2275
                f: asyncio.Future = asyncio.Future()
×
2276
                loop.add_writer(raw_socket, f.set_result, None)
×
2277
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2278
                await f
×
2279
            except BaseException:
7✔
2280
                raw_socket.close()
7✔
2281
                raise
7✔
2282
            else:
2283
                return UNIXSocketStream(raw_socket)
7✔
2284

2285
    @classmethod
10✔
2286
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2287
        return TCPSocketListener(sock)
10✔
2288

2289
    @classmethod
10✔
2290
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2291
        return UNIXSocketListener(sock)
7✔
2292

2293
    @classmethod
10✔
2294
    async def create_udp_socket(
10✔
2295
        cls,
2296
        family: AddressFamily,
2297
        local_address: IPSockAddrType | None,
2298
        remote_address: IPSockAddrType | None,
2299
        reuse_port: bool,
2300
    ) -> UDPSocket | ConnectedUDPSocket:
2301
        transport, protocol = await get_running_loop().create_datagram_endpoint(
9✔
2302
            DatagramProtocol,
2303
            local_addr=local_address,
2304
            remote_addr=remote_address,
2305
            family=family,
2306
            reuse_port=reuse_port,
2307
        )
2308
        if protocol.exception:
9✔
2309
            transport.close()
×
2310
            raise protocol.exception
×
2311

2312
        if not remote_address:
9✔
2313
            return UDPSocket(transport, protocol)
9✔
2314
        else:
2315
            return ConnectedUDPSocket(transport, protocol)
9✔
2316

2317
    @classmethod
10✔
2318
    async def create_unix_datagram_socket(  # type: ignore[override]
10✔
2319
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2320
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2321
        await cls.checkpoint()
7✔
2322
        loop = get_running_loop()
7✔
2323

2324
        if remote_path:
7✔
2325
            while True:
4✔
2326
                try:
7✔
2327
                    raw_socket.connect(remote_path)
7✔
2328
                except BlockingIOError:
×
2329
                    f: asyncio.Future = asyncio.Future()
×
2330
                    loop.add_writer(raw_socket, f.set_result, None)
×
2331
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2332
                    await f
×
2333
                except BaseException:
×
2334
                    raw_socket.close()
×
2335
                    raise
×
2336
                else:
2337
                    return ConnectedUNIXDatagramSocket(raw_socket)
7✔
2338
        else:
2339
            return UNIXDatagramSocket(raw_socket)
7✔
2340

2341
    @classmethod
10✔
2342
    async def getaddrinfo(
10✔
2343
        cls,
2344
        host: bytes | str | None,
2345
        port: str | int | None,
2346
        *,
2347
        family: int | AddressFamily = 0,
2348
        type: int | SocketKind = 0,
2349
        proto: int = 0,
2350
        flags: int = 0,
2351
    ) -> list[
2352
        tuple[
2353
            AddressFamily,
2354
            SocketKind,
2355
            int,
2356
            str,
2357
            tuple[str, int] | tuple[str, int, int, int],
2358
        ]
2359
    ]:
2360
        return await get_running_loop().getaddrinfo(
10✔
2361
            host, port, family=family, type=type, proto=proto, flags=flags
2362
        )
2363

2364
    @classmethod
10✔
2365
    async def getnameinfo(
10✔
2366
        cls, sockaddr: IPSockAddrType, flags: int = 0
2367
    ) -> tuple[str, str]:
2368
        return await get_running_loop().getnameinfo(sockaddr, flags)
9✔
2369

2370
    @classmethod
10✔
2371
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
10✔
2372
        await cls.checkpoint()
×
2373
        try:
×
2374
            read_events = _read_events.get()
×
2375
        except LookupError:
×
2376
            read_events = {}
×
2377
            _read_events.set(read_events)
×
2378

2379
        if read_events.get(sock):
×
2380
            raise BusyResourceError("reading from") from None
×
2381

2382
        loop = get_running_loop()
×
2383
        event = read_events[sock] = asyncio.Event()
×
2384
        loop.add_reader(sock, event.set)
×
2385
        try:
×
2386
            await event.wait()
×
2387
        finally:
2388
            if read_events.pop(sock, None) is not None:
×
2389
                loop.remove_reader(sock)
×
2390
                readable = True
×
2391
            else:
2392
                readable = False
×
2393

2394
        if not readable:
×
2395
            raise ClosedResourceError
×
2396

2397
    @classmethod
10✔
2398
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
10✔
2399
        await cls.checkpoint()
×
2400
        try:
×
2401
            write_events = _write_events.get()
×
2402
        except LookupError:
×
2403
            write_events = {}
×
2404
            _write_events.set(write_events)
×
2405

2406
        if write_events.get(sock):
×
2407
            raise BusyResourceError("writing to") from None
×
2408

2409
        loop = get_running_loop()
×
2410
        event = write_events[sock] = asyncio.Event()
×
2411
        loop.add_writer(sock.fileno(), event.set)
×
2412
        try:
×
2413
            await event.wait()
×
2414
        finally:
2415
            if write_events.pop(sock, None) is not None:
×
2416
                loop.remove_writer(sock)
×
2417
                writable = True
×
2418
            else:
2419
                writable = False
×
2420

2421
        if not writable:
×
2422
            raise ClosedResourceError
×
2423

2424
    @classmethod
10✔
2425
    def current_default_thread_limiter(cls) -> CapacityLimiter:
10✔
2426
        try:
10✔
2427
            return _default_thread_limiter.get()
10✔
2428
        except LookupError:
10✔
2429
            limiter = CapacityLimiter(40)
10✔
2430
            _default_thread_limiter.set(limiter)
10✔
2431
            return limiter
10✔
2432

2433
    @classmethod
10✔
2434
    def open_signal_receiver(
10✔
2435
        cls, *signals: Signals
2436
    ) -> ContextManager[AsyncIterator[Signals]]:
2437
        return _SignalReceiver(signals)
8✔
2438

2439
    @classmethod
10✔
2440
    def get_current_task(cls) -> TaskInfo:
10✔
2441
        return _create_task_info(current_task())  # type: ignore[arg-type]
10✔
2442

2443
    @classmethod
10✔
2444
    def get_running_tasks(cls) -> list[TaskInfo]:
10✔
2445
        return [_create_task_info(task) for task in all_tasks() if not task.done()]
10✔
2446

2447
    @classmethod
10✔
2448
    async def wait_all_tasks_blocked(cls) -> None:
10✔
2449
        await cls.checkpoint()
10✔
2450
        this_task = current_task()
10✔
2451
        while True:
6✔
2452
            for task in all_tasks():
10✔
2453
                if task is this_task:
10✔
2454
                    continue
10✔
2455

2456
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
2457
                if waiter is None or waiter.done():
10✔
2458
                    await sleep(0.1)
10✔
2459
                    break
10✔
2460
            else:
2461
                return
10✔
2462

2463
    @classmethod
10✔
2464
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
10✔
2465
        return TestRunner(**options)
10✔
2466

2467

2468
backend_class = AsyncIOBackend
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc