• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653607058

01 Sep 2024 11:27AM UTC coverage: 91.753% (-0.04%) from 91.788%
10653607058

Pull #752

github

web-flow
Merge 30adfc8d6 into 8a5b34626
Pull Request #752: Fixed feed_data after feed_eof assertion errors on asyncio

4 of 5 new or added lines in 1 file covered. (80.0%)

1 existing line in 1 file now uncovered.

4595 of 5008 relevant lines covered (91.75%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.19
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
11✔
2

3
import array
11✔
4
import asyncio
11✔
5
import concurrent.futures
11✔
6
import math
11✔
7
import socket
11✔
8
import sys
11✔
9
import threading
11✔
10
import weakref
11✔
11
from asyncio import (
11✔
12
    AbstractEventLoop,
13
    CancelledError,
14
    all_tasks,
15
    create_task,
16
    current_task,
17
    get_running_loop,
18
    sleep,
19
)
20
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
11✔
21
from collections import OrderedDict, deque
11✔
22
from collections.abc import AsyncIterator, Iterable
11✔
23
from concurrent.futures import Future
11✔
24
from contextlib import suppress
11✔
25
from contextvars import Context, copy_context
11✔
26
from dataclasses import dataclass, field
11✔
27
from functools import partial, wraps
11✔
28
from inspect import (
11✔
29
    CORO_RUNNING,
30
    CORO_SUSPENDED,
31
    getcoroutinestate,
32
    iscoroutine,
33
)
34
from io import IOBase
11✔
35
from os import PathLike
11✔
36
from queue import Queue
11✔
37
from signal import Signals
11✔
38
from socket import AddressFamily, SocketKind
11✔
39
from threading import Thread
11✔
40
from types import TracebackType
11✔
41
from typing import (
11✔
42
    IO,
43
    Any,
44
    AsyncGenerator,
45
    Awaitable,
46
    Callable,
47
    Collection,
48
    ContextManager,
49
    Coroutine,
50
    Mapping,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
11✔
58

59
import sniffio
11✔
60

61
from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
11✔
62
from .._core._eventloop import claim_worker_thread, threadlocals
11✔
63
from .._core._exceptions import (
11✔
64
    BrokenResourceError,
65
    BusyResourceError,
66
    ClosedResourceError,
67
    EndOfStream,
68
    WouldBlock,
69
    iterate_exceptions,
70
)
71
from .._core._sockets import convert_ipv6_sockaddr
11✔
72
from .._core._streams import create_memory_object_stream
11✔
73
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
11✔
74
from .._core._synchronization import Event as BaseEvent
11✔
75
from .._core._synchronization import ResourceGuard
11✔
76
from .._core._tasks import CancelScope as BaseCancelScope
11✔
77
from ..abc import (
11✔
78
    AsyncBackend,
79
    IPSockAddrType,
80
    SocketListener,
81
    UDPPacketType,
82
    UNIXDatagramPacketType,
83
)
84
from ..lowlevel import RunVar
11✔
85
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
11✔
86

87
if sys.version_info >= (3, 10):
11✔
88
    from typing import ParamSpec
7✔
89
else:
90
    from typing_extensions import ParamSpec
4✔
91

92
if sys.version_info >= (3, 11):
11✔
93
    from asyncio import Runner
5✔
94
    from typing import TypeVarTuple, Unpack
5✔
95
else:
96
    import contextvars
6✔
97
    import enum
6✔
98
    import signal
6✔
99
    from asyncio import coroutines, events, exceptions, tasks
6✔
100

101
    from exceptiongroup import BaseExceptionGroup
6✔
102
    from typing_extensions import TypeVarTuple, Unpack
6✔
103

104
    class _State(enum.Enum):
6✔
105
        CREATED = "created"
6✔
106
        INITIALIZED = "initialized"
6✔
107
        CLOSED = "closed"
6✔
108

109
    class Runner:
6✔
110
        # Copied from CPython 3.11
111
        def __init__(
6✔
112
            self,
113
            *,
114
            debug: bool | None = None,
115
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
116
        ):
117
            self._state = _State.CREATED
6✔
118
            self._debug = debug
6✔
119
            self._loop_factory = loop_factory
6✔
120
            self._loop: AbstractEventLoop | None = None
6✔
121
            self._context = None
6✔
122
            self._interrupt_count = 0
6✔
123
            self._set_event_loop = False
6✔
124

125
        def __enter__(self) -> Runner:
6✔
126
            self._lazy_init()
6✔
127
            return self
6✔
128

129
        def __exit__(
6✔
130
            self,
131
            exc_type: type[BaseException],
132
            exc_val: BaseException,
133
            exc_tb: TracebackType,
134
        ) -> None:
135
            self.close()
6✔
136

137
        def close(self) -> None:
6✔
138
            """Shutdown and close event loop."""
139
            if self._state is not _State.INITIALIZED:
6✔
140
                return
×
141
            try:
6✔
142
                loop = self._loop
6✔
143
                _cancel_all_tasks(loop)
6✔
144
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
145
                if hasattr(loop, "shutdown_default_executor"):
6✔
146
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
147
                else:
148
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
149
            finally:
150
                if self._set_event_loop:
6✔
151
                    events.set_event_loop(None)
6✔
152
                loop.close()
6✔
153
                self._loop = None
6✔
154
                self._state = _State.CLOSED
6✔
155

156
        def get_loop(self) -> AbstractEventLoop:
6✔
157
            """Return embedded event loop."""
158
            self._lazy_init()
6✔
159
            return self._loop
6✔
160

161
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
162
            """Run a coroutine inside the embedded event loop."""
163
            if not coroutines.iscoroutine(coro):
6✔
164
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
165

166
            if events._get_running_loop() is not None:
6✔
167
                # fail fast with short traceback
168
                raise RuntimeError(
×
169
                    "Runner.run() cannot be called from a running event loop"
170
                )
171

172
            self._lazy_init()
6✔
173

174
            if context is None:
6✔
175
                context = self._context
6✔
176
            task = context.run(self._loop.create_task, coro)
6✔
177

178
            if (
6✔
179
                threading.current_thread() is threading.main_thread()
180
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
181
            ):
182
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
183
                try:
6✔
184
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
185
                except ValueError:
×
186
                    # `signal.signal` may throw if `threading.main_thread` does
187
                    # not support signals (e.g. embedded interpreter with signals
188
                    # not registered - see gh-91880)
189
                    sigint_handler = None
×
190
            else:
191
                sigint_handler = None
6✔
192

193
            self._interrupt_count = 0
6✔
194
            try:
6✔
195
                return self._loop.run_until_complete(task)
6✔
196
            except exceptions.CancelledError:
6✔
197
                if self._interrupt_count > 0:
×
198
                    uncancel = getattr(task, "uncancel", None)
×
199
                    if uncancel is not None and uncancel() == 0:
×
200
                        raise KeyboardInterrupt()
1✔
201
                raise  # CancelledError
×
202
            finally:
203
                if (
6✔
204
                    sigint_handler is not None
205
                    and signal.getsignal(signal.SIGINT) is sigint_handler
206
                ):
207
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
208

209
        def _lazy_init(self) -> None:
6✔
210
            if self._state is _State.CLOSED:
6✔
211
                raise RuntimeError("Runner is closed")
×
212
            if self._state is _State.INITIALIZED:
6✔
213
                return
6✔
214
            if self._loop_factory is None:
6✔
215
                self._loop = events.new_event_loop()
6✔
216
                if not self._set_event_loop:
6✔
217
                    # Call set_event_loop only once to avoid calling
218
                    # attach_loop multiple times on child watchers
219
                    events.set_event_loop(self._loop)
6✔
220
                    self._set_event_loop = True
6✔
221
            else:
222
                self._loop = self._loop_factory()
4✔
223
            if self._debug is not None:
6✔
224
                self._loop.set_debug(self._debug)
6✔
225
            self._context = contextvars.copy_context()
6✔
226
            self._state = _State.INITIALIZED
6✔
227

228
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
229
            self._interrupt_count += 1
×
230
            if self._interrupt_count == 1 and not main_task.done():
×
231
                main_task.cancel()
×
232
                # wakeup loop if it is blocked by select() with long timeout
233
                self._loop.call_soon_threadsafe(lambda: None)
×
234
                return
×
235
            raise KeyboardInterrupt()
×
236

237
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
238
        to_cancel = tasks.all_tasks(loop)
6✔
239
        if not to_cancel:
6✔
240
            return
7✔
241

242
        for task in to_cancel:
6✔
243
            task.cancel()
6✔
244

245
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
246

247
        for task in to_cancel:
6✔
248
            if task.cancelled():
6✔
249
                continue
6✔
250
            if task.exception() is not None:
5✔
251
                loop.call_exception_handler(
×
252
                    {
253
                        "message": "unhandled exception during asyncio.run() shutdown",
254
                        "exception": task.exception(),
255
                        "task": task,
256
                    }
257
                )
258

259
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
260
        """Schedule the shutdown of the default executor."""
261

262
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
263
            try:
3✔
264
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
265
                loop.call_soon_threadsafe(future.set_result, None)
3✔
266
            except Exception as ex:
×
267
                loop.call_soon_threadsafe(future.set_exception, ex)
×
268

269
        loop._executor_shutdown_called = True
3✔
270
        if loop._default_executor is None:
3✔
271
            return
3✔
272
        future = loop.create_future()
3✔
273
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
274
        thread.start()
3✔
275
        try:
3✔
276
            await future
3✔
277
        finally:
278
            thread.join()
3✔
279

280

281
T_Retval = TypeVar("T_Retval")
11✔
282
T_contra = TypeVar("T_contra", contravariant=True)
11✔
283
PosArgsT = TypeVarTuple("PosArgsT")
11✔
284
P = ParamSpec("P")
11✔
285

286
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
11✔
287

288

289
def find_root_task() -> asyncio.Task:
11✔
290
    root_task = _root_task.get(None)
11✔
291
    if root_task is not None and not root_task.done():
11✔
292
        return root_task
11✔
293

294
    # Look for a task that has been started via run_until_complete()
295
    for task in all_tasks():
11✔
296
        if task._callbacks and not task.done():
11✔
297
            callbacks = [cb for cb, context in task._callbacks]
11✔
298
            for cb in callbacks:
11✔
299
                if (
11✔
300
                    cb is _run_until_complete_cb
301
                    or getattr(cb, "__module__", None) == "uvloop.loop"
302
                ):
303
                    _root_task.set(task)
11✔
304
                    return task
11✔
305

306
    # Look up the topmost task in the AnyIO task tree, if possible
307
    task = cast(asyncio.Task, current_task())
10✔
308
    state = _task_states.get(task)
10✔
309
    if state:
10✔
310
        cancel_scope = state.cancel_scope
10✔
311
        while cancel_scope and cancel_scope._parent_scope is not None:
10✔
312
            cancel_scope = cancel_scope._parent_scope
×
313

314
        if cancel_scope is not None:
10✔
315
            return cast(asyncio.Task, cancel_scope._host_task)
10✔
316

317
    return task
×
318

319

320
def get_callable_name(func: Callable) -> str:
11✔
321
    module = getattr(func, "__module__", None)
11✔
322
    qualname = getattr(func, "__qualname__", None)
11✔
323
    return ".".join([x for x in (module, qualname) if x])
11✔
324

325

326
#
327
# Event loop
328
#
329

330
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
11✔
331

332

333
def _task_started(task: asyncio.Task) -> bool:
11✔
334
    """Return ``True`` if the task has been started and has not finished."""
335
    try:
11✔
336
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
11✔
337
    except AttributeError:
×
338
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
339
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
340

341

342
#
343
# Timeouts and cancellation
344
#
345

346

347
class CancelScope(BaseCancelScope):
11✔
348
    def __new__(
11✔
349
        cls, *, deadline: float = math.inf, shield: bool = False
350
    ) -> CancelScope:
351
        return object.__new__(cls)
11✔
352

353
    def __init__(self, deadline: float = math.inf, shield: bool = False):
11✔
354
        self._deadline = deadline
11✔
355
        self._shield = shield
11✔
356
        self._parent_scope: CancelScope | None = None
11✔
357
        self._child_scopes: set[CancelScope] = set()
11✔
358
        self._cancel_called = False
11✔
359
        self._cancelled_caught = False
11✔
360
        self._active = False
11✔
361
        self._timeout_handle: asyncio.TimerHandle | None = None
11✔
362
        self._cancel_handle: asyncio.Handle | None = None
11✔
363
        self._tasks: set[asyncio.Task] = set()
11✔
364
        self._host_task: asyncio.Task | None = None
11✔
365
        self._cancel_calls: int = 0
11✔
366
        self._cancelling: int | None = None
11✔
367

368
    def __enter__(self) -> CancelScope:
11✔
369
        if self._active:
11✔
370
            raise RuntimeError(
×
371
                "Each CancelScope may only be used for a single 'with' block"
372
            )
373

374
        self._host_task = host_task = cast(asyncio.Task, current_task())
11✔
375
        self._tasks.add(host_task)
11✔
376
        try:
11✔
377
            task_state = _task_states[host_task]
11✔
378
        except KeyError:
11✔
379
            task_state = TaskState(None, self)
11✔
380
            _task_states[host_task] = task_state
11✔
381
        else:
382
            self._parent_scope = task_state.cancel_scope
11✔
383
            task_state.cancel_scope = self
11✔
384
            if self._parent_scope is not None:
11✔
385
                self._parent_scope._child_scopes.add(self)
11✔
386
                self._parent_scope._tasks.remove(host_task)
11✔
387

388
        self._timeout()
11✔
389
        self._active = True
11✔
390
        if sys.version_info >= (3, 11):
11✔
391
            self._cancelling = self._host_task.cancelling()
5✔
392

393
        # Start cancelling the host task if the scope was cancelled before entering
394
        if self._cancel_called:
11✔
395
            self._deliver_cancellation(self)
11✔
396

397
        return self
11✔
398

399
    def __exit__(
11✔
400
        self,
401
        exc_type: type[BaseException] | None,
402
        exc_val: BaseException | None,
403
        exc_tb: TracebackType | None,
404
    ) -> bool | None:
405
        if not self._active:
11✔
406
            raise RuntimeError("This cancel scope is not active")
10✔
407
        if current_task() is not self._host_task:
11✔
408
            raise RuntimeError(
10✔
409
                "Attempted to exit cancel scope in a different task than it was "
410
                "entered in"
411
            )
412

413
        assert self._host_task is not None
11✔
414
        host_task_state = _task_states.get(self._host_task)
11✔
415
        if host_task_state is None or host_task_state.cancel_scope is not self:
11✔
416
            raise RuntimeError(
10✔
417
                "Attempted to exit a cancel scope that isn't the current tasks's "
418
                "current cancel scope"
419
            )
420

421
        self._active = False
11✔
422
        if self._timeout_handle:
11✔
423
            self._timeout_handle.cancel()
11✔
424
            self._timeout_handle = None
11✔
425

426
        self._tasks.remove(self._host_task)
11✔
427
        if self._parent_scope is not None:
11✔
428
            self._parent_scope._child_scopes.remove(self)
11✔
429
            self._parent_scope._tasks.add(self._host_task)
11✔
430

431
        host_task_state.cancel_scope = self._parent_scope
11✔
432

433
        # Restart the cancellation effort in the closest directly cancelled parent
434
        # scope if this one was shielded
435
        self._restart_cancellation_in_parent()
11✔
436

437
        if self._cancel_called and exc_val is not None:
11✔
438
            for exc in iterate_exceptions(exc_val):
11✔
439
                if isinstance(exc, CancelledError):
11✔
440
                    self._cancelled_caught = self._uncancel(exc)
11✔
441
                    if self._cancelled_caught:
11✔
442
                        break
11✔
443

444
            return self._cancelled_caught
11✔
445

446
        return None
11✔
447

448
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
11✔
449
        if sys.version_info < (3, 9) or self._host_task is None:
11✔
450
            self._cancel_calls = 0
3✔
451
            return True
3✔
452

453
        # Undo all cancellations done by this scope
454
        if self._cancelling is not None:
8✔
455
            while self._cancel_calls:
5✔
456
                self._cancel_calls -= 1
5✔
457
                if self._host_task.uncancel() <= self._cancelling:
5✔
458
                    return True
5✔
459

460
        self._cancel_calls = 0
8✔
461
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
8✔
462

463
    def _timeout(self) -> None:
11✔
464
        if self._deadline != math.inf:
11✔
465
            loop = get_running_loop()
11✔
466
            if loop.time() >= self._deadline:
11✔
467
                self.cancel()
11✔
468
            else:
469
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
11✔
470

471
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
11✔
472
        """
473
        Deliver cancellation to directly contained tasks and nested cancel scopes.
474

475
        Schedule another run at the end if we still have tasks eligible for
476
        cancellation.
477

478
        :param origin: the cancel scope that originated the cancellation
479
        :return: ``True`` if the delivery needs to be retried on the next cycle
480

481
        """
482
        should_retry = False
11✔
483
        current = current_task()
11✔
484
        for task in self._tasks:
11✔
485
            if task._must_cancel:  # type: ignore[attr-defined]
11✔
486
                continue
10✔
487

488
            # The task is eligible for cancellation if it has started
489
            should_retry = True
11✔
490
            if task is not current and (task is self._host_task or _task_started(task)):
11✔
491
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
492
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
11✔
493
                    origin._cancel_calls += 1
11✔
494
                    if sys.version_info >= (3, 9):
11✔
495
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
8✔
496
                    else:
497
                        task.cancel()
3✔
498

499
        # Deliver cancellation to child scopes that aren't shielded or running their own
500
        # cancellation callbacks
501
        for scope in self._child_scopes:
11✔
502
            if not scope._shield and not scope.cancel_called:
11✔
503
                should_retry = scope._deliver_cancellation(origin) or should_retry
11✔
504

505
        # Schedule another callback if there are still tasks left
506
        if origin is self:
11✔
507
            if should_retry:
11✔
508
                self._cancel_handle = get_running_loop().call_soon(
11✔
509
                    self._deliver_cancellation, origin
510
                )
511
            else:
512
                self._cancel_handle = None
11✔
513

514
        return should_retry
11✔
515

516
    def _restart_cancellation_in_parent(self) -> None:
11✔
517
        """
518
        Restart the cancellation effort in the closest directly cancelled parent scope.
519

520
        """
521
        scope = self._parent_scope
11✔
522
        while scope is not None:
11✔
523
            if scope._cancel_called:
11✔
524
                if scope._cancel_handle is None:
11✔
525
                    scope._deliver_cancellation(scope)
11✔
526

527
                break
11✔
528

529
            # No point in looking beyond any shielded scope
530
            if scope._shield:
11✔
531
                break
10✔
532

533
            scope = scope._parent_scope
11✔
534

535
    def _parent_cancelled(self) -> bool:
11✔
536
        # Check whether any parent has been cancelled
537
        cancel_scope = self._parent_scope
11✔
538
        while cancel_scope is not None and not cancel_scope._shield:
11✔
539
            if cancel_scope._cancel_called:
11✔
540
                return True
11✔
541
            else:
542
                cancel_scope = cancel_scope._parent_scope
11✔
543

544
        return False
11✔
545

546
    def cancel(self) -> None:
11✔
547
        if not self._cancel_called:
11✔
548
            if self._timeout_handle:
11✔
549
                self._timeout_handle.cancel()
11✔
550
                self._timeout_handle = None
11✔
551

552
            self._cancel_called = True
11✔
553
            if self._host_task is not None:
11✔
554
                self._deliver_cancellation(self)
11✔
555

556
    @property
11✔
557
    def deadline(self) -> float:
11✔
558
        return self._deadline
10✔
559

560
    @deadline.setter
11✔
561
    def deadline(self, value: float) -> None:
11✔
562
        self._deadline = float(value)
10✔
563
        if self._timeout_handle is not None:
10✔
564
            self._timeout_handle.cancel()
10✔
565
            self._timeout_handle = None
10✔
566

567
        if self._active and not self._cancel_called:
10✔
568
            self._timeout()
10✔
569

570
    @property
11✔
571
    def cancel_called(self) -> bool:
11✔
572
        return self._cancel_called
11✔
573

574
    @property
11✔
575
    def cancelled_caught(self) -> bool:
11✔
576
        return self._cancelled_caught
11✔
577

578
    @property
11✔
579
    def shield(self) -> bool:
11✔
580
        return self._shield
11✔
581

582
    @shield.setter
11✔
583
    def shield(self, value: bool) -> None:
11✔
584
        if self._shield != value:
10✔
585
            self._shield = value
10✔
586
            if not value:
10✔
587
                self._restart_cancellation_in_parent()
10✔
588

589

590
#
591
# Task states
592
#
593

594

595
class TaskState:
11✔
596
    """
597
    Encapsulates auxiliary task information that cannot be added to the Task instance
598
    itself because there are no guarantees about its implementation.
599
    """
600

601
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
11✔
602

603
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
11✔
604
        self.parent_id = parent_id
11✔
605
        self.cancel_scope = cancel_scope
11✔
606

607

608
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
11✔
609

610

611
#
612
# Task groups
613
#
614

615

616
class _AsyncioTaskStatus(abc.TaskStatus):
11✔
617
    def __init__(self, future: asyncio.Future, parent_id: int):
11✔
618
        self._future = future
11✔
619
        self._parent_id = parent_id
11✔
620

621
    def started(self, value: T_contra | None = None) -> None:
11✔
622
        try:
11✔
623
            self._future.set_result(value)
11✔
624
        except asyncio.InvalidStateError:
10✔
625
            if not self._future.cancelled():
10✔
626
                raise RuntimeError(
10✔
627
                    "called 'started' twice on the same task status"
628
                ) from None
629

630
        task = cast(asyncio.Task, current_task())
11✔
631
        _task_states[task].parent_id = self._parent_id
11✔
632

633

634
class TaskGroup(abc.TaskGroup):
11✔
635
    def __init__(self) -> None:
11✔
636
        self.cancel_scope: CancelScope = CancelScope()
11✔
637
        self._active = False
11✔
638
        self._exceptions: list[BaseException] = []
11✔
639
        self._tasks: set[asyncio.Task] = set()
11✔
640

641
    async def __aenter__(self) -> TaskGroup:
11✔
642
        self.cancel_scope.__enter__()
11✔
643
        self._active = True
11✔
644
        return self
11✔
645

646
    async def __aexit__(
11✔
647
        self,
648
        exc_type: type[BaseException] | None,
649
        exc_val: BaseException | None,
650
        exc_tb: TracebackType | None,
651
    ) -> bool | None:
652
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
11✔
653
        if exc_val is not None:
11✔
654
            self.cancel_scope.cancel()
11✔
655
            if not isinstance(exc_val, CancelledError):
11✔
656
                self._exceptions.append(exc_val)
11✔
657

658
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
11✔
659
        while self._tasks:
11✔
660
            try:
11✔
661
                await asyncio.wait(self._tasks)
11✔
662
            except CancelledError as exc:
11✔
663
                # This task was cancelled natively; reraise the CancelledError later
664
                # unless this task was already interrupted by another exception
665
                self.cancel_scope.cancel()
11✔
666
                if cancelled_exc_while_waiting_tasks is None:
11✔
667
                    cancelled_exc_while_waiting_tasks = exc
11✔
668

669
        self._active = False
11✔
670
        if self._exceptions:
11✔
671
            raise BaseExceptionGroup(
11✔
672
                "unhandled errors in a TaskGroup", self._exceptions
673
            )
674

675
        # Raise the CancelledError received while waiting for child tasks to exit,
676
        # unless the context manager itself was previously exited with another
677
        # exception, or if any of the  child tasks raised an exception other than
678
        # CancelledError
679
        if cancelled_exc_while_waiting_tasks:
11✔
680
            if exc_val is None or ignore_exception:
11✔
681
                raise cancelled_exc_while_waiting_tasks
11✔
682

683
        return ignore_exception
11✔
684

685
    def _spawn(
11✔
686
        self,
687
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
688
        args: tuple[Unpack[PosArgsT]],
689
        name: object,
690
        task_status_future: asyncio.Future | None = None,
691
    ) -> asyncio.Task:
692
        def task_done(_task: asyncio.Task) -> None:
11✔
693
            task_state = _task_states[_task]
11✔
694
            assert task_state.cancel_scope is not None
11✔
695
            assert _task in task_state.cancel_scope._tasks
11✔
696
            task_state.cancel_scope._tasks.remove(_task)
11✔
697
            self._tasks.remove(task)
11✔
698
            del _task_states[_task]
11✔
699

700
            try:
11✔
701
                exc = _task.exception()
11✔
702
            except CancelledError as e:
11✔
703
                while isinstance(e.__context__, CancelledError):
11✔
704
                    e = e.__context__
3✔
705

706
                exc = e
11✔
707

708
            if exc is not None:
11✔
709
                # The future can only be in the cancelled state if the host task was
710
                # cancelled, so return immediately instead of adding one more
711
                # CancelledError to the exceptions list
712
                if task_status_future is not None and task_status_future.cancelled():
11✔
713
                    return
10✔
714

715
                if task_status_future is None or task_status_future.done():
11✔
716
                    if not isinstance(exc, CancelledError):
11✔
717
                        self._exceptions.append(exc)
11✔
718

719
                    if not self.cancel_scope._parent_cancelled():
11✔
720
                        self.cancel_scope.cancel()
11✔
721
                else:
722
                    task_status_future.set_exception(exc)
10✔
723
            elif task_status_future is not None and not task_status_future.done():
11✔
724
                task_status_future.set_exception(
10✔
725
                    RuntimeError("Child exited without calling task_status.started()")
726
                )
727

728
        if not self._active:
11✔
729
            raise RuntimeError(
10✔
730
                "This task group is not active; no new tasks can be started."
731
            )
732

733
        kwargs = {}
11✔
734
        if task_status_future:
11✔
735
            parent_id = id(current_task())
11✔
736
            kwargs["task_status"] = _AsyncioTaskStatus(
11✔
737
                task_status_future, id(self.cancel_scope._host_task)
738
            )
739
        else:
740
            parent_id = id(self.cancel_scope._host_task)
11✔
741

742
        coro = func(*args, **kwargs)
11✔
743
        if not iscoroutine(coro):
11✔
744
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
10✔
745
            raise TypeError(
10✔
746
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
747
                f"the return value ({coro!r}) is not a coroutine object"
748
            )
749

750
        name = get_callable_name(func) if name is None else str(name)
11✔
751
        task = create_task(coro, name=name)
11✔
752
        task.add_done_callback(task_done)
11✔
753

754
        # Make the spawned task inherit the task group's cancel scope
755
        _task_states[task] = TaskState(
11✔
756
            parent_id=parent_id, cancel_scope=self.cancel_scope
757
        )
758
        self.cancel_scope._tasks.add(task)
11✔
759
        self._tasks.add(task)
11✔
760
        return task
11✔
761

762
    def start_soon(
11✔
763
        self,
764
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
765
        *args: Unpack[PosArgsT],
766
        name: object = None,
767
    ) -> None:
768
        self._spawn(func, args, name)
11✔
769

770
    async def start(
11✔
771
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
772
    ) -> Any:
773
        future: asyncio.Future = asyncio.Future()
11✔
774
        task = self._spawn(func, args, name, future)
11✔
775

776
        # If the task raises an exception after sending a start value without a switch
777
        # point between, the task group is cancelled and this method never proceeds to
778
        # process the completed future. That's why we have to have a shielded cancel
779
        # scope here.
780
        try:
11✔
781
            return await future
11✔
782
        except CancelledError:
10✔
783
            # Cancel the task and wait for it to exit before returning
784
            task.cancel()
10✔
785
            with CancelScope(shield=True), suppress(CancelledError):
10✔
786
                await task
10✔
787

788
            raise
10✔
789

790

791
#
792
# Threads
793
#
794

795
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
11✔
796

797

798
class WorkerThread(Thread):
11✔
799
    MAX_IDLE_TIME = 10  # seconds
11✔
800

801
    def __init__(
11✔
802
        self,
803
        root_task: asyncio.Task,
804
        workers: set[WorkerThread],
805
        idle_workers: deque[WorkerThread],
806
    ):
807
        super().__init__(name="AnyIO worker thread")
11✔
808
        self.root_task = root_task
11✔
809
        self.workers = workers
11✔
810
        self.idle_workers = idle_workers
11✔
811
        self.loop = root_task._loop
11✔
812
        self.queue: Queue[
11✔
813
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
814
        ] = Queue(2)
815
        self.idle_since = AsyncIOBackend.current_time()
11✔
816
        self.stopping = False
11✔
817

818
    def _report_result(
11✔
819
        self, future: asyncio.Future, result: Any, exc: BaseException | None
820
    ) -> None:
821
        self.idle_since = AsyncIOBackend.current_time()
11✔
822
        if not self.stopping:
11✔
823
            self.idle_workers.append(self)
11✔
824

825
        if not future.cancelled():
11✔
826
            if exc is not None:
11✔
827
                if isinstance(exc, StopIteration):
11✔
828
                    new_exc = RuntimeError("coroutine raised StopIteration")
10✔
829
                    new_exc.__cause__ = exc
10✔
830
                    exc = new_exc
10✔
831

832
                future.set_exception(exc)
11✔
833
            else:
834
                future.set_result(result)
11✔
835

836
    def run(self) -> None:
11✔
837
        with claim_worker_thread(AsyncIOBackend, self.loop):
11✔
838
            while True:
7✔
839
                item = self.queue.get()
11✔
840
                if item is None:
11✔
841
                    # Shutdown command received
842
                    return
11✔
843

844
                context, func, args, future, cancel_scope = item
11✔
845
                if not future.cancelled():
11✔
846
                    result = None
11✔
847
                    exception: BaseException | None = None
11✔
848
                    threadlocals.current_cancel_scope = cancel_scope
11✔
849
                    try:
11✔
850
                        result = context.run(func, *args)
11✔
851
                    except BaseException as exc:
11✔
852
                        exception = exc
11✔
853
                    finally:
854
                        del threadlocals.current_cancel_scope
11✔
855

856
                    if not self.loop.is_closed():
11✔
857
                        self.loop.call_soon_threadsafe(
11✔
858
                            self._report_result, future, result, exception
859
                        )
860

861
                self.queue.task_done()
11✔
862

863
    def stop(self, f: asyncio.Task | None = None) -> None:
11✔
864
        self.stopping = True
11✔
865
        self.queue.put_nowait(None)
11✔
866
        self.workers.discard(self)
11✔
867
        try:
11✔
868
            self.idle_workers.remove(self)
11✔
869
        except ValueError:
10✔
870
            pass
10✔
871

872

873
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
11✔
874
    "_threadpool_idle_workers"
875
)
876
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
11✔
877

878

879
class BlockingPortal(abc.BlockingPortal):
11✔
880
    def __new__(cls) -> BlockingPortal:
11✔
881
        return object.__new__(cls)
11✔
882

883
    def __init__(self) -> None:
11✔
884
        super().__init__()
11✔
885
        self._loop = get_running_loop()
11✔
886

887
    def _spawn_task_from_thread(
11✔
888
        self,
889
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
890
        args: tuple[Unpack[PosArgsT]],
891
        kwargs: dict[str, Any],
892
        name: object,
893
        future: Future[T_Retval],
894
    ) -> None:
895
        AsyncIOBackend.run_sync_from_thread(
11✔
896
            partial(self._task_group.start_soon, name=name),
897
            (self._call_func, func, args, kwargs, future),
898
            self._loop,
899
        )
900

901

902
#
903
# Subprocesses
904
#
905

906

907
@dataclass(eq=False)
11✔
908
class StreamReaderWrapper(abc.ByteReceiveStream):
11✔
909
    _stream: asyncio.StreamReader
11✔
910
    _closed: bool = field(init=False, default=False)
11✔
911

912
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
913
        if self._closed:
10✔
NEW
914
            raise ClosedResourceError
×
915

916
        data = await self._stream.read(max_bytes)
10✔
917
        if data:
10✔
918
            return data
10✔
919
        else:
920
            raise EndOfStream
10✔
921

922
    async def aclose(self) -> None:
11✔
923
        self._closed = True
10✔
924
        await AsyncIOBackend.checkpoint()
10✔
925

926

927
@dataclass(eq=False)
11✔
928
class StreamWriterWrapper(abc.ByteSendStream):
11✔
929
    _stream: asyncio.StreamWriter
11✔
930

931
    async def send(self, item: bytes) -> None:
11✔
932
        self._stream.write(item)
10✔
933
        await self._stream.drain()
10✔
934

935
    async def aclose(self) -> None:
11✔
936
        self._stream.close()
10✔
937
        await AsyncIOBackend.checkpoint()
10✔
938

939

940
@dataclass(eq=False)
11✔
941
class Process(abc.Process):
11✔
942
    _process: asyncio.subprocess.Process
11✔
943
    _stdin: StreamWriterWrapper | None
11✔
944
    _stdout: StreamReaderWrapper | None
11✔
945
    _stderr: StreamReaderWrapper | None
11✔
946

947
    async def aclose(self) -> None:
11✔
948
        with CancelScope(shield=True):
10✔
949
            if self._stdin:
10✔
950
                await self._stdin.aclose()
10✔
951
            if self._stdout:
10✔
952
                await self._stdout.aclose()
10✔
953
            if self._stderr:
10✔
954
                await self._stderr.aclose()
10✔
955

956
        try:
10✔
957
            await self.wait()
10✔
958
        except BaseException:
10✔
959
            self.kill()
10✔
960
            with CancelScope(shield=True):
10✔
961
                await self.wait()
10✔
962

963
            raise
10✔
964

965
    async def wait(self) -> int:
11✔
966
        return await self._process.wait()
10✔
967

968
    def terminate(self) -> None:
11✔
969
        self._process.terminate()
8✔
970

971
    def kill(self) -> None:
11✔
972
        self._process.kill()
10✔
973

974
    def send_signal(self, signal: int) -> None:
11✔
975
        self._process.send_signal(signal)
×
976

977
    @property
11✔
978
    def pid(self) -> int:
11✔
979
        return self._process.pid
×
980

981
    @property
11✔
982
    def returncode(self) -> int | None:
11✔
983
        return self._process.returncode
10✔
984

985
    @property
11✔
986
    def stdin(self) -> abc.ByteSendStream | None:
11✔
987
        return self._stdin
10✔
988

989
    @property
11✔
990
    def stdout(self) -> abc.ByteReceiveStream | None:
11✔
991
        return self._stdout
10✔
992

993
    @property
11✔
994
    def stderr(self) -> abc.ByteReceiveStream | None:
11✔
995
        return self._stderr
10✔
996

997

998
def _forcibly_shutdown_process_pool_on_exit(
11✔
999
    workers: set[Process], _task: object
1000
) -> None:
1001
    """
1002
    Forcibly shuts down worker processes belonging to this event loop."""
1003
    child_watcher: asyncio.AbstractChildWatcher | None = None
10✔
1004
    if sys.version_info < (3, 12):
10✔
1005
        try:
6✔
1006
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1007
        except NotImplementedError:
1✔
1008
            pass
1✔
1009

1010
    # Close as much as possible (w/o async/await) to avoid warnings
1011
    for process in workers:
10✔
1012
        if process.returncode is None:
10✔
1013
            continue
10✔
1014

1015
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1016
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1017
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1018
        process.kill()
×
1019
        if child_watcher:
×
1020
            child_watcher.remove_child_handler(process.pid)
×
1021

1022

1023
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
11✔
1024
    """
1025
    Shuts down worker processes belonging to this event loop.
1026

1027
    NOTE: this only works when the event loop was started using asyncio.run() or
1028
    anyio.run().
1029

1030
    """
1031
    process: abc.Process
1032
    try:
10✔
1033
        await sleep(math.inf)
10✔
1034
    except asyncio.CancelledError:
10✔
1035
        for process in workers:
10✔
1036
            if process.returncode is None:
10✔
1037
                process.kill()
10✔
1038

1039
        for process in workers:
10✔
1040
            await process.aclose()
10✔
1041

1042

1043
#
1044
# Sockets and networking
1045
#
1046

1047

1048
class StreamProtocol(asyncio.Protocol):
11✔
1049
    read_queue: deque[bytes]
11✔
1050
    read_event: asyncio.Event
11✔
1051
    write_event: asyncio.Event
11✔
1052
    exception: Exception | None = None
11✔
1053
    is_at_eof: bool = False
11✔
1054

1055
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1056
        self.read_queue = deque()
11✔
1057
        self.read_event = asyncio.Event()
11✔
1058
        self.write_event = asyncio.Event()
11✔
1059
        self.write_event.set()
11✔
1060
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
11✔
1061

1062
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1063
        if exc:
11✔
1064
            self.exception = BrokenResourceError()
11✔
1065
            self.exception.__cause__ = exc
11✔
1066

1067
        self.read_event.set()
11✔
1068
        self.write_event.set()
11✔
1069

1070
    def data_received(self, data: bytes) -> None:
11✔
1071
        self.read_queue.append(data)
11✔
1072
        self.read_event.set()
11✔
1073

1074
    def eof_received(self) -> bool | None:
11✔
1075
        self.is_at_eof = True
11✔
1076
        self.read_event.set()
11✔
1077
        return True
11✔
1078

1079
    def pause_writing(self) -> None:
11✔
1080
        self.write_event = asyncio.Event()
11✔
1081

1082
    def resume_writing(self) -> None:
11✔
UNCOV
1083
        self.write_event.set()
×
1084

1085

1086
class DatagramProtocol(asyncio.DatagramProtocol):
11✔
1087
    read_queue: deque[tuple[bytes, IPSockAddrType]]
11✔
1088
    read_event: asyncio.Event
11✔
1089
    write_event: asyncio.Event
11✔
1090
    exception: Exception | None = None
11✔
1091

1092
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1093
        self.read_queue = deque(maxlen=100)  # arbitrary value
10✔
1094
        self.read_event = asyncio.Event()
10✔
1095
        self.write_event = asyncio.Event()
10✔
1096
        self.write_event.set()
10✔
1097

1098
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1099
        self.read_event.set()
10✔
1100
        self.write_event.set()
10✔
1101

1102
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
11✔
1103
        addr = convert_ipv6_sockaddr(addr)
10✔
1104
        self.read_queue.append((data, addr))
10✔
1105
        self.read_event.set()
10✔
1106

1107
    def error_received(self, exc: Exception) -> None:
11✔
1108
        self.exception = exc
×
1109

1110
    def pause_writing(self) -> None:
11✔
1111
        self.write_event.clear()
×
1112

1113
    def resume_writing(self) -> None:
11✔
1114
        self.write_event.set()
×
1115

1116

1117
class SocketStream(abc.SocketStream):
11✔
1118
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
11✔
1119
        self._transport = transport
11✔
1120
        self._protocol = protocol
11✔
1121
        self._receive_guard = ResourceGuard("reading from")
11✔
1122
        self._send_guard = ResourceGuard("writing to")
11✔
1123
        self._closed = False
11✔
1124

1125
    @property
11✔
1126
    def _raw_socket(self) -> socket.socket:
11✔
1127
        return self._transport.get_extra_info("socket")
11✔
1128

1129
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1130
        with self._receive_guard:
11✔
1131
            if (
11✔
1132
                not self._protocol.read_event.is_set()
1133
                and not self._transport.is_closing()
1134
                and not self._protocol.is_at_eof
1135
            ):
1136
                self._transport.resume_reading()
11✔
1137
                await self._protocol.read_event.wait()
11✔
1138
                self._transport.pause_reading()
11✔
1139
            else:
1140
                await AsyncIOBackend.checkpoint()
11✔
1141

1142
            try:
11✔
1143
                chunk = self._protocol.read_queue.popleft()
11✔
1144
            except IndexError:
11✔
1145
                if self._closed:
11✔
1146
                    raise ClosedResourceError from None
11✔
1147
                elif self._protocol.exception:
11✔
1148
                    raise self._protocol.exception from None
11✔
1149
                else:
1150
                    raise EndOfStream from None
11✔
1151

1152
            if len(chunk) > max_bytes:
11✔
1153
                # Split the oversized chunk
1154
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
9✔
1155
                self._protocol.read_queue.appendleft(leftover)
9✔
1156

1157
            # If the read queue is empty, clear the flag so that the next call will
1158
            # block until data is available
1159
            if not self._protocol.read_queue:
11✔
1160
                self._protocol.read_event.clear()
11✔
1161

1162
        return chunk
11✔
1163

1164
    async def send(self, item: bytes) -> None:
11✔
1165
        with self._send_guard:
11✔
1166
            await AsyncIOBackend.checkpoint()
11✔
1167

1168
            if self._closed:
11✔
1169
                raise ClosedResourceError
11✔
1170
            elif self._protocol.exception is not None:
11✔
1171
                raise self._protocol.exception
11✔
1172

1173
            try:
11✔
1174
                self._transport.write(item)
11✔
1175
            except RuntimeError as exc:
×
1176
                if self._transport.is_closing():
×
1177
                    raise BrokenResourceError from exc
×
1178
                else:
1179
                    raise
×
1180

1181
            await self._protocol.write_event.wait()
11✔
1182

1183
    async def send_eof(self) -> None:
11✔
1184
        try:
11✔
1185
            self._transport.write_eof()
11✔
1186
        except OSError:
×
1187
            pass
×
1188

1189
    async def aclose(self) -> None:
11✔
1190
        if not self._transport.is_closing():
11✔
1191
            self._closed = True
11✔
1192
            try:
11✔
1193
                self._transport.write_eof()
11✔
1194
            except OSError:
7✔
1195
                pass
7✔
1196

1197
            self._transport.close()
11✔
1198
            await sleep(0)
11✔
1199
            self._transport.abort()
11✔
1200

1201

1202
class _RawSocketMixin:
11✔
1203
    _receive_future: asyncio.Future | None = None
11✔
1204
    _send_future: asyncio.Future | None = None
11✔
1205
    _closing = False
11✔
1206

1207
    def __init__(self, raw_socket: socket.socket):
11✔
1208
        self.__raw_socket = raw_socket
8✔
1209
        self._receive_guard = ResourceGuard("reading from")
8✔
1210
        self._send_guard = ResourceGuard("writing to")
8✔
1211

1212
    @property
11✔
1213
    def _raw_socket(self) -> socket.socket:
11✔
1214
        return self.__raw_socket
8✔
1215

1216
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1217
        def callback(f: object) -> None:
8✔
1218
            del self._receive_future
8✔
1219
            loop.remove_reader(self.__raw_socket)
8✔
1220

1221
        f = self._receive_future = asyncio.Future()
8✔
1222
        loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1223
        f.add_done_callback(callback)
8✔
1224
        return f
8✔
1225

1226
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1227
        def callback(f: object) -> None:
8✔
1228
            del self._send_future
8✔
1229
            loop.remove_writer(self.__raw_socket)
8✔
1230

1231
        f = self._send_future = asyncio.Future()
8✔
1232
        loop.add_writer(self.__raw_socket, f.set_result, None)
8✔
1233
        f.add_done_callback(callback)
8✔
1234
        return f
8✔
1235

1236
    async def aclose(self) -> None:
11✔
1237
        if not self._closing:
8✔
1238
            self._closing = True
8✔
1239
            if self.__raw_socket.fileno() != -1:
8✔
1240
                self.__raw_socket.close()
8✔
1241

1242
            if self._receive_future:
8✔
1243
                self._receive_future.set_result(None)
8✔
1244
            if self._send_future:
8✔
1245
                self._send_future.set_result(None)
×
1246

1247

1248
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
11✔
1249
    async def send_eof(self) -> None:
11✔
1250
        with self._send_guard:
8✔
1251
            self._raw_socket.shutdown(socket.SHUT_WR)
8✔
1252

1253
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1254
        loop = get_running_loop()
8✔
1255
        await AsyncIOBackend.checkpoint()
8✔
1256
        with self._receive_guard:
8✔
1257
            while True:
5✔
1258
                try:
8✔
1259
                    data = self._raw_socket.recv(max_bytes)
8✔
1260
                except BlockingIOError:
8✔
1261
                    await self._wait_until_readable(loop)
8✔
1262
                except OSError as exc:
8✔
1263
                    if self._closing:
8✔
1264
                        raise ClosedResourceError from None
8✔
1265
                    else:
1266
                        raise BrokenResourceError from exc
1✔
1267
                else:
1268
                    if not data:
8✔
1269
                        raise EndOfStream
8✔
1270

1271
                    return data
8✔
1272

1273
    async def send(self, item: bytes) -> None:
11✔
1274
        loop = get_running_loop()
8✔
1275
        await AsyncIOBackend.checkpoint()
8✔
1276
        with self._send_guard:
8✔
1277
            view = memoryview(item)
8✔
1278
            while view:
8✔
1279
                try:
8✔
1280
                    bytes_sent = self._raw_socket.send(view)
8✔
1281
                except BlockingIOError:
8✔
1282
                    await self._wait_until_writable(loop)
8✔
1283
                except OSError as exc:
8✔
1284
                    if self._closing:
8✔
1285
                        raise ClosedResourceError from None
8✔
1286
                    else:
1287
                        raise BrokenResourceError from exc
1✔
1288
                else:
1289
                    view = view[bytes_sent:]
8✔
1290

1291
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
11✔
1292
        if not isinstance(msglen, int) or msglen < 0:
8✔
1293
            raise ValueError("msglen must be a non-negative integer")
8✔
1294
        if not isinstance(maxfds, int) or maxfds < 1:
8✔
1295
            raise ValueError("maxfds must be a positive integer")
8✔
1296

1297
        loop = get_running_loop()
8✔
1298
        fds = array.array("i")
8✔
1299
        await AsyncIOBackend.checkpoint()
8✔
1300
        with self._receive_guard:
8✔
1301
            while True:
5✔
1302
                try:
8✔
1303
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
8✔
1304
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1305
                    )
1306
                except BlockingIOError:
8✔
1307
                    await self._wait_until_readable(loop)
8✔
1308
                except OSError as exc:
×
1309
                    if self._closing:
×
1310
                        raise ClosedResourceError from None
×
1311
                    else:
1312
                        raise BrokenResourceError from exc
×
1313
                else:
1314
                    if not message and not ancdata:
8✔
1315
                        raise EndOfStream
×
1316

1317
                    break
5✔
1318

1319
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
8✔
1320
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
8✔
1321
                raise RuntimeError(
×
1322
                    f"Received unexpected ancillary data; message = {message!r}, "
1323
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1324
                )
1325

1326
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
8✔
1327

1328
        return message, list(fds)
8✔
1329

1330
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
11✔
1331
        if not message:
8✔
1332
            raise ValueError("message must not be empty")
8✔
1333
        if not fds:
8✔
1334
            raise ValueError("fds must not be empty")
8✔
1335

1336
        loop = get_running_loop()
8✔
1337
        filenos: list[int] = []
8✔
1338
        for fd in fds:
8✔
1339
            if isinstance(fd, int):
8✔
1340
                filenos.append(fd)
×
1341
            elif isinstance(fd, IOBase):
8✔
1342
                filenos.append(fd.fileno())
8✔
1343

1344
        fdarray = array.array("i", filenos)
8✔
1345
        await AsyncIOBackend.checkpoint()
8✔
1346
        with self._send_guard:
8✔
1347
            while True:
5✔
1348
                try:
8✔
1349
                    # The ignore can be removed after mypy picks up
1350
                    # https://github.com/python/typeshed/pull/5545
1351
                    self._raw_socket.sendmsg(
8✔
1352
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1353
                    )
1354
                    break
8✔
1355
                except BlockingIOError:
×
1356
                    await self._wait_until_writable(loop)
×
1357
                except OSError as exc:
×
1358
                    if self._closing:
×
1359
                        raise ClosedResourceError from None
×
1360
                    else:
1361
                        raise BrokenResourceError from exc
×
1362

1363

1364
class TCPSocketListener(abc.SocketListener):
11✔
1365
    _accept_scope: CancelScope | None = None
11✔
1366
    _closed = False
11✔
1367

1368
    def __init__(self, raw_socket: socket.socket):
11✔
1369
        self.__raw_socket = raw_socket
11✔
1370
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
11✔
1371
        self._accept_guard = ResourceGuard("accepting connections from")
11✔
1372

1373
    @property
11✔
1374
    def _raw_socket(self) -> socket.socket:
11✔
1375
        return self.__raw_socket
11✔
1376

1377
    async def accept(self) -> abc.SocketStream:
11✔
1378
        if self._closed:
11✔
1379
            raise ClosedResourceError
11✔
1380

1381
        with self._accept_guard:
11✔
1382
            await AsyncIOBackend.checkpoint()
11✔
1383
            with CancelScope() as self._accept_scope:
11✔
1384
                try:
11✔
1385
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
11✔
1386
                except asyncio.CancelledError:
11✔
1387
                    # Workaround for https://bugs.python.org/issue41317
1388
                    try:
11✔
1389
                        self._loop.remove_reader(self._raw_socket)
11✔
1390
                    except (ValueError, NotImplementedError):
2✔
1391
                        pass
2✔
1392

1393
                    if self._closed:
11✔
1394
                        raise ClosedResourceError from None
10✔
1395

1396
                    raise
11✔
1397
                finally:
1398
                    self._accept_scope = None
11✔
1399

1400
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
1401
        transport, protocol = await self._loop.connect_accepted_socket(
11✔
1402
            StreamProtocol, client_sock
1403
        )
1404
        return SocketStream(transport, protocol)
11✔
1405

1406
    async def aclose(self) -> None:
11✔
1407
        if self._closed:
11✔
1408
            return
11✔
1409

1410
        self._closed = True
11✔
1411
        if self._accept_scope:
11✔
1412
            # Workaround for https://bugs.python.org/issue41317
1413
            try:
11✔
1414
                self._loop.remove_reader(self._raw_socket)
11✔
1415
            except (ValueError, NotImplementedError):
2✔
1416
                pass
2✔
1417

1418
            self._accept_scope.cancel()
10✔
1419
            await sleep(0)
10✔
1420

1421
        self._raw_socket.close()
11✔
1422

1423

1424
class UNIXSocketListener(abc.SocketListener):
11✔
1425
    def __init__(self, raw_socket: socket.socket):
11✔
1426
        self.__raw_socket = raw_socket
8✔
1427
        self._loop = get_running_loop()
8✔
1428
        self._accept_guard = ResourceGuard("accepting connections from")
8✔
1429
        self._closed = False
8✔
1430

1431
    async def accept(self) -> abc.SocketStream:
11✔
1432
        await AsyncIOBackend.checkpoint()
8✔
1433
        with self._accept_guard:
8✔
1434
            while True:
5✔
1435
                try:
8✔
1436
                    client_sock, _ = self.__raw_socket.accept()
8✔
1437
                    client_sock.setblocking(False)
8✔
1438
                    return UNIXSocketStream(client_sock)
8✔
1439
                except BlockingIOError:
8✔
1440
                    f: asyncio.Future = asyncio.Future()
8✔
1441
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1442
                    f.add_done_callback(
8✔
1443
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1444
                    )
1445
                    await f
8✔
1446
                except OSError as exc:
×
1447
                    if self._closed:
×
1448
                        raise ClosedResourceError from None
×
1449
                    else:
1450
                        raise BrokenResourceError from exc
1✔
1451

1452
    async def aclose(self) -> None:
11✔
1453
        self._closed = True
8✔
1454
        self.__raw_socket.close()
8✔
1455

1456
    @property
11✔
1457
    def _raw_socket(self) -> socket.socket:
11✔
1458
        return self.__raw_socket
8✔
1459

1460

1461
class UDPSocket(abc.UDPSocket):
11✔
1462
    def __init__(
11✔
1463
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1464
    ):
1465
        self._transport = transport
10✔
1466
        self._protocol = protocol
10✔
1467
        self._receive_guard = ResourceGuard("reading from")
10✔
1468
        self._send_guard = ResourceGuard("writing to")
10✔
1469
        self._closed = False
10✔
1470

1471
    @property
11✔
1472
    def _raw_socket(self) -> socket.socket:
11✔
1473
        return self._transport.get_extra_info("socket")
10✔
1474

1475
    async def aclose(self) -> None:
11✔
1476
        if not self._transport.is_closing():
10✔
1477
            self._closed = True
10✔
1478
            self._transport.close()
10✔
1479

1480
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
11✔
1481
        with self._receive_guard:
10✔
1482
            await AsyncIOBackend.checkpoint()
10✔
1483

1484
            # If the buffer is empty, ask for more data
1485
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1486
                self._protocol.read_event.clear()
10✔
1487
                await self._protocol.read_event.wait()
10✔
1488

1489
            try:
10✔
1490
                return self._protocol.read_queue.popleft()
10✔
1491
            except IndexError:
10✔
1492
                if self._closed:
10✔
1493
                    raise ClosedResourceError from None
10✔
1494
                else:
1495
                    raise BrokenResourceError from None
1✔
1496

1497
    async def send(self, item: UDPPacketType) -> None:
11✔
1498
        with self._send_guard:
10✔
1499
            await AsyncIOBackend.checkpoint()
10✔
1500
            await self._protocol.write_event.wait()
10✔
1501
            if self._closed:
10✔
1502
                raise ClosedResourceError
10✔
1503
            elif self._transport.is_closing():
10✔
1504
                raise BrokenResourceError
×
1505
            else:
1506
                self._transport.sendto(*item)
10✔
1507

1508

1509
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
11✔
1510
    def __init__(
11✔
1511
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1512
    ):
1513
        self._transport = transport
10✔
1514
        self._protocol = protocol
10✔
1515
        self._receive_guard = ResourceGuard("reading from")
10✔
1516
        self._send_guard = ResourceGuard("writing to")
10✔
1517
        self._closed = False
10✔
1518

1519
    @property
11✔
1520
    def _raw_socket(self) -> socket.socket:
11✔
1521
        return self._transport.get_extra_info("socket")
10✔
1522

1523
    async def aclose(self) -> None:
11✔
1524
        if not self._transport.is_closing():
10✔
1525
            self._closed = True
10✔
1526
            self._transport.close()
10✔
1527

1528
    async def receive(self) -> bytes:
11✔
1529
        with self._receive_guard:
10✔
1530
            await AsyncIOBackend.checkpoint()
10✔
1531

1532
            # If the buffer is empty, ask for more data
1533
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1534
                self._protocol.read_event.clear()
10✔
1535
                await self._protocol.read_event.wait()
10✔
1536

1537
            try:
10✔
1538
                packet = self._protocol.read_queue.popleft()
10✔
1539
            except IndexError:
10✔
1540
                if self._closed:
10✔
1541
                    raise ClosedResourceError from None
10✔
1542
                else:
1543
                    raise BrokenResourceError from None
×
1544

1545
            return packet[0]
10✔
1546

1547
    async def send(self, item: bytes) -> None:
11✔
1548
        with self._send_guard:
10✔
1549
            await AsyncIOBackend.checkpoint()
10✔
1550
            await self._protocol.write_event.wait()
10✔
1551
            if self._closed:
10✔
1552
                raise ClosedResourceError
10✔
1553
            elif self._transport.is_closing():
10✔
1554
                raise BrokenResourceError
×
1555
            else:
1556
                self._transport.sendto(item)
10✔
1557

1558

1559
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
11✔
1560
    async def receive(self) -> UNIXDatagramPacketType:
11✔
1561
        loop = get_running_loop()
8✔
1562
        await AsyncIOBackend.checkpoint()
8✔
1563
        with self._receive_guard:
8✔
1564
            while True:
5✔
1565
                try:
8✔
1566
                    data = self._raw_socket.recvfrom(65536)
8✔
1567
                except BlockingIOError:
8✔
1568
                    await self._wait_until_readable(loop)
8✔
1569
                except OSError as exc:
8✔
1570
                    if self._closing:
8✔
1571
                        raise ClosedResourceError from None
8✔
1572
                    else:
1573
                        raise BrokenResourceError from exc
1✔
1574
                else:
1575
                    return data
8✔
1576

1577
    async def send(self, item: UNIXDatagramPacketType) -> None:
11✔
1578
        loop = get_running_loop()
8✔
1579
        await AsyncIOBackend.checkpoint()
8✔
1580
        with self._send_guard:
8✔
1581
            while True:
5✔
1582
                try:
8✔
1583
                    self._raw_socket.sendto(*item)
8✔
1584
                except BlockingIOError:
8✔
1585
                    await self._wait_until_writable(loop)
×
1586
                except OSError as exc:
8✔
1587
                    if self._closing:
8✔
1588
                        raise ClosedResourceError from None
8✔
1589
                    else:
1590
                        raise BrokenResourceError from exc
1✔
1591
                else:
1592
                    return
8✔
1593

1594

1595
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
11✔
1596
    async def receive(self) -> bytes:
11✔
1597
        loop = get_running_loop()
8✔
1598
        await AsyncIOBackend.checkpoint()
8✔
1599
        with self._receive_guard:
8✔
1600
            while True:
5✔
1601
                try:
8✔
1602
                    data = self._raw_socket.recv(65536)
8✔
1603
                except BlockingIOError:
8✔
1604
                    await self._wait_until_readable(loop)
8✔
1605
                except OSError as exc:
8✔
1606
                    if self._closing:
8✔
1607
                        raise ClosedResourceError from None
8✔
1608
                    else:
1609
                        raise BrokenResourceError from exc
1✔
1610
                else:
1611
                    return data
8✔
1612

1613
    async def send(self, item: bytes) -> None:
11✔
1614
        loop = get_running_loop()
8✔
1615
        await AsyncIOBackend.checkpoint()
8✔
1616
        with self._send_guard:
8✔
1617
            while True:
5✔
1618
                try:
8✔
1619
                    self._raw_socket.send(item)
8✔
1620
                except BlockingIOError:
8✔
1621
                    await self._wait_until_writable(loop)
×
1622
                except OSError as exc:
8✔
1623
                    if self._closing:
8✔
1624
                        raise ClosedResourceError from None
8✔
1625
                    else:
1626
                        raise BrokenResourceError from exc
1✔
1627
                else:
1628
                    return
8✔
1629

1630

1631
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
11✔
1632
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
11✔
1633

1634

1635
#
1636
# Synchronization
1637
#
1638

1639

1640
class Event(BaseEvent):
11✔
1641
    def __new__(cls) -> Event:
11✔
1642
        return object.__new__(cls)
11✔
1643

1644
    def __init__(self) -> None:
11✔
1645
        self._event = asyncio.Event()
11✔
1646

1647
    def set(self) -> None:
11✔
1648
        self._event.set()
11✔
1649

1650
    def is_set(self) -> bool:
11✔
1651
        return self._event.is_set()
11✔
1652

1653
    async def wait(self) -> None:
11✔
1654
        if self.is_set():
11✔
1655
            await AsyncIOBackend.checkpoint()
11✔
1656
        else:
1657
            await self._event.wait()
11✔
1658

1659
    def statistics(self) -> EventStatistics:
11✔
1660
        return EventStatistics(len(self._event._waiters))
10✔
1661

1662

1663
class CapacityLimiter(BaseCapacityLimiter):
11✔
1664
    _total_tokens: float = 0
11✔
1665

1666
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
11✔
1667
        return object.__new__(cls)
11✔
1668

1669
    def __init__(self, total_tokens: float):
11✔
1670
        self._borrowers: set[Any] = set()
11✔
1671
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
11✔
1672
        self.total_tokens = total_tokens
11✔
1673

1674
    async def __aenter__(self) -> None:
11✔
1675
        await self.acquire()
11✔
1676

1677
    async def __aexit__(
11✔
1678
        self,
1679
        exc_type: type[BaseException] | None,
1680
        exc_val: BaseException | None,
1681
        exc_tb: TracebackType | None,
1682
    ) -> None:
1683
        self.release()
11✔
1684

1685
    @property
11✔
1686
    def total_tokens(self) -> float:
11✔
1687
        return self._total_tokens
10✔
1688

1689
    @total_tokens.setter
11✔
1690
    def total_tokens(self, value: float) -> None:
11✔
1691
        if not isinstance(value, int) and not math.isinf(value):
11✔
1692
            raise TypeError("total_tokens must be an int or math.inf")
10✔
1693
        if value < 1:
11✔
1694
            raise ValueError("total_tokens must be >= 1")
10✔
1695

1696
        waiters_to_notify = max(value - self._total_tokens, 0)
11✔
1697
        self._total_tokens = value
11✔
1698

1699
        # Notify waiting tasks that they have acquired the limiter
1700
        while self._wait_queue and waiters_to_notify:
11✔
1701
            event = self._wait_queue.popitem(last=False)[1]
10✔
1702
            event.set()
10✔
1703
            waiters_to_notify -= 1
10✔
1704

1705
    @property
11✔
1706
    def borrowed_tokens(self) -> int:
11✔
1707
        return len(self._borrowers)
10✔
1708

1709
    @property
11✔
1710
    def available_tokens(self) -> float:
11✔
1711
        return self._total_tokens - len(self._borrowers)
10✔
1712

1713
    def acquire_nowait(self) -> None:
11✔
1714
        self.acquire_on_behalf_of_nowait(current_task())
×
1715

1716
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
1717
        if borrower in self._borrowers:
11✔
1718
            raise RuntimeError(
10✔
1719
                "this borrower is already holding one of this CapacityLimiter's "
1720
                "tokens"
1721
            )
1722

1723
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
11✔
1724
            raise WouldBlock
10✔
1725

1726
        self._borrowers.add(borrower)
11✔
1727

1728
    async def acquire(self) -> None:
11✔
1729
        return await self.acquire_on_behalf_of(current_task())
11✔
1730

1731
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
1732
        await AsyncIOBackend.checkpoint_if_cancelled()
11✔
1733
        try:
11✔
1734
            self.acquire_on_behalf_of_nowait(borrower)
11✔
1735
        except WouldBlock:
10✔
1736
            event = asyncio.Event()
10✔
1737
            self._wait_queue[borrower] = event
10✔
1738
            try:
10✔
1739
                await event.wait()
10✔
1740
            except BaseException:
×
1741
                self._wait_queue.pop(borrower, None)
×
1742
                raise
×
1743

1744
            self._borrowers.add(borrower)
10✔
1745
        else:
1746
            try:
11✔
1747
                await AsyncIOBackend.cancel_shielded_checkpoint()
11✔
1748
            except BaseException:
10✔
1749
                self.release()
10✔
1750
                raise
10✔
1751

1752
    def release(self) -> None:
11✔
1753
        self.release_on_behalf_of(current_task())
11✔
1754

1755
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
1756
        try:
11✔
1757
            self._borrowers.remove(borrower)
11✔
1758
        except KeyError:
10✔
1759
            raise RuntimeError(
10✔
1760
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1761
            ) from None
1762

1763
        # Notify the next task in line if this limiter has free capacity now
1764
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
11✔
1765
            event = self._wait_queue.popitem(last=False)[1]
10✔
1766
            event.set()
10✔
1767

1768
    def statistics(self) -> CapacityLimiterStatistics:
11✔
1769
        return CapacityLimiterStatistics(
10✔
1770
            self.borrowed_tokens,
1771
            self.total_tokens,
1772
            tuple(self._borrowers),
1773
            len(self._wait_queue),
1774
        )
1775

1776

1777
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
11✔
1778

1779

1780
#
1781
# Operating system signals
1782
#
1783

1784

1785
class _SignalReceiver:
11✔
1786
    def __init__(self, signals: tuple[Signals, ...]):
11✔
1787
        self._signals = signals
9✔
1788
        self._loop = get_running_loop()
9✔
1789
        self._signal_queue: deque[Signals] = deque()
9✔
1790
        self._future: asyncio.Future = asyncio.Future()
9✔
1791
        self._handled_signals: set[Signals] = set()
9✔
1792

1793
    def _deliver(self, signum: Signals) -> None:
11✔
1794
        self._signal_queue.append(signum)
9✔
1795
        if not self._future.done():
9✔
1796
            self._future.set_result(None)
9✔
1797

1798
    def __enter__(self) -> _SignalReceiver:
11✔
1799
        for sig in set(self._signals):
9✔
1800
            self._loop.add_signal_handler(sig, self._deliver, sig)
9✔
1801
            self._handled_signals.add(sig)
9✔
1802

1803
        return self
9✔
1804

1805
    def __exit__(
11✔
1806
        self,
1807
        exc_type: type[BaseException] | None,
1808
        exc_val: BaseException | None,
1809
        exc_tb: TracebackType | None,
1810
    ) -> bool | None:
1811
        for sig in self._handled_signals:
9✔
1812
            self._loop.remove_signal_handler(sig)
9✔
1813
        return None
9✔
1814

1815
    def __aiter__(self) -> _SignalReceiver:
11✔
1816
        return self
9✔
1817

1818
    async def __anext__(self) -> Signals:
11✔
1819
        await AsyncIOBackend.checkpoint()
9✔
1820
        if not self._signal_queue:
9✔
1821
            self._future = asyncio.Future()
×
1822
            await self._future
×
1823

1824
        return self._signal_queue.popleft()
9✔
1825

1826

1827
#
1828
# Testing and debugging
1829
#
1830

1831

1832
class AsyncIOTaskInfo(TaskInfo):
11✔
1833
    def __init__(self, task: asyncio.Task):
11✔
1834
        task_state = _task_states.get(task)
11✔
1835
        if task_state is None:
11✔
1836
            parent_id = None
11✔
1837
        else:
1838
            parent_id = task_state.parent_id
11✔
1839

1840
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
11✔
1841
        self._task = weakref.ref(task)
11✔
1842

1843
    def has_pending_cancellation(self) -> bool:
11✔
1844
        if not (task := self._task()):
11✔
1845
            # If the task isn't around anymore, it won't have a pending cancellation
1846
            return False
×
1847

1848
        if sys.version_info >= (3, 11):
11✔
1849
            if task.cancelling():
5✔
1850
                return True
5✔
1851
        elif (
6✔
1852
            isinstance(task._fut_waiter, asyncio.Future)
1853
            and task._fut_waiter.cancelled()
1854
        ):
1855
            return True
6✔
1856

1857
        if task_state := _task_states.get(task):
11✔
1858
            if cancel_scope := task_state.cancel_scope:
11✔
1859
                return cancel_scope.cancel_called or (
11✔
1860
                    not cancel_scope.shield and cancel_scope._parent_cancelled()
1861
                )
1862

1863
        return False
11✔
1864

1865

1866
class TestRunner(abc.TestRunner):
11✔
1867
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
11✔
1868

1869
    def __init__(
11✔
1870
        self,
1871
        *,
1872
        debug: bool | None = None,
1873
        use_uvloop: bool = False,
1874
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
1875
    ) -> None:
1876
        if use_uvloop and loop_factory is None:
11✔
1877
            import uvloop
×
1878

1879
            loop_factory = uvloop.new_event_loop
×
1880

1881
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
11✔
1882
        self._exceptions: list[BaseException] = []
11✔
1883
        self._runner_task: asyncio.Task | None = None
11✔
1884

1885
    def __enter__(self) -> TestRunner:
11✔
1886
        self._runner.__enter__()
11✔
1887
        self.get_loop().set_exception_handler(self._exception_handler)
11✔
1888
        return self
11✔
1889

1890
    def __exit__(
11✔
1891
        self,
1892
        exc_type: type[BaseException] | None,
1893
        exc_val: BaseException | None,
1894
        exc_tb: TracebackType | None,
1895
    ) -> None:
1896
        self._runner.__exit__(exc_type, exc_val, exc_tb)
11✔
1897

1898
    def get_loop(self) -> AbstractEventLoop:
11✔
1899
        return self._runner.get_loop()
11✔
1900

1901
    def _exception_handler(
11✔
1902
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
1903
    ) -> None:
1904
        if isinstance(context.get("exception"), Exception):
11✔
1905
            self._exceptions.append(context["exception"])
11✔
1906
        else:
1907
            loop.default_exception_handler(context)
11✔
1908

1909
    def _raise_async_exceptions(self) -> None:
11✔
1910
        # Re-raise any exceptions raised in asynchronous callbacks
1911
        if self._exceptions:
11✔
1912
            exceptions, self._exceptions = self._exceptions, []
11✔
1913
            if len(exceptions) == 1:
11✔
1914
                raise exceptions[0]
11✔
1915
            elif exceptions:
×
1916
                raise BaseExceptionGroup(
×
1917
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
1918
                )
1919

1920
    async def _run_tests_and_fixtures(
11✔
1921
        self,
1922
        receive_stream: MemoryObjectReceiveStream[
1923
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
1924
        ],
1925
    ) -> None:
1926
        with receive_stream, self._send_stream:
11✔
1927
            async for coro, future in receive_stream:
11✔
1928
                try:
11✔
1929
                    retval = await coro
11✔
1930
                except BaseException as exc:
11✔
1931
                    if not future.cancelled():
11✔
1932
                        future.set_exception(exc)
11✔
1933
                else:
1934
                    if not future.cancelled():
11✔
1935
                        future.set_result(retval)
11✔
1936

1937
    async def _call_in_runner_task(
11✔
1938
        self,
1939
        func: Callable[P, Awaitable[T_Retval]],
1940
        *args: P.args,
1941
        **kwargs: P.kwargs,
1942
    ) -> T_Retval:
1943
        if not self._runner_task:
11✔
1944
            self._send_stream, receive_stream = create_memory_object_stream[
11✔
1945
                Tuple[Awaitable[Any], asyncio.Future]
1946
            ](1)
1947
            self._runner_task = self.get_loop().create_task(
11✔
1948
                self._run_tests_and_fixtures(receive_stream)
1949
            )
1950

1951
        coro = func(*args, **kwargs)
11✔
1952
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
11✔
1953
        self._send_stream.send_nowait((coro, future))
11✔
1954
        return await future
11✔
1955

1956
    def run_asyncgen_fixture(
11✔
1957
        self,
1958
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
1959
        kwargs: dict[str, Any],
1960
    ) -> Iterable[T_Retval]:
1961
        asyncgen = fixture_func(**kwargs)
11✔
1962
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
11✔
1963
            self._call_in_runner_task(asyncgen.asend, None)
1964
        )
1965
        self._raise_async_exceptions()
11✔
1966

1967
        yield fixturevalue
11✔
1968

1969
        try:
11✔
1970
            self.get_loop().run_until_complete(
11✔
1971
                self._call_in_runner_task(asyncgen.asend, None)
1972
            )
1973
        except StopAsyncIteration:
11✔
1974
            self._raise_async_exceptions()
11✔
1975
        else:
1976
            self.get_loop().run_until_complete(asyncgen.aclose())
×
1977
            raise RuntimeError("Async generator fixture did not stop")
×
1978

1979
    def run_fixture(
11✔
1980
        self,
1981
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
1982
        kwargs: dict[str, Any],
1983
    ) -> T_Retval:
1984
        retval = self.get_loop().run_until_complete(
11✔
1985
            self._call_in_runner_task(fixture_func, **kwargs)
1986
        )
1987
        self._raise_async_exceptions()
11✔
1988
        return retval
11✔
1989

1990
    def run_test(
11✔
1991
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
1992
    ) -> None:
1993
        try:
11✔
1994
            self.get_loop().run_until_complete(
11✔
1995
                self._call_in_runner_task(test_func, **kwargs)
1996
            )
1997
        except Exception as exc:
11✔
1998
            self._exceptions.append(exc)
11✔
1999

2000
        self._raise_async_exceptions()
11✔
2001

2002

2003
class AsyncIOBackend(AsyncBackend):
11✔
2004
    @classmethod
11✔
2005
    def run(
11✔
2006
        cls,
2007
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2008
        args: tuple[Unpack[PosArgsT]],
2009
        kwargs: dict[str, Any],
2010
        options: dict[str, Any],
2011
    ) -> T_Retval:
2012
        @wraps(func)
11✔
2013
        async def wrapper() -> T_Retval:
11✔
2014
            task = cast(asyncio.Task, current_task())
11✔
2015
            task.set_name(get_callable_name(func))
11✔
2016
            _task_states[task] = TaskState(None, None)
11✔
2017

2018
            try:
11✔
2019
                return await func(*args)
11✔
2020
            finally:
2021
                del _task_states[task]
11✔
2022

2023
        debug = options.get("debug", None)
11✔
2024
        loop_factory = options.get("loop_factory", None)
11✔
2025
        if loop_factory is None and options.get("use_uvloop", False):
11✔
2026
            import uvloop
7✔
2027

2028
            loop_factory = uvloop.new_event_loop
7✔
2029

2030
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
11✔
2031
            return runner.run(wrapper())
11✔
2032

2033
    @classmethod
11✔
2034
    def current_token(cls) -> object:
11✔
2035
        return get_running_loop()
11✔
2036

2037
    @classmethod
11✔
2038
    def current_time(cls) -> float:
11✔
2039
        return get_running_loop().time()
11✔
2040

2041
    @classmethod
11✔
2042
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
2043
        return CancelledError
11✔
2044

2045
    @classmethod
11✔
2046
    async def checkpoint(cls) -> None:
11✔
2047
        await sleep(0)
11✔
2048

2049
    @classmethod
11✔
2050
    async def checkpoint_if_cancelled(cls) -> None:
11✔
2051
        task = current_task()
11✔
2052
        if task is None:
11✔
2053
            return
×
2054

2055
        try:
11✔
2056
            cancel_scope = _task_states[task].cancel_scope
11✔
2057
        except KeyError:
11✔
2058
            return
11✔
2059

2060
        while cancel_scope:
11✔
2061
            if cancel_scope.cancel_called:
11✔
2062
                await sleep(0)
11✔
2063
            elif cancel_scope.shield:
11✔
2064
                break
10✔
2065
            else:
2066
                cancel_scope = cancel_scope._parent_scope
11✔
2067

2068
    @classmethod
11✔
2069
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
2070
        with CancelScope(shield=True):
11✔
2071
            await sleep(0)
11✔
2072

2073
    @classmethod
11✔
2074
    async def sleep(cls, delay: float) -> None:
11✔
2075
        await sleep(delay)
11✔
2076

2077
    @classmethod
11✔
2078
    def create_cancel_scope(
11✔
2079
        cls, *, deadline: float = math.inf, shield: bool = False
2080
    ) -> CancelScope:
2081
        return CancelScope(deadline=deadline, shield=shield)
11✔
2082

2083
    @classmethod
11✔
2084
    def current_effective_deadline(cls) -> float:
11✔
2085
        try:
10✔
2086
            cancel_scope = _task_states[
10✔
2087
                current_task()  # type: ignore[index]
2088
            ].cancel_scope
2089
        except KeyError:
×
2090
            return math.inf
×
2091

2092
        deadline = math.inf
10✔
2093
        while cancel_scope:
10✔
2094
            deadline = min(deadline, cancel_scope.deadline)
10✔
2095
            if cancel_scope._cancel_called:
10✔
2096
                deadline = -math.inf
10✔
2097
                break
10✔
2098
            elif cancel_scope.shield:
10✔
2099
                break
10✔
2100
            else:
2101
                cancel_scope = cancel_scope._parent_scope
10✔
2102

2103
        return deadline
10✔
2104

2105
    @classmethod
11✔
2106
    def create_task_group(cls) -> abc.TaskGroup:
11✔
2107
        return TaskGroup()
11✔
2108

2109
    @classmethod
11✔
2110
    def create_event(cls) -> abc.Event:
11✔
2111
        return Event()
11✔
2112

2113
    @classmethod
11✔
2114
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
11✔
2115
        return CapacityLimiter(total_tokens)
10✔
2116

2117
    @classmethod
11✔
2118
    async def run_sync_in_worker_thread(
11✔
2119
        cls,
2120
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2121
        args: tuple[Unpack[PosArgsT]],
2122
        abandon_on_cancel: bool = False,
2123
        limiter: abc.CapacityLimiter | None = None,
2124
    ) -> T_Retval:
2125
        await cls.checkpoint()
11✔
2126

2127
        # If this is the first run in this event loop thread, set up the necessary
2128
        # variables
2129
        try:
11✔
2130
            idle_workers = _threadpool_idle_workers.get()
11✔
2131
            workers = _threadpool_workers.get()
11✔
2132
        except LookupError:
11✔
2133
            idle_workers = deque()
11✔
2134
            workers = set()
11✔
2135
            _threadpool_idle_workers.set(idle_workers)
11✔
2136
            _threadpool_workers.set(workers)
11✔
2137

2138
        async with limiter or cls.current_default_thread_limiter():
11✔
2139
            with CancelScope(shield=not abandon_on_cancel) as scope:
11✔
2140
                future: asyncio.Future = asyncio.Future()
11✔
2141
                root_task = find_root_task()
11✔
2142
                if not idle_workers:
11✔
2143
                    worker = WorkerThread(root_task, workers, idle_workers)
11✔
2144
                    worker.start()
11✔
2145
                    workers.add(worker)
11✔
2146
                    root_task.add_done_callback(worker.stop)
11✔
2147
                else:
2148
                    worker = idle_workers.pop()
11✔
2149

2150
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2151
                    # seconds or longer
2152
                    now = cls.current_time()
11✔
2153
                    while idle_workers:
11✔
2154
                        if (
10✔
2155
                            now - idle_workers[0].idle_since
2156
                            < WorkerThread.MAX_IDLE_TIME
2157
                        ):
2158
                            break
10✔
2159

2160
                        expired_worker = idle_workers.popleft()
×
2161
                        expired_worker.root_task.remove_done_callback(
×
2162
                            expired_worker.stop
2163
                        )
2164
                        expired_worker.stop()
×
2165

2166
                context = copy_context()
11✔
2167
                context.run(sniffio.current_async_library_cvar.set, None)
11✔
2168
                if abandon_on_cancel or scope._parent_scope is None:
11✔
2169
                    worker_scope = scope
11✔
2170
                else:
2171
                    worker_scope = scope._parent_scope
11✔
2172

2173
                worker.queue.put_nowait((context, func, args, future, worker_scope))
11✔
2174
                return await future
11✔
2175

2176
    @classmethod
11✔
2177
    def check_cancelled(cls) -> None:
11✔
2178
        scope: CancelScope | None = threadlocals.current_cancel_scope
11✔
2179
        while scope is not None:
11✔
2180
            if scope.cancel_called:
11✔
2181
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
11✔
2182

2183
            if scope.shield:
11✔
2184
                return
×
2185

2186
            scope = scope._parent_scope
11✔
2187

2188
    @classmethod
11✔
2189
    def run_async_from_thread(
11✔
2190
        cls,
2191
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2192
        args: tuple[Unpack[PosArgsT]],
2193
        token: object,
2194
    ) -> T_Retval:
2195
        async def task_wrapper(scope: CancelScope) -> T_Retval:
11✔
2196
            __tracebackhide__ = True
11✔
2197
            task = cast(asyncio.Task, current_task())
11✔
2198
            _task_states[task] = TaskState(None, scope)
11✔
2199
            scope._tasks.add(task)
11✔
2200
            try:
11✔
2201
                return await func(*args)
11✔
2202
            except CancelledError as exc:
11✔
2203
                raise concurrent.futures.CancelledError(str(exc)) from None
11✔
2204
            finally:
2205
                scope._tasks.discard(task)
11✔
2206

2207
        loop = cast(AbstractEventLoop, token)
11✔
2208
        context = copy_context()
11✔
2209
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
11✔
2210
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
11✔
2211
        f: concurrent.futures.Future[T_Retval] = context.run(
11✔
2212
            asyncio.run_coroutine_threadsafe, wrapper, loop
2213
        )
2214
        return f.result()
11✔
2215

2216
    @classmethod
11✔
2217
    def run_sync_from_thread(
11✔
2218
        cls,
2219
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2220
        args: tuple[Unpack[PosArgsT]],
2221
        token: object,
2222
    ) -> T_Retval:
2223
        @wraps(func)
11✔
2224
        def wrapper() -> None:
11✔
2225
            try:
11✔
2226
                sniffio.current_async_library_cvar.set("asyncio")
11✔
2227
                f.set_result(func(*args))
11✔
2228
            except BaseException as exc:
11✔
2229
                f.set_exception(exc)
11✔
2230
                if not isinstance(exc, Exception):
11✔
2231
                    raise
×
2232

2233
        f: concurrent.futures.Future[T_Retval] = Future()
11✔
2234
        loop = cast(AbstractEventLoop, token)
11✔
2235
        loop.call_soon_threadsafe(wrapper)
11✔
2236
        return f.result()
11✔
2237

2238
    @classmethod
11✔
2239
    def create_blocking_portal(cls) -> abc.BlockingPortal:
11✔
2240
        return BlockingPortal()
11✔
2241

2242
    @classmethod
11✔
2243
    async def open_process(
11✔
2244
        cls,
2245
        command: str | bytes | Sequence[str | bytes],
2246
        *,
2247
        shell: bool,
2248
        stdin: int | IO[Any] | None,
2249
        stdout: int | IO[Any] | None,
2250
        stderr: int | IO[Any] | None,
2251
        cwd: str | bytes | PathLike | None = None,
2252
        env: Mapping[str, str] | None = None,
2253
        start_new_session: bool = False,
2254
    ) -> Process:
2255
        await cls.checkpoint()
10✔
2256
        if shell:
10✔
2257
            process = await asyncio.create_subprocess_shell(
10✔
2258
                cast("str | bytes", command),
2259
                stdin=stdin,
2260
                stdout=stdout,
2261
                stderr=stderr,
2262
                cwd=cwd,
2263
                env=env,
2264
                start_new_session=start_new_session,
2265
            )
2266
        else:
2267
            process = await asyncio.create_subprocess_exec(
10✔
2268
                *command,
2269
                stdin=stdin,
2270
                stdout=stdout,
2271
                stderr=stderr,
2272
                cwd=cwd,
2273
                env=env,
2274
                start_new_session=start_new_session,
2275
            )
2276

2277
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
10✔
2278
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
10✔
2279
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
10✔
2280
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
10✔
2281

2282
    @classmethod
11✔
2283
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
11✔
2284
        create_task(
10✔
2285
            _shutdown_process_pool_on_exit(workers),
2286
            name="AnyIO process pool shutdown task",
2287
        )
2288
        find_root_task().add_done_callback(
10✔
2289
            partial(_forcibly_shutdown_process_pool_on_exit, workers)  # type:ignore[arg-type]
2290
        )
2291

2292
    @classmethod
11✔
2293
    async def connect_tcp(
11✔
2294
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2295
    ) -> abc.SocketStream:
2296
        transport, protocol = cast(
11✔
2297
            Tuple[asyncio.Transport, StreamProtocol],
2298
            await get_running_loop().create_connection(
2299
                StreamProtocol, host, port, local_addr=local_address
2300
            ),
2301
        )
2302
        transport.pause_reading()
11✔
2303
        return SocketStream(transport, protocol)
11✔
2304

2305
    @classmethod
11✔
2306
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
11✔
2307
        await cls.checkpoint()
8✔
2308
        loop = get_running_loop()
8✔
2309
        raw_socket = socket.socket(socket.AF_UNIX)
8✔
2310
        raw_socket.setblocking(False)
8✔
2311
        while True:
5✔
2312
            try:
8✔
2313
                raw_socket.connect(path)
8✔
2314
            except BlockingIOError:
8✔
2315
                f: asyncio.Future = asyncio.Future()
×
2316
                loop.add_writer(raw_socket, f.set_result, None)
×
2317
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2318
                await f
×
2319
            except BaseException:
8✔
2320
                raw_socket.close()
8✔
2321
                raise
8✔
2322
            else:
2323
                return UNIXSocketStream(raw_socket)
8✔
2324

2325
    @classmethod
11✔
2326
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2327
        return TCPSocketListener(sock)
11✔
2328

2329
    @classmethod
11✔
2330
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2331
        return UNIXSocketListener(sock)
8✔
2332

2333
    @classmethod
11✔
2334
    async def create_udp_socket(
11✔
2335
        cls,
2336
        family: AddressFamily,
2337
        local_address: IPSockAddrType | None,
2338
        remote_address: IPSockAddrType | None,
2339
        reuse_port: bool,
2340
    ) -> UDPSocket | ConnectedUDPSocket:
2341
        transport, protocol = await get_running_loop().create_datagram_endpoint(
10✔
2342
            DatagramProtocol,
2343
            local_addr=local_address,
2344
            remote_addr=remote_address,
2345
            family=family,
2346
            reuse_port=reuse_port,
2347
        )
2348
        if protocol.exception:
10✔
2349
            transport.close()
×
2350
            raise protocol.exception
×
2351

2352
        if not remote_address:
10✔
2353
            return UDPSocket(transport, protocol)
10✔
2354
        else:
2355
            return ConnectedUDPSocket(transport, protocol)
10✔
2356

2357
    @classmethod
11✔
2358
    async def create_unix_datagram_socket(  # type: ignore[override]
11✔
2359
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2360
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2361
        await cls.checkpoint()
8✔
2362
        loop = get_running_loop()
8✔
2363

2364
        if remote_path:
8✔
2365
            while True:
5✔
2366
                try:
8✔
2367
                    raw_socket.connect(remote_path)
8✔
2368
                except BlockingIOError:
×
2369
                    f: asyncio.Future = asyncio.Future()
×
2370
                    loop.add_writer(raw_socket, f.set_result, None)
×
2371
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2372
                    await f
×
2373
                except BaseException:
×
2374
                    raw_socket.close()
×
2375
                    raise
×
2376
                else:
2377
                    return ConnectedUNIXDatagramSocket(raw_socket)
8✔
2378
        else:
2379
            return UNIXDatagramSocket(raw_socket)
8✔
2380

2381
    @classmethod
11✔
2382
    async def getaddrinfo(
11✔
2383
        cls,
2384
        host: bytes | str | None,
2385
        port: str | int | None,
2386
        *,
2387
        family: int | AddressFamily = 0,
2388
        type: int | SocketKind = 0,
2389
        proto: int = 0,
2390
        flags: int = 0,
2391
    ) -> list[
2392
        tuple[
2393
            AddressFamily,
2394
            SocketKind,
2395
            int,
2396
            str,
2397
            tuple[str, int] | tuple[str, int, int, int],
2398
        ]
2399
    ]:
2400
        return await get_running_loop().getaddrinfo(
11✔
2401
            host, port, family=family, type=type, proto=proto, flags=flags
2402
        )
2403

2404
    @classmethod
11✔
2405
    async def getnameinfo(
11✔
2406
        cls, sockaddr: IPSockAddrType, flags: int = 0
2407
    ) -> tuple[str, str]:
2408
        return await get_running_loop().getnameinfo(sockaddr, flags)
10✔
2409

2410
    @classmethod
11✔
2411
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
11✔
2412
        await cls.checkpoint()
×
2413
        try:
×
2414
            read_events = _read_events.get()
×
2415
        except LookupError:
×
2416
            read_events = {}
×
2417
            _read_events.set(read_events)
×
2418

2419
        if read_events.get(sock):
×
2420
            raise BusyResourceError("reading from") from None
×
2421

2422
        loop = get_running_loop()
×
2423
        event = read_events[sock] = asyncio.Event()
×
2424
        loop.add_reader(sock, event.set)
×
2425
        try:
×
2426
            await event.wait()
×
2427
        finally:
2428
            if read_events.pop(sock, None) is not None:
×
2429
                loop.remove_reader(sock)
×
2430
                readable = True
×
2431
            else:
2432
                readable = False
×
2433

2434
        if not readable:
×
2435
            raise ClosedResourceError
×
2436

2437
    @classmethod
11✔
2438
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
11✔
2439
        await cls.checkpoint()
×
2440
        try:
×
2441
            write_events = _write_events.get()
×
2442
        except LookupError:
×
2443
            write_events = {}
×
2444
            _write_events.set(write_events)
×
2445

2446
        if write_events.get(sock):
×
2447
            raise BusyResourceError("writing to") from None
×
2448

2449
        loop = get_running_loop()
×
2450
        event = write_events[sock] = asyncio.Event()
×
2451
        loop.add_writer(sock.fileno(), event.set)
×
2452
        try:
×
2453
            await event.wait()
×
2454
        finally:
2455
            if write_events.pop(sock, None) is not None:
×
2456
                loop.remove_writer(sock)
×
2457
                writable = True
×
2458
            else:
2459
                writable = False
×
2460

2461
        if not writable:
×
2462
            raise ClosedResourceError
×
2463

2464
    @classmethod
11✔
2465
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
2466
        try:
11✔
2467
            return _default_thread_limiter.get()
11✔
2468
        except LookupError:
11✔
2469
            limiter = CapacityLimiter(40)
11✔
2470
            _default_thread_limiter.set(limiter)
11✔
2471
            return limiter
11✔
2472

2473
    @classmethod
11✔
2474
    def open_signal_receiver(
11✔
2475
        cls, *signals: Signals
2476
    ) -> ContextManager[AsyncIterator[Signals]]:
2477
        return _SignalReceiver(signals)
9✔
2478

2479
    @classmethod
11✔
2480
    def get_current_task(cls) -> TaskInfo:
11✔
2481
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
11✔
2482

2483
    @classmethod
11✔
2484
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
2485
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
11✔
2486

2487
    @classmethod
11✔
2488
    async def wait_all_tasks_blocked(cls) -> None:
11✔
2489
        await cls.checkpoint()
11✔
2490
        this_task = current_task()
11✔
2491
        while True:
7✔
2492
            for task in all_tasks():
11✔
2493
                if task is this_task:
11✔
2494
                    continue
11✔
2495

2496
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
2497
                if waiter is None or waiter.done():
11✔
2498
                    await sleep(0.1)
11✔
2499
                    break
11✔
2500
            else:
2501
                return
11✔
2502

2503
    @classmethod
11✔
2504
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
2505
        return TestRunner(**options)
11✔
2506

2507

2508
backend_class = AsyncIOBackend
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc