• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653622634

01 Sep 2024 11:31AM UTC coverage: 91.768% (-0.02%) from 91.788%
10653622634

Pull #777

github

web-flow
Merge 8dfdb99e9 into 8a5b34626
Pull Request #777: Convert bytearray to bytes in `StreamProtocol.receive()`

1 of 1 new or added line in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

4593 of 5005 relevant lines covered (91.77%)

9.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.23
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
11✔
2

3
import array
11✔
4
import asyncio
11✔
5
import concurrent.futures
11✔
6
import math
11✔
7
import socket
11✔
8
import sys
11✔
9
import threading
11✔
10
import weakref
11✔
11
from asyncio import (
11✔
12
    AbstractEventLoop,
13
    CancelledError,
14
    all_tasks,
15
    create_task,
16
    current_task,
17
    get_running_loop,
18
    sleep,
19
)
20
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
11✔
21
from collections import OrderedDict, deque
11✔
22
from collections.abc import AsyncIterator, Iterable
11✔
23
from concurrent.futures import Future
11✔
24
from contextlib import suppress
11✔
25
from contextvars import Context, copy_context
11✔
26
from dataclasses import dataclass
11✔
27
from functools import partial, wraps
11✔
28
from inspect import (
11✔
29
    CORO_RUNNING,
30
    CORO_SUSPENDED,
31
    getcoroutinestate,
32
    iscoroutine,
33
)
34
from io import IOBase
11✔
35
from os import PathLike
11✔
36
from queue import Queue
11✔
37
from signal import Signals
11✔
38
from socket import AddressFamily, SocketKind
11✔
39
from threading import Thread
11✔
40
from types import TracebackType
11✔
41
from typing import (
11✔
42
    IO,
43
    Any,
44
    AsyncGenerator,
45
    Awaitable,
46
    Callable,
47
    Collection,
48
    ContextManager,
49
    Coroutine,
50
    Mapping,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
11✔
58

59
import sniffio
11✔
60

61
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
11✔
62
from .._core._eventloop import claim_worker_thread, threadlocals
11✔
63
from .._core._exceptions import (
11✔
64
    BrokenResourceError,
65
    BusyResourceError,
66
    ClosedResourceError,
67
    EndOfStream,
68
    WouldBlock,
69
    iterate_exceptions,
70
)
71
from .._core._sockets import convert_ipv6_sockaddr
11✔
72
from .._core._streams import create_memory_object_stream
11✔
73
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
11✔
74
from .._core._synchronization import Event as BaseEvent
11✔
75
from .._core._synchronization import ResourceGuard
11✔
76
from .._core._tasks import CancelScope as BaseCancelScope
11✔
77
from ..abc import (
11✔
78
    AsyncBackend,
79
    IPSockAddrType,
80
    SocketListener,
81
    UDPPacketType,
82
    UNIXDatagramPacketType,
83
)
84
from ..lowlevel import RunVar
11✔
85
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
11✔
86

87
if sys.version_info >= (3, 10):
11✔
88
    from typing import ParamSpec
7✔
89
else:
90
    from typing_extensions import ParamSpec
4✔
91

92
if sys.version_info >= (3, 11):
11✔
93
    from asyncio import Runner
5✔
94
    from typing import TypeVarTuple, Unpack
5✔
95
else:
96
    import contextvars
6✔
97
    import enum
6✔
98
    import signal
6✔
99
    from asyncio import coroutines, events, exceptions, tasks
6✔
100

101
    from exceptiongroup import BaseExceptionGroup
6✔
102
    from typing_extensions import TypeVarTuple, Unpack
6✔
103

104
    class _State(enum.Enum):
6✔
105
        CREATED = "created"
6✔
106
        INITIALIZED = "initialized"
6✔
107
        CLOSED = "closed"
6✔
108

109
    class Runner:
6✔
110
        # Copied from CPython 3.11
111
        def __init__(
6✔
112
            self,
113
            *,
114
            debug: bool | None = None,
115
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
116
        ):
117
            self._state = _State.CREATED
6✔
118
            self._debug = debug
6✔
119
            self._loop_factory = loop_factory
6✔
120
            self._loop: AbstractEventLoop | None = None
6✔
121
            self._context = None
6✔
122
            self._interrupt_count = 0
6✔
123
            self._set_event_loop = False
6✔
124

125
        def __enter__(self) -> Runner:
6✔
126
            self._lazy_init()
6✔
127
            return self
6✔
128

129
        def __exit__(
6✔
130
            self,
131
            exc_type: type[BaseException],
132
            exc_val: BaseException,
133
            exc_tb: TracebackType,
134
        ) -> None:
135
            self.close()
6✔
136

137
        def close(self) -> None:
6✔
138
            """Shutdown and close event loop."""
139
            if self._state is not _State.INITIALIZED:
6✔
140
                return
×
141
            try:
6✔
142
                loop = self._loop
6✔
143
                _cancel_all_tasks(loop)
6✔
144
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
145
                if hasattr(loop, "shutdown_default_executor"):
6✔
146
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
147
                else:
148
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
149
            finally:
150
                if self._set_event_loop:
6✔
151
                    events.set_event_loop(None)
6✔
152
                loop.close()
6✔
153
                self._loop = None
6✔
154
                self._state = _State.CLOSED
6✔
155

156
        def get_loop(self) -> AbstractEventLoop:
6✔
157
            """Return embedded event loop."""
158
            self._lazy_init()
6✔
159
            return self._loop
6✔
160

161
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
162
            """Run a coroutine inside the embedded event loop."""
163
            if not coroutines.iscoroutine(coro):
6✔
164
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
165

166
            if events._get_running_loop() is not None:
6✔
167
                # fail fast with short traceback
168
                raise RuntimeError(
×
169
                    "Runner.run() cannot be called from a running event loop"
170
                )
171

172
            self._lazy_init()
6✔
173

174
            if context is None:
6✔
175
                context = self._context
6✔
176
            task = context.run(self._loop.create_task, coro)
6✔
177

178
            if (
6✔
179
                threading.current_thread() is threading.main_thread()
180
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
181
            ):
182
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
183
                try:
6✔
184
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
185
                except ValueError:
×
186
                    # `signal.signal` may throw if `threading.main_thread` does
187
                    # not support signals (e.g. embedded interpreter with signals
188
                    # not registered - see gh-91880)
189
                    sigint_handler = None
×
190
            else:
191
                sigint_handler = None
6✔
192

193
            self._interrupt_count = 0
6✔
194
            try:
6✔
195
                return self._loop.run_until_complete(task)
6✔
196
            except exceptions.CancelledError:
6✔
197
                if self._interrupt_count > 0:
×
198
                    uncancel = getattr(task, "uncancel", None)
×
199
                    if uncancel is not None and uncancel() == 0:
×
200
                        raise KeyboardInterrupt()
1✔
201
                raise  # CancelledError
×
202
            finally:
203
                if (
6✔
204
                    sigint_handler is not None
205
                    and signal.getsignal(signal.SIGINT) is sigint_handler
206
                ):
207
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
208

209
        def _lazy_init(self) -> None:
6✔
210
            if self._state is _State.CLOSED:
6✔
211
                raise RuntimeError("Runner is closed")
×
212
            if self._state is _State.INITIALIZED:
6✔
213
                return
6✔
214
            if self._loop_factory is None:
6✔
215
                self._loop = events.new_event_loop()
6✔
216
                if not self._set_event_loop:
6✔
217
                    # Call set_event_loop only once to avoid calling
218
                    # attach_loop multiple times on child watchers
219
                    events.set_event_loop(self._loop)
6✔
220
                    self._set_event_loop = True
6✔
221
            else:
222
                self._loop = self._loop_factory()
4✔
223
            if self._debug is not None:
6✔
224
                self._loop.set_debug(self._debug)
6✔
225
            self._context = contextvars.copy_context()
6✔
226
            self._state = _State.INITIALIZED
6✔
227

228
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
229
            self._interrupt_count += 1
×
230
            if self._interrupt_count == 1 and not main_task.done():
×
231
                main_task.cancel()
×
232
                # wakeup loop if it is blocked by select() with long timeout
233
                self._loop.call_soon_threadsafe(lambda: None)
×
234
                return
×
235
            raise KeyboardInterrupt()
×
236

237
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
238
        to_cancel = tasks.all_tasks(loop)
6✔
239
        if not to_cancel:
6✔
240
            return
7✔
241

242
        for task in to_cancel:
6✔
243
            task.cancel()
6✔
244

245
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
246

247
        for task in to_cancel:
6✔
248
            if task.cancelled():
6✔
249
                continue
6✔
250
            if task.exception() is not None:
5✔
251
                loop.call_exception_handler(
×
252
                    {
253
                        "message": "unhandled exception during asyncio.run() shutdown",
254
                        "exception": task.exception(),
255
                        "task": task,
256
                    }
257
                )
258

259
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
260
        """Schedule the shutdown of the default executor."""
261

262
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
263
            try:
3✔
264
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
265
                loop.call_soon_threadsafe(future.set_result, None)
3✔
266
            except Exception as ex:
×
267
                loop.call_soon_threadsafe(future.set_exception, ex)
×
268

269
        loop._executor_shutdown_called = True
3✔
270
        if loop._default_executor is None:
3✔
271
            return
3✔
272
        future = loop.create_future()
3✔
273
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
274
        thread.start()
3✔
275
        try:
3✔
276
            await future
3✔
277
        finally:
278
            thread.join()
3✔
279

280

281
T_Retval = TypeVar("T_Retval")
11✔
282
T_contra = TypeVar("T_contra", contravariant=True)
11✔
283
PosArgsT = TypeVarTuple("PosArgsT")
11✔
284
P = ParamSpec("P")
11✔
285

286
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
11✔
287

288

289
def find_root_task() -> asyncio.Task:
11✔
290
    root_task = _root_task.get(None)
11✔
291
    if root_task is not None and not root_task.done():
11✔
292
        return root_task
11✔
293

294
    # Look for a task that has been started via run_until_complete()
295
    for task in all_tasks():
11✔
296
        if task._callbacks and not task.done():
11✔
297
            callbacks = [cb for cb, context in task._callbacks]
11✔
298
            for cb in callbacks:
11✔
299
                if (
11✔
300
                    cb is _run_until_complete_cb
301
                    or getattr(cb, "__module__", None) == "uvloop.loop"
302
                ):
303
                    _root_task.set(task)
11✔
304
                    return task
11✔
305

306
    # Look up the topmost task in the AnyIO task tree, if possible
307
    task = cast(asyncio.Task, current_task())
10✔
308
    state = _task_states.get(task)
10✔
309
    if state:
10✔
310
        cancel_scope = state.cancel_scope
10✔
311
        while cancel_scope and cancel_scope._parent_scope is not None:
10✔
312
            cancel_scope = cancel_scope._parent_scope
×
313

314
        if cancel_scope is not None:
10✔
315
            return cast(asyncio.Task, cancel_scope._host_task)
10✔
316

317
    return task
×
318

319

320
def get_callable_name(func: Callable) -> str:
11✔
321
    module = getattr(func, "__module__", None)
11✔
322
    qualname = getattr(func, "__qualname__", None)
11✔
323
    return ".".join([x for x in (module, qualname) if x])
11✔
324

325

326
#
327
# Event loop
328
#
329

330
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
11✔
331

332

333
def _task_started(task: asyncio.Task) -> bool:
11✔
334
    """Return ``True`` if the task has been started and has not finished."""
335
    try:
11✔
336
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
11✔
337
    except AttributeError:
×
338
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
339
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
340

341

342
#
343
# Timeouts and cancellation
344
#
345

346

347
class CancelScope(BaseCancelScope):
11✔
348
    def __new__(
11✔
349
        cls, *, deadline: float = math.inf, shield: bool = False
350
    ) -> CancelScope:
351
        return object.__new__(cls)
11✔
352

353
    def __init__(self, deadline: float = math.inf, shield: bool = False):
11✔
354
        self._deadline = deadline
11✔
355
        self._shield = shield
11✔
356
        self._parent_scope: CancelScope | None = None
11✔
357
        self._child_scopes: set[CancelScope] = set()
11✔
358
        self._cancel_called = False
11✔
359
        self._cancelled_caught = False
11✔
360
        self._active = False
11✔
361
        self._timeout_handle: asyncio.TimerHandle | None = None
11✔
362
        self._cancel_handle: asyncio.Handle | None = None
11✔
363
        self._tasks: set[asyncio.Task] = set()
11✔
364
        self._host_task: asyncio.Task | None = None
11✔
365
        self._cancel_calls: int = 0
11✔
366
        self._cancelling: int | None = None
11✔
367

368
    def __enter__(self) -> CancelScope:
11✔
369
        if self._active:
11✔
370
            raise RuntimeError(
×
371
                "Each CancelScope may only be used for a single 'with' block"
372
            )
373

374
        self._host_task = host_task = cast(asyncio.Task, current_task())
11✔
375
        self._tasks.add(host_task)
11✔
376
        try:
11✔
377
            task_state = _task_states[host_task]
11✔
378
        except KeyError:
11✔
379
            task_state = TaskState(None, self)
11✔
380
            _task_states[host_task] = task_state
11✔
381
        else:
382
            self._parent_scope = task_state.cancel_scope
11✔
383
            task_state.cancel_scope = self
11✔
384
            if self._parent_scope is not None:
11✔
385
                self._parent_scope._child_scopes.add(self)
11✔
386
                self._parent_scope._tasks.remove(host_task)
11✔
387

388
        self._timeout()
11✔
389
        self._active = True
11✔
390
        if sys.version_info >= (3, 11):
11✔
391
            self._cancelling = self._host_task.cancelling()
5✔
392

393
        # Start cancelling the host task if the scope was cancelled before entering
394
        if self._cancel_called:
11✔
395
            self._deliver_cancellation(self)
11✔
396

397
        return self
11✔
398

399
    def __exit__(
11✔
400
        self,
401
        exc_type: type[BaseException] | None,
402
        exc_val: BaseException | None,
403
        exc_tb: TracebackType | None,
404
    ) -> bool | None:
405
        if not self._active:
11✔
406
            raise RuntimeError("This cancel scope is not active")
10✔
407
        if current_task() is not self._host_task:
11✔
408
            raise RuntimeError(
10✔
409
                "Attempted to exit cancel scope in a different task than it was "
410
                "entered in"
411
            )
412

413
        assert self._host_task is not None
11✔
414
        host_task_state = _task_states.get(self._host_task)
11✔
415
        if host_task_state is None or host_task_state.cancel_scope is not self:
11✔
416
            raise RuntimeError(
10✔
417
                "Attempted to exit a cancel scope that isn't the current tasks's "
418
                "current cancel scope"
419
            )
420

421
        self._active = False
11✔
422
        if self._timeout_handle:
11✔
423
            self._timeout_handle.cancel()
11✔
424
            self._timeout_handle = None
11✔
425

426
        self._tasks.remove(self._host_task)
11✔
427
        if self._parent_scope is not None:
11✔
428
            self._parent_scope._child_scopes.remove(self)
11✔
429
            self._parent_scope._tasks.add(self._host_task)
11✔
430

431
        host_task_state.cancel_scope = self._parent_scope
11✔
432

433
        # Restart the cancellation effort in the closest directly cancelled parent
434
        # scope if this one was shielded
435
        self._restart_cancellation_in_parent()
11✔
436

437
        if self._cancel_called and exc_val is not None:
11✔
438
            for exc in iterate_exceptions(exc_val):
11✔
439
                if isinstance(exc, CancelledError):
11✔
440
                    self._cancelled_caught = self._uncancel(exc)
11✔
441
                    if self._cancelled_caught:
11✔
442
                        break
11✔
443

444
            return self._cancelled_caught
11✔
445

446
        return None
11✔
447

448
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
11✔
449
        if sys.version_info < (3, 9) or self._host_task is None:
11✔
450
            self._cancel_calls = 0
3✔
451
            return True
3✔
452

453
        # Undo all cancellations done by this scope
454
        if self._cancelling is not None:
8✔
455
            while self._cancel_calls:
5✔
456
                self._cancel_calls -= 1
5✔
457
                if self._host_task.uncancel() <= self._cancelling:
5✔
458
                    return True
5✔
459

460
        self._cancel_calls = 0
8✔
461
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
8✔
462

463
    def _timeout(self) -> None:
11✔
464
        if self._deadline != math.inf:
11✔
465
            loop = get_running_loop()
11✔
466
            if loop.time() >= self._deadline:
11✔
467
                self.cancel()
11✔
468
            else:
469
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
11✔
470

471
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
11✔
472
        """
473
        Deliver cancellation to directly contained tasks and nested cancel scopes.
474

475
        Schedule another run at the end if we still have tasks eligible for
476
        cancellation.
477

478
        :param origin: the cancel scope that originated the cancellation
479
        :return: ``True`` if the delivery needs to be retried on the next cycle
480

481
        """
482
        should_retry = False
11✔
483
        current = current_task()
11✔
484
        for task in self._tasks:
11✔
485
            if task._must_cancel:  # type: ignore[attr-defined]
11✔
486
                continue
10✔
487

488
            # The task is eligible for cancellation if it has started
489
            should_retry = True
11✔
490
            if task is not current and (task is self._host_task or _task_started(task)):
11✔
491
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
492
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
11✔
493
                    origin._cancel_calls += 1
11✔
494
                    if sys.version_info >= (3, 9):
11✔
495
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
8✔
496
                    else:
497
                        task.cancel()
3✔
498

499
        # Deliver cancellation to child scopes that aren't shielded or running their own
500
        # cancellation callbacks
501
        for scope in self._child_scopes:
11✔
502
            if not scope._shield and not scope.cancel_called:
11✔
503
                should_retry = scope._deliver_cancellation(origin) or should_retry
11✔
504

505
        # Schedule another callback if there are still tasks left
506
        if origin is self:
11✔
507
            if should_retry:
11✔
508
                self._cancel_handle = get_running_loop().call_soon(
11✔
509
                    self._deliver_cancellation, origin
510
                )
511
            else:
512
                self._cancel_handle = None
11✔
513

514
        return should_retry
11✔
515

516
    def _restart_cancellation_in_parent(self) -> None:
11✔
517
        """
518
        Restart the cancellation effort in the closest directly cancelled parent scope.
519

520
        """
521
        scope = self._parent_scope
11✔
522
        while scope is not None:
11✔
523
            if scope._cancel_called:
11✔
524
                if scope._cancel_handle is None:
11✔
525
                    scope._deliver_cancellation(scope)
11✔
526

527
                break
11✔
528

529
            # No point in looking beyond any shielded scope
530
            if scope._shield:
11✔
531
                break
10✔
532

533
            scope = scope._parent_scope
11✔
534

535
    def _parent_cancelled(self) -> bool:
11✔
536
        # Check whether any parent has been cancelled
537
        cancel_scope = self._parent_scope
11✔
538
        while cancel_scope is not None and not cancel_scope._shield:
11✔
539
            if cancel_scope._cancel_called:
11✔
540
                return True
11✔
541
            else:
542
                cancel_scope = cancel_scope._parent_scope
11✔
543

544
        return False
11✔
545

546
    def cancel(self) -> None:
11✔
547
        if not self._cancel_called:
11✔
548
            if self._timeout_handle:
11✔
549
                self._timeout_handle.cancel()
11✔
550
                self._timeout_handle = None
11✔
551

552
            self._cancel_called = True
11✔
553
            if self._host_task is not None:
11✔
554
                self._deliver_cancellation(self)
11✔
555

556
    @property
11✔
557
    def deadline(self) -> float:
11✔
558
        return self._deadline
10✔
559

560
    @deadline.setter
11✔
561
    def deadline(self, value: float) -> None:
11✔
562
        self._deadline = float(value)
10✔
563
        if self._timeout_handle is not None:
10✔
564
            self._timeout_handle.cancel()
10✔
565
            self._timeout_handle = None
10✔
566

567
        if self._active and not self._cancel_called:
10✔
568
            self._timeout()
10✔
569

570
    @property
11✔
571
    def cancel_called(self) -> bool:
11✔
572
        return self._cancel_called
11✔
573

574
    @property
11✔
575
    def cancelled_caught(self) -> bool:
11✔
576
        return self._cancelled_caught
11✔
577

578
    @property
11✔
579
    def shield(self) -> bool:
11✔
580
        return self._shield
11✔
581

582
    @shield.setter
11✔
583
    def shield(self, value: bool) -> None:
11✔
584
        if self._shield != value:
10✔
585
            self._shield = value
10✔
586
            if not value:
10✔
587
                self._restart_cancellation_in_parent()
10✔
588

589

590
#
591
# Task states
592
#
593

594

595
class TaskState:
11✔
596
    """
597
    Encapsulates auxiliary task information that cannot be added to the Task instance
598
    itself because there are no guarantees about its implementation.
599
    """
600

601
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
11✔
602

603
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
11✔
604
        self.parent_id = parent_id
11✔
605
        self.cancel_scope = cancel_scope
11✔
606

607

608
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
11✔
609

610

611
#
612
# Task groups
613
#
614

615

616
class _AsyncioTaskStatus(abc.TaskStatus):
11✔
617
    def __init__(self, future: asyncio.Future, parent_id: int):
11✔
618
        self._future = future
11✔
619
        self._parent_id = parent_id
11✔
620

621
    def started(self, value: T_contra | None = None) -> None:
11✔
622
        try:
11✔
623
            self._future.set_result(value)
11✔
624
        except asyncio.InvalidStateError:
10✔
625
            if not self._future.cancelled():
10✔
626
                raise RuntimeError(
10✔
627
                    "called 'started' twice on the same task status"
628
                ) from None
629

630
        task = cast(asyncio.Task, current_task())
11✔
631
        _task_states[task].parent_id = self._parent_id
11✔
632

633

634
class TaskGroup(abc.TaskGroup):
11✔
635
    def __init__(self) -> None:
11✔
636
        self.cancel_scope: CancelScope = CancelScope()
11✔
637
        self._active = False
11✔
638
        self._exceptions: list[BaseException] = []
11✔
639
        self._tasks: set[asyncio.Task] = set()
11✔
640

641
    async def __aenter__(self) -> TaskGroup:
11✔
642
        self.cancel_scope.__enter__()
11✔
643
        self._active = True
11✔
644
        return self
11✔
645

646
    async def __aexit__(
11✔
647
        self,
648
        exc_type: type[BaseException] | None,
649
        exc_val: BaseException | None,
650
        exc_tb: TracebackType | None,
651
    ) -> bool | None:
652
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
11✔
653
        if exc_val is not None:
11✔
654
            self.cancel_scope.cancel()
11✔
655
            if not isinstance(exc_val, CancelledError):
11✔
656
                self._exceptions.append(exc_val)
11✔
657

658
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
11✔
659
        while self._tasks:
11✔
660
            try:
11✔
661
                await asyncio.wait(self._tasks)
11✔
662
            except CancelledError as exc:
11✔
663
                # This task was cancelled natively; reraise the CancelledError later
664
                # unless this task was already interrupted by another exception
665
                self.cancel_scope.cancel()
11✔
666
                if cancelled_exc_while_waiting_tasks is None:
11✔
667
                    cancelled_exc_while_waiting_tasks = exc
11✔
668

669
        self._active = False
11✔
670
        if self._exceptions:
11✔
671
            raise BaseExceptionGroup(
11✔
672
                "unhandled errors in a TaskGroup", self._exceptions
673
            )
674

675
        # Raise the CancelledError received while waiting for child tasks to exit,
676
        # unless the context manager itself was previously exited with another
677
        # exception, or if any of the  child tasks raised an exception other than
678
        # CancelledError
679
        if cancelled_exc_while_waiting_tasks:
11✔
680
            if exc_val is None or ignore_exception:
11✔
681
                raise cancelled_exc_while_waiting_tasks
11✔
682

683
        return ignore_exception
11✔
684

685
    def _spawn(
11✔
686
        self,
687
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
688
        args: tuple[Unpack[PosArgsT]],
689
        name: object,
690
        task_status_future: asyncio.Future | None = None,
691
    ) -> asyncio.Task:
692
        def task_done(_task: asyncio.Task) -> None:
11✔
693
            task_state = _task_states[_task]
11✔
694
            assert task_state.cancel_scope is not None
11✔
695
            assert _task in task_state.cancel_scope._tasks
11✔
696
            task_state.cancel_scope._tasks.remove(_task)
11✔
697
            self._tasks.remove(task)
11✔
698
            del _task_states[_task]
11✔
699

700
            try:
11✔
701
                exc = _task.exception()
11✔
702
            except CancelledError as e:
11✔
703
                while isinstance(e.__context__, CancelledError):
11✔
704
                    e = e.__context__
3✔
705

706
                exc = e
11✔
707

708
            if exc is not None:
11✔
709
                # The future can only be in the cancelled state if the host task was
710
                # cancelled, so return immediately instead of adding one more
711
                # CancelledError to the exceptions list
712
                if task_status_future is not None and task_status_future.cancelled():
11✔
713
                    return
10✔
714

715
                if task_status_future is None or task_status_future.done():
11✔
716
                    if not isinstance(exc, CancelledError):
11✔
717
                        self._exceptions.append(exc)
11✔
718

719
                    if not self.cancel_scope._parent_cancelled():
11✔
720
                        self.cancel_scope.cancel()
11✔
721
                else:
722
                    task_status_future.set_exception(exc)
10✔
723
            elif task_status_future is not None and not task_status_future.done():
11✔
724
                task_status_future.set_exception(
10✔
725
                    RuntimeError("Child exited without calling task_status.started()")
726
                )
727

728
        if not self._active:
11✔
729
            raise RuntimeError(
10✔
730
                "This task group is not active; no new tasks can be started."
731
            )
732

733
        kwargs = {}
11✔
734
        if task_status_future:
11✔
735
            parent_id = id(current_task())
11✔
736
            kwargs["task_status"] = _AsyncioTaskStatus(
11✔
737
                task_status_future, id(self.cancel_scope._host_task)
738
            )
739
        else:
740
            parent_id = id(self.cancel_scope._host_task)
11✔
741

742
        coro = func(*args, **kwargs)
11✔
743
        if not iscoroutine(coro):
11✔
744
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
10✔
745
            raise TypeError(
10✔
746
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
747
                f"the return value ({coro!r}) is not a coroutine object"
748
            )
749

750
        name = get_callable_name(func) if name is None else str(name)
11✔
751
        task = create_task(coro, name=name)
11✔
752
        task.add_done_callback(task_done)
11✔
753

754
        # Make the spawned task inherit the task group's cancel scope
755
        _task_states[task] = TaskState(
11✔
756
            parent_id=parent_id, cancel_scope=self.cancel_scope
757
        )
758
        self.cancel_scope._tasks.add(task)
11✔
759
        self._tasks.add(task)
11✔
760
        return task
11✔
761

762
    def start_soon(
11✔
763
        self,
764
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
765
        *args: Unpack[PosArgsT],
766
        name: object = None,
767
    ) -> None:
768
        self._spawn(func, args, name)
11✔
769

770
    async def start(
11✔
771
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
772
    ) -> Any:
773
        future: asyncio.Future = asyncio.Future()
11✔
774
        task = self._spawn(func, args, name, future)
11✔
775

776
        # If the task raises an exception after sending a start value without a switch
777
        # point between, the task group is cancelled and this method never proceeds to
778
        # process the completed future. That's why we have to have a shielded cancel
779
        # scope here.
780
        try:
11✔
781
            return await future
11✔
782
        except CancelledError:
10✔
783
            # Cancel the task and wait for it to exit before returning
784
            task.cancel()
10✔
785
            with CancelScope(shield=True), suppress(CancelledError):
10✔
786
                await task
10✔
787

788
            raise
10✔
789

790

791
#
792
# Threads
793
#
794

795
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
11✔
796

797

798
class WorkerThread(Thread):
11✔
799
    MAX_IDLE_TIME = 10  # seconds
11✔
800

801
    def __init__(
11✔
802
        self,
803
        root_task: asyncio.Task,
804
        workers: set[WorkerThread],
805
        idle_workers: deque[WorkerThread],
806
    ):
807
        super().__init__(name="AnyIO worker thread")
11✔
808
        self.root_task = root_task
11✔
809
        self.workers = workers
11✔
810
        self.idle_workers = idle_workers
11✔
811
        self.loop = root_task._loop
11✔
812
        self.queue: Queue[
11✔
813
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
814
        ] = Queue(2)
815
        self.idle_since = AsyncIOBackend.current_time()
11✔
816
        self.stopping = False
11✔
817

818
    def _report_result(
11✔
819
        self, future: asyncio.Future, result: Any, exc: BaseException | None
820
    ) -> None:
821
        self.idle_since = AsyncIOBackend.current_time()
11✔
822
        if not self.stopping:
11✔
823
            self.idle_workers.append(self)
11✔
824

825
        if not future.cancelled():
11✔
826
            if exc is not None:
11✔
827
                if isinstance(exc, StopIteration):
11✔
828
                    new_exc = RuntimeError("coroutine raised StopIteration")
10✔
829
                    new_exc.__cause__ = exc
10✔
830
                    exc = new_exc
10✔
831

832
                future.set_exception(exc)
11✔
833
            else:
834
                future.set_result(result)
11✔
835

836
    def run(self) -> None:
11✔
837
        with claim_worker_thread(AsyncIOBackend, self.loop):
11✔
838
            while True:
7✔
839
                item = self.queue.get()
11✔
840
                if item is None:
11✔
841
                    # Shutdown command received
842
                    return
11✔
843

844
                context, func, args, future, cancel_scope = item
11✔
845
                if not future.cancelled():
11✔
846
                    result = None
11✔
847
                    exception: BaseException | None = None
11✔
848
                    threadlocals.current_cancel_scope = cancel_scope
11✔
849
                    try:
11✔
850
                        result = context.run(func, *args)
11✔
851
                    except BaseException as exc:
11✔
852
                        exception = exc
11✔
853
                    finally:
854
                        del threadlocals.current_cancel_scope
11✔
855

856
                    if not self.loop.is_closed():
11✔
857
                        self.loop.call_soon_threadsafe(
11✔
858
                            self._report_result, future, result, exception
859
                        )
860

861
                self.queue.task_done()
11✔
862

863
    def stop(self, f: asyncio.Task | None = None) -> None:
11✔
864
        self.stopping = True
11✔
865
        self.queue.put_nowait(None)
11✔
866
        self.workers.discard(self)
11✔
867
        try:
11✔
868
            self.idle_workers.remove(self)
11✔
869
        except ValueError:
10✔
870
            pass
10✔
871

872

873
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
11✔
874
    "_threadpool_idle_workers"
875
)
876
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
11✔
877

878

879
class BlockingPortal(abc.BlockingPortal):
11✔
880
    def __new__(cls) -> BlockingPortal:
11✔
881
        return object.__new__(cls)
11✔
882

883
    def __init__(self) -> None:
11✔
884
        super().__init__()
11✔
885
        self._loop = get_running_loop()
11✔
886

887
    def _spawn_task_from_thread(
11✔
888
        self,
889
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
890
        args: tuple[Unpack[PosArgsT]],
891
        kwargs: dict[str, Any],
892
        name: object,
893
        future: Future[T_Retval],
894
    ) -> None:
895
        AsyncIOBackend.run_sync_from_thread(
11✔
896
            partial(self._task_group.start_soon, name=name),
897
            (self._call_func, func, args, kwargs, future),
898
            self._loop,
899
        )
900

901

902
#
903
# Subprocesses
904
#
905

906

907
@dataclass(eq=False)
11✔
908
class StreamReaderWrapper(abc.ByteReceiveStream):
11✔
909
    _stream: asyncio.StreamReader
11✔
910

911
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
912
        data = await self._stream.read(max_bytes)
10✔
913
        if data:
10✔
914
            return data
10✔
915
        else:
916
            raise EndOfStream
10✔
917

918
    async def aclose(self) -> None:
11✔
919
        self._stream.feed_eof()
10✔
920
        await AsyncIOBackend.checkpoint()
10✔
921

922

923
@dataclass(eq=False)
11✔
924
class StreamWriterWrapper(abc.ByteSendStream):
11✔
925
    _stream: asyncio.StreamWriter
11✔
926

927
    async def send(self, item: bytes) -> None:
11✔
928
        self._stream.write(item)
10✔
929
        await self._stream.drain()
10✔
930

931
    async def aclose(self) -> None:
11✔
932
        self._stream.close()
10✔
933
        await AsyncIOBackend.checkpoint()
10✔
934

935

936
@dataclass(eq=False)
11✔
937
class Process(abc.Process):
11✔
938
    _process: asyncio.subprocess.Process
11✔
939
    _stdin: StreamWriterWrapper | None
11✔
940
    _stdout: StreamReaderWrapper | None
11✔
941
    _stderr: StreamReaderWrapper | None
11✔
942

943
    async def aclose(self) -> None:
11✔
944
        with CancelScope(shield=True):
10✔
945
            if self._stdin:
10✔
946
                await self._stdin.aclose()
10✔
947
            if self._stdout:
10✔
948
                await self._stdout.aclose()
10✔
949
            if self._stderr:
10✔
950
                await self._stderr.aclose()
10✔
951

952
        try:
10✔
953
            await self.wait()
10✔
954
        except BaseException:
10✔
955
            self.kill()
10✔
956
            with CancelScope(shield=True):
10✔
957
                await self.wait()
10✔
958

959
            raise
10✔
960

961
    async def wait(self) -> int:
11✔
962
        return await self._process.wait()
10✔
963

964
    def terminate(self) -> None:
11✔
965
        self._process.terminate()
8✔
966

967
    def kill(self) -> None:
11✔
968
        self._process.kill()
10✔
969

970
    def send_signal(self, signal: int) -> None:
11✔
971
        self._process.send_signal(signal)
×
972

973
    @property
11✔
974
    def pid(self) -> int:
11✔
975
        return self._process.pid
×
976

977
    @property
11✔
978
    def returncode(self) -> int | None:
11✔
979
        return self._process.returncode
10✔
980

981
    @property
11✔
982
    def stdin(self) -> abc.ByteSendStream | None:
11✔
983
        return self._stdin
10✔
984

985
    @property
11✔
986
    def stdout(self) -> abc.ByteReceiveStream | None:
11✔
987
        return self._stdout
10✔
988

989
    @property
11✔
990
    def stderr(self) -> abc.ByteReceiveStream | None:
11✔
991
        return self._stderr
10✔
992

993

994
def _forcibly_shutdown_process_pool_on_exit(
11✔
995
    workers: set[Process], _task: object
996
) -> None:
997
    """
998
    Forcibly shuts down worker processes belonging to this event loop."""
999
    child_watcher: asyncio.AbstractChildWatcher | None = None
10✔
1000
    if sys.version_info < (3, 12):
10✔
1001
        try:
6✔
1002
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1003
        except NotImplementedError:
1✔
1004
            pass
1✔
1005

1006
    # Close as much as possible (w/o async/await) to avoid warnings
1007
    for process in workers:
10✔
1008
        if process.returncode is None:
10✔
1009
            continue
10✔
1010

1011
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1012
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1013
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1014
        process.kill()
×
1015
        if child_watcher:
×
1016
            child_watcher.remove_child_handler(process.pid)
×
1017

1018

1019
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
11✔
1020
    """
1021
    Shuts down worker processes belonging to this event loop.
1022

1023
    NOTE: this only works when the event loop was started using asyncio.run() or
1024
    anyio.run().
1025

1026
    """
1027
    process: abc.Process
1028
    try:
10✔
1029
        await sleep(math.inf)
10✔
1030
    except asyncio.CancelledError:
10✔
1031
        for process in workers:
10✔
1032
            if process.returncode is None:
10✔
1033
                process.kill()
10✔
1034

1035
        for process in workers:
10✔
1036
            await process.aclose()
10✔
1037

1038

1039
#
1040
# Sockets and networking
1041
#
1042

1043

1044
class StreamProtocol(asyncio.Protocol):
11✔
1045
    read_queue: deque[bytes]
11✔
1046
    read_event: asyncio.Event
11✔
1047
    write_event: asyncio.Event
11✔
1048
    exception: Exception | None = None
11✔
1049
    is_at_eof: bool = False
11✔
1050

1051
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1052
        self.read_queue = deque()
11✔
1053
        self.read_event = asyncio.Event()
11✔
1054
        self.write_event = asyncio.Event()
11✔
1055
        self.write_event.set()
11✔
1056
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
11✔
1057

1058
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1059
        if exc:
11✔
1060
            self.exception = BrokenResourceError()
11✔
1061
            self.exception.__cause__ = exc
11✔
1062

1063
        self.read_event.set()
11✔
1064
        self.write_event.set()
11✔
1065

1066
    def data_received(self, data: bytes) -> None:
11✔
1067
        # ProactorEventloop sometimes sends bytearray instead of bytes
1068
        self.read_queue.append(bytes(data))
11✔
1069
        self.read_event.set()
11✔
1070

1071
    def eof_received(self) -> bool | None:
11✔
1072
        self.is_at_eof = True
11✔
1073
        self.read_event.set()
11✔
1074
        return True
11✔
1075

1076
    def pause_writing(self) -> None:
11✔
1077
        self.write_event = asyncio.Event()
11✔
1078

1079
    def resume_writing(self) -> None:
11✔
UNCOV
1080
        self.write_event.set()
×
1081

1082

1083
class DatagramProtocol(asyncio.DatagramProtocol):
11✔
1084
    read_queue: deque[tuple[bytes, IPSockAddrType]]
11✔
1085
    read_event: asyncio.Event
11✔
1086
    write_event: asyncio.Event
11✔
1087
    exception: Exception | None = None
11✔
1088

1089
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1090
        self.read_queue = deque(maxlen=100)  # arbitrary value
10✔
1091
        self.read_event = asyncio.Event()
10✔
1092
        self.write_event = asyncio.Event()
10✔
1093
        self.write_event.set()
10✔
1094

1095
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1096
        self.read_event.set()
10✔
1097
        self.write_event.set()
10✔
1098

1099
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
11✔
1100
        addr = convert_ipv6_sockaddr(addr)
10✔
1101
        self.read_queue.append((data, addr))
10✔
1102
        self.read_event.set()
10✔
1103

1104
    def error_received(self, exc: Exception) -> None:
11✔
1105
        self.exception = exc
×
1106

1107
    def pause_writing(self) -> None:
11✔
1108
        self.write_event.clear()
×
1109

1110
    def resume_writing(self) -> None:
11✔
1111
        self.write_event.set()
×
1112

1113

1114
class SocketStream(abc.SocketStream):
11✔
1115
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
11✔
1116
        self._transport = transport
11✔
1117
        self._protocol = protocol
11✔
1118
        self._receive_guard = ResourceGuard("reading from")
11✔
1119
        self._send_guard = ResourceGuard("writing to")
11✔
1120
        self._closed = False
11✔
1121

1122
    @property
11✔
1123
    def _raw_socket(self) -> socket.socket:
11✔
1124
        return self._transport.get_extra_info("socket")
11✔
1125

1126
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1127
        with self._receive_guard:
11✔
1128
            if (
11✔
1129
                not self._protocol.read_event.is_set()
1130
                and not self._transport.is_closing()
1131
                and not self._protocol.is_at_eof
1132
            ):
1133
                self._transport.resume_reading()
11✔
1134
                await self._protocol.read_event.wait()
11✔
1135
                self._transport.pause_reading()
11✔
1136
            else:
1137
                await AsyncIOBackend.checkpoint()
11✔
1138

1139
            try:
11✔
1140
                chunk = self._protocol.read_queue.popleft()
11✔
1141
            except IndexError:
11✔
1142
                if self._closed:
11✔
1143
                    raise ClosedResourceError from None
11✔
1144
                elif self._protocol.exception:
11✔
1145
                    raise self._protocol.exception from None
11✔
1146
                else:
1147
                    raise EndOfStream from None
11✔
1148

1149
            if len(chunk) > max_bytes:
11✔
1150
                # Split the oversized chunk
1151
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
9✔
1152
                self._protocol.read_queue.appendleft(leftover)
9✔
1153

1154
            # If the read queue is empty, clear the flag so that the next call will
1155
            # block until data is available
1156
            if not self._protocol.read_queue:
11✔
1157
                self._protocol.read_event.clear()
11✔
1158

1159
        return chunk
11✔
1160

1161
    async def send(self, item: bytes) -> None:
11✔
1162
        with self._send_guard:
11✔
1163
            await AsyncIOBackend.checkpoint()
11✔
1164

1165
            if self._closed:
11✔
1166
                raise ClosedResourceError
11✔
1167
            elif self._protocol.exception is not None:
11✔
1168
                raise self._protocol.exception
11✔
1169

1170
            try:
11✔
1171
                self._transport.write(item)
11✔
1172
            except RuntimeError as exc:
×
1173
                if self._transport.is_closing():
×
1174
                    raise BrokenResourceError from exc
×
1175
                else:
1176
                    raise
×
1177

1178
            await self._protocol.write_event.wait()
11✔
1179

1180
    async def send_eof(self) -> None:
11✔
1181
        try:
11✔
1182
            self._transport.write_eof()
11✔
1183
        except OSError:
×
1184
            pass
×
1185

1186
    async def aclose(self) -> None:
11✔
1187
        if not self._transport.is_closing():
11✔
1188
            self._closed = True
11✔
1189
            try:
11✔
1190
                self._transport.write_eof()
11✔
1191
            except OSError:
7✔
1192
                pass
7✔
1193

1194
            self._transport.close()
11✔
1195
            await sleep(0)
11✔
1196
            self._transport.abort()
11✔
1197

1198

1199
class _RawSocketMixin:
11✔
1200
    _receive_future: asyncio.Future | None = None
11✔
1201
    _send_future: asyncio.Future | None = None
11✔
1202
    _closing = False
11✔
1203

1204
    def __init__(self, raw_socket: socket.socket):
11✔
1205
        self.__raw_socket = raw_socket
8✔
1206
        self._receive_guard = ResourceGuard("reading from")
8✔
1207
        self._send_guard = ResourceGuard("writing to")
8✔
1208

1209
    @property
11✔
1210
    def _raw_socket(self) -> socket.socket:
11✔
1211
        return self.__raw_socket
8✔
1212

1213
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1214
        def callback(f: object) -> None:
8✔
1215
            del self._receive_future
8✔
1216
            loop.remove_reader(self.__raw_socket)
8✔
1217

1218
        f = self._receive_future = asyncio.Future()
8✔
1219
        loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1220
        f.add_done_callback(callback)
8✔
1221
        return f
8✔
1222

1223
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1224
        def callback(f: object) -> None:
8✔
1225
            del self._send_future
8✔
1226
            loop.remove_writer(self.__raw_socket)
8✔
1227

1228
        f = self._send_future = asyncio.Future()
8✔
1229
        loop.add_writer(self.__raw_socket, f.set_result, None)
8✔
1230
        f.add_done_callback(callback)
8✔
1231
        return f
8✔
1232

1233
    async def aclose(self) -> None:
11✔
1234
        if not self._closing:
8✔
1235
            self._closing = True
8✔
1236
            if self.__raw_socket.fileno() != -1:
8✔
1237
                self.__raw_socket.close()
8✔
1238

1239
            if self._receive_future:
8✔
1240
                self._receive_future.set_result(None)
8✔
1241
            if self._send_future:
8✔
1242
                self._send_future.set_result(None)
×
1243

1244

1245
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
11✔
1246
    async def send_eof(self) -> None:
11✔
1247
        with self._send_guard:
8✔
1248
            self._raw_socket.shutdown(socket.SHUT_WR)
8✔
1249

1250
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1251
        loop = get_running_loop()
8✔
1252
        await AsyncIOBackend.checkpoint()
8✔
1253
        with self._receive_guard:
8✔
1254
            while True:
5✔
1255
                try:
8✔
1256
                    data = self._raw_socket.recv(max_bytes)
8✔
1257
                except BlockingIOError:
8✔
1258
                    await self._wait_until_readable(loop)
8✔
1259
                except OSError as exc:
8✔
1260
                    if self._closing:
8✔
1261
                        raise ClosedResourceError from None
8✔
1262
                    else:
1263
                        raise BrokenResourceError from exc
1✔
1264
                else:
1265
                    if not data:
8✔
1266
                        raise EndOfStream
8✔
1267

1268
                    return data
8✔
1269

1270
    async def send(self, item: bytes) -> None:
11✔
1271
        loop = get_running_loop()
8✔
1272
        await AsyncIOBackend.checkpoint()
8✔
1273
        with self._send_guard:
8✔
1274
            view = memoryview(item)
8✔
1275
            while view:
8✔
1276
                try:
8✔
1277
                    bytes_sent = self._raw_socket.send(view)
8✔
1278
                except BlockingIOError:
8✔
1279
                    await self._wait_until_writable(loop)
8✔
1280
                except OSError as exc:
8✔
1281
                    if self._closing:
8✔
1282
                        raise ClosedResourceError from None
8✔
1283
                    else:
1284
                        raise BrokenResourceError from exc
1✔
1285
                else:
1286
                    view = view[bytes_sent:]
8✔
1287

1288
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
11✔
1289
        if not isinstance(msglen, int) or msglen < 0:
8✔
1290
            raise ValueError("msglen must be a non-negative integer")
8✔
1291
        if not isinstance(maxfds, int) or maxfds < 1:
8✔
1292
            raise ValueError("maxfds must be a positive integer")
8✔
1293

1294
        loop = get_running_loop()
8✔
1295
        fds = array.array("i")
8✔
1296
        await AsyncIOBackend.checkpoint()
8✔
1297
        with self._receive_guard:
8✔
1298
            while True:
5✔
1299
                try:
8✔
1300
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
8✔
1301
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1302
                    )
1303
                except BlockingIOError:
8✔
1304
                    await self._wait_until_readable(loop)
8✔
1305
                except OSError as exc:
×
1306
                    if self._closing:
×
1307
                        raise ClosedResourceError from None
×
1308
                    else:
1309
                        raise BrokenResourceError from exc
×
1310
                else:
1311
                    if not message and not ancdata:
8✔
1312
                        raise EndOfStream
×
1313

1314
                    break
5✔
1315

1316
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
8✔
1317
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
8✔
1318
                raise RuntimeError(
×
1319
                    f"Received unexpected ancillary data; message = {message!r}, "
1320
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1321
                )
1322

1323
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
8✔
1324

1325
        return message, list(fds)
8✔
1326

1327
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
11✔
1328
        if not message:
8✔
1329
            raise ValueError("message must not be empty")
8✔
1330
        if not fds:
8✔
1331
            raise ValueError("fds must not be empty")
8✔
1332

1333
        loop = get_running_loop()
8✔
1334
        filenos: list[int] = []
8✔
1335
        for fd in fds:
8✔
1336
            if isinstance(fd, int):
8✔
1337
                filenos.append(fd)
×
1338
            elif isinstance(fd, IOBase):
8✔
1339
                filenos.append(fd.fileno())
8✔
1340

1341
        fdarray = array.array("i", filenos)
8✔
1342
        await AsyncIOBackend.checkpoint()
8✔
1343
        with self._send_guard:
8✔
1344
            while True:
5✔
1345
                try:
8✔
1346
                    # The ignore can be removed after mypy picks up
1347
                    # https://github.com/python/typeshed/pull/5545
1348
                    self._raw_socket.sendmsg(
8✔
1349
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1350
                    )
1351
                    break
8✔
1352
                except BlockingIOError:
×
1353
                    await self._wait_until_writable(loop)
×
1354
                except OSError as exc:
×
1355
                    if self._closing:
×
1356
                        raise ClosedResourceError from None
×
1357
                    else:
1358
                        raise BrokenResourceError from exc
×
1359

1360

1361
class TCPSocketListener(abc.SocketListener):
11✔
1362
    _accept_scope: CancelScope | None = None
11✔
1363
    _closed = False
11✔
1364

1365
    def __init__(self, raw_socket: socket.socket):
11✔
1366
        self.__raw_socket = raw_socket
11✔
1367
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
11✔
1368
        self._accept_guard = ResourceGuard("accepting connections from")
11✔
1369

1370
    @property
11✔
1371
    def _raw_socket(self) -> socket.socket:
11✔
1372
        return self.__raw_socket
11✔
1373

1374
    async def accept(self) -> abc.SocketStream:
11✔
1375
        if self._closed:
11✔
1376
            raise ClosedResourceError
11✔
1377

1378
        with self._accept_guard:
11✔
1379
            await AsyncIOBackend.checkpoint()
11✔
1380
            with CancelScope() as self._accept_scope:
11✔
1381
                try:
11✔
1382
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
11✔
1383
                except asyncio.CancelledError:
11✔
1384
                    # Workaround for https://bugs.python.org/issue41317
1385
                    try:
11✔
1386
                        self._loop.remove_reader(self._raw_socket)
11✔
1387
                    except (ValueError, NotImplementedError):
2✔
1388
                        pass
2✔
1389

1390
                    if self._closed:
11✔
1391
                        raise ClosedResourceError from None
10✔
1392

1393
                    raise
11✔
1394
                finally:
1395
                    self._accept_scope = None
11✔
1396

1397
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
1398
        transport, protocol = await self._loop.connect_accepted_socket(
11✔
1399
            StreamProtocol, client_sock
1400
        )
1401
        return SocketStream(transport, protocol)
11✔
1402

1403
    async def aclose(self) -> None:
11✔
1404
        if self._closed:
11✔
1405
            return
11✔
1406

1407
        self._closed = True
11✔
1408
        if self._accept_scope:
11✔
1409
            # Workaround for https://bugs.python.org/issue41317
1410
            try:
11✔
1411
                self._loop.remove_reader(self._raw_socket)
11✔
1412
            except (ValueError, NotImplementedError):
2✔
1413
                pass
2✔
1414

1415
            self._accept_scope.cancel()
10✔
1416
            await sleep(0)
10✔
1417

1418
        self._raw_socket.close()
11✔
1419

1420

1421
class UNIXSocketListener(abc.SocketListener):
11✔
1422
    def __init__(self, raw_socket: socket.socket):
11✔
1423
        self.__raw_socket = raw_socket
8✔
1424
        self._loop = get_running_loop()
8✔
1425
        self._accept_guard = ResourceGuard("accepting connections from")
8✔
1426
        self._closed = False
8✔
1427

1428
    async def accept(self) -> abc.SocketStream:
11✔
1429
        await AsyncIOBackend.checkpoint()
8✔
1430
        with self._accept_guard:
8✔
1431
            while True:
5✔
1432
                try:
8✔
1433
                    client_sock, _ = self.__raw_socket.accept()
8✔
1434
                    client_sock.setblocking(False)
8✔
1435
                    return UNIXSocketStream(client_sock)
8✔
1436
                except BlockingIOError:
8✔
1437
                    f: asyncio.Future = asyncio.Future()
8✔
1438
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1439
                    f.add_done_callback(
8✔
1440
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1441
                    )
1442
                    await f
8✔
1443
                except OSError as exc:
×
1444
                    if self._closed:
×
1445
                        raise ClosedResourceError from None
×
1446
                    else:
1447
                        raise BrokenResourceError from exc
1✔
1448

1449
    async def aclose(self) -> None:
11✔
1450
        self._closed = True
8✔
1451
        self.__raw_socket.close()
8✔
1452

1453
    @property
11✔
1454
    def _raw_socket(self) -> socket.socket:
11✔
1455
        return self.__raw_socket
8✔
1456

1457

1458
class UDPSocket(abc.UDPSocket):
11✔
1459
    def __init__(
11✔
1460
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1461
    ):
1462
        self._transport = transport
10✔
1463
        self._protocol = protocol
10✔
1464
        self._receive_guard = ResourceGuard("reading from")
10✔
1465
        self._send_guard = ResourceGuard("writing to")
10✔
1466
        self._closed = False
10✔
1467

1468
    @property
11✔
1469
    def _raw_socket(self) -> socket.socket:
11✔
1470
        return self._transport.get_extra_info("socket")
10✔
1471

1472
    async def aclose(self) -> None:
11✔
1473
        if not self._transport.is_closing():
10✔
1474
            self._closed = True
10✔
1475
            self._transport.close()
10✔
1476

1477
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
11✔
1478
        with self._receive_guard:
10✔
1479
            await AsyncIOBackend.checkpoint()
10✔
1480

1481
            # If the buffer is empty, ask for more data
1482
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1483
                self._protocol.read_event.clear()
10✔
1484
                await self._protocol.read_event.wait()
10✔
1485

1486
            try:
10✔
1487
                return self._protocol.read_queue.popleft()
10✔
1488
            except IndexError:
10✔
1489
                if self._closed:
10✔
1490
                    raise ClosedResourceError from None
10✔
1491
                else:
1492
                    raise BrokenResourceError from None
1✔
1493

1494
    async def send(self, item: UDPPacketType) -> None:
11✔
1495
        with self._send_guard:
10✔
1496
            await AsyncIOBackend.checkpoint()
10✔
1497
            await self._protocol.write_event.wait()
10✔
1498
            if self._closed:
10✔
1499
                raise ClosedResourceError
10✔
1500
            elif self._transport.is_closing():
10✔
1501
                raise BrokenResourceError
×
1502
            else:
1503
                self._transport.sendto(*item)
10✔
1504

1505

1506
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
11✔
1507
    def __init__(
11✔
1508
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1509
    ):
1510
        self._transport = transport
10✔
1511
        self._protocol = protocol
10✔
1512
        self._receive_guard = ResourceGuard("reading from")
10✔
1513
        self._send_guard = ResourceGuard("writing to")
10✔
1514
        self._closed = False
10✔
1515

1516
    @property
11✔
1517
    def _raw_socket(self) -> socket.socket:
11✔
1518
        return self._transport.get_extra_info("socket")
10✔
1519

1520
    async def aclose(self) -> None:
11✔
1521
        if not self._transport.is_closing():
10✔
1522
            self._closed = True
10✔
1523
            self._transport.close()
10✔
1524

1525
    async def receive(self) -> bytes:
11✔
1526
        with self._receive_guard:
10✔
1527
            await AsyncIOBackend.checkpoint()
10✔
1528

1529
            # If the buffer is empty, ask for more data
1530
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1531
                self._protocol.read_event.clear()
10✔
1532
                await self._protocol.read_event.wait()
10✔
1533

1534
            try:
10✔
1535
                packet = self._protocol.read_queue.popleft()
10✔
1536
            except IndexError:
10✔
1537
                if self._closed:
10✔
1538
                    raise ClosedResourceError from None
10✔
1539
                else:
1540
                    raise BrokenResourceError from None
×
1541

1542
            return packet[0]
10✔
1543

1544
    async def send(self, item: bytes) -> None:
11✔
1545
        with self._send_guard:
10✔
1546
            await AsyncIOBackend.checkpoint()
10✔
1547
            await self._protocol.write_event.wait()
10✔
1548
            if self._closed:
10✔
1549
                raise ClosedResourceError
10✔
1550
            elif self._transport.is_closing():
10✔
1551
                raise BrokenResourceError
×
1552
            else:
1553
                self._transport.sendto(item)
10✔
1554

1555

1556
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
11✔
1557
    async def receive(self) -> UNIXDatagramPacketType:
11✔
1558
        loop = get_running_loop()
8✔
1559
        await AsyncIOBackend.checkpoint()
8✔
1560
        with self._receive_guard:
8✔
1561
            while True:
5✔
1562
                try:
8✔
1563
                    data = self._raw_socket.recvfrom(65536)
8✔
1564
                except BlockingIOError:
8✔
1565
                    await self._wait_until_readable(loop)
8✔
1566
                except OSError as exc:
8✔
1567
                    if self._closing:
8✔
1568
                        raise ClosedResourceError from None
8✔
1569
                    else:
1570
                        raise BrokenResourceError from exc
1✔
1571
                else:
1572
                    return data
8✔
1573

1574
    async def send(self, item: UNIXDatagramPacketType) -> None:
11✔
1575
        loop = get_running_loop()
8✔
1576
        await AsyncIOBackend.checkpoint()
8✔
1577
        with self._send_guard:
8✔
1578
            while True:
5✔
1579
                try:
8✔
1580
                    self._raw_socket.sendto(*item)
8✔
1581
                except BlockingIOError:
8✔
1582
                    await self._wait_until_writable(loop)
×
1583
                except OSError as exc:
8✔
1584
                    if self._closing:
8✔
1585
                        raise ClosedResourceError from None
8✔
1586
                    else:
1587
                        raise BrokenResourceError from exc
1✔
1588
                else:
1589
                    return
8✔
1590

1591

1592
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
11✔
1593
    async def receive(self) -> bytes:
11✔
1594
        loop = get_running_loop()
8✔
1595
        await AsyncIOBackend.checkpoint()
8✔
1596
        with self._receive_guard:
8✔
1597
            while True:
5✔
1598
                try:
8✔
1599
                    data = self._raw_socket.recv(65536)
8✔
1600
                except BlockingIOError:
8✔
1601
                    await self._wait_until_readable(loop)
8✔
1602
                except OSError as exc:
8✔
1603
                    if self._closing:
8✔
1604
                        raise ClosedResourceError from None
8✔
1605
                    else:
1606
                        raise BrokenResourceError from exc
1✔
1607
                else:
1608
                    return data
8✔
1609

1610
    async def send(self, item: bytes) -> None:
11✔
1611
        loop = get_running_loop()
8✔
1612
        await AsyncIOBackend.checkpoint()
8✔
1613
        with self._send_guard:
8✔
1614
            while True:
5✔
1615
                try:
8✔
1616
                    self._raw_socket.send(item)
8✔
1617
                except BlockingIOError:
8✔
1618
                    await self._wait_until_writable(loop)
×
1619
                except OSError as exc:
8✔
1620
                    if self._closing:
8✔
1621
                        raise ClosedResourceError from None
8✔
1622
                    else:
1623
                        raise BrokenResourceError from exc
1✔
1624
                else:
1625
                    return
8✔
1626

1627

1628
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
11✔
1629
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
11✔
1630

1631

1632
#
1633
# Synchronization
1634
#
1635

1636

1637
class Event(BaseEvent):
11✔
1638
    def __new__(cls) -> Event:
11✔
1639
        return object.__new__(cls)
11✔
1640

1641
    def __init__(self) -> None:
11✔
1642
        self._event = asyncio.Event()
11✔
1643

1644
    def set(self) -> None:
11✔
1645
        self._event.set()
11✔
1646

1647
    def is_set(self) -> bool:
11✔
1648
        return self._event.is_set()
11✔
1649

1650
    async def wait(self) -> None:
11✔
1651
        if self.is_set():
11✔
1652
            await AsyncIOBackend.checkpoint()
11✔
1653
        else:
1654
            await self._event.wait()
11✔
1655

1656
    def statistics(self) -> EventStatistics:
11✔
1657
        return EventStatistics(len(self._event._waiters))
10✔
1658

1659

1660
class CapacityLimiter(BaseCapacityLimiter):
11✔
1661
    _total_tokens: float = 0
11✔
1662

1663
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
11✔
1664
        return object.__new__(cls)
11✔
1665

1666
    def __init__(self, total_tokens: float):
11✔
1667
        self._borrowers: set[Any] = set()
11✔
1668
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
11✔
1669
        self.total_tokens = total_tokens
11✔
1670

1671
    async def __aenter__(self) -> None:
11✔
1672
        await self.acquire()
11✔
1673

1674
    async def __aexit__(
11✔
1675
        self,
1676
        exc_type: type[BaseException] | None,
1677
        exc_val: BaseException | None,
1678
        exc_tb: TracebackType | None,
1679
    ) -> None:
1680
        self.release()
11✔
1681

1682
    @property
11✔
1683
    def total_tokens(self) -> float:
11✔
1684
        return self._total_tokens
10✔
1685

1686
    @total_tokens.setter
11✔
1687
    def total_tokens(self, value: float) -> None:
11✔
1688
        if not isinstance(value, int) and not math.isinf(value):
11✔
1689
            raise TypeError("total_tokens must be an int or math.inf")
10✔
1690
        if value < 1:
11✔
1691
            raise ValueError("total_tokens must be >= 1")
10✔
1692

1693
        waiters_to_notify = max(value - self._total_tokens, 0)
11✔
1694
        self._total_tokens = value
11✔
1695

1696
        # Notify waiting tasks that they have acquired the limiter
1697
        while self._wait_queue and waiters_to_notify:
11✔
1698
            event = self._wait_queue.popitem(last=False)[1]
10✔
1699
            event.set()
10✔
1700
            waiters_to_notify -= 1
10✔
1701

1702
    @property
11✔
1703
    def borrowed_tokens(self) -> int:
11✔
1704
        return len(self._borrowers)
10✔
1705

1706
    @property
11✔
1707
    def available_tokens(self) -> float:
11✔
1708
        return self._total_tokens - len(self._borrowers)
10✔
1709

1710
    def acquire_nowait(self) -> None:
11✔
1711
        self.acquire_on_behalf_of_nowait(current_task())
×
1712

1713
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
1714
        if borrower in self._borrowers:
11✔
1715
            raise RuntimeError(
10✔
1716
                "this borrower is already holding one of this CapacityLimiter's "
1717
                "tokens"
1718
            )
1719

1720
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
11✔
1721
            raise WouldBlock
10✔
1722

1723
        self._borrowers.add(borrower)
11✔
1724

1725
    async def acquire(self) -> None:
11✔
1726
        return await self.acquire_on_behalf_of(current_task())
11✔
1727

1728
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
1729
        await AsyncIOBackend.checkpoint_if_cancelled()
11✔
1730
        try:
11✔
1731
            self.acquire_on_behalf_of_nowait(borrower)
11✔
1732
        except WouldBlock:
10✔
1733
            event = asyncio.Event()
10✔
1734
            self._wait_queue[borrower] = event
10✔
1735
            try:
10✔
1736
                await event.wait()
10✔
1737
            except BaseException:
×
1738
                self._wait_queue.pop(borrower, None)
×
1739
                raise
×
1740

1741
            self._borrowers.add(borrower)
10✔
1742
        else:
1743
            try:
11✔
1744
                await AsyncIOBackend.cancel_shielded_checkpoint()
11✔
1745
            except BaseException:
10✔
1746
                self.release()
10✔
1747
                raise
10✔
1748

1749
    def release(self) -> None:
11✔
1750
        self.release_on_behalf_of(current_task())
11✔
1751

1752
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
1753
        try:
11✔
1754
            self._borrowers.remove(borrower)
11✔
1755
        except KeyError:
10✔
1756
            raise RuntimeError(
10✔
1757
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1758
            ) from None
1759

1760
        # Notify the next task in line if this limiter has free capacity now
1761
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
11✔
1762
            event = self._wait_queue.popitem(last=False)[1]
10✔
1763
            event.set()
10✔
1764

1765
    def statistics(self) -> CapacityLimiterStatistics:
11✔
1766
        return CapacityLimiterStatistics(
10✔
1767
            self.borrowed_tokens,
1768
            self.total_tokens,
1769
            tuple(self._borrowers),
1770
            len(self._wait_queue),
1771
        )
1772

1773

1774
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
11✔
1775

1776

1777
#
1778
# Operating system signals
1779
#
1780

1781

1782
class _SignalReceiver:
11✔
1783
    def __init__(self, signals: tuple[Signals, ...]):
11✔
1784
        self._signals = signals
9✔
1785
        self._loop = get_running_loop()
9✔
1786
        self._signal_queue: deque[Signals] = deque()
9✔
1787
        self._future: asyncio.Future = asyncio.Future()
9✔
1788
        self._handled_signals: set[Signals] = set()
9✔
1789

1790
    def _deliver(self, signum: Signals) -> None:
11✔
1791
        self._signal_queue.append(signum)
9✔
1792
        if not self._future.done():
9✔
1793
            self._future.set_result(None)
9✔
1794

1795
    def __enter__(self) -> _SignalReceiver:
11✔
1796
        for sig in set(self._signals):
9✔
1797
            self._loop.add_signal_handler(sig, self._deliver, sig)
9✔
1798
            self._handled_signals.add(sig)
9✔
1799

1800
        return self
9✔
1801

1802
    def __exit__(
11✔
1803
        self,
1804
        exc_type: type[BaseException] | None,
1805
        exc_val: BaseException | None,
1806
        exc_tb: TracebackType | None,
1807
    ) -> bool | None:
1808
        for sig in self._handled_signals:
9✔
1809
            self._loop.remove_signal_handler(sig)
9✔
1810
        return None
9✔
1811

1812
    def __aiter__(self) -> _SignalReceiver:
11✔
1813
        return self
9✔
1814

1815
    async def __anext__(self) -> Signals:
11✔
1816
        await AsyncIOBackend.checkpoint()
9✔
1817
        if not self._signal_queue:
9✔
1818
            self._future = asyncio.Future()
×
1819
            await self._future
×
1820

1821
        return self._signal_queue.popleft()
9✔
1822

1823

1824
#
1825
# Testing and debugging
1826
#
1827

1828

1829
class AsyncIOTaskInfo(TaskInfo):
11✔
1830
    def __init__(self, task: asyncio.Task):
11✔
1831
        task_state = _task_states.get(task)
11✔
1832
        if task_state is None:
11✔
1833
            parent_id = None
11✔
1834
        else:
1835
            parent_id = task_state.parent_id
11✔
1836

1837
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
11✔
1838
        self._task = weakref.ref(task)
11✔
1839

1840
    def has_pending_cancellation(self) -> bool:
11✔
1841
        if not (task := self._task()):
11✔
1842
            # If the task isn't around anymore, it won't have a pending cancellation
1843
            return False
×
1844

1845
        if sys.version_info >= (3, 11):
11✔
1846
            if task.cancelling():
5✔
1847
                return True
5✔
1848
        elif (
6✔
1849
            isinstance(task._fut_waiter, asyncio.Future)
1850
            and task._fut_waiter.cancelled()
1851
        ):
1852
            return True
6✔
1853

1854
        if task_state := _task_states.get(task):
11✔
1855
            if cancel_scope := task_state.cancel_scope:
11✔
1856
                return cancel_scope.cancel_called or (
11✔
1857
                    not cancel_scope.shield and cancel_scope._parent_cancelled()
1858
                )
1859

1860
        return False
11✔
1861

1862

1863
class TestRunner(abc.TestRunner):
11✔
1864
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
11✔
1865

1866
    def __init__(
11✔
1867
        self,
1868
        *,
1869
        debug: bool | None = None,
1870
        use_uvloop: bool = False,
1871
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1872
    ) -> None:
1873
        if use_uvloop and loop_factory is None:
11✔
1874
            import uvloop
×
1875

1876
            loop_factory = uvloop.new_event_loop
×
1877

1878
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
11✔
1879
        self._exceptions: list[BaseException] = []
11✔
1880
        self._runner_task: asyncio.Task | None = None
11✔
1881

1882
    def __enter__(self) -> TestRunner:
11✔
1883
        self._runner.__enter__()
11✔
1884
        self.get_loop().set_exception_handler(self._exception_handler)
11✔
1885
        return self
11✔
1886

1887
    def __exit__(
11✔
1888
        self,
1889
        exc_type: type[BaseException] | None,
1890
        exc_val: BaseException | None,
1891
        exc_tb: TracebackType | None,
1892
    ) -> None:
1893
        self._runner.__exit__(exc_type, exc_val, exc_tb)
11✔
1894

1895
    def get_loop(self) -> AbstractEventLoop:
11✔
1896
        return self._runner.get_loop()
11✔
1897

1898
    def _exception_handler(
11✔
1899
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1900
    ) -> None:
1901
        if isinstance(context.get("exception"), Exception):
11✔
1902
            self._exceptions.append(context["exception"])
11✔
1903
        else:
1904
            loop.default_exception_handler(context)
11✔
1905

1906
    def _raise_async_exceptions(self) -> None:
11✔
1907
        # Re-raise any exceptions raised in asynchronous callbacks
1908
        if self._exceptions:
11✔
1909
            exceptions, self._exceptions = self._exceptions, []
11✔
1910
            if len(exceptions) == 1:
11✔
1911
                raise exceptions[0]
11✔
1912
            elif exceptions:
×
1913
                raise BaseExceptionGroup(
×
1914
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1915
                )
1916

1917
    async def _run_tests_and_fixtures(
11✔
1918
        self,
1919
        receive_stream: MemoryObjectReceiveStream[
1920
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1921
        ],
1922
    ) -> None:
1923
        with receive_stream, self._send_stream:
11✔
1924
            async for coro, future in receive_stream:
11✔
1925
                try:
11✔
1926
                    retval = await coro
11✔
1927
                except BaseException as exc:
11✔
1928
                    if not future.cancelled():
11✔
1929
                        future.set_exception(exc)
11✔
1930
                else:
1931
                    if not future.cancelled():
11✔
1932
                        future.set_result(retval)
11✔
1933

1934
    async def _call_in_runner_task(
11✔
1935
        self,
1936
        func: Callable[P, Awaitable[T_Retval]],
1937
        *args: P.args,
1938
        **kwargs: P.kwargs,
1939
    ) -> T_Retval:
1940
        if not self._runner_task:
11✔
1941
            self._send_stream, receive_stream = create_memory_object_stream[
11✔
1942
                Tuple[Awaitable[Any], asyncio.Future]
1943
            ](1)
1944
            self._runner_task = self.get_loop().create_task(
11✔
1945
                self._run_tests_and_fixtures(receive_stream)
1946
            )
1947

1948
        coro = func(*args, **kwargs)
11✔
1949
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
11✔
1950
        self._send_stream.send_nowait((coro, future))
11✔
1951
        return await future
11✔
1952

1953
    def run_asyncgen_fixture(
11✔
1954
        self,
1955
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1956
        kwargs: dict[str, Any],
1957
    ) -> Iterable[T_Retval]:
1958
        asyncgen = fixture_func(**kwargs)
11✔
1959
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
11✔
1960
            self._call_in_runner_task(asyncgen.asend, None)
1961
        )
1962
        self._raise_async_exceptions()
11✔
1963

1964
        yield fixturevalue
11✔
1965

1966
        try:
11✔
1967
            self.get_loop().run_until_complete(
11✔
1968
                self._call_in_runner_task(asyncgen.asend, None)
1969
            )
1970
        except StopAsyncIteration:
11✔
1971
            self._raise_async_exceptions()
11✔
1972
        else:
1973
            self.get_loop().run_until_complete(asyncgen.aclose())
×
1974
            raise RuntimeError("Async generator fixture did not stop")
×
1975

1976
    def run_fixture(
11✔
1977
        self,
1978
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1979
        kwargs: dict[str, Any],
1980
    ) -> T_Retval:
1981
        retval = self.get_loop().run_until_complete(
11✔
1982
            self._call_in_runner_task(fixture_func, **kwargs)
1983
        )
1984
        self._raise_async_exceptions()
11✔
1985
        return retval
11✔
1986

1987
    def run_test(
11✔
1988
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
1989
    ) -> None:
1990
        try:
11✔
1991
            self.get_loop().run_until_complete(
11✔
1992
                self._call_in_runner_task(test_func, **kwargs)
1993
            )
1994
        except Exception as exc:
11✔
1995
            self._exceptions.append(exc)
11✔
1996

1997
        self._raise_async_exceptions()
11✔
1998

1999

2000
class AsyncIOBackend(AsyncBackend):
11✔
2001
    @classmethod
11✔
2002
    def run(
11✔
2003
        cls,
2004
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2005
        args: tuple[Unpack[PosArgsT]],
2006
        kwargs: dict[str, Any],
2007
        options: dict[str, Any],
2008
    ) -> T_Retval:
2009
        @wraps(func)
11✔
2010
        async def wrapper() -> T_Retval:
11✔
2011
            task = cast(asyncio.Task, current_task())
11✔
2012
            task.set_name(get_callable_name(func))
11✔
2013
            _task_states[task] = TaskState(None, None)
11✔
2014

2015
            try:
11✔
2016
                return await func(*args)
11✔
2017
            finally:
2018
                del _task_states[task]
11✔
2019

2020
        debug = options.get("debug", None)
11✔
2021
        loop_factory = options.get("loop_factory", None)
11✔
2022
        if loop_factory is None and options.get("use_uvloop", False):
11✔
2023
            import uvloop
7✔
2024

2025
            loop_factory = uvloop.new_event_loop
7✔
2026

2027
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
11✔
2028
            return runner.run(wrapper())
11✔
2029

2030
    @classmethod
11✔
2031
    def current_token(cls) -> object:
11✔
2032
        return get_running_loop()
11✔
2033

2034
    @classmethod
11✔
2035
    def current_time(cls) -> float:
11✔
2036
        return get_running_loop().time()
11✔
2037

2038
    @classmethod
11✔
2039
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
2040
        return CancelledError
11✔
2041

2042
    @classmethod
11✔
2043
    async def checkpoint(cls) -> None:
11✔
2044
        await sleep(0)
11✔
2045

2046
    @classmethod
11✔
2047
    async def checkpoint_if_cancelled(cls) -> None:
11✔
2048
        task = current_task()
11✔
2049
        if task is None:
11✔
2050
            return
×
2051

2052
        try:
11✔
2053
            cancel_scope = _task_states[task].cancel_scope
11✔
2054
        except KeyError:
11✔
2055
            return
11✔
2056

2057
        while cancel_scope:
11✔
2058
            if cancel_scope.cancel_called:
11✔
2059
                await sleep(0)
11✔
2060
            elif cancel_scope.shield:
11✔
2061
                break
10✔
2062
            else:
2063
                cancel_scope = cancel_scope._parent_scope
11✔
2064

2065
    @classmethod
11✔
2066
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
2067
        with CancelScope(shield=True):
11✔
2068
            await sleep(0)
11✔
2069

2070
    @classmethod
11✔
2071
    async def sleep(cls, delay: float) -> None:
11✔
2072
        await sleep(delay)
11✔
2073

2074
    @classmethod
11✔
2075
    def create_cancel_scope(
11✔
2076
        cls, *, deadline: float = math.inf, shield: bool = False
2077
    ) -> CancelScope:
2078
        return CancelScope(deadline=deadline, shield=shield)
11✔
2079

2080
    @classmethod
11✔
2081
    def current_effective_deadline(cls) -> float:
11✔
2082
        try:
10✔
2083
            cancel_scope = _task_states[
10✔
2084
                current_task()  # type: ignore[index]
2085
            ].cancel_scope
2086
        except KeyError:
×
2087
            return math.inf
×
2088

2089
        deadline = math.inf
10✔
2090
        while cancel_scope:
10✔
2091
            deadline = min(deadline, cancel_scope.deadline)
10✔
2092
            if cancel_scope._cancel_called:
10✔
2093
                deadline = -math.inf
10✔
2094
                break
10✔
2095
            elif cancel_scope.shield:
10✔
2096
                break
10✔
2097
            else:
2098
                cancel_scope = cancel_scope._parent_scope
10✔
2099

2100
        return deadline
10✔
2101

2102
    @classmethod
11✔
2103
    def create_task_group(cls) -> abc.TaskGroup:
11✔
2104
        return TaskGroup()
11✔
2105

2106
    @classmethod
11✔
2107
    def create_event(cls) -> abc.Event:
11✔
2108
        return Event()
11✔
2109

2110
    @classmethod
11✔
2111
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
11✔
2112
        return CapacityLimiter(total_tokens)
10✔
2113

2114
    @classmethod
11✔
2115
    async def run_sync_in_worker_thread(
11✔
2116
        cls,
2117
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2118
        args: tuple[Unpack[PosArgsT]],
2119
        abandon_on_cancel: bool = False,
2120
        limiter: abc.CapacityLimiter | None = None,
2121
    ) -> T_Retval:
2122
        await cls.checkpoint()
11✔
2123

2124
        # If this is the first run in this event loop thread, set up the necessary
2125
        # variables
2126
        try:
11✔
2127
            idle_workers = _threadpool_idle_workers.get()
11✔
2128
            workers = _threadpool_workers.get()
11✔
2129
        except LookupError:
11✔
2130
            idle_workers = deque()
11✔
2131
            workers = set()
11✔
2132
            _threadpool_idle_workers.set(idle_workers)
11✔
2133
            _threadpool_workers.set(workers)
11✔
2134

2135
        async with limiter or cls.current_default_thread_limiter():
11✔
2136
            with CancelScope(shield=not abandon_on_cancel) as scope:
11✔
2137
                future: asyncio.Future = asyncio.Future()
11✔
2138
                root_task = find_root_task()
11✔
2139
                if not idle_workers:
11✔
2140
                    worker = WorkerThread(root_task, workers, idle_workers)
11✔
2141
                    worker.start()
11✔
2142
                    workers.add(worker)
11✔
2143
                    root_task.add_done_callback(worker.stop)
11✔
2144
                else:
2145
                    worker = idle_workers.pop()
11✔
2146

2147
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2148
                    # seconds or longer
2149
                    now = cls.current_time()
11✔
2150
                    while idle_workers:
11✔
2151
                        if (
10✔
2152
                            now - idle_workers[0].idle_since
2153
                            < WorkerThread.MAX_IDLE_TIME
2154
                        ):
2155
                            break
10✔
2156

2157
                        expired_worker = idle_workers.popleft()
×
2158
                        expired_worker.root_task.remove_done_callback(
×
2159
                            expired_worker.stop
2160
                        )
2161
                        expired_worker.stop()
×
2162

2163
                context = copy_context()
11✔
2164
                context.run(sniffio.current_async_library_cvar.set, None)
11✔
2165
                if abandon_on_cancel or scope._parent_scope is None:
11✔
2166
                    worker_scope = scope
11✔
2167
                else:
2168
                    worker_scope = scope._parent_scope
11✔
2169

2170
                worker.queue.put_nowait((context, func, args, future, worker_scope))
11✔
2171
                return await future
11✔
2172

2173
    @classmethod
11✔
2174
    def check_cancelled(cls) -> None:
11✔
2175
        scope: CancelScope | None = threadlocals.current_cancel_scope
11✔
2176
        while scope is not None:
11✔
2177
            if scope.cancel_called:
11✔
2178
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
11✔
2179

2180
            if scope.shield:
11✔
2181
                return
×
2182

2183
            scope = scope._parent_scope
11✔
2184

2185
    @classmethod
11✔
2186
    def run_async_from_thread(
11✔
2187
        cls,
2188
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2189
        args: tuple[Unpack[PosArgsT]],
2190
        token: object,
2191
    ) -> T_Retval:
2192
        async def task_wrapper(scope: CancelScope) -> T_Retval:
11✔
2193
            __tracebackhide__ = True
11✔
2194
            task = cast(asyncio.Task, current_task())
11✔
2195
            _task_states[task] = TaskState(None, scope)
11✔
2196
            scope._tasks.add(task)
11✔
2197
            try:
11✔
2198
                return await func(*args)
11✔
2199
            except CancelledError as exc:
11✔
2200
                raise concurrent.futures.CancelledError(str(exc)) from None
11✔
2201
            finally:
2202
                scope._tasks.discard(task)
11✔
2203

2204
        loop = cast(AbstractEventLoop, token)
11✔
2205
        context = copy_context()
11✔
2206
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
11✔
2207
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
11✔
2208
        f: concurrent.futures.Future[T_Retval] = context.run(
11✔
2209
            asyncio.run_coroutine_threadsafe, wrapper, loop
2210
        )
2211
        return f.result()
11✔
2212

2213
    @classmethod
11✔
2214
    def run_sync_from_thread(
11✔
2215
        cls,
2216
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2217
        args: tuple[Unpack[PosArgsT]],
2218
        token: object,
2219
    ) -> T_Retval:
2220
        @wraps(func)
11✔
2221
        def wrapper() -> None:
11✔
2222
            try:
11✔
2223
                sniffio.current_async_library_cvar.set("asyncio")
11✔
2224
                f.set_result(func(*args))
11✔
2225
            except BaseException as exc:
11✔
2226
                f.set_exception(exc)
11✔
2227
                if not isinstance(exc, Exception):
11✔
2228
                    raise
×
2229

2230
        f: concurrent.futures.Future[T_Retval] = Future()
11✔
2231
        loop = cast(AbstractEventLoop, token)
11✔
2232
        loop.call_soon_threadsafe(wrapper)
11✔
2233
        return f.result()
11✔
2234

2235
    @classmethod
11✔
2236
    def create_blocking_portal(cls) -> abc.BlockingPortal:
11✔
2237
        return BlockingPortal()
11✔
2238

2239
    @classmethod
11✔
2240
    async def open_process(
11✔
2241
        cls,
2242
        command: str | bytes | Sequence[str | bytes],
2243
        *,
2244
        shell: bool,
2245
        stdin: int | IO[Any] | None,
2246
        stdout: int | IO[Any] | None,
2247
        stderr: int | IO[Any] | None,
2248
        cwd: str | bytes | PathLike | None = None,
2249
        env: Mapping[str, str] | None = None,
2250
        start_new_session: bool = False,
2251
    ) -> Process:
2252
        await cls.checkpoint()
10✔
2253
        if shell:
10✔
2254
            process = await asyncio.create_subprocess_shell(
10✔
2255
                cast("str | bytes", command),
2256
                stdin=stdin,
2257
                stdout=stdout,
2258
                stderr=stderr,
2259
                cwd=cwd,
2260
                env=env,
2261
                start_new_session=start_new_session,
2262
            )
2263
        else:
2264
            process = await asyncio.create_subprocess_exec(
10✔
2265
                *command,
2266
                stdin=stdin,
2267
                stdout=stdout,
2268
                stderr=stderr,
2269
                cwd=cwd,
2270
                env=env,
2271
                start_new_session=start_new_session,
2272
            )
2273

2274
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
10✔
2275
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
10✔
2276
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
10✔
2277
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
10✔
2278

2279
    @classmethod
11✔
2280
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
11✔
2281
        create_task(
10✔
2282
            _shutdown_process_pool_on_exit(workers),
2283
            name="AnyIO process pool shutdown task",
2284
        )
2285
        find_root_task().add_done_callback(
10✔
2286
            partial(_forcibly_shutdown_process_pool_on_exit, workers)  # type:ignore[arg-type]
2287
        )
2288

2289
    @classmethod
11✔
2290
    async def connect_tcp(
11✔
2291
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2292
    ) -> abc.SocketStream:
2293
        transport, protocol = cast(
11✔
2294
            Tuple[asyncio.Transport, StreamProtocol],
2295
            await get_running_loop().create_connection(
2296
                StreamProtocol, host, port, local_addr=local_address
2297
            ),
2298
        )
2299
        transport.pause_reading()
11✔
2300
        return SocketStream(transport, protocol)
11✔
2301

2302
    @classmethod
11✔
2303
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
11✔
2304
        await cls.checkpoint()
8✔
2305
        loop = get_running_loop()
8✔
2306
        raw_socket = socket.socket(socket.AF_UNIX)
8✔
2307
        raw_socket.setblocking(False)
8✔
2308
        while True:
5✔
2309
            try:
8✔
2310
                raw_socket.connect(path)
8✔
2311
            except BlockingIOError:
8✔
2312
                f: asyncio.Future = asyncio.Future()
×
2313
                loop.add_writer(raw_socket, f.set_result, None)
×
2314
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2315
                await f
×
2316
            except BaseException:
8✔
2317
                raw_socket.close()
8✔
2318
                raise
8✔
2319
            else:
2320
                return UNIXSocketStream(raw_socket)
8✔
2321

2322
    @classmethod
11✔
2323
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2324
        return TCPSocketListener(sock)
11✔
2325

2326
    @classmethod
11✔
2327
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2328
        return UNIXSocketListener(sock)
8✔
2329

2330
    @classmethod
11✔
2331
    async def create_udp_socket(
11✔
2332
        cls,
2333
        family: AddressFamily,
2334
        local_address: IPSockAddrType | None,
2335
        remote_address: IPSockAddrType | None,
2336
        reuse_port: bool,
2337
    ) -> UDPSocket | ConnectedUDPSocket:
2338
        transport, protocol = await get_running_loop().create_datagram_endpoint(
10✔
2339
            DatagramProtocol,
2340
            local_addr=local_address,
2341
            remote_addr=remote_address,
2342
            family=family,
2343
            reuse_port=reuse_port,
2344
        )
2345
        if protocol.exception:
10✔
2346
            transport.close()
×
2347
            raise protocol.exception
×
2348

2349
        if not remote_address:
10✔
2350
            return UDPSocket(transport, protocol)
10✔
2351
        else:
2352
            return ConnectedUDPSocket(transport, protocol)
10✔
2353

2354
    @classmethod
11✔
2355
    async def create_unix_datagram_socket(  # type: ignore[override]
11✔
2356
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2357
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2358
        await cls.checkpoint()
8✔
2359
        loop = get_running_loop()
8✔
2360

2361
        if remote_path:
8✔
2362
            while True:
5✔
2363
                try:
8✔
2364
                    raw_socket.connect(remote_path)
8✔
2365
                except BlockingIOError:
×
2366
                    f: asyncio.Future = asyncio.Future()
×
2367
                    loop.add_writer(raw_socket, f.set_result, None)
×
2368
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2369
                    await f
×
2370
                except BaseException:
×
2371
                    raw_socket.close()
×
2372
                    raise
×
2373
                else:
2374
                    return ConnectedUNIXDatagramSocket(raw_socket)
8✔
2375
        else:
2376
            return UNIXDatagramSocket(raw_socket)
8✔
2377

2378
    @classmethod
11✔
2379
    async def getaddrinfo(
11✔
2380
        cls,
2381
        host: bytes | str | None,
2382
        port: str | int | None,
2383
        *,
2384
        family: int | AddressFamily = 0,
2385
        type: int | SocketKind = 0,
2386
        proto: int = 0,
2387
        flags: int = 0,
2388
    ) -> list[
2389
        tuple[
2390
            AddressFamily,
2391
            SocketKind,
2392
            int,
2393
            str,
2394
            tuple[str, int] | tuple[str, int, int, int],
2395
        ]
2396
    ]:
2397
        return await get_running_loop().getaddrinfo(
11✔
2398
            host, port, family=family, type=type, proto=proto, flags=flags
2399
        )
2400

2401
    @classmethod
11✔
2402
    async def getnameinfo(
11✔
2403
        cls, sockaddr: IPSockAddrType, flags: int = 0
2404
    ) -> tuple[str, str]:
2405
        return await get_running_loop().getnameinfo(sockaddr, flags)
10✔
2406

2407
    @classmethod
11✔
2408
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
11✔
2409
        await cls.checkpoint()
×
2410
        try:
×
2411
            read_events = _read_events.get()
×
2412
        except LookupError:
×
2413
            read_events = {}
×
2414
            _read_events.set(read_events)
×
2415

2416
        if read_events.get(sock):
×
2417
            raise BusyResourceError("reading from") from None
×
2418

2419
        loop = get_running_loop()
×
2420
        event = read_events[sock] = asyncio.Event()
×
2421
        loop.add_reader(sock, event.set)
×
2422
        try:
×
2423
            await event.wait()
×
2424
        finally:
2425
            if read_events.pop(sock, None) is not None:
×
2426
                loop.remove_reader(sock)
×
2427
                readable = True
×
2428
            else:
2429
                readable = False
×
2430

2431
        if not readable:
×
2432
            raise ClosedResourceError
×
2433

2434
    @classmethod
11✔
2435
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
11✔
2436
        await cls.checkpoint()
×
2437
        try:
×
2438
            write_events = _write_events.get()
×
2439
        except LookupError:
×
2440
            write_events = {}
×
2441
            _write_events.set(write_events)
×
2442

2443
        if write_events.get(sock):
×
2444
            raise BusyResourceError("writing to") from None
×
2445

2446
        loop = get_running_loop()
×
2447
        event = write_events[sock] = asyncio.Event()
×
2448
        loop.add_writer(sock.fileno(), event.set)
×
2449
        try:
×
2450
            await event.wait()
×
2451
        finally:
2452
            if write_events.pop(sock, None) is not None:
×
2453
                loop.remove_writer(sock)
×
2454
                writable = True
×
2455
            else:
2456
                writable = False
×
2457

2458
        if not writable:
×
2459
            raise ClosedResourceError
×
2460

2461
    @classmethod
11✔
2462
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
2463
        try:
11✔
2464
            return _default_thread_limiter.get()
11✔
2465
        except LookupError:
11✔
2466
            limiter = CapacityLimiter(40)
11✔
2467
            _default_thread_limiter.set(limiter)
11✔
2468
            return limiter
11✔
2469

2470
    @classmethod
11✔
2471
    def open_signal_receiver(
11✔
2472
        cls, *signals: Signals
2473
    ) -> ContextManager[AsyncIterator[Signals]]:
2474
        return _SignalReceiver(signals)
9✔
2475

2476
    @classmethod
11✔
2477
    def get_current_task(cls) -> TaskInfo:
11✔
2478
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
11✔
2479

2480
    @classmethod
11✔
2481
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
2482
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
11✔
2483

2484
    @classmethod
11✔
2485
    async def wait_all_tasks_blocked(cls) -> None:
11✔
2486
        await cls.checkpoint()
11✔
2487
        this_task = current_task()
11✔
2488
        while True:
7✔
2489
            for task in all_tasks():
11✔
2490
                if task is this_task:
11✔
2491
                    continue
11✔
2492

2493
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
2494
                if waiter is None or waiter.done():
11✔
2495
                    await sleep(0.1)
11✔
2496
                    break
11✔
2497
            else:
2498
                return
11✔
2499

2500
    @classmethod
11✔
2501
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
2502
        return TestRunner(**options)
11✔
2503

2504

2505
backend_class = AsyncIOBackend
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc