• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 9634469595

23 Jun 2024 03:13PM UTC coverage: 91.579% (+0.08%) from 91.503%
9634469595

Pull #749

github

web-flow
Merge 9c683842b into dc7ba6070
Pull Request #749: Added more process arguments

36 of 37 new or added lines in 4 files covered. (97.3%)

1 existing line in 1 file now uncovered.

4589 of 5011 relevant lines covered (91.58%)

8.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.22
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
10✔
2

3
import array
10✔
4
import asyncio
10✔
5
import concurrent.futures
10✔
6
import math
10✔
7
import pathlib
10✔
8
import socket
10✔
9
import sys
10✔
10
import threading
10✔
11
import weakref
10✔
12
from asyncio import (
10✔
13
    AbstractEventLoop,
14
    CancelledError,
15
    all_tasks,
16
    create_task,
17
    current_task,
18
    get_running_loop,
19
    sleep,
20
)
21
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
10✔
22
from collections import OrderedDict, deque
10✔
23
from collections.abc import AsyncIterator, Generator, Iterable
10✔
24
from concurrent.futures import Future
10✔
25
from contextlib import suppress
10✔
26
from contextvars import Context, copy_context
10✔
27
from dataclasses import dataclass
10✔
28
from functools import partial, wraps
10✔
29
from inspect import (
10✔
30
    CORO_RUNNING,
31
    CORO_SUSPENDED,
32
    getcoroutinestate,
33
    iscoroutine,
34
)
35
from io import IOBase
10✔
36
from os import PathLike
10✔
37
from queue import Queue
10✔
38
from signal import Signals
10✔
39
from socket import AddressFamily, SocketKind
10✔
40
from threading import Thread
10✔
41
from types import TracebackType
10✔
42
from typing import (
10✔
43
    IO,
44
    Any,
45
    AsyncGenerator,
46
    Awaitable,
47
    Callable,
48
    Collection,
49
    ContextManager,
50
    Coroutine,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
10✔
58

59
import sniffio
10✔
60

61
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
10✔
62
from .._core._eventloop import claim_worker_thread, threadlocals
10✔
63
from .._core._exceptions import (
10✔
64
    BrokenResourceError,
65
    BusyResourceError,
66
    ClosedResourceError,
67
    EndOfStream,
68
    WouldBlock,
69
)
70
from .._core._sockets import convert_ipv6_sockaddr
10✔
71
from .._core._streams import create_memory_object_stream
10✔
72
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
10✔
73
from .._core._synchronization import Event as BaseEvent
10✔
74
from .._core._synchronization import ResourceGuard
10✔
75
from .._core._tasks import CancelScope as BaseCancelScope
10✔
76
from ..abc import (
10✔
77
    AsyncBackend,
78
    IPSockAddrType,
79
    SocketListener,
80
    UDPPacketType,
81
    UNIXDatagramPacketType,
82
)
83
from ..abc._eventloop import StrOrBytesPath
10✔
84
from ..lowlevel import RunVar
10✔
85
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10✔
86

87
if sys.version_info >= (3, 10):
10✔
88
    from typing import ParamSpec
6✔
89
else:
90
    from typing_extensions import ParamSpec
4✔
91

92
if sys.version_info >= (3, 11):
10✔
93
    from asyncio import Runner
4✔
94
    from typing import TypeVarTuple, Unpack
4✔
95
else:
96
    import contextvars
6✔
97
    import enum
6✔
98
    import signal
6✔
99
    from asyncio import coroutines, events, exceptions, tasks
6✔
100

101
    from exceptiongroup import BaseExceptionGroup
6✔
102
    from typing_extensions import TypeVarTuple, Unpack
6✔
103

104
    class _State(enum.Enum):
6✔
105
        CREATED = "created"
6✔
106
        INITIALIZED = "initialized"
6✔
107
        CLOSED = "closed"
6✔
108

109
    class Runner:
6✔
110
        # Copied from CPython 3.11
111
        def __init__(
6✔
112
            self,
113
            *,
114
            debug: bool | None = None,
115
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
116
        ):
117
            self._state = _State.CREATED
6✔
118
            self._debug = debug
6✔
119
            self._loop_factory = loop_factory
6✔
120
            self._loop: AbstractEventLoop | None = None
6✔
121
            self._context = None
6✔
122
            self._interrupt_count = 0
6✔
123
            self._set_event_loop = False
6✔
124

125
        def __enter__(self) -> Runner:
6✔
126
            self._lazy_init()
6✔
127
            return self
6✔
128

129
        def __exit__(
6✔
130
            self,
131
            exc_type: type[BaseException],
132
            exc_val: BaseException,
133
            exc_tb: TracebackType,
134
        ) -> None:
135
            self.close()
6✔
136

137
        def close(self) -> None:
6✔
138
            """Shutdown and close event loop."""
139
            if self._state is not _State.INITIALIZED:
6✔
140
                return
×
141
            try:
6✔
142
                loop = self._loop
6✔
143
                _cancel_all_tasks(loop)
6✔
144
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
145
                if hasattr(loop, "shutdown_default_executor"):
6✔
146
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
147
                else:
148
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
149
            finally:
150
                if self._set_event_loop:
6✔
151
                    events.set_event_loop(None)
6✔
152
                loop.close()
6✔
153
                self._loop = None
6✔
154
                self._state = _State.CLOSED
6✔
155

156
        def get_loop(self) -> AbstractEventLoop:
6✔
157
            """Return embedded event loop."""
158
            self._lazy_init()
6✔
159
            return self._loop
6✔
160

161
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
162
            """Run a coroutine inside the embedded event loop."""
163
            if not coroutines.iscoroutine(coro):
6✔
164
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
165

166
            if events._get_running_loop() is not None:
6✔
167
                # fail fast with short traceback
168
                raise RuntimeError(
×
169
                    "Runner.run() cannot be called from a running event loop"
170
                )
171

172
            self._lazy_init()
6✔
173

174
            if context is None:
6✔
175
                context = self._context
6✔
176
            task = context.run(self._loop.create_task, coro)
6✔
177

178
            if (
6✔
179
                threading.current_thread() is threading.main_thread()
180
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
181
            ):
182
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
183
                try:
6✔
184
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
185
                except ValueError:
×
186
                    # `signal.signal` may throw if `threading.main_thread` does
187
                    # not support signals (e.g. embedded interpreter with signals
188
                    # not registered - see gh-91880)
189
                    sigint_handler = None
×
190
            else:
191
                sigint_handler = None
6✔
192

193
            self._interrupt_count = 0
6✔
194
            try:
6✔
195
                return self._loop.run_until_complete(task)
6✔
196
            except exceptions.CancelledError:
6✔
197
                if self._interrupt_count > 0:
×
198
                    uncancel = getattr(task, "uncancel", None)
×
199
                    if uncancel is not None and uncancel() == 0:
×
200
                        raise KeyboardInterrupt()
1✔
UNCOV
201
                raise  # CancelledError
×
202
            finally:
203
                if (
6✔
204
                    sigint_handler is not None
205
                    and signal.getsignal(signal.SIGINT) is sigint_handler
206
                ):
207
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
208

209
        def _lazy_init(self) -> None:
6✔
210
            if self._state is _State.CLOSED:
6✔
211
                raise RuntimeError("Runner is closed")
×
212
            if self._state is _State.INITIALIZED:
6✔
213
                return
6✔
214
            if self._loop_factory is None:
6✔
215
                self._loop = events.new_event_loop()
6✔
216
                if not self._set_event_loop:
6✔
217
                    # Call set_event_loop only once to avoid calling
218
                    # attach_loop multiple times on child watchers
219
                    events.set_event_loop(self._loop)
6✔
220
                    self._set_event_loop = True
6✔
221
            else:
222
                self._loop = self._loop_factory()
4✔
223
            if self._debug is not None:
6✔
224
                self._loop.set_debug(self._debug)
6✔
225
            self._context = contextvars.copy_context()
6✔
226
            self._state = _State.INITIALIZED
6✔
227

228
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
229
            self._interrupt_count += 1
×
230
            if self._interrupt_count == 1 and not main_task.done():
×
231
                main_task.cancel()
×
232
                # wakeup loop if it is blocked by select() with long timeout
233
                self._loop.call_soon_threadsafe(lambda: None)
×
234
                return
×
235
            raise KeyboardInterrupt()
×
236

237
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
238
        to_cancel = tasks.all_tasks(loop)
6✔
239
        if not to_cancel:
6✔
240
            return
7✔
241

242
        for task in to_cancel:
6✔
243
            task.cancel()
6✔
244

245
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
246

247
        for task in to_cancel:
6✔
248
            if task.cancelled():
6✔
249
                continue
6✔
250
            if task.exception() is not None:
5✔
251
                loop.call_exception_handler(
×
252
                    {
253
                        "message": "unhandled exception during asyncio.run() shutdown",
254
                        "exception": task.exception(),
255
                        "task": task,
256
                    }
257
                )
258

259
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
260
        """Schedule the shutdown of the default executor."""
261

262
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
263
            try:
3✔
264
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
265
                loop.call_soon_threadsafe(future.set_result, None)
3✔
266
            except Exception as ex:
×
267
                loop.call_soon_threadsafe(future.set_exception, ex)
×
268

269
        loop._executor_shutdown_called = True
3✔
270
        if loop._default_executor is None:
3✔
271
            return
3✔
272
        future = loop.create_future()
3✔
273
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
274
        thread.start()
3✔
275
        try:
3✔
276
            await future
3✔
277
        finally:
278
            thread.join()
3✔
279

280

281
T_Retval = TypeVar("T_Retval")
10✔
282
T_contra = TypeVar("T_contra", contravariant=True)
10✔
283
PosArgsT = TypeVarTuple("PosArgsT")
10✔
284
P = ParamSpec("P")
10✔
285

286
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
10✔
287

288

289
def find_root_task() -> asyncio.Task:
10✔
290
    root_task = _root_task.get(None)
10✔
291
    if root_task is not None and not root_task.done():
10✔
292
        return root_task
10✔
293

294
    # Look for a task that has been started via run_until_complete()
295
    for task in all_tasks():
10✔
296
        if task._callbacks and not task.done():
10✔
297
            callbacks = [cb for cb, context in task._callbacks]
10✔
298
            for cb in callbacks:
10✔
299
                if (
10✔
300
                    cb is _run_until_complete_cb
301
                    or getattr(cb, "__module__", None) == "uvloop.loop"
302
                ):
303
                    _root_task.set(task)
10✔
304
                    return task
10✔
305

306
    # Look up the topmost task in the AnyIO task tree, if possible
307
    task = cast(asyncio.Task, current_task())
9✔
308
    state = _task_states.get(task)
9✔
309
    if state:
9✔
310
        cancel_scope = state.cancel_scope
9✔
311
        while cancel_scope and cancel_scope._parent_scope is not None:
9✔
312
            cancel_scope = cancel_scope._parent_scope
×
313

314
        if cancel_scope is not None:
9✔
315
            return cast(asyncio.Task, cancel_scope._host_task)
9✔
316

317
    return task
×
318

319

320
def get_callable_name(func: Callable) -> str:
10✔
321
    module = getattr(func, "__module__", None)
10✔
322
    qualname = getattr(func, "__qualname__", None)
10✔
323
    return ".".join([x for x in (module, qualname) if x])
10✔
324

325

326
#
327
# Event loop
328
#
329

330
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
10✔
331

332

333
def _task_started(task: asyncio.Task) -> bool:
10✔
334
    """Return ``True`` if the task has been started and has not finished."""
335
    try:
10✔
336
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
10✔
337
    except AttributeError:
×
338
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
339
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
340

341

342
#
343
# Timeouts and cancellation
344
#
345

346

347
class CancelScope(BaseCancelScope):
10✔
348
    def __new__(
10✔
349
        cls, *, deadline: float = math.inf, shield: bool = False
350
    ) -> CancelScope:
351
        return object.__new__(cls)
10✔
352

353
    def __init__(self, deadline: float = math.inf, shield: bool = False):
10✔
354
        self._deadline = deadline
10✔
355
        self._shield = shield
10✔
356
        self._parent_scope: CancelScope | None = None
10✔
357
        self._child_scopes: set[CancelScope] = set()
10✔
358
        self._cancel_called = False
10✔
359
        self._cancelled_caught = False
10✔
360
        self._active = False
10✔
361
        self._timeout_handle: asyncio.TimerHandle | None = None
10✔
362
        self._cancel_handle: asyncio.Handle | None = None
10✔
363
        self._tasks: set[asyncio.Task] = set()
10✔
364
        self._host_task: asyncio.Task | None = None
10✔
365
        self._cancel_calls: int = 0
10✔
366
        self._cancelling: int | None = None
10✔
367

368
    def __enter__(self) -> CancelScope:
10✔
369
        if self._active:
10✔
370
            raise RuntimeError(
×
371
                "Each CancelScope may only be used for a single 'with' block"
372
            )
373

374
        self._host_task = host_task = cast(asyncio.Task, current_task())
10✔
375
        self._tasks.add(host_task)
10✔
376
        try:
10✔
377
            task_state = _task_states[host_task]
10✔
378
        except KeyError:
10✔
379
            task_state = TaskState(None, self)
10✔
380
            _task_states[host_task] = task_state
10✔
381
        else:
382
            self._parent_scope = task_state.cancel_scope
10✔
383
            task_state.cancel_scope = self
10✔
384
            if self._parent_scope is not None:
10✔
385
                self._parent_scope._child_scopes.add(self)
10✔
386
                self._parent_scope._tasks.remove(host_task)
10✔
387

388
        self._timeout()
10✔
389
        self._active = True
10✔
390
        if sys.version_info >= (3, 11):
10✔
391
            self._cancelling = self._host_task.cancelling()
4✔
392

393
        # Start cancelling the host task if the scope was cancelled before entering
394
        if self._cancel_called:
10✔
395
            self._deliver_cancellation(self)
10✔
396

397
        return self
10✔
398

399
    def __exit__(
10✔
400
        self,
401
        exc_type: type[BaseException] | None,
402
        exc_val: BaseException | None,
403
        exc_tb: TracebackType | None,
404
    ) -> bool | None:
405
        if not self._active:
10✔
406
            raise RuntimeError("This cancel scope is not active")
9✔
407
        if current_task() is not self._host_task:
10✔
408
            raise RuntimeError(
9✔
409
                "Attempted to exit cancel scope in a different task than it was "
410
                "entered in"
411
            )
412

413
        assert self._host_task is not None
10✔
414
        host_task_state = _task_states.get(self._host_task)
10✔
415
        if host_task_state is None or host_task_state.cancel_scope is not self:
10✔
416
            raise RuntimeError(
9✔
417
                "Attempted to exit a cancel scope that isn't the current tasks's "
418
                "current cancel scope"
419
            )
420

421
        self._active = False
10✔
422
        if self._timeout_handle:
10✔
423
            self._timeout_handle.cancel()
10✔
424
            self._timeout_handle = None
10✔
425

426
        self._tasks.remove(self._host_task)
10✔
427
        if self._parent_scope is not None:
10✔
428
            self._parent_scope._child_scopes.remove(self)
10✔
429
            self._parent_scope._tasks.add(self._host_task)
10✔
430

431
        host_task_state.cancel_scope = self._parent_scope
10✔
432

433
        # Restart the cancellation effort in the closest directly cancelled parent
434
        # scope if this one was shielded
435
        self._restart_cancellation_in_parent()
10✔
436

437
        if self._cancel_called and exc_val is not None:
10✔
438
            for exc in iterate_exceptions(exc_val):
10✔
439
                if isinstance(exc, CancelledError):
10✔
440
                    self._cancelled_caught = self._uncancel(exc)
10✔
441
                    if self._cancelled_caught:
10✔
442
                        break
10✔
443

444
            return self._cancelled_caught
10✔
445

446
        return None
10✔
447

448
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
10✔
449
        if sys.version_info < (3, 9) or self._host_task is None:
10✔
450
            self._cancel_calls = 0
3✔
451
            return True
3✔
452

453
        # Undo all cancellations done by this scope
454
        if self._cancelling is not None:
7✔
455
            while self._cancel_calls:
4✔
456
                self._cancel_calls -= 1
4✔
457
                if self._host_task.uncancel() <= self._cancelling:
4✔
458
                    return True
4✔
459

460
        self._cancel_calls = 0
7✔
461
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
7✔
462

463
    def _timeout(self) -> None:
10✔
464
        if self._deadline != math.inf:
10✔
465
            loop = get_running_loop()
10✔
466
            if loop.time() >= self._deadline:
10✔
467
                self.cancel()
10✔
468
            else:
469
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
10✔
470

471
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
10✔
472
        """
473
        Deliver cancellation to directly contained tasks and nested cancel scopes.
474

475
        Schedule another run at the end if we still have tasks eligible for
476
        cancellation.
477

478
        :param origin: the cancel scope that originated the cancellation
479
        :return: ``True`` if the delivery needs to be retried on the next cycle
480

481
        """
482
        should_retry = False
10✔
483
        current = current_task()
10✔
484
        for task in self._tasks:
10✔
485
            if task._must_cancel:  # type: ignore[attr-defined]
10✔
486
                continue
9✔
487

488
            # The task is eligible for cancellation if it has started
489
            should_retry = True
10✔
490
            if task is not current and (task is self._host_task or _task_started(task)):
10✔
491
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
492
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
10✔
493
                    origin._cancel_calls += 1
10✔
494
                    if sys.version_info >= (3, 9):
10✔
495
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
7✔
496
                    else:
497
                        task.cancel()
3✔
498

499
        # Deliver cancellation to child scopes that aren't shielded or running their own
500
        # cancellation callbacks
501
        for scope in self._child_scopes:
10✔
502
            if not scope._shield and not scope.cancel_called:
10✔
503
                should_retry = scope._deliver_cancellation(origin) or should_retry
10✔
504

505
        # Schedule another callback if there are still tasks left
506
        if origin is self:
10✔
507
            if should_retry:
10✔
508
                self._cancel_handle = get_running_loop().call_soon(
10✔
509
                    self._deliver_cancellation, origin
510
                )
511
            else:
512
                self._cancel_handle = None
10✔
513

514
        return should_retry
10✔
515

516
    def _restart_cancellation_in_parent(self) -> None:
10✔
517
        """
518
        Restart the cancellation effort in the closest directly cancelled parent scope.
519

520
        """
521
        scope = self._parent_scope
10✔
522
        while scope is not None:
10✔
523
            if scope._cancel_called:
10✔
524
                if scope._cancel_handle is None:
10✔
525
                    scope._deliver_cancellation(scope)
10✔
526

527
                break
10✔
528

529
            # No point in looking beyond any shielded scope
530
            if scope._shield:
10✔
531
                break
9✔
532

533
            scope = scope._parent_scope
10✔
534

535
    def _parent_cancelled(self) -> bool:
10✔
536
        # Check whether any parent has been cancelled
537
        cancel_scope = self._parent_scope
10✔
538
        while cancel_scope is not None and not cancel_scope._shield:
10✔
539
            if cancel_scope._cancel_called:
10✔
540
                return True
10✔
541
            else:
542
                cancel_scope = cancel_scope._parent_scope
10✔
543

544
        return False
10✔
545

546
    def cancel(self) -> None:
10✔
547
        if not self._cancel_called:
10✔
548
            if self._timeout_handle:
10✔
549
                self._timeout_handle.cancel()
10✔
550
                self._timeout_handle = None
10✔
551

552
            self._cancel_called = True
10✔
553
            if self._host_task is not None:
10✔
554
                self._deliver_cancellation(self)
10✔
555

556
    @property
10✔
557
    def deadline(self) -> float:
10✔
558
        return self._deadline
9✔
559

560
    @deadline.setter
10✔
561
    def deadline(self, value: float) -> None:
10✔
562
        self._deadline = float(value)
9✔
563
        if self._timeout_handle is not None:
9✔
564
            self._timeout_handle.cancel()
9✔
565
            self._timeout_handle = None
9✔
566

567
        if self._active and not self._cancel_called:
9✔
568
            self._timeout()
9✔
569

570
    @property
10✔
571
    def cancel_called(self) -> bool:
10✔
572
        return self._cancel_called
10✔
573

574
    @property
10✔
575
    def cancelled_caught(self) -> bool:
10✔
576
        return self._cancelled_caught
10✔
577

578
    @property
10✔
579
    def shield(self) -> bool:
10✔
580
        return self._shield
10✔
581

582
    @shield.setter
10✔
583
    def shield(self, value: bool) -> None:
10✔
584
        if self._shield != value:
9✔
585
            self._shield = value
9✔
586
            if not value:
9✔
587
                self._restart_cancellation_in_parent()
9✔
588

589

590
#
591
# Task states
592
#
593

594

595
class TaskState:
10✔
596
    """
597
    Encapsulates auxiliary task information that cannot be added to the Task instance
598
    itself because there are no guarantees about its implementation.
599
    """
600

601
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
10✔
602

603
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
10✔
604
        self.parent_id = parent_id
10✔
605
        self.cancel_scope = cancel_scope
10✔
606

607

608
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
10✔
609

610

611
#
612
# Task groups
613
#
614

615

616
class _AsyncioTaskStatus(abc.TaskStatus):
10✔
617
    def __init__(self, future: asyncio.Future, parent_id: int):
10✔
618
        self._future = future
10✔
619
        self._parent_id = parent_id
10✔
620

621
    def started(self, value: T_contra | None = None) -> None:
10✔
622
        try:
10✔
623
            self._future.set_result(value)
10✔
624
        except asyncio.InvalidStateError:
9✔
625
            if not self._future.cancelled():
9✔
626
                raise RuntimeError(
9✔
627
                    "called 'started' twice on the same task status"
628
                ) from None
629

630
        task = cast(asyncio.Task, current_task())
10✔
631
        _task_states[task].parent_id = self._parent_id
10✔
632

633

634
def iterate_exceptions(
10✔
635
    exception: BaseException,
636
) -> Generator[BaseException, None, None]:
637
    if isinstance(exception, BaseExceptionGroup):
10✔
638
        for exc in exception.exceptions:
9✔
639
            yield from iterate_exceptions(exc)
9✔
640
    else:
641
        yield exception
10✔
642

643

644
class TaskGroup(abc.TaskGroup):
10✔
645
    def __init__(self) -> None:
10✔
646
        self.cancel_scope: CancelScope = CancelScope()
10✔
647
        self._active = False
10✔
648
        self._exceptions: list[BaseException] = []
10✔
649
        self._tasks: set[asyncio.Task] = set()
10✔
650

651
    async def __aenter__(self) -> TaskGroup:
10✔
652
        self.cancel_scope.__enter__()
10✔
653
        self._active = True
10✔
654
        return self
10✔
655

656
    async def __aexit__(
10✔
657
        self,
658
        exc_type: type[BaseException] | None,
659
        exc_val: BaseException | None,
660
        exc_tb: TracebackType | None,
661
    ) -> bool | None:
662
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
10✔
663
        if exc_val is not None:
10✔
664
            self.cancel_scope.cancel()
10✔
665
            if not isinstance(exc_val, CancelledError):
10✔
666
                self._exceptions.append(exc_val)
10✔
667

668
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
10✔
669
        while self._tasks:
10✔
670
            try:
10✔
671
                await asyncio.wait(self._tasks)
10✔
672
            except CancelledError as exc:
10✔
673
                # This task was cancelled natively; reraise the CancelledError later
674
                # unless this task was already interrupted by another exception
675
                self.cancel_scope.cancel()
10✔
676
                if cancelled_exc_while_waiting_tasks is None:
10✔
677
                    cancelled_exc_while_waiting_tasks = exc
10✔
678

679
        self._active = False
10✔
680
        if self._exceptions:
10✔
681
            raise BaseExceptionGroup(
10✔
682
                "unhandled errors in a TaskGroup", self._exceptions
683
            )
684

685
        # Raise the CancelledError received while waiting for child tasks to exit,
686
        # unless the context manager itself was previously exited with another
687
        # exception, or if any of the  child tasks raised an exception other than
688
        # CancelledError
689
        if cancelled_exc_while_waiting_tasks:
10✔
690
            if exc_val is None or ignore_exception:
10✔
691
                raise cancelled_exc_while_waiting_tasks
10✔
692

693
        return ignore_exception
10✔
694

695
    def _spawn(
10✔
696
        self,
697
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
698
        args: tuple[Unpack[PosArgsT]],
699
        name: object,
700
        task_status_future: asyncio.Future | None = None,
701
    ) -> asyncio.Task:
702
        def task_done(_task: asyncio.Task) -> None:
10✔
703
            task_state = _task_states[_task]
10✔
704
            assert task_state.cancel_scope is not None
10✔
705
            assert _task in task_state.cancel_scope._tasks
10✔
706
            task_state.cancel_scope._tasks.remove(_task)
10✔
707
            self._tasks.remove(task)
10✔
708
            del _task_states[_task]
10✔
709

710
            try:
10✔
711
                exc = _task.exception()
10✔
712
            except CancelledError as e:
10✔
713
                while isinstance(e.__context__, CancelledError):
10✔
714
                    e = e.__context__
3✔
715

716
                exc = e
10✔
717

718
            if exc is not None:
10✔
719
                # The future can only be in the cancelled state if the host task was
720
                # cancelled, so return immediately instead of adding one more
721
                # CancelledError to the exceptions list
722
                if task_status_future is not None and task_status_future.cancelled():
10✔
723
                    return
9✔
724

725
                if task_status_future is None or task_status_future.done():
10✔
726
                    if not isinstance(exc, CancelledError):
10✔
727
                        self._exceptions.append(exc)
10✔
728

729
                    if not self.cancel_scope._parent_cancelled():
10✔
730
                        self.cancel_scope.cancel()
10✔
731
                else:
732
                    task_status_future.set_exception(exc)
9✔
733
            elif task_status_future is not None and not task_status_future.done():
10✔
734
                task_status_future.set_exception(
9✔
735
                    RuntimeError("Child exited without calling task_status.started()")
736
                )
737

738
        if not self._active:
10✔
739
            raise RuntimeError(
10✔
740
                "This task group is not active; no new tasks can be started."
741
            )
742

743
        kwargs = {}
10✔
744
        if task_status_future:
10✔
745
            parent_id = id(current_task())
10✔
746
            kwargs["task_status"] = _AsyncioTaskStatus(
10✔
747
                task_status_future, id(self.cancel_scope._host_task)
748
            )
749
        else:
750
            parent_id = id(self.cancel_scope._host_task)
10✔
751

752
        coro = func(*args, **kwargs)
10✔
753
        if not iscoroutine(coro):
10✔
754
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
9✔
755
            raise TypeError(
9✔
756
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
757
                f"the return value ({coro!r}) is not a coroutine object"
758
            )
759

760
        name = get_callable_name(func) if name is None else str(name)
10✔
761
        task = create_task(coro, name=name)
10✔
762
        task.add_done_callback(task_done)
10✔
763

764
        # Make the spawned task inherit the task group's cancel scope
765
        _task_states[task] = TaskState(
10✔
766
            parent_id=parent_id, cancel_scope=self.cancel_scope
767
        )
768
        self.cancel_scope._tasks.add(task)
10✔
769
        self._tasks.add(task)
10✔
770
        return task
10✔
771

772
    def start_soon(
10✔
773
        self,
774
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
775
        *args: Unpack[PosArgsT],
776
        name: object = None,
777
    ) -> None:
778
        self._spawn(func, args, name)
10✔
779

780
    async def start(
10✔
781
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
782
    ) -> Any:
783
        future: asyncio.Future = asyncio.Future()
10✔
784
        task = self._spawn(func, args, name, future)
10✔
785

786
        # If the task raises an exception after sending a start value without a switch
787
        # point between, the task group is cancelled and this method never proceeds to
788
        # process the completed future. That's why we have to have a shielded cancel
789
        # scope here.
790
        try:
10✔
791
            return await future
10✔
792
        except CancelledError:
9✔
793
            # Cancel the task and wait for it to exit before returning
794
            task.cancel()
9✔
795
            with CancelScope(shield=True), suppress(CancelledError):
9✔
796
                await task
9✔
797

798
            raise
9✔
799

800

801
#
802
# Threads
803
#
804

805
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
10✔
806

807

808
class WorkerThread(Thread):
10✔
809
    MAX_IDLE_TIME = 10  # seconds
10✔
810

811
    def __init__(
10✔
812
        self,
813
        root_task: asyncio.Task,
814
        workers: set[WorkerThread],
815
        idle_workers: deque[WorkerThread],
816
    ):
817
        super().__init__(name="AnyIO worker thread")
10✔
818
        self.root_task = root_task
10✔
819
        self.workers = workers
10✔
820
        self.idle_workers = idle_workers
10✔
821
        self.loop = root_task._loop
10✔
822
        self.queue: Queue[
10✔
823
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
824
        ] = Queue(2)
825
        self.idle_since = AsyncIOBackend.current_time()
10✔
826
        self.stopping = False
10✔
827

828
    def _report_result(
10✔
829
        self, future: asyncio.Future, result: Any, exc: BaseException | None
830
    ) -> None:
831
        self.idle_since = AsyncIOBackend.current_time()
10✔
832
        if not self.stopping:
10✔
833
            self.idle_workers.append(self)
10✔
834

835
        if not future.cancelled():
10✔
836
            if exc is not None:
10✔
837
                if isinstance(exc, StopIteration):
10✔
838
                    new_exc = RuntimeError("coroutine raised StopIteration")
9✔
839
                    new_exc.__cause__ = exc
9✔
840
                    exc = new_exc
9✔
841

842
                future.set_exception(exc)
10✔
843
            else:
844
                future.set_result(result)
10✔
845

846
    def run(self) -> None:
10✔
847
        with claim_worker_thread(AsyncIOBackend, self.loop):
10✔
848
            while True:
6✔
849
                item = self.queue.get()
10✔
850
                if item is None:
10✔
851
                    # Shutdown command received
852
                    return
10✔
853

854
                context, func, args, future, cancel_scope = item
10✔
855
                if not future.cancelled():
10✔
856
                    result = None
10✔
857
                    exception: BaseException | None = None
10✔
858
                    threadlocals.current_cancel_scope = cancel_scope
10✔
859
                    try:
10✔
860
                        result = context.run(func, *args)
10✔
861
                    except BaseException as exc:
10✔
862
                        exception = exc
10✔
863
                    finally:
864
                        del threadlocals.current_cancel_scope
10✔
865

866
                    if not self.loop.is_closed():
10✔
867
                        self.loop.call_soon_threadsafe(
10✔
868
                            self._report_result, future, result, exception
869
                        )
870

871
                self.queue.task_done()
10✔
872

873
    def stop(self, f: asyncio.Task | None = None) -> None:
10✔
874
        self.stopping = True
10✔
875
        self.queue.put_nowait(None)
10✔
876
        self.workers.discard(self)
10✔
877
        try:
10✔
878
            self.idle_workers.remove(self)
10✔
879
        except ValueError:
9✔
880
            pass
9✔
881

882

883
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
10✔
884
    "_threadpool_idle_workers"
885
)
886
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
10✔
887

888

889
class BlockingPortal(abc.BlockingPortal):
10✔
890
    def __new__(cls) -> BlockingPortal:
10✔
891
        return object.__new__(cls)
10✔
892

893
    def __init__(self) -> None:
10✔
894
        super().__init__()
10✔
895
        self._loop = get_running_loop()
10✔
896

897
    def _spawn_task_from_thread(
10✔
898
        self,
899
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
900
        args: tuple[Unpack[PosArgsT]],
901
        kwargs: dict[str, Any],
902
        name: object,
903
        future: Future[T_Retval],
904
    ) -> None:
905
        AsyncIOBackend.run_sync_from_thread(
10✔
906
            partial(self._task_group.start_soon, name=name),
907
            (self._call_func, func, args, kwargs, future),
908
            self._loop,
909
        )
910

911

912
#
913
# Subprocesses
914
#
915

916

917
@dataclass(eq=False)
10✔
918
class StreamReaderWrapper(abc.ByteReceiveStream):
10✔
919
    _stream: asyncio.StreamReader
10✔
920

921
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
922
        data = await self._stream.read(max_bytes)
9✔
923
        if data:
9✔
924
            return data
9✔
925
        else:
926
            raise EndOfStream
9✔
927

928
    async def aclose(self) -> None:
10✔
929
        self._stream.feed_eof()
9✔
930
        await AsyncIOBackend.checkpoint()
9✔
931

932

933
@dataclass(eq=False)
10✔
934
class StreamWriterWrapper(abc.ByteSendStream):
10✔
935
    _stream: asyncio.StreamWriter
10✔
936

937
    async def send(self, item: bytes) -> None:
10✔
938
        self._stream.write(item)
9✔
939
        await self._stream.drain()
9✔
940

941
    async def aclose(self) -> None:
10✔
942
        self._stream.close()
9✔
943
        await AsyncIOBackend.checkpoint()
9✔
944

945

946
@dataclass(eq=False)
10✔
947
class Process(abc.Process):
10✔
948
    _process: asyncio.subprocess.Process
10✔
949
    _stdin: StreamWriterWrapper | None
10✔
950
    _stdout: StreamReaderWrapper | None
10✔
951
    _stderr: StreamReaderWrapper | None
10✔
952

953
    async def aclose(self) -> None:
10✔
954
        with CancelScope(shield=True):
9✔
955
            if self._stdin:
9✔
956
                await self._stdin.aclose()
9✔
957
            if self._stdout:
9✔
958
                await self._stdout.aclose()
9✔
959
            if self._stderr:
9✔
960
                await self._stderr.aclose()
9✔
961

962
        try:
9✔
963
            await self.wait()
9✔
964
        except BaseException:
9✔
965
            self.kill()
9✔
966
            with CancelScope(shield=True):
9✔
967
                await self.wait()
9✔
968

969
            raise
9✔
970

971
    async def wait(self) -> int:
10✔
972
        return await self._process.wait()
9✔
973

974
    def terminate(self) -> None:
10✔
975
        self._process.terminate()
7✔
976

977
    def kill(self) -> None:
10✔
978
        self._process.kill()
9✔
979

980
    def send_signal(self, signal: int) -> None:
10✔
981
        self._process.send_signal(signal)
×
982

983
    @property
10✔
984
    def pid(self) -> int:
10✔
985
        return self._process.pid
×
986

987
    @property
10✔
988
    def returncode(self) -> int | None:
10✔
989
        return self._process.returncode
9✔
990

991
    @property
10✔
992
    def stdin(self) -> abc.ByteSendStream | None:
10✔
993
        return self._stdin
9✔
994

995
    @property
10✔
996
    def stdout(self) -> abc.ByteReceiveStream | None:
10✔
997
        return self._stdout
9✔
998

999
    @property
10✔
1000
    def stderr(self) -> abc.ByteReceiveStream | None:
10✔
1001
        return self._stderr
9✔
1002

1003

1004
def _forcibly_shutdown_process_pool_on_exit(
10✔
1005
    workers: set[Process], _task: object
1006
) -> None:
1007
    """
1008
    Forcibly shuts down worker processes belonging to this event loop."""
1009
    child_watcher: asyncio.AbstractChildWatcher | None = None
9✔
1010
    if sys.version_info < (3, 12):
9✔
1011
        try:
6✔
1012
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1013
        except NotImplementedError:
1✔
1014
            pass
1✔
1015

1016
    # Close as much as possible (w/o async/await) to avoid warnings
1017
    for process in workers:
9✔
1018
        if process.returncode is None:
9✔
1019
            continue
9✔
1020

1021
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1022
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1023
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1024
        process.kill()
×
1025
        if child_watcher:
×
1026
            child_watcher.remove_child_handler(process.pid)
×
1027

1028

1029
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
10✔
1030
    """
1031
    Shuts down worker processes belonging to this event loop.
1032

1033
    NOTE: this only works when the event loop was started using asyncio.run() or
1034
    anyio.run().
1035

1036
    """
1037
    process: abc.Process
1038
    try:
9✔
1039
        await sleep(math.inf)
9✔
1040
    except asyncio.CancelledError:
9✔
1041
        for process in workers:
9✔
1042
            if process.returncode is None:
9✔
1043
                process.kill()
9✔
1044

1045
        for process in workers:
9✔
1046
            await process.aclose()
9✔
1047

1048

1049
#
1050
# Sockets and networking
1051
#
1052

1053

1054
class StreamProtocol(asyncio.Protocol):
10✔
1055
    read_queue: deque[bytes]
10✔
1056
    read_event: asyncio.Event
10✔
1057
    write_event: asyncio.Event
10✔
1058
    exception: Exception | None = None
10✔
1059
    is_at_eof: bool = False
10✔
1060

1061
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1062
        self.read_queue = deque()
10✔
1063
        self.read_event = asyncio.Event()
10✔
1064
        self.write_event = asyncio.Event()
10✔
1065
        self.write_event.set()
10✔
1066
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
10✔
1067

1068
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1069
        if exc:
10✔
1070
            self.exception = BrokenResourceError()
10✔
1071
            self.exception.__cause__ = exc
10✔
1072

1073
        self.read_event.set()
10✔
1074
        self.write_event.set()
10✔
1075

1076
    def data_received(self, data: bytes) -> None:
10✔
1077
        self.read_queue.append(data)
10✔
1078
        self.read_event.set()
10✔
1079

1080
    def eof_received(self) -> bool | None:
10✔
1081
        self.is_at_eof = True
10✔
1082
        self.read_event.set()
10✔
1083
        return True
10✔
1084

1085
    def pause_writing(self) -> None:
10✔
1086
        self.write_event = asyncio.Event()
10✔
1087

1088
    def resume_writing(self) -> None:
10✔
1089
        self.write_event.set()
×
1090

1091

1092
class DatagramProtocol(asyncio.DatagramProtocol):
10✔
1093
    read_queue: deque[tuple[bytes, IPSockAddrType]]
10✔
1094
    read_event: asyncio.Event
10✔
1095
    write_event: asyncio.Event
10✔
1096
    exception: Exception | None = None
10✔
1097

1098
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
10✔
1099
        self.read_queue = deque(maxlen=100)  # arbitrary value
9✔
1100
        self.read_event = asyncio.Event()
9✔
1101
        self.write_event = asyncio.Event()
9✔
1102
        self.write_event.set()
9✔
1103

1104
    def connection_lost(self, exc: Exception | None) -> None:
10✔
1105
        self.read_event.set()
9✔
1106
        self.write_event.set()
9✔
1107

1108
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
10✔
1109
        addr = convert_ipv6_sockaddr(addr)
9✔
1110
        self.read_queue.append((data, addr))
9✔
1111
        self.read_event.set()
9✔
1112

1113
    def error_received(self, exc: Exception) -> None:
10✔
1114
        self.exception = exc
×
1115

1116
    def pause_writing(self) -> None:
10✔
1117
        self.write_event.clear()
×
1118

1119
    def resume_writing(self) -> None:
10✔
1120
        self.write_event.set()
×
1121

1122

1123
class SocketStream(abc.SocketStream):
10✔
1124
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
10✔
1125
        self._transport = transport
10✔
1126
        self._protocol = protocol
10✔
1127
        self._receive_guard = ResourceGuard("reading from")
10✔
1128
        self._send_guard = ResourceGuard("writing to")
10✔
1129
        self._closed = False
10✔
1130

1131
    @property
10✔
1132
    def _raw_socket(self) -> socket.socket:
10✔
1133
        return self._transport.get_extra_info("socket")
10✔
1134

1135
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1136
        with self._receive_guard:
10✔
1137
            if (
10✔
1138
                not self._protocol.read_event.is_set()
1139
                and not self._transport.is_closing()
1140
                and not self._protocol.is_at_eof
1141
            ):
1142
                self._transport.resume_reading()
10✔
1143
                await self._protocol.read_event.wait()
10✔
1144
                self._transport.pause_reading()
10✔
1145
            else:
1146
                await AsyncIOBackend.checkpoint()
10✔
1147

1148
            try:
10✔
1149
                chunk = self._protocol.read_queue.popleft()
10✔
1150
            except IndexError:
10✔
1151
                if self._closed:
10✔
1152
                    raise ClosedResourceError from None
10✔
1153
                elif self._protocol.exception:
10✔
1154
                    raise self._protocol.exception from None
10✔
1155
                else:
1156
                    raise EndOfStream from None
10✔
1157

1158
            if len(chunk) > max_bytes:
10✔
1159
                # Split the oversized chunk
1160
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
8✔
1161
                self._protocol.read_queue.appendleft(leftover)
8✔
1162

1163
            # If the read queue is empty, clear the flag so that the next call will
1164
            # block until data is available
1165
            if not self._protocol.read_queue:
10✔
1166
                self._protocol.read_event.clear()
10✔
1167

1168
        return chunk
10✔
1169

1170
    async def send(self, item: bytes) -> None:
10✔
1171
        with self._send_guard:
10✔
1172
            await AsyncIOBackend.checkpoint()
10✔
1173

1174
            if self._closed:
10✔
1175
                raise ClosedResourceError
10✔
1176
            elif self._protocol.exception is not None:
10✔
1177
                raise self._protocol.exception
10✔
1178

1179
            try:
10✔
1180
                self._transport.write(item)
10✔
1181
            except RuntimeError as exc:
×
1182
                if self._transport.is_closing():
×
1183
                    raise BrokenResourceError from exc
×
1184
                else:
1185
                    raise
×
1186

1187
            await self._protocol.write_event.wait()
10✔
1188

1189
    async def send_eof(self) -> None:
10✔
1190
        try:
10✔
1191
            self._transport.write_eof()
10✔
1192
        except OSError:
×
1193
            pass
×
1194

1195
    async def aclose(self) -> None:
10✔
1196
        if not self._transport.is_closing():
10✔
1197
            self._closed = True
10✔
1198
            try:
10✔
1199
                self._transport.write_eof()
10✔
1200
            except OSError:
6✔
1201
                pass
6✔
1202

1203
            self._transport.close()
10✔
1204
            await sleep(0)
10✔
1205
            self._transport.abort()
10✔
1206

1207

1208
class _RawSocketMixin:
10✔
1209
    _receive_future: asyncio.Future | None = None
10✔
1210
    _send_future: asyncio.Future | None = None
10✔
1211
    _closing = False
10✔
1212

1213
    def __init__(self, raw_socket: socket.socket):
10✔
1214
        self.__raw_socket = raw_socket
7✔
1215
        self._receive_guard = ResourceGuard("reading from")
7✔
1216
        self._send_guard = ResourceGuard("writing to")
7✔
1217

1218
    @property
10✔
1219
    def _raw_socket(self) -> socket.socket:
10✔
1220
        return self.__raw_socket
7✔
1221

1222
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1223
        def callback(f: object) -> None:
7✔
1224
            del self._receive_future
7✔
1225
            loop.remove_reader(self.__raw_socket)
7✔
1226

1227
        f = self._receive_future = asyncio.Future()
7✔
1228
        loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1229
        f.add_done_callback(callback)
7✔
1230
        return f
7✔
1231

1232
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
10✔
1233
        def callback(f: object) -> None:
7✔
1234
            del self._send_future
7✔
1235
            loop.remove_writer(self.__raw_socket)
7✔
1236

1237
        f = self._send_future = asyncio.Future()
7✔
1238
        loop.add_writer(self.__raw_socket, f.set_result, None)
7✔
1239
        f.add_done_callback(callback)
7✔
1240
        return f
7✔
1241

1242
    async def aclose(self) -> None:
10✔
1243
        if not self._closing:
7✔
1244
            self._closing = True
7✔
1245
            if self.__raw_socket.fileno() != -1:
7✔
1246
                self.__raw_socket.close()
7✔
1247

1248
            if self._receive_future:
7✔
1249
                self._receive_future.set_result(None)
7✔
1250
            if self._send_future:
7✔
1251
                self._send_future.set_result(None)
×
1252

1253

1254
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
10✔
1255
    async def send_eof(self) -> None:
10✔
1256
        with self._send_guard:
7✔
1257
            self._raw_socket.shutdown(socket.SHUT_WR)
7✔
1258

1259
    async def receive(self, max_bytes: int = 65536) -> bytes:
10✔
1260
        loop = get_running_loop()
7✔
1261
        await AsyncIOBackend.checkpoint()
7✔
1262
        with self._receive_guard:
7✔
1263
            while True:
4✔
1264
                try:
7✔
1265
                    data = self._raw_socket.recv(max_bytes)
7✔
1266
                except BlockingIOError:
7✔
1267
                    await self._wait_until_readable(loop)
7✔
1268
                except OSError as exc:
7✔
1269
                    if self._closing:
7✔
1270
                        raise ClosedResourceError from None
7✔
1271
                    else:
1272
                        raise BrokenResourceError from exc
1✔
1273
                else:
1274
                    if not data:
7✔
1275
                        raise EndOfStream
7✔
1276

1277
                    return data
7✔
1278

1279
    async def send(self, item: bytes) -> None:
10✔
1280
        loop = get_running_loop()
7✔
1281
        await AsyncIOBackend.checkpoint()
7✔
1282
        with self._send_guard:
7✔
1283
            view = memoryview(item)
7✔
1284
            while view:
7✔
1285
                try:
7✔
1286
                    bytes_sent = self._raw_socket.send(view)
7✔
1287
                except BlockingIOError:
7✔
1288
                    await self._wait_until_writable(loop)
7✔
1289
                except OSError as exc:
7✔
1290
                    if self._closing:
7✔
1291
                        raise ClosedResourceError from None
7✔
1292
                    else:
1293
                        raise BrokenResourceError from exc
1✔
1294
                else:
1295
                    view = view[bytes_sent:]
7✔
1296

1297
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
10✔
1298
        if not isinstance(msglen, int) or msglen < 0:
7✔
1299
            raise ValueError("msglen must be a non-negative integer")
7✔
1300
        if not isinstance(maxfds, int) or maxfds < 1:
7✔
1301
            raise ValueError("maxfds must be a positive integer")
7✔
1302

1303
        loop = get_running_loop()
7✔
1304
        fds = array.array("i")
7✔
1305
        await AsyncIOBackend.checkpoint()
7✔
1306
        with self._receive_guard:
7✔
1307
            while True:
4✔
1308
                try:
7✔
1309
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
7✔
1310
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1311
                    )
1312
                except BlockingIOError:
7✔
1313
                    await self._wait_until_readable(loop)
7✔
1314
                except OSError as exc:
×
1315
                    if self._closing:
×
1316
                        raise ClosedResourceError from None
×
1317
                    else:
1318
                        raise BrokenResourceError from exc
×
1319
                else:
1320
                    if not message and not ancdata:
7✔
1321
                        raise EndOfStream
×
1322

1323
                    break
4✔
1324

1325
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
7✔
1326
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
7✔
1327
                raise RuntimeError(
×
1328
                    f"Received unexpected ancillary data; message = {message!r}, "
1329
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1330
                )
1331

1332
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
7✔
1333

1334
        return message, list(fds)
7✔
1335

1336
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
10✔
1337
        if not message:
7✔
1338
            raise ValueError("message must not be empty")
7✔
1339
        if not fds:
7✔
1340
            raise ValueError("fds must not be empty")
7✔
1341

1342
        loop = get_running_loop()
7✔
1343
        filenos: list[int] = []
7✔
1344
        for fd in fds:
7✔
1345
            if isinstance(fd, int):
7✔
1346
                filenos.append(fd)
×
1347
            elif isinstance(fd, IOBase):
7✔
1348
                filenos.append(fd.fileno())
7✔
1349

1350
        fdarray = array.array("i", filenos)
7✔
1351
        await AsyncIOBackend.checkpoint()
7✔
1352
        with self._send_guard:
7✔
1353
            while True:
4✔
1354
                try:
7✔
1355
                    # The ignore can be removed after mypy picks up
1356
                    # https://github.com/python/typeshed/pull/5545
1357
                    self._raw_socket.sendmsg(
7✔
1358
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1359
                    )
1360
                    break
7✔
1361
                except BlockingIOError:
×
1362
                    await self._wait_until_writable(loop)
×
1363
                except OSError as exc:
×
1364
                    if self._closing:
×
1365
                        raise ClosedResourceError from None
×
1366
                    else:
1367
                        raise BrokenResourceError from exc
×
1368

1369

1370
class TCPSocketListener(abc.SocketListener):
10✔
1371
    _accept_scope: CancelScope | None = None
10✔
1372
    _closed = False
10✔
1373

1374
    def __init__(self, raw_socket: socket.socket):
10✔
1375
        self.__raw_socket = raw_socket
10✔
1376
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
10✔
1377
        self._accept_guard = ResourceGuard("accepting connections from")
10✔
1378

1379
    @property
10✔
1380
    def _raw_socket(self) -> socket.socket:
10✔
1381
        return self.__raw_socket
10✔
1382

1383
    async def accept(self) -> abc.SocketStream:
10✔
1384
        if self._closed:
10✔
1385
            raise ClosedResourceError
10✔
1386

1387
        with self._accept_guard:
10✔
1388
            await AsyncIOBackend.checkpoint()
10✔
1389
            with CancelScope() as self._accept_scope:
10✔
1390
                try:
10✔
1391
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
10✔
1392
                except asyncio.CancelledError:
10✔
1393
                    # Workaround for https://bugs.python.org/issue41317
1394
                    try:
10✔
1395
                        self._loop.remove_reader(self._raw_socket)
10✔
1396
                    except (ValueError, NotImplementedError):
2✔
1397
                        pass
2✔
1398

1399
                    if self._closed:
10✔
1400
                        raise ClosedResourceError from None
9✔
1401

1402
                    raise
10✔
1403
                finally:
1404
                    self._accept_scope = None
10✔
1405

1406
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
10✔
1407
        transport, protocol = await self._loop.connect_accepted_socket(
10✔
1408
            StreamProtocol, client_sock
1409
        )
1410
        return SocketStream(transport, protocol)
10✔
1411

1412
    async def aclose(self) -> None:
10✔
1413
        if self._closed:
10✔
1414
            return
10✔
1415

1416
        self._closed = True
10✔
1417
        if self._accept_scope:
10✔
1418
            # Workaround for https://bugs.python.org/issue41317
1419
            try:
10✔
1420
                self._loop.remove_reader(self._raw_socket)
10✔
1421
            except (ValueError, NotImplementedError):
2✔
1422
                pass
2✔
1423

1424
            self._accept_scope.cancel()
9✔
1425
            await sleep(0)
9✔
1426

1427
        self._raw_socket.close()
10✔
1428

1429

1430
class UNIXSocketListener(abc.SocketListener):
10✔
1431
    def __init__(self, raw_socket: socket.socket):
10✔
1432
        self.__raw_socket = raw_socket
7✔
1433
        self._loop = get_running_loop()
7✔
1434
        self._accept_guard = ResourceGuard("accepting connections from")
7✔
1435
        self._closed = False
7✔
1436

1437
    async def accept(self) -> abc.SocketStream:
10✔
1438
        await AsyncIOBackend.checkpoint()
7✔
1439
        with self._accept_guard:
7✔
1440
            while True:
4✔
1441
                try:
7✔
1442
                    client_sock, _ = self.__raw_socket.accept()
7✔
1443
                    client_sock.setblocking(False)
7✔
1444
                    return UNIXSocketStream(client_sock)
7✔
1445
                except BlockingIOError:
7✔
1446
                    f: asyncio.Future = asyncio.Future()
7✔
1447
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
7✔
1448
                    f.add_done_callback(
7✔
1449
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1450
                    )
1451
                    await f
7✔
1452
                except OSError as exc:
×
1453
                    if self._closed:
×
1454
                        raise ClosedResourceError from None
×
1455
                    else:
1456
                        raise BrokenResourceError from exc
1✔
1457

1458
    async def aclose(self) -> None:
10✔
1459
        self._closed = True
7✔
1460
        self.__raw_socket.close()
7✔
1461

1462
    @property
10✔
1463
    def _raw_socket(self) -> socket.socket:
10✔
1464
        return self.__raw_socket
7✔
1465

1466

1467
class UDPSocket(abc.UDPSocket):
10✔
1468
    def __init__(
10✔
1469
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1470
    ):
1471
        self._transport = transport
9✔
1472
        self._protocol = protocol
9✔
1473
        self._receive_guard = ResourceGuard("reading from")
9✔
1474
        self._send_guard = ResourceGuard("writing to")
9✔
1475
        self._closed = False
9✔
1476

1477
    @property
10✔
1478
    def _raw_socket(self) -> socket.socket:
10✔
1479
        return self._transport.get_extra_info("socket")
9✔
1480

1481
    async def aclose(self) -> None:
10✔
1482
        if not self._transport.is_closing():
9✔
1483
            self._closed = True
9✔
1484
            self._transport.close()
9✔
1485

1486
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
10✔
1487
        with self._receive_guard:
9✔
1488
            await AsyncIOBackend.checkpoint()
9✔
1489

1490
            # If the buffer is empty, ask for more data
1491
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1492
                self._protocol.read_event.clear()
9✔
1493
                await self._protocol.read_event.wait()
9✔
1494

1495
            try:
9✔
1496
                return self._protocol.read_queue.popleft()
9✔
1497
            except IndexError:
9✔
1498
                if self._closed:
9✔
1499
                    raise ClosedResourceError from None
9✔
1500
                else:
1501
                    raise BrokenResourceError from None
1✔
1502

1503
    async def send(self, item: UDPPacketType) -> None:
10✔
1504
        with self._send_guard:
9✔
1505
            await AsyncIOBackend.checkpoint()
9✔
1506
            await self._protocol.write_event.wait()
9✔
1507
            if self._closed:
9✔
1508
                raise ClosedResourceError
9✔
1509
            elif self._transport.is_closing():
9✔
1510
                raise BrokenResourceError
×
1511
            else:
1512
                self._transport.sendto(*item)
9✔
1513

1514

1515
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
10✔
1516
    def __init__(
10✔
1517
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1518
    ):
1519
        self._transport = transport
9✔
1520
        self._protocol = protocol
9✔
1521
        self._receive_guard = ResourceGuard("reading from")
9✔
1522
        self._send_guard = ResourceGuard("writing to")
9✔
1523
        self._closed = False
9✔
1524

1525
    @property
10✔
1526
    def _raw_socket(self) -> socket.socket:
10✔
1527
        return self._transport.get_extra_info("socket")
9✔
1528

1529
    async def aclose(self) -> None:
10✔
1530
        if not self._transport.is_closing():
9✔
1531
            self._closed = True
9✔
1532
            self._transport.close()
9✔
1533

1534
    async def receive(self) -> bytes:
10✔
1535
        with self._receive_guard:
9✔
1536
            await AsyncIOBackend.checkpoint()
9✔
1537

1538
            # If the buffer is empty, ask for more data
1539
            if not self._protocol.read_queue and not self._transport.is_closing():
9✔
1540
                self._protocol.read_event.clear()
9✔
1541
                await self._protocol.read_event.wait()
9✔
1542

1543
            try:
9✔
1544
                packet = self._protocol.read_queue.popleft()
9✔
1545
            except IndexError:
9✔
1546
                if self._closed:
9✔
1547
                    raise ClosedResourceError from None
9✔
1548
                else:
1549
                    raise BrokenResourceError from None
×
1550

1551
            return packet[0]
9✔
1552

1553
    async def send(self, item: bytes) -> None:
10✔
1554
        with self._send_guard:
9✔
1555
            await AsyncIOBackend.checkpoint()
9✔
1556
            await self._protocol.write_event.wait()
9✔
1557
            if self._closed:
9✔
1558
                raise ClosedResourceError
9✔
1559
            elif self._transport.is_closing():
9✔
1560
                raise BrokenResourceError
×
1561
            else:
1562
                self._transport.sendto(item)
9✔
1563

1564

1565
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
10✔
1566
    async def receive(self) -> UNIXDatagramPacketType:
10✔
1567
        loop = get_running_loop()
7✔
1568
        await AsyncIOBackend.checkpoint()
7✔
1569
        with self._receive_guard:
7✔
1570
            while True:
4✔
1571
                try:
7✔
1572
                    data = self._raw_socket.recvfrom(65536)
7✔
1573
                except BlockingIOError:
7✔
1574
                    await self._wait_until_readable(loop)
7✔
1575
                except OSError as exc:
7✔
1576
                    if self._closing:
7✔
1577
                        raise ClosedResourceError from None
7✔
1578
                    else:
1579
                        raise BrokenResourceError from exc
1✔
1580
                else:
1581
                    return data
7✔
1582

1583
    async def send(self, item: UNIXDatagramPacketType) -> None:
10✔
1584
        loop = get_running_loop()
7✔
1585
        await AsyncIOBackend.checkpoint()
7✔
1586
        with self._send_guard:
7✔
1587
            while True:
4✔
1588
                try:
7✔
1589
                    self._raw_socket.sendto(*item)
7✔
1590
                except BlockingIOError:
7✔
1591
                    await self._wait_until_writable(loop)
×
1592
                except OSError as exc:
7✔
1593
                    if self._closing:
7✔
1594
                        raise ClosedResourceError from None
7✔
1595
                    else:
1596
                        raise BrokenResourceError from exc
1✔
1597
                else:
1598
                    return
7✔
1599

1600

1601
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
10✔
1602
    async def receive(self) -> bytes:
10✔
1603
        loop = get_running_loop()
7✔
1604
        await AsyncIOBackend.checkpoint()
7✔
1605
        with self._receive_guard:
7✔
1606
            while True:
4✔
1607
                try:
7✔
1608
                    data = self._raw_socket.recv(65536)
7✔
1609
                except BlockingIOError:
7✔
1610
                    await self._wait_until_readable(loop)
7✔
1611
                except OSError as exc:
7✔
1612
                    if self._closing:
7✔
1613
                        raise ClosedResourceError from None
7✔
1614
                    else:
1615
                        raise BrokenResourceError from exc
1✔
1616
                else:
1617
                    return data
7✔
1618

1619
    async def send(self, item: bytes) -> None:
10✔
1620
        loop = get_running_loop()
7✔
1621
        await AsyncIOBackend.checkpoint()
7✔
1622
        with self._send_guard:
7✔
1623
            while True:
4✔
1624
                try:
7✔
1625
                    self._raw_socket.send(item)
7✔
1626
                except BlockingIOError:
7✔
1627
                    await self._wait_until_writable(loop)
×
1628
                except OSError as exc:
7✔
1629
                    if self._closing:
7✔
1630
                        raise ClosedResourceError from None
7✔
1631
                    else:
1632
                        raise BrokenResourceError from exc
1✔
1633
                else:
1634
                    return
7✔
1635

1636

1637
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
10✔
1638
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
10✔
1639

1640

1641
#
1642
# Synchronization
1643
#
1644

1645

1646
class Event(BaseEvent):
10✔
1647
    def __new__(cls) -> Event:
10✔
1648
        return object.__new__(cls)
10✔
1649

1650
    def __init__(self) -> None:
10✔
1651
        self._event = asyncio.Event()
10✔
1652

1653
    def set(self) -> None:
10✔
1654
        self._event.set()
10✔
1655

1656
    def is_set(self) -> bool:
10✔
1657
        return self._event.is_set()
10✔
1658

1659
    async def wait(self) -> None:
10✔
1660
        if self.is_set():
10✔
1661
            await AsyncIOBackend.checkpoint()
10✔
1662
        else:
1663
            await self._event.wait()
10✔
1664

1665
    def statistics(self) -> EventStatistics:
10✔
1666
        return EventStatistics(len(self._event._waiters))
9✔
1667

1668

1669
class CapacityLimiter(BaseCapacityLimiter):
10✔
1670
    _total_tokens: float = 0
10✔
1671

1672
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
10✔
1673
        return object.__new__(cls)
10✔
1674

1675
    def __init__(self, total_tokens: float):
10✔
1676
        self._borrowers: set[Any] = set()
10✔
1677
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
10✔
1678
        self.total_tokens = total_tokens
10✔
1679

1680
    async def __aenter__(self) -> None:
10✔
1681
        await self.acquire()
10✔
1682

1683
    async def __aexit__(
10✔
1684
        self,
1685
        exc_type: type[BaseException] | None,
1686
        exc_val: BaseException | None,
1687
        exc_tb: TracebackType | None,
1688
    ) -> None:
1689
        self.release()
10✔
1690

1691
    @property
10✔
1692
    def total_tokens(self) -> float:
10✔
1693
        return self._total_tokens
9✔
1694

1695
    @total_tokens.setter
10✔
1696
    def total_tokens(self, value: float) -> None:
10✔
1697
        if not isinstance(value, int) and not math.isinf(value):
10✔
1698
            raise TypeError("total_tokens must be an int or math.inf")
9✔
1699
        if value < 1:
10✔
1700
            raise ValueError("total_tokens must be >= 1")
9✔
1701

1702
        waiters_to_notify = max(value - self._total_tokens, 0)
10✔
1703
        self._total_tokens = value
10✔
1704

1705
        # Notify waiting tasks that they have acquired the limiter
1706
        while self._wait_queue and waiters_to_notify:
10✔
1707
            event = self._wait_queue.popitem(last=False)[1]
9✔
1708
            event.set()
9✔
1709
            waiters_to_notify -= 1
9✔
1710

1711
    @property
10✔
1712
    def borrowed_tokens(self) -> int:
10✔
1713
        return len(self._borrowers)
9✔
1714

1715
    @property
10✔
1716
    def available_tokens(self) -> float:
10✔
1717
        return self._total_tokens - len(self._borrowers)
9✔
1718

1719
    def acquire_nowait(self) -> None:
10✔
1720
        self.acquire_on_behalf_of_nowait(current_task())
×
1721

1722
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
10✔
1723
        if borrower in self._borrowers:
10✔
1724
            raise RuntimeError(
9✔
1725
                "this borrower is already holding one of this CapacityLimiter's "
1726
                "tokens"
1727
            )
1728

1729
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
10✔
1730
            raise WouldBlock
9✔
1731

1732
        self._borrowers.add(borrower)
10✔
1733

1734
    async def acquire(self) -> None:
10✔
1735
        return await self.acquire_on_behalf_of(current_task())
10✔
1736

1737
    async def acquire_on_behalf_of(self, borrower: object) -> None:
10✔
1738
        await AsyncIOBackend.checkpoint_if_cancelled()
10✔
1739
        try:
10✔
1740
            self.acquire_on_behalf_of_nowait(borrower)
10✔
1741
        except WouldBlock:
9✔
1742
            event = asyncio.Event()
9✔
1743
            self._wait_queue[borrower] = event
9✔
1744
            try:
9✔
1745
                await event.wait()
9✔
1746
            except BaseException:
×
1747
                self._wait_queue.pop(borrower, None)
×
1748
                raise
×
1749

1750
            self._borrowers.add(borrower)
9✔
1751
        else:
1752
            try:
10✔
1753
                await AsyncIOBackend.cancel_shielded_checkpoint()
10✔
1754
            except BaseException:
9✔
1755
                self.release()
9✔
1756
                raise
9✔
1757

1758
    def release(self) -> None:
10✔
1759
        self.release_on_behalf_of(current_task())
10✔
1760

1761
    def release_on_behalf_of(self, borrower: object) -> None:
10✔
1762
        try:
10✔
1763
            self._borrowers.remove(borrower)
10✔
1764
        except KeyError:
9✔
1765
            raise RuntimeError(
9✔
1766
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1767
            ) from None
1768

1769
        # Notify the next task in line if this limiter has free capacity now
1770
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
10✔
1771
            event = self._wait_queue.popitem(last=False)[1]
9✔
1772
            event.set()
9✔
1773

1774
    def statistics(self) -> CapacityLimiterStatistics:
10✔
1775
        return CapacityLimiterStatistics(
9✔
1776
            self.borrowed_tokens,
1777
            self.total_tokens,
1778
            tuple(self._borrowers),
1779
            len(self._wait_queue),
1780
        )
1781

1782

1783
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
10✔
1784

1785

1786
#
1787
# Operating system signals
1788
#
1789

1790

1791
class _SignalReceiver:
10✔
1792
    def __init__(self, signals: tuple[Signals, ...]):
10✔
1793
        self._signals = signals
8✔
1794
        self._loop = get_running_loop()
8✔
1795
        self._signal_queue: deque[Signals] = deque()
8✔
1796
        self._future: asyncio.Future = asyncio.Future()
8✔
1797
        self._handled_signals: set[Signals] = set()
8✔
1798

1799
    def _deliver(self, signum: Signals) -> None:
10✔
1800
        self._signal_queue.append(signum)
8✔
1801
        if not self._future.done():
8✔
1802
            self._future.set_result(None)
8✔
1803

1804
    def __enter__(self) -> _SignalReceiver:
10✔
1805
        for sig in set(self._signals):
8✔
1806
            self._loop.add_signal_handler(sig, self._deliver, sig)
8✔
1807
            self._handled_signals.add(sig)
8✔
1808

1809
        return self
8✔
1810

1811
    def __exit__(
10✔
1812
        self,
1813
        exc_type: type[BaseException] | None,
1814
        exc_val: BaseException | None,
1815
        exc_tb: TracebackType | None,
1816
    ) -> bool | None:
1817
        for sig in self._handled_signals:
8✔
1818
            self._loop.remove_signal_handler(sig)
8✔
1819
        return None
8✔
1820

1821
    def __aiter__(self) -> _SignalReceiver:
10✔
1822
        return self
8✔
1823

1824
    async def __anext__(self) -> Signals:
10✔
1825
        await AsyncIOBackend.checkpoint()
8✔
1826
        if not self._signal_queue:
8✔
1827
            self._future = asyncio.Future()
×
1828
            await self._future
×
1829

1830
        return self._signal_queue.popleft()
8✔
1831

1832

1833
#
1834
# Testing and debugging
1835
#
1836

1837

1838
class AsyncIOTaskInfo(TaskInfo):
10✔
1839
    def __init__(self, task: asyncio.Task):
10✔
1840
        task_state = _task_states.get(task)
10✔
1841
        if task_state is None:
10✔
1842
            parent_id = None
10✔
1843
        else:
1844
            parent_id = task_state.parent_id
10✔
1845

1846
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
10✔
1847
        self._task = weakref.ref(task)
10✔
1848

1849
    def has_pending_cancellation(self) -> bool:
10✔
1850
        if not (task := self._task()):
10✔
1851
            # If the task isn't around anymore, it won't have a pending cancellation
1852
            return False
×
1853

1854
        if sys.version_info >= (3, 11):
10✔
1855
            if task.cancelling():
4✔
1856
                return True
4✔
1857
        elif (
6✔
1858
            isinstance(task._fut_waiter, asyncio.Future)
1859
            and task._fut_waiter.cancelled()
1860
        ):
1861
            return True
6✔
1862

1863
        if task_state := _task_states.get(task):
10✔
1864
            if cancel_scope := task_state.cancel_scope:
10✔
1865
                return cancel_scope.cancel_called or cancel_scope._parent_cancelled()
10✔
1866

1867
        return False
10✔
1868

1869

1870
class TestRunner(abc.TestRunner):
10✔
1871
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
10✔
1872

1873
    def __init__(
10✔
1874
        self,
1875
        *,
1876
        debug: bool | None = None,
1877
        use_uvloop: bool = False,
1878
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1879
    ) -> None:
1880
        if use_uvloop and loop_factory is None:
10✔
1881
            import uvloop
×
1882

1883
            loop_factory = uvloop.new_event_loop
×
1884

1885
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
10✔
1886
        self._exceptions: list[BaseException] = []
10✔
1887
        self._runner_task: asyncio.Task | None = None
10✔
1888

1889
    def __enter__(self) -> TestRunner:
10✔
1890
        self._runner.__enter__()
10✔
1891
        self.get_loop().set_exception_handler(self._exception_handler)
10✔
1892
        return self
10✔
1893

1894
    def __exit__(
10✔
1895
        self,
1896
        exc_type: type[BaseException] | None,
1897
        exc_val: BaseException | None,
1898
        exc_tb: TracebackType | None,
1899
    ) -> None:
1900
        self._runner.__exit__(exc_type, exc_val, exc_tb)
10✔
1901

1902
    def get_loop(self) -> AbstractEventLoop:
10✔
1903
        return self._runner.get_loop()
10✔
1904

1905
    def _exception_handler(
10✔
1906
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1907
    ) -> None:
1908
        if isinstance(context.get("exception"), Exception):
10✔
1909
            self._exceptions.append(context["exception"])
10✔
1910
        else:
1911
            loop.default_exception_handler(context)
10✔
1912

1913
    def _raise_async_exceptions(self) -> None:
10✔
1914
        # Re-raise any exceptions raised in asynchronous callbacks
1915
        if self._exceptions:
10✔
1916
            exceptions, self._exceptions = self._exceptions, []
10✔
1917
            if len(exceptions) == 1:
10✔
1918
                raise exceptions[0]
10✔
1919
            elif exceptions:
×
1920
                raise BaseExceptionGroup(
×
1921
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1922
                )
1923

1924
    async def _run_tests_and_fixtures(
10✔
1925
        self,
1926
        receive_stream: MemoryObjectReceiveStream[
1927
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1928
        ],
1929
    ) -> None:
1930
        with receive_stream, self._send_stream:
10✔
1931
            async for coro, future in receive_stream:
10✔
1932
                try:
10✔
1933
                    retval = await coro
10✔
1934
                except BaseException as exc:
10✔
1935
                    if not future.cancelled():
10✔
1936
                        future.set_exception(exc)
10✔
1937
                else:
1938
                    if not future.cancelled():
10✔
1939
                        future.set_result(retval)
10✔
1940

1941
    async def _call_in_runner_task(
10✔
1942
        self,
1943
        func: Callable[P, Awaitable[T_Retval]],
1944
        *args: P.args,
1945
        **kwargs: P.kwargs,
1946
    ) -> T_Retval:
1947
        if not self._runner_task:
10✔
1948
            self._send_stream, receive_stream = create_memory_object_stream[
10✔
1949
                Tuple[Awaitable[Any], asyncio.Future]
1950
            ](1)
1951
            self._runner_task = self.get_loop().create_task(
10✔
1952
                self._run_tests_and_fixtures(receive_stream)
1953
            )
1954

1955
        coro = func(*args, **kwargs)
10✔
1956
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
10✔
1957
        self._send_stream.send_nowait((coro, future))
10✔
1958
        return await future
10✔
1959

1960
    def run_asyncgen_fixture(
10✔
1961
        self,
1962
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1963
        kwargs: dict[str, Any],
1964
    ) -> Iterable[T_Retval]:
1965
        asyncgen = fixture_func(**kwargs)
10✔
1966
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
10✔
1967
            self._call_in_runner_task(asyncgen.asend, None)
1968
        )
1969
        self._raise_async_exceptions()
10✔
1970

1971
        yield fixturevalue
10✔
1972

1973
        try:
10✔
1974
            self.get_loop().run_until_complete(
10✔
1975
                self._call_in_runner_task(asyncgen.asend, None)
1976
            )
1977
        except StopAsyncIteration:
10✔
1978
            self._raise_async_exceptions()
10✔
1979
        else:
1980
            self.get_loop().run_until_complete(asyncgen.aclose())
×
1981
            raise RuntimeError("Async generator fixture did not stop")
×
1982

1983
    def run_fixture(
10✔
1984
        self,
1985
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1986
        kwargs: dict[str, Any],
1987
    ) -> T_Retval:
1988
        retval = self.get_loop().run_until_complete(
10✔
1989
            self._call_in_runner_task(fixture_func, **kwargs)
1990
        )
1991
        self._raise_async_exceptions()
10✔
1992
        return retval
10✔
1993

1994
    def run_test(
10✔
1995
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
1996
    ) -> None:
1997
        try:
10✔
1998
            self.get_loop().run_until_complete(
10✔
1999
                self._call_in_runner_task(test_func, **kwargs)
2000
            )
2001
        except Exception as exc:
10✔
2002
            self._exceptions.append(exc)
9✔
2003

2004
        self._raise_async_exceptions()
10✔
2005

2006

2007
class AsyncIOBackend(AsyncBackend):
10✔
2008
    @classmethod
10✔
2009
    def run(
10✔
2010
        cls,
2011
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2012
        args: tuple[Unpack[PosArgsT]],
2013
        kwargs: dict[str, Any],
2014
        options: dict[str, Any],
2015
    ) -> T_Retval:
2016
        @wraps(func)
10✔
2017
        async def wrapper() -> T_Retval:
10✔
2018
            task = cast(asyncio.Task, current_task())
10✔
2019
            task.set_name(get_callable_name(func))
10✔
2020
            _task_states[task] = TaskState(None, None)
10✔
2021

2022
            try:
10✔
2023
                return await func(*args)
10✔
2024
            finally:
2025
                del _task_states[task]
10✔
2026

2027
        debug = options.get("debug", None)
10✔
2028
        loop_factory = options.get("loop_factory", None)
10✔
2029
        if loop_factory is None and options.get("use_uvloop", False):
10✔
2030
            import uvloop
7✔
2031

2032
            loop_factory = uvloop.new_event_loop
7✔
2033

2034
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
10✔
2035
            return runner.run(wrapper())
10✔
2036

2037
    @classmethod
10✔
2038
    def current_token(cls) -> object:
10✔
2039
        return get_running_loop()
10✔
2040

2041
    @classmethod
10✔
2042
    def current_time(cls) -> float:
10✔
2043
        return get_running_loop().time()
10✔
2044

2045
    @classmethod
10✔
2046
    def cancelled_exception_class(cls) -> type[BaseException]:
10✔
2047
        return CancelledError
10✔
2048

2049
    @classmethod
10✔
2050
    async def checkpoint(cls) -> None:
10✔
2051
        await sleep(0)
10✔
2052

2053
    @classmethod
10✔
2054
    async def checkpoint_if_cancelled(cls) -> None:
10✔
2055
        task = current_task()
10✔
2056
        if task is None:
10✔
2057
            return
×
2058

2059
        try:
10✔
2060
            cancel_scope = _task_states[task].cancel_scope
10✔
2061
        except KeyError:
10✔
2062
            return
10✔
2063

2064
        while cancel_scope:
10✔
2065
            if cancel_scope.cancel_called:
10✔
2066
                await sleep(0)
10✔
2067
            elif cancel_scope.shield:
10✔
2068
                break
9✔
2069
            else:
2070
                cancel_scope = cancel_scope._parent_scope
10✔
2071

2072
    @classmethod
10✔
2073
    async def cancel_shielded_checkpoint(cls) -> None:
10✔
2074
        with CancelScope(shield=True):
10✔
2075
            await sleep(0)
10✔
2076

2077
    @classmethod
10✔
2078
    async def sleep(cls, delay: float) -> None:
10✔
2079
        await sleep(delay)
10✔
2080

2081
    @classmethod
10✔
2082
    def create_cancel_scope(
10✔
2083
        cls, *, deadline: float = math.inf, shield: bool = False
2084
    ) -> CancelScope:
2085
        return CancelScope(deadline=deadline, shield=shield)
10✔
2086

2087
    @classmethod
10✔
2088
    def current_effective_deadline(cls) -> float:
10✔
2089
        try:
9✔
2090
            cancel_scope = _task_states[
9✔
2091
                current_task()  # type: ignore[index]
2092
            ].cancel_scope
2093
        except KeyError:
×
2094
            return math.inf
×
2095

2096
        deadline = math.inf
9✔
2097
        while cancel_scope:
9✔
2098
            deadline = min(deadline, cancel_scope.deadline)
9✔
2099
            if cancel_scope._cancel_called:
9✔
2100
                deadline = -math.inf
9✔
2101
                break
9✔
2102
            elif cancel_scope.shield:
9✔
2103
                break
9✔
2104
            else:
2105
                cancel_scope = cancel_scope._parent_scope
9✔
2106

2107
        return deadline
9✔
2108

2109
    @classmethod
10✔
2110
    def create_task_group(cls) -> abc.TaskGroup:
10✔
2111
        return TaskGroup()
10✔
2112

2113
    @classmethod
10✔
2114
    def create_event(cls) -> abc.Event:
10✔
2115
        return Event()
10✔
2116

2117
    @classmethod
10✔
2118
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
10✔
2119
        return CapacityLimiter(total_tokens)
9✔
2120

2121
    @classmethod
10✔
2122
    async def run_sync_in_worker_thread(
10✔
2123
        cls,
2124
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2125
        args: tuple[Unpack[PosArgsT]],
2126
        abandon_on_cancel: bool = False,
2127
        limiter: abc.CapacityLimiter | None = None,
2128
    ) -> T_Retval:
2129
        await cls.checkpoint()
10✔
2130

2131
        # If this is the first run in this event loop thread, set up the necessary
2132
        # variables
2133
        try:
10✔
2134
            idle_workers = _threadpool_idle_workers.get()
10✔
2135
            workers = _threadpool_workers.get()
10✔
2136
        except LookupError:
10✔
2137
            idle_workers = deque()
10✔
2138
            workers = set()
10✔
2139
            _threadpool_idle_workers.set(idle_workers)
10✔
2140
            _threadpool_workers.set(workers)
10✔
2141

2142
        async with limiter or cls.current_default_thread_limiter():
10✔
2143
            with CancelScope(shield=not abandon_on_cancel) as scope:
10✔
2144
                future: asyncio.Future = asyncio.Future()
10✔
2145
                root_task = find_root_task()
10✔
2146
                if not idle_workers:
10✔
2147
                    worker = WorkerThread(root_task, workers, idle_workers)
10✔
2148
                    worker.start()
10✔
2149
                    workers.add(worker)
10✔
2150
                    root_task.add_done_callback(worker.stop)
10✔
2151
                else:
2152
                    worker = idle_workers.pop()
10✔
2153

2154
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2155
                    # seconds or longer
2156
                    now = cls.current_time()
10✔
2157
                    while idle_workers:
10✔
2158
                        if (
9✔
2159
                            now - idle_workers[0].idle_since
2160
                            < WorkerThread.MAX_IDLE_TIME
2161
                        ):
2162
                            break
9✔
2163

2164
                        expired_worker = idle_workers.popleft()
×
2165
                        expired_worker.root_task.remove_done_callback(
×
2166
                            expired_worker.stop
2167
                        )
2168
                        expired_worker.stop()
×
2169

2170
                context = copy_context()
10✔
2171
                context.run(sniffio.current_async_library_cvar.set, None)
10✔
2172
                if abandon_on_cancel or scope._parent_scope is None:
10✔
2173
                    worker_scope = scope
10✔
2174
                else:
2175
                    worker_scope = scope._parent_scope
10✔
2176

2177
                worker.queue.put_nowait((context, func, args, future, worker_scope))
10✔
2178
                return await future
10✔
2179

2180
    @classmethod
10✔
2181
    def check_cancelled(cls) -> None:
10✔
2182
        scope: CancelScope | None = threadlocals.current_cancel_scope
10✔
2183
        while scope is not None:
10✔
2184
            if scope.cancel_called:
10✔
2185
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
10✔
2186

2187
            if scope.shield:
10✔
2188
                return
×
2189

2190
            scope = scope._parent_scope
10✔
2191

2192
    @classmethod
10✔
2193
    def run_async_from_thread(
10✔
2194
        cls,
2195
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2196
        args: tuple[Unpack[PosArgsT]],
2197
        token: object,
2198
    ) -> T_Retval:
2199
        async def task_wrapper(scope: CancelScope) -> T_Retval:
10✔
2200
            __tracebackhide__ = True
10✔
2201
            task = cast(asyncio.Task, current_task())
10✔
2202
            _task_states[task] = TaskState(None, scope)
10✔
2203
            scope._tasks.add(task)
10✔
2204
            try:
10✔
2205
                return await func(*args)
10✔
2206
            except CancelledError as exc:
10✔
2207
                raise concurrent.futures.CancelledError(str(exc)) from None
10✔
2208
            finally:
2209
                scope._tasks.discard(task)
10✔
2210

2211
        loop = cast(AbstractEventLoop, token)
10✔
2212
        context = copy_context()
10✔
2213
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
10✔
2214
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
10✔
2215
        f: concurrent.futures.Future[T_Retval] = context.run(
10✔
2216
            asyncio.run_coroutine_threadsafe, wrapper, loop
2217
        )
2218
        return f.result()
10✔
2219

2220
    @classmethod
10✔
2221
    def run_sync_from_thread(
10✔
2222
        cls,
2223
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2224
        args: tuple[Unpack[PosArgsT]],
2225
        token: object,
2226
    ) -> T_Retval:
2227
        @wraps(func)
10✔
2228
        def wrapper() -> None:
10✔
2229
            try:
10✔
2230
                sniffio.current_async_library_cvar.set("asyncio")
10✔
2231
                f.set_result(func(*args))
10✔
2232
            except BaseException as exc:
10✔
2233
                f.set_exception(exc)
10✔
2234
                if not isinstance(exc, Exception):
10✔
2235
                    raise
×
2236

2237
        f: concurrent.futures.Future[T_Retval] = Future()
10✔
2238
        loop = cast(AbstractEventLoop, token)
10✔
2239
        loop.call_soon_threadsafe(wrapper)
10✔
2240
        return f.result()
10✔
2241

2242
    @classmethod
10✔
2243
    def create_blocking_portal(cls) -> abc.BlockingPortal:
10✔
2244
        return BlockingPortal()
10✔
2245

2246
    @classmethod
10✔
2247
    async def open_process(
10✔
2248
        cls,
2249
        command: StrOrBytesPath | Sequence[StrOrBytesPath],
2250
        *,
2251
        stdin: int | IO[Any] | None,
2252
        stdout: int | IO[Any] | None,
2253
        stderr: int | IO[Any] | None,
2254
        **kwargs: Any,
2255
    ) -> Process:
2256
        await cls.checkpoint()
9✔
2257
        if isinstance(command, PathLike):
9✔
NEW
2258
            command = str(pathlib.Path(command))
×
2259

2260
        if isinstance(command, (str, bytes)):
9✔
2261
            process = await asyncio.create_subprocess_shell(
9✔
2262
                command,
2263
                stdin=stdin,
2264
                stdout=stdout,
2265
                stderr=stderr,
2266
                **kwargs,
2267
            )
2268
        else:
2269
            process = await asyncio.create_subprocess_exec(
9✔
2270
                *command,
2271
                stdin=stdin,
2272
                stdout=stdout,
2273
                stderr=stderr,
2274
                **kwargs,
2275
            )
2276

2277
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
9✔
2278
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
9✔
2279
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
9✔
2280
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
9✔
2281

2282
    @classmethod
10✔
2283
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
10✔
2284
        create_task(
9✔
2285
            _shutdown_process_pool_on_exit(workers),
2286
            name="AnyIO process pool shutdown task",
2287
        )
2288
        find_root_task().add_done_callback(
9✔
2289
            partial(_forcibly_shutdown_process_pool_on_exit, workers)
2290
        )
2291

2292
    @classmethod
10✔
2293
    async def connect_tcp(
10✔
2294
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2295
    ) -> abc.SocketStream:
2296
        transport, protocol = cast(
10✔
2297
            Tuple[asyncio.Transport, StreamProtocol],
2298
            await get_running_loop().create_connection(
2299
                StreamProtocol, host, port, local_addr=local_address
2300
            ),
2301
        )
2302
        transport.pause_reading()
10✔
2303
        return SocketStream(transport, protocol)
10✔
2304

2305
    @classmethod
10✔
2306
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
10✔
2307
        await cls.checkpoint()
7✔
2308
        loop = get_running_loop()
7✔
2309
        raw_socket = socket.socket(socket.AF_UNIX)
7✔
2310
        raw_socket.setblocking(False)
7✔
2311
        while True:
4✔
2312
            try:
7✔
2313
                raw_socket.connect(path)
7✔
2314
            except BlockingIOError:
7✔
2315
                f: asyncio.Future = asyncio.Future()
×
2316
                loop.add_writer(raw_socket, f.set_result, None)
×
2317
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2318
                await f
×
2319
            except BaseException:
7✔
2320
                raw_socket.close()
7✔
2321
                raise
7✔
2322
            else:
2323
                return UNIXSocketStream(raw_socket)
7✔
2324

2325
    @classmethod
10✔
2326
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2327
        return TCPSocketListener(sock)
10✔
2328

2329
    @classmethod
10✔
2330
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
10✔
2331
        return UNIXSocketListener(sock)
7✔
2332

2333
    @classmethod
10✔
2334
    async def create_udp_socket(
10✔
2335
        cls,
2336
        family: AddressFamily,
2337
        local_address: IPSockAddrType | None,
2338
        remote_address: IPSockAddrType | None,
2339
        reuse_port: bool,
2340
    ) -> UDPSocket | ConnectedUDPSocket:
2341
        transport, protocol = await get_running_loop().create_datagram_endpoint(
9✔
2342
            DatagramProtocol,
2343
            local_addr=local_address,
2344
            remote_addr=remote_address,
2345
            family=family,
2346
            reuse_port=reuse_port,
2347
        )
2348
        if protocol.exception:
9✔
2349
            transport.close()
×
2350
            raise protocol.exception
×
2351

2352
        if not remote_address:
9✔
2353
            return UDPSocket(transport, protocol)
9✔
2354
        else:
2355
            return ConnectedUDPSocket(transport, protocol)
9✔
2356

2357
    @classmethod
10✔
2358
    async def create_unix_datagram_socket(  # type: ignore[override]
10✔
2359
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2360
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2361
        await cls.checkpoint()
7✔
2362
        loop = get_running_loop()
7✔
2363

2364
        if remote_path:
7✔
2365
            while True:
4✔
2366
                try:
7✔
2367
                    raw_socket.connect(remote_path)
7✔
2368
                except BlockingIOError:
×
2369
                    f: asyncio.Future = asyncio.Future()
×
2370
                    loop.add_writer(raw_socket, f.set_result, None)
×
2371
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2372
                    await f
×
2373
                except BaseException:
×
2374
                    raw_socket.close()
×
2375
                    raise
×
2376
                else:
2377
                    return ConnectedUNIXDatagramSocket(raw_socket)
7✔
2378
        else:
2379
            return UNIXDatagramSocket(raw_socket)
7✔
2380

2381
    @classmethod
10✔
2382
    async def getaddrinfo(
10✔
2383
        cls,
2384
        host: bytes | str | None,
2385
        port: str | int | None,
2386
        *,
2387
        family: int | AddressFamily = 0,
2388
        type: int | SocketKind = 0,
2389
        proto: int = 0,
2390
        flags: int = 0,
2391
    ) -> list[
2392
        tuple[
2393
            AddressFamily,
2394
            SocketKind,
2395
            int,
2396
            str,
2397
            tuple[str, int] | tuple[str, int, int, int],
2398
        ]
2399
    ]:
2400
        return await get_running_loop().getaddrinfo(
10✔
2401
            host, port, family=family, type=type, proto=proto, flags=flags
2402
        )
2403

2404
    @classmethod
10✔
2405
    async def getnameinfo(
10✔
2406
        cls, sockaddr: IPSockAddrType, flags: int = 0
2407
    ) -> tuple[str, str]:
2408
        return await get_running_loop().getnameinfo(sockaddr, flags)
9✔
2409

2410
    @classmethod
10✔
2411
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
10✔
2412
        await cls.checkpoint()
×
2413
        try:
×
2414
            read_events = _read_events.get()
×
2415
        except LookupError:
×
2416
            read_events = {}
×
2417
            _read_events.set(read_events)
×
2418

2419
        if read_events.get(sock):
×
2420
            raise BusyResourceError("reading from") from None
×
2421

2422
        loop = get_running_loop()
×
2423
        event = read_events[sock] = asyncio.Event()
×
2424
        loop.add_reader(sock, event.set)
×
2425
        try:
×
2426
            await event.wait()
×
2427
        finally:
2428
            if read_events.pop(sock, None) is not None:
×
2429
                loop.remove_reader(sock)
×
2430
                readable = True
×
2431
            else:
2432
                readable = False
×
2433

2434
        if not readable:
×
2435
            raise ClosedResourceError
×
2436

2437
    @classmethod
10✔
2438
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
10✔
2439
        await cls.checkpoint()
×
2440
        try:
×
2441
            write_events = _write_events.get()
×
2442
        except LookupError:
×
2443
            write_events = {}
×
2444
            _write_events.set(write_events)
×
2445

2446
        if write_events.get(sock):
×
2447
            raise BusyResourceError("writing to") from None
×
2448

2449
        loop = get_running_loop()
×
2450
        event = write_events[sock] = asyncio.Event()
×
2451
        loop.add_writer(sock.fileno(), event.set)
×
2452
        try:
×
2453
            await event.wait()
×
2454
        finally:
2455
            if write_events.pop(sock, None) is not None:
×
2456
                loop.remove_writer(sock)
×
2457
                writable = True
×
2458
            else:
2459
                writable = False
×
2460

2461
        if not writable:
×
2462
            raise ClosedResourceError
×
2463

2464
    @classmethod
10✔
2465
    def current_default_thread_limiter(cls) -> CapacityLimiter:
10✔
2466
        try:
10✔
2467
            return _default_thread_limiter.get()
10✔
2468
        except LookupError:
10✔
2469
            limiter = CapacityLimiter(40)
10✔
2470
            _default_thread_limiter.set(limiter)
10✔
2471
            return limiter
10✔
2472

2473
    @classmethod
10✔
2474
    def open_signal_receiver(
10✔
2475
        cls, *signals: Signals
2476
    ) -> ContextManager[AsyncIterator[Signals]]:
2477
        return _SignalReceiver(signals)
8✔
2478

2479
    @classmethod
10✔
2480
    def get_current_task(cls) -> TaskInfo:
10✔
2481
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
10✔
2482

2483
    @classmethod
10✔
2484
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
10✔
2485
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
10✔
2486

2487
    @classmethod
10✔
2488
    async def wait_all_tasks_blocked(cls) -> None:
10✔
2489
        await cls.checkpoint()
10✔
2490
        this_task = current_task()
10✔
2491
        while True:
6✔
2492
            for task in all_tasks():
10✔
2493
                if task is this_task:
10✔
2494
                    continue
10✔
2495

2496
                waiter = task._fut_waiter  # type: ignore[attr-defined]
10✔
2497
                if waiter is None or waiter.done():
10✔
2498
                    await sleep(0.1)
10✔
2499
                    break
10✔
2500
            else:
2501
                return
10✔
2502

2503
    @classmethod
10✔
2504
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
10✔
2505
        return TestRunner(**options)
10✔
2506

2507

2508
backend_class = AsyncIOBackend
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc