• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653597381

01 Sep 2024 11:25AM UTC coverage: 91.671% (-0.1%) from 91.788%
10653597381

Pull #761

github

web-flow
Merge 6d0f355bd into 8a5b34626
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

229 of 250 new or added lines in 4 files covered. (91.6%)

2 existing lines in 2 files now uncovered.

4744 of 5175 relevant lines covered (91.67%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.72
/src/anyio/_backends/_asyncio.py
1
from __future__ import annotations
11✔
2

3
import array
11✔
4
import asyncio
11✔
5
import concurrent.futures
11✔
6
import math
11✔
7
import socket
11✔
8
import sys
11✔
9
import threading
11✔
10
import weakref
11✔
11
from asyncio import (
11✔
12
    AbstractEventLoop,
13
    CancelledError,
14
    all_tasks,
15
    create_task,
16
    current_task,
17
    get_running_loop,
18
    sleep,
19
)
20
from asyncio.base_events import _run_until_complete_cb  # type: ignore[attr-defined]
11✔
21
from collections import OrderedDict, deque
11✔
22
from collections.abc import AsyncIterator, Iterable
11✔
23
from concurrent.futures import Future
11✔
24
from contextlib import suppress
11✔
25
from contextvars import Context, copy_context
11✔
26
from dataclasses import dataclass
11✔
27
from functools import partial, wraps
11✔
28
from inspect import (
11✔
29
    CORO_RUNNING,
30
    CORO_SUSPENDED,
31
    getcoroutinestate,
32
    iscoroutine,
33
)
34
from io import IOBase
11✔
35
from os import PathLike
11✔
36
from queue import Queue
11✔
37
from signal import Signals
11✔
38
from socket import AddressFamily, SocketKind
11✔
39
from threading import Thread
11✔
40
from types import TracebackType
11✔
41
from typing import (
11✔
42
    IO,
43
    Any,
44
    AsyncGenerator,
45
    Awaitable,
46
    Callable,
47
    Collection,
48
    ContextManager,
49
    Coroutine,
50
    Mapping,
51
    Optional,
52
    Sequence,
53
    Tuple,
54
    TypeVar,
55
    cast,
56
)
57
from weakref import WeakKeyDictionary
11✔
58

59
import sniffio
11✔
60

61
from .. import (
11✔
62
    CapacityLimiterStatistics,
63
    EventStatistics,
64
    LockStatistics,
65
    TaskInfo,
66
    abc,
67
)
68
from .._core._eventloop import claim_worker_thread, threadlocals
11✔
69
from .._core._exceptions import (
11✔
70
    BrokenResourceError,
71
    BusyResourceError,
72
    ClosedResourceError,
73
    EndOfStream,
74
    WouldBlock,
75
    iterate_exceptions,
76
)
77
from .._core._sockets import convert_ipv6_sockaddr
11✔
78
from .._core._streams import create_memory_object_stream
11✔
79
from .._core._synchronization import (
11✔
80
    CapacityLimiter as BaseCapacityLimiter,
81
)
82
from .._core._synchronization import Event as BaseEvent
11✔
83
from .._core._synchronization import Lock as BaseLock
11✔
84
from .._core._synchronization import (
11✔
85
    ResourceGuard,
86
    SemaphoreStatistics,
87
)
88
from .._core._synchronization import Semaphore as BaseSemaphore
11✔
89
from .._core._tasks import CancelScope as BaseCancelScope
11✔
90
from ..abc import (
11✔
91
    AsyncBackend,
92
    IPSockAddrType,
93
    SocketListener,
94
    UDPPacketType,
95
    UNIXDatagramPacketType,
96
)
97
from ..lowlevel import RunVar
11✔
98
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
11✔
99

100
if sys.version_info >= (3, 10):
11✔
101
    from typing import ParamSpec
7✔
102
else:
103
    from typing_extensions import ParamSpec
4✔
104

105
if sys.version_info >= (3, 11):
11✔
106
    from asyncio import Runner
5✔
107
    from typing import TypeVarTuple, Unpack
5✔
108
else:
109
    import contextvars
6✔
110
    import enum
6✔
111
    import signal
6✔
112
    from asyncio import coroutines, events, exceptions, tasks
6✔
113

114
    from exceptiongroup import BaseExceptionGroup
6✔
115
    from typing_extensions import TypeVarTuple, Unpack
6✔
116

117
    class _State(enum.Enum):
6✔
118
        CREATED = "created"
6✔
119
        INITIALIZED = "initialized"
6✔
120
        CLOSED = "closed"
6✔
121

122
    class Runner:
6✔
123
        # Copied from CPython 3.11
124
        def __init__(
6✔
125
            self,
126
            *,
127
            debug: bool | None = None,
128
            loop_factory: Callable[[], AbstractEventLoop] | None = None,
129
        ):
130
            self._state = _State.CREATED
6✔
131
            self._debug = debug
6✔
132
            self._loop_factory = loop_factory
6✔
133
            self._loop: AbstractEventLoop | None = None
6✔
134
            self._context = None
6✔
135
            self._interrupt_count = 0
6✔
136
            self._set_event_loop = False
6✔
137

138
        def __enter__(self) -> Runner:
6✔
139
            self._lazy_init()
6✔
140
            return self
6✔
141

142
        def __exit__(
6✔
143
            self,
144
            exc_type: type[BaseException],
145
            exc_val: BaseException,
146
            exc_tb: TracebackType,
147
        ) -> None:
148
            self.close()
6✔
149

150
        def close(self) -> None:
6✔
151
            """Shutdown and close event loop."""
152
            if self._state is not _State.INITIALIZED:
6✔
153
                return
×
154
            try:
6✔
155
                loop = self._loop
6✔
156
                _cancel_all_tasks(loop)
6✔
157
                loop.run_until_complete(loop.shutdown_asyncgens())
6✔
158
                if hasattr(loop, "shutdown_default_executor"):
6✔
159
                    loop.run_until_complete(loop.shutdown_default_executor())
5✔
160
                else:
161
                    loop.run_until_complete(_shutdown_default_executor(loop))
3✔
162
            finally:
163
                if self._set_event_loop:
6✔
164
                    events.set_event_loop(None)
6✔
165
                loop.close()
6✔
166
                self._loop = None
6✔
167
                self._state = _State.CLOSED
6✔
168

169
        def get_loop(self) -> AbstractEventLoop:
6✔
170
            """Return embedded event loop."""
171
            self._lazy_init()
6✔
172
            return self._loop
6✔
173

174
        def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
6✔
175
            """Run a coroutine inside the embedded event loop."""
176
            if not coroutines.iscoroutine(coro):
6✔
177
                raise ValueError(f"a coroutine was expected, got {coro!r}")
×
178

179
            if events._get_running_loop() is not None:
6✔
180
                # fail fast with short traceback
181
                raise RuntimeError(
×
182
                    "Runner.run() cannot be called from a running event loop"
183
                )
184

185
            self._lazy_init()
6✔
186

187
            if context is None:
6✔
188
                context = self._context
6✔
189
            task = context.run(self._loop.create_task, coro)
6✔
190

191
            if (
6✔
192
                threading.current_thread() is threading.main_thread()
193
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
194
            ):
195
                sigint_handler = partial(self._on_sigint, main_task=task)
6✔
196
                try:
6✔
197
                    signal.signal(signal.SIGINT, sigint_handler)
6✔
198
                except ValueError:
×
199
                    # `signal.signal` may throw if `threading.main_thread` does
200
                    # not support signals (e.g. embedded interpreter with signals
201
                    # not registered - see gh-91880)
202
                    sigint_handler = None
×
203
            else:
204
                sigint_handler = None
6✔
205

206
            self._interrupt_count = 0
6✔
207
            try:
6✔
208
                return self._loop.run_until_complete(task)
6✔
209
            except exceptions.CancelledError:
6✔
210
                if self._interrupt_count > 0:
×
211
                    uncancel = getattr(task, "uncancel", None)
×
212
                    if uncancel is not None and uncancel() == 0:
×
UNCOV
213
                        raise KeyboardInterrupt()
×
214
                raise  # CancelledError
×
215
            finally:
216
                if (
6✔
217
                    sigint_handler is not None
218
                    and signal.getsignal(signal.SIGINT) is sigint_handler
219
                ):
220
                    signal.signal(signal.SIGINT, signal.default_int_handler)
6✔
221

222
        def _lazy_init(self) -> None:
6✔
223
            if self._state is _State.CLOSED:
6✔
224
                raise RuntimeError("Runner is closed")
×
225
            if self._state is _State.INITIALIZED:
6✔
226
                return
6✔
227
            if self._loop_factory is None:
6✔
228
                self._loop = events.new_event_loop()
6✔
229
                if not self._set_event_loop:
6✔
230
                    # Call set_event_loop only once to avoid calling
231
                    # attach_loop multiple times on child watchers
232
                    events.set_event_loop(self._loop)
6✔
233
                    self._set_event_loop = True
6✔
234
            else:
235
                self._loop = self._loop_factory()
4✔
236
            if self._debug is not None:
6✔
237
                self._loop.set_debug(self._debug)
6✔
238
            self._context = contextvars.copy_context()
6✔
239
            self._state = _State.INITIALIZED
6✔
240

241
        def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
6✔
242
            self._interrupt_count += 1
×
243
            if self._interrupt_count == 1 and not main_task.done():
×
244
                main_task.cancel()
×
245
                # wakeup loop if it is blocked by select() with long timeout
246
                self._loop.call_soon_threadsafe(lambda: None)
×
247
                return
×
248
            raise KeyboardInterrupt()
×
249

250
    def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
6✔
251
        to_cancel = tasks.all_tasks(loop)
6✔
252
        if not to_cancel:
6✔
253
            return
6✔
254

255
        for task in to_cancel:
6✔
256
            task.cancel()
6✔
257

258
        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
6✔
259

260
        for task in to_cancel:
6✔
261
            if task.cancelled():
6✔
262
                continue
6✔
263
            if task.exception() is not None:
5✔
264
                loop.call_exception_handler(
×
265
                    {
266
                        "message": "unhandled exception during asyncio.run() shutdown",
267
                        "exception": task.exception(),
268
                        "task": task,
269
                    }
270
                )
271

272
    async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
6✔
273
        """Schedule the shutdown of the default executor."""
274

275
        def _do_shutdown(future: asyncio.futures.Future) -> None:
3✔
276
            try:
3✔
277
                loop._default_executor.shutdown(wait=True)  # type: ignore[attr-defined]
3✔
278
                loop.call_soon_threadsafe(future.set_result, None)
3✔
279
            except Exception as ex:
×
280
                loop.call_soon_threadsafe(future.set_exception, ex)
×
281

282
        loop._executor_shutdown_called = True
3✔
283
        if loop._default_executor is None:
3✔
284
            return
3✔
285
        future = loop.create_future()
3✔
286
        thread = threading.Thread(target=_do_shutdown, args=(future,))
3✔
287
        thread.start()
3✔
288
        try:
3✔
289
            await future
3✔
290
        finally:
291
            thread.join()
3✔
292

293

294
T_Retval = TypeVar("T_Retval")
11✔
295
T_contra = TypeVar("T_contra", contravariant=True)
11✔
296
PosArgsT = TypeVarTuple("PosArgsT")
11✔
297
P = ParamSpec("P")
11✔
298

299
_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
11✔
300

301

302
def find_root_task() -> asyncio.Task:
11✔
303
    root_task = _root_task.get(None)
11✔
304
    if root_task is not None and not root_task.done():
11✔
305
        return root_task
11✔
306

307
    # Look for a task that has been started via run_until_complete()
308
    for task in all_tasks():
11✔
309
        if task._callbacks and not task.done():
11✔
310
            callbacks = [cb for cb, context in task._callbacks]
11✔
311
            for cb in callbacks:
11✔
312
                if (
11✔
313
                    cb is _run_until_complete_cb
314
                    or getattr(cb, "__module__", None) == "uvloop.loop"
315
                ):
316
                    _root_task.set(task)
11✔
317
                    return task
11✔
318

319
    # Look up the topmost task in the AnyIO task tree, if possible
320
    task = cast(asyncio.Task, current_task())
10✔
321
    state = _task_states.get(task)
10✔
322
    if state:
10✔
323
        cancel_scope = state.cancel_scope
10✔
324
        while cancel_scope and cancel_scope._parent_scope is not None:
10✔
325
            cancel_scope = cancel_scope._parent_scope
×
326

327
        if cancel_scope is not None:
10✔
328
            return cast(asyncio.Task, cancel_scope._host_task)
10✔
329

330
    return task
×
331

332

333
def get_callable_name(func: Callable) -> str:
11✔
334
    module = getattr(func, "__module__", None)
11✔
335
    qualname = getattr(func, "__qualname__", None)
11✔
336
    return ".".join([x for x in (module, qualname) if x])
11✔
337

338

339
#
340
# Event loop
341
#
342

343
_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
11✔
344

345

346
def _task_started(task: asyncio.Task) -> bool:
11✔
347
    """Return ``True`` if the task has been started and has not finished."""
348
    try:
11✔
349
        return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
11✔
350
    except AttributeError:
×
351
        # task coro is async_genenerator_asend https://bugs.python.org/issue37771
352
        raise Exception(f"Cannot determine if task {task} has started or not") from None
×
353

354

355
#
356
# Timeouts and cancellation
357
#
358

359

360
class CancelScope(BaseCancelScope):
11✔
361
    def __new__(
11✔
362
        cls, *, deadline: float = math.inf, shield: bool = False
363
    ) -> CancelScope:
364
        return object.__new__(cls)
11✔
365

366
    def __init__(self, deadline: float = math.inf, shield: bool = False):
11✔
367
        self._deadline = deadline
11✔
368
        self._shield = shield
11✔
369
        self._parent_scope: CancelScope | None = None
11✔
370
        self._child_scopes: set[CancelScope] = set()
11✔
371
        self._cancel_called = False
11✔
372
        self._cancelled_caught = False
11✔
373
        self._active = False
11✔
374
        self._timeout_handle: asyncio.TimerHandle | None = None
11✔
375
        self._cancel_handle: asyncio.Handle | None = None
11✔
376
        self._tasks: set[asyncio.Task] = set()
11✔
377
        self._host_task: asyncio.Task | None = None
11✔
378
        self._cancel_calls: int = 0
11✔
379
        self._cancelling: int | None = None
11✔
380

381
    def __enter__(self) -> CancelScope:
11✔
382
        if self._active:
11✔
383
            raise RuntimeError(
×
384
                "Each CancelScope may only be used for a single 'with' block"
385
            )
386

387
        self._host_task = host_task = cast(asyncio.Task, current_task())
11✔
388
        self._tasks.add(host_task)
11✔
389
        try:
11✔
390
            task_state = _task_states[host_task]
11✔
391
        except KeyError:
11✔
392
            task_state = TaskState(None, self)
11✔
393
            _task_states[host_task] = task_state
11✔
394
        else:
395
            self._parent_scope = task_state.cancel_scope
11✔
396
            task_state.cancel_scope = self
11✔
397
            if self._parent_scope is not None:
11✔
398
                self._parent_scope._child_scopes.add(self)
11✔
399
                self._parent_scope._tasks.remove(host_task)
11✔
400

401
        self._timeout()
11✔
402
        self._active = True
11✔
403
        if sys.version_info >= (3, 11):
11✔
404
            self._cancelling = self._host_task.cancelling()
5✔
405

406
        # Start cancelling the host task if the scope was cancelled before entering
407
        if self._cancel_called:
11✔
408
            self._deliver_cancellation(self)
11✔
409

410
        return self
11✔
411

412
    def __exit__(
11✔
413
        self,
414
        exc_type: type[BaseException] | None,
415
        exc_val: BaseException | None,
416
        exc_tb: TracebackType | None,
417
    ) -> bool | None:
418
        if not self._active:
11✔
419
            raise RuntimeError("This cancel scope is not active")
10✔
420
        if current_task() is not self._host_task:
11✔
421
            raise RuntimeError(
10✔
422
                "Attempted to exit cancel scope in a different task than it was "
423
                "entered in"
424
            )
425

426
        assert self._host_task is not None
11✔
427
        host_task_state = _task_states.get(self._host_task)
11✔
428
        if host_task_state is None or host_task_state.cancel_scope is not self:
11✔
429
            raise RuntimeError(
10✔
430
                "Attempted to exit a cancel scope that isn't the current tasks's "
431
                "current cancel scope"
432
            )
433

434
        self._active = False
11✔
435
        if self._timeout_handle:
11✔
436
            self._timeout_handle.cancel()
11✔
437
            self._timeout_handle = None
11✔
438

439
        self._tasks.remove(self._host_task)
11✔
440
        if self._parent_scope is not None:
11✔
441
            self._parent_scope._child_scopes.remove(self)
11✔
442
            self._parent_scope._tasks.add(self._host_task)
11✔
443

444
        host_task_state.cancel_scope = self._parent_scope
11✔
445

446
        # Restart the cancellation effort in the closest directly cancelled parent
447
        # scope if this one was shielded
448
        self._restart_cancellation_in_parent()
11✔
449

450
        if self._cancel_called and exc_val is not None:
11✔
451
            for exc in iterate_exceptions(exc_val):
11✔
452
                if isinstance(exc, CancelledError):
11✔
453
                    self._cancelled_caught = self._uncancel(exc)
11✔
454
                    if self._cancelled_caught:
11✔
455
                        break
11✔
456

457
            return self._cancelled_caught
11✔
458

459
        return None
11✔
460

461
    def _uncancel(self, cancelled_exc: CancelledError) -> bool:
11✔
462
        if sys.version_info < (3, 9) or self._host_task is None:
11✔
463
            self._cancel_calls = 0
3✔
464
            return True
3✔
465

466
        # Undo all cancellations done by this scope
467
        if self._cancelling is not None:
8✔
468
            while self._cancel_calls:
5✔
469
                self._cancel_calls -= 1
5✔
470
                if self._host_task.uncancel() <= self._cancelling:
5✔
471
                    return True
5✔
472

473
        self._cancel_calls = 0
8✔
474
        return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
8✔
475

476
    def _timeout(self) -> None:
11✔
477
        if self._deadline != math.inf:
11✔
478
            loop = get_running_loop()
11✔
479
            if loop.time() >= self._deadline:
11✔
480
                self.cancel()
11✔
481
            else:
482
                self._timeout_handle = loop.call_at(self._deadline, self._timeout)
11✔
483

484
    def _deliver_cancellation(self, origin: CancelScope) -> bool:
11✔
485
        """
486
        Deliver cancellation to directly contained tasks and nested cancel scopes.
487

488
        Schedule another run at the end if we still have tasks eligible for
489
        cancellation.
490

491
        :param origin: the cancel scope that originated the cancellation
492
        :return: ``True`` if the delivery needs to be retried on the next cycle
493

494
        """
495
        should_retry = False
11✔
496
        current = current_task()
11✔
497
        for task in self._tasks:
11✔
498
            if task._must_cancel:  # type: ignore[attr-defined]
11✔
499
                continue
10✔
500

501
            # The task is eligible for cancellation if it has started
502
            should_retry = True
11✔
503
            if task is not current and (task is self._host_task or _task_started(task)):
11✔
504
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
505
                if not isinstance(waiter, asyncio.Future) or not waiter.done():
11✔
506
                    origin._cancel_calls += 1
11✔
507
                    if sys.version_info >= (3, 9):
11✔
508
                        task.cancel(f"Cancelled by cancel scope {id(origin):x}")
8✔
509
                    else:
510
                        task.cancel()
3✔
511

512
        # Deliver cancellation to child scopes that aren't shielded or running their own
513
        # cancellation callbacks
514
        for scope in self._child_scopes:
11✔
515
            if not scope._shield and not scope.cancel_called:
11✔
516
                should_retry = scope._deliver_cancellation(origin) or should_retry
11✔
517

518
        # Schedule another callback if there are still tasks left
519
        if origin is self:
11✔
520
            if should_retry:
11✔
521
                self._cancel_handle = get_running_loop().call_soon(
11✔
522
                    self._deliver_cancellation, origin
523
                )
524
            else:
525
                self._cancel_handle = None
11✔
526

527
        return should_retry
11✔
528

529
    def _restart_cancellation_in_parent(self) -> None:
11✔
530
        """
531
        Restart the cancellation effort in the closest directly cancelled parent scope.
532

533
        """
534
        scope = self._parent_scope
11✔
535
        while scope is not None:
11✔
536
            if scope._cancel_called:
11✔
537
                if scope._cancel_handle is None:
11✔
538
                    scope._deliver_cancellation(scope)
11✔
539

540
                break
11✔
541

542
            # No point in looking beyond any shielded scope
543
            if scope._shield:
11✔
544
                break
10✔
545

546
            scope = scope._parent_scope
11✔
547

548
    def _parent_cancelled(self) -> bool:
11✔
549
        # Check whether any parent has been cancelled
550
        cancel_scope = self._parent_scope
11✔
551
        while cancel_scope is not None and not cancel_scope._shield:
11✔
552
            if cancel_scope._cancel_called:
11✔
553
                return True
11✔
554
            else:
555
                cancel_scope = cancel_scope._parent_scope
11✔
556

557
        return False
11✔
558

559
    def cancel(self) -> None:
11✔
560
        if not self._cancel_called:
11✔
561
            if self._timeout_handle:
11✔
562
                self._timeout_handle.cancel()
11✔
563
                self._timeout_handle = None
11✔
564

565
            self._cancel_called = True
11✔
566
            if self._host_task is not None:
11✔
567
                self._deliver_cancellation(self)
11✔
568

569
    @property
11✔
570
    def deadline(self) -> float:
11✔
571
        return self._deadline
10✔
572

573
    @deadline.setter
11✔
574
    def deadline(self, value: float) -> None:
11✔
575
        self._deadline = float(value)
10✔
576
        if self._timeout_handle is not None:
10✔
577
            self._timeout_handle.cancel()
10✔
578
            self._timeout_handle = None
10✔
579

580
        if self._active and not self._cancel_called:
10✔
581
            self._timeout()
10✔
582

583
    @property
11✔
584
    def cancel_called(self) -> bool:
11✔
585
        return self._cancel_called
11✔
586

587
    @property
11✔
588
    def cancelled_caught(self) -> bool:
11✔
589
        return self._cancelled_caught
11✔
590

591
    @property
11✔
592
    def shield(self) -> bool:
11✔
593
        return self._shield
11✔
594

595
    @shield.setter
11✔
596
    def shield(self, value: bool) -> None:
11✔
597
        if self._shield != value:
10✔
598
            self._shield = value
10✔
599
            if not value:
10✔
600
                self._restart_cancellation_in_parent()
10✔
601

602

603
#
604
# Task states
605
#
606

607

608
class TaskState:
11✔
609
    """
610
    Encapsulates auxiliary task information that cannot be added to the Task instance
611
    itself because there are no guarantees about its implementation.
612
    """
613

614
    __slots__ = "parent_id", "cancel_scope", "__weakref__"
11✔
615

616
    def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
11✔
617
        self.parent_id = parent_id
11✔
618
        self.cancel_scope = cancel_scope
11✔
619

620

621
_task_states: WeakKeyDictionary[asyncio.Task, TaskState] = WeakKeyDictionary()
11✔
622

623

624
#
625
# Task groups
626
#
627

628

629
class _AsyncioTaskStatus(abc.TaskStatus):
11✔
630
    def __init__(self, future: asyncio.Future, parent_id: int):
11✔
631
        self._future = future
11✔
632
        self._parent_id = parent_id
11✔
633

634
    def started(self, value: T_contra | None = None) -> None:
11✔
635
        try:
11✔
636
            self._future.set_result(value)
11✔
637
        except asyncio.InvalidStateError:
10✔
638
            if not self._future.cancelled():
10✔
639
                raise RuntimeError(
10✔
640
                    "called 'started' twice on the same task status"
641
                ) from None
642

643
        task = cast(asyncio.Task, current_task())
11✔
644
        _task_states[task].parent_id = self._parent_id
11✔
645

646

647
class TaskGroup(abc.TaskGroup):
11✔
648
    def __init__(self) -> None:
11✔
649
        self.cancel_scope: CancelScope = CancelScope()
11✔
650
        self._active = False
11✔
651
        self._exceptions: list[BaseException] = []
11✔
652
        self._tasks: set[asyncio.Task] = set()
11✔
653

654
    async def __aenter__(self) -> TaskGroup:
11✔
655
        self.cancel_scope.__enter__()
11✔
656
        self._active = True
11✔
657
        return self
11✔
658

659
    async def __aexit__(
11✔
660
        self,
661
        exc_type: type[BaseException] | None,
662
        exc_val: BaseException | None,
663
        exc_tb: TracebackType | None,
664
    ) -> bool | None:
665
        ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
11✔
666
        if exc_val is not None:
11✔
667
            self.cancel_scope.cancel()
11✔
668
            if not isinstance(exc_val, CancelledError):
11✔
669
                self._exceptions.append(exc_val)
11✔
670

671
        cancelled_exc_while_waiting_tasks: CancelledError | None = None
11✔
672
        while self._tasks:
11✔
673
            try:
11✔
674
                await asyncio.wait(self._tasks)
11✔
675
            except CancelledError as exc:
11✔
676
                # This task was cancelled natively; reraise the CancelledError later
677
                # unless this task was already interrupted by another exception
678
                self.cancel_scope.cancel()
11✔
679
                if cancelled_exc_while_waiting_tasks is None:
11✔
680
                    cancelled_exc_while_waiting_tasks = exc
11✔
681

682
        self._active = False
11✔
683
        if self._exceptions:
11✔
684
            raise BaseExceptionGroup(
11✔
685
                "unhandled errors in a TaskGroup", self._exceptions
686
            )
687

688
        # Raise the CancelledError received while waiting for child tasks to exit,
689
        # unless the context manager itself was previously exited with another
690
        # exception, or if any of the  child tasks raised an exception other than
691
        # CancelledError
692
        if cancelled_exc_while_waiting_tasks:
11✔
693
            if exc_val is None or ignore_exception:
11✔
694
                raise cancelled_exc_while_waiting_tasks
11✔
695

696
        return ignore_exception
11✔
697

698
    def _spawn(
11✔
699
        self,
700
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
701
        args: tuple[Unpack[PosArgsT]],
702
        name: object,
703
        task_status_future: asyncio.Future | None = None,
704
    ) -> asyncio.Task:
705
        def task_done(_task: asyncio.Task) -> None:
11✔
706
            task_state = _task_states[_task]
11✔
707
            assert task_state.cancel_scope is not None
11✔
708
            assert _task in task_state.cancel_scope._tasks
11✔
709
            task_state.cancel_scope._tasks.remove(_task)
11✔
710
            self._tasks.remove(task)
11✔
711
            del _task_states[_task]
11✔
712

713
            try:
11✔
714
                exc = _task.exception()
11✔
715
            except CancelledError as e:
11✔
716
                while isinstance(e.__context__, CancelledError):
11✔
717
                    e = e.__context__
3✔
718

719
                exc = e
11✔
720

721
            if exc is not None:
11✔
722
                # The future can only be in the cancelled state if the host task was
723
                # cancelled, so return immediately instead of adding one more
724
                # CancelledError to the exceptions list
725
                if task_status_future is not None and task_status_future.cancelled():
11✔
726
                    return
10✔
727

728
                if task_status_future is None or task_status_future.done():
11✔
729
                    if not isinstance(exc, CancelledError):
11✔
730
                        self._exceptions.append(exc)
11✔
731

732
                    if not self.cancel_scope._parent_cancelled():
11✔
733
                        self.cancel_scope.cancel()
11✔
734
                else:
735
                    task_status_future.set_exception(exc)
10✔
736
            elif task_status_future is not None and not task_status_future.done():
11✔
737
                task_status_future.set_exception(
10✔
738
                    RuntimeError("Child exited without calling task_status.started()")
739
                )
740

741
        if not self._active:
11✔
742
            raise RuntimeError(
10✔
743
                "This task group is not active; no new tasks can be started."
744
            )
745

746
        kwargs = {}
11✔
747
        if task_status_future:
11✔
748
            parent_id = id(current_task())
11✔
749
            kwargs["task_status"] = _AsyncioTaskStatus(
11✔
750
                task_status_future, id(self.cancel_scope._host_task)
751
            )
752
        else:
753
            parent_id = id(self.cancel_scope._host_task)
11✔
754

755
        coro = func(*args, **kwargs)
11✔
756
        if not iscoroutine(coro):
11✔
757
            prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
10✔
758
            raise TypeError(
10✔
759
                f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
760
                f"the return value ({coro!r}) is not a coroutine object"
761
            )
762

763
        name = get_callable_name(func) if name is None else str(name)
11✔
764
        task = create_task(coro, name=name)
11✔
765
        task.add_done_callback(task_done)
11✔
766

767
        # Make the spawned task inherit the task group's cancel scope
768
        _task_states[task] = TaskState(
11✔
769
            parent_id=parent_id, cancel_scope=self.cancel_scope
770
        )
771
        self.cancel_scope._tasks.add(task)
11✔
772
        self._tasks.add(task)
11✔
773
        return task
11✔
774

775
    def start_soon(
11✔
776
        self,
777
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
778
        *args: Unpack[PosArgsT],
779
        name: object = None,
780
    ) -> None:
781
        self._spawn(func, args, name)
11✔
782

783
    async def start(
11✔
784
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
785
    ) -> Any:
786
        future: asyncio.Future = asyncio.Future()
11✔
787
        task = self._spawn(func, args, name, future)
11✔
788

789
        # If the task raises an exception after sending a start value without a switch
790
        # point between, the task group is cancelled and this method never proceeds to
791
        # process the completed future. That's why we have to have a shielded cancel
792
        # scope here.
793
        try:
11✔
794
            return await future
11✔
795
        except CancelledError:
10✔
796
            # Cancel the task and wait for it to exit before returning
797
            task.cancel()
10✔
798
            with CancelScope(shield=True), suppress(CancelledError):
10✔
799
                await task
10✔
800

801
            raise
10✔
802

803

804
#
805
# Threads
806
#
807

808
_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
11✔
809

810

811
class WorkerThread(Thread):
11✔
812
    MAX_IDLE_TIME = 10  # seconds
11✔
813

814
    def __init__(
11✔
815
        self,
816
        root_task: asyncio.Task,
817
        workers: set[WorkerThread],
818
        idle_workers: deque[WorkerThread],
819
    ):
820
        super().__init__(name="AnyIO worker thread")
11✔
821
        self.root_task = root_task
11✔
822
        self.workers = workers
11✔
823
        self.idle_workers = idle_workers
11✔
824
        self.loop = root_task._loop
11✔
825
        self.queue: Queue[
11✔
826
            tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
827
        ] = Queue(2)
828
        self.idle_since = AsyncIOBackend.current_time()
11✔
829
        self.stopping = False
11✔
830

831
    def _report_result(
11✔
832
        self, future: asyncio.Future, result: Any, exc: BaseException | None
833
    ) -> None:
834
        self.idle_since = AsyncIOBackend.current_time()
11✔
835
        if not self.stopping:
11✔
836
            self.idle_workers.append(self)
11✔
837

838
        if not future.cancelled():
11✔
839
            if exc is not None:
11✔
840
                if isinstance(exc, StopIteration):
11✔
841
                    new_exc = RuntimeError("coroutine raised StopIteration")
10✔
842
                    new_exc.__cause__ = exc
10✔
843
                    exc = new_exc
10✔
844

845
                future.set_exception(exc)
11✔
846
            else:
847
                future.set_result(result)
11✔
848

849
    def run(self) -> None:
11✔
850
        with claim_worker_thread(AsyncIOBackend, self.loop):
11✔
851
            while True:
7✔
852
                item = self.queue.get()
11✔
853
                if item is None:
11✔
854
                    # Shutdown command received
855
                    return
11✔
856

857
                context, func, args, future, cancel_scope = item
11✔
858
                if not future.cancelled():
11✔
859
                    result = None
11✔
860
                    exception: BaseException | None = None
11✔
861
                    threadlocals.current_cancel_scope = cancel_scope
11✔
862
                    try:
11✔
863
                        result = context.run(func, *args)
11✔
864
                    except BaseException as exc:
11✔
865
                        exception = exc
11✔
866
                    finally:
867
                        del threadlocals.current_cancel_scope
11✔
868

869
                    if not self.loop.is_closed():
11✔
870
                        self.loop.call_soon_threadsafe(
11✔
871
                            self._report_result, future, result, exception
872
                        )
873

874
                self.queue.task_done()
11✔
875

876
    def stop(self, f: asyncio.Task | None = None) -> None:
11✔
877
        self.stopping = True
11✔
878
        self.queue.put_nowait(None)
11✔
879
        self.workers.discard(self)
11✔
880
        try:
11✔
881
            self.idle_workers.remove(self)
11✔
882
        except ValueError:
10✔
883
            pass
10✔
884

885

886
_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
11✔
887
    "_threadpool_idle_workers"
888
)
889
_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
11✔
890

891

892
class BlockingPortal(abc.BlockingPortal):
11✔
893
    def __new__(cls) -> BlockingPortal:
11✔
894
        return object.__new__(cls)
11✔
895

896
    def __init__(self) -> None:
11✔
897
        super().__init__()
11✔
898
        self._loop = get_running_loop()
11✔
899

900
    def _spawn_task_from_thread(
11✔
901
        self,
902
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
903
        args: tuple[Unpack[PosArgsT]],
904
        kwargs: dict[str, Any],
905
        name: object,
906
        future: Future[T_Retval],
907
    ) -> None:
908
        AsyncIOBackend.run_sync_from_thread(
11✔
909
            partial(self._task_group.start_soon, name=name),
910
            (self._call_func, func, args, kwargs, future),
911
            self._loop,
912
        )
913

914

915
#
916
# Subprocesses
917
#
918

919

920
@dataclass(eq=False)
11✔
921
class StreamReaderWrapper(abc.ByteReceiveStream):
11✔
922
    _stream: asyncio.StreamReader
11✔
923

924
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
925
        data = await self._stream.read(max_bytes)
10✔
926
        if data:
10✔
927
            return data
10✔
928
        else:
929
            raise EndOfStream
10✔
930

931
    async def aclose(self) -> None:
11✔
932
        self._stream.feed_eof()
10✔
933
        await AsyncIOBackend.checkpoint()
10✔
934

935

936
@dataclass(eq=False)
11✔
937
class StreamWriterWrapper(abc.ByteSendStream):
11✔
938
    _stream: asyncio.StreamWriter
11✔
939

940
    async def send(self, item: bytes) -> None:
11✔
941
        self._stream.write(item)
10✔
942
        await self._stream.drain()
10✔
943

944
    async def aclose(self) -> None:
11✔
945
        self._stream.close()
10✔
946
        await AsyncIOBackend.checkpoint()
10✔
947

948

949
@dataclass(eq=False)
11✔
950
class Process(abc.Process):
11✔
951
    _process: asyncio.subprocess.Process
11✔
952
    _stdin: StreamWriterWrapper | None
11✔
953
    _stdout: StreamReaderWrapper | None
11✔
954
    _stderr: StreamReaderWrapper | None
11✔
955

956
    async def aclose(self) -> None:
11✔
957
        with CancelScope(shield=True):
10✔
958
            if self._stdin:
10✔
959
                await self._stdin.aclose()
10✔
960
            if self._stdout:
10✔
961
                await self._stdout.aclose()
10✔
962
            if self._stderr:
10✔
963
                await self._stderr.aclose()
10✔
964

965
        try:
10✔
966
            await self.wait()
10✔
967
        except BaseException:
10✔
968
            self.kill()
10✔
969
            with CancelScope(shield=True):
10✔
970
                await self.wait()
10✔
971

972
            raise
10✔
973

974
    async def wait(self) -> int:
11✔
975
        return await self._process.wait()
10✔
976

977
    def terminate(self) -> None:
11✔
978
        self._process.terminate()
8✔
979

980
    def kill(self) -> None:
11✔
981
        self._process.kill()
10✔
982

983
    def send_signal(self, signal: int) -> None:
11✔
984
        self._process.send_signal(signal)
×
985

986
    @property
11✔
987
    def pid(self) -> int:
11✔
988
        return self._process.pid
×
989

990
    @property
11✔
991
    def returncode(self) -> int | None:
11✔
992
        return self._process.returncode
10✔
993

994
    @property
11✔
995
    def stdin(self) -> abc.ByteSendStream | None:
11✔
996
        return self._stdin
10✔
997

998
    @property
11✔
999
    def stdout(self) -> abc.ByteReceiveStream | None:
11✔
1000
        return self._stdout
10✔
1001

1002
    @property
11✔
1003
    def stderr(self) -> abc.ByteReceiveStream | None:
11✔
1004
        return self._stderr
10✔
1005

1006

1007
def _forcibly_shutdown_process_pool_on_exit(
11✔
1008
    workers: set[Process], _task: object
1009
) -> None:
1010
    """
1011
    Forcibly shuts down worker processes belonging to this event loop."""
1012
    child_watcher: asyncio.AbstractChildWatcher | None = None
10✔
1013
    if sys.version_info < (3, 12):
10✔
1014
        try:
6✔
1015
            child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
6✔
1016
        except NotImplementedError:
1✔
1017
            pass
1✔
1018

1019
    # Close as much as possible (w/o async/await) to avoid warnings
1020
    for process in workers:
10✔
1021
        if process.returncode is None:
10✔
1022
            continue
10✔
1023

1024
        process._stdin._stream._transport.close()  # type: ignore[union-attr]
×
1025
        process._stdout._stream._transport.close()  # type: ignore[union-attr]
×
1026
        process._stderr._stream._transport.close()  # type: ignore[union-attr]
×
1027
        process.kill()
×
1028
        if child_watcher:
×
1029
            child_watcher.remove_child_handler(process.pid)
×
1030

1031

1032
async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
11✔
1033
    """
1034
    Shuts down worker processes belonging to this event loop.
1035

1036
    NOTE: this only works when the event loop was started using asyncio.run() or
1037
    anyio.run().
1038

1039
    """
1040
    process: abc.Process
1041
    try:
10✔
1042
        await sleep(math.inf)
10✔
1043
    except asyncio.CancelledError:
10✔
1044
        for process in workers:
10✔
1045
            if process.returncode is None:
10✔
1046
                process.kill()
10✔
1047

1048
        for process in workers:
10✔
1049
            await process.aclose()
10✔
1050

1051

1052
#
1053
# Sockets and networking
1054
#
1055

1056

1057
class StreamProtocol(asyncio.Protocol):
11✔
1058
    read_queue: deque[bytes]
11✔
1059
    read_event: asyncio.Event
11✔
1060
    write_event: asyncio.Event
11✔
1061
    exception: Exception | None = None
11✔
1062
    is_at_eof: bool = False
11✔
1063

1064
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1065
        self.read_queue = deque()
11✔
1066
        self.read_event = asyncio.Event()
11✔
1067
        self.write_event = asyncio.Event()
11✔
1068
        self.write_event.set()
11✔
1069
        cast(asyncio.Transport, transport).set_write_buffer_limits(0)
11✔
1070

1071
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1072
        if exc:
11✔
1073
            self.exception = BrokenResourceError()
11✔
1074
            self.exception.__cause__ = exc
11✔
1075

1076
        self.read_event.set()
11✔
1077
        self.write_event.set()
11✔
1078

1079
    def data_received(self, data: bytes) -> None:
11✔
1080
        self.read_queue.append(data)
11✔
1081
        self.read_event.set()
11✔
1082

1083
    def eof_received(self) -> bool | None:
11✔
1084
        self.is_at_eof = True
11✔
1085
        self.read_event.set()
11✔
1086
        return True
11✔
1087

1088
    def pause_writing(self) -> None:
11✔
1089
        self.write_event = asyncio.Event()
11✔
1090

1091
    def resume_writing(self) -> None:
11✔
1092
        self.write_event.set()
1✔
1093

1094

1095
class DatagramProtocol(asyncio.DatagramProtocol):
11✔
1096
    read_queue: deque[tuple[bytes, IPSockAddrType]]
11✔
1097
    read_event: asyncio.Event
11✔
1098
    write_event: asyncio.Event
11✔
1099
    exception: Exception | None = None
11✔
1100

1101
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
11✔
1102
        self.read_queue = deque(maxlen=100)  # arbitrary value
10✔
1103
        self.read_event = asyncio.Event()
10✔
1104
        self.write_event = asyncio.Event()
10✔
1105
        self.write_event.set()
10✔
1106

1107
    def connection_lost(self, exc: Exception | None) -> None:
11✔
1108
        self.read_event.set()
10✔
1109
        self.write_event.set()
10✔
1110

1111
    def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
11✔
1112
        addr = convert_ipv6_sockaddr(addr)
10✔
1113
        self.read_queue.append((data, addr))
10✔
1114
        self.read_event.set()
10✔
1115

1116
    def error_received(self, exc: Exception) -> None:
11✔
1117
        self.exception = exc
×
1118

1119
    def pause_writing(self) -> None:
11✔
1120
        self.write_event.clear()
×
1121

1122
    def resume_writing(self) -> None:
11✔
1123
        self.write_event.set()
×
1124

1125

1126
class SocketStream(abc.SocketStream):
11✔
1127
    def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
11✔
1128
        self._transport = transport
11✔
1129
        self._protocol = protocol
11✔
1130
        self._receive_guard = ResourceGuard("reading from")
11✔
1131
        self._send_guard = ResourceGuard("writing to")
11✔
1132
        self._closed = False
11✔
1133

1134
    @property
11✔
1135
    def _raw_socket(self) -> socket.socket:
11✔
1136
        return self._transport.get_extra_info("socket")
11✔
1137

1138
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1139
        with self._receive_guard:
11✔
1140
            if (
11✔
1141
                not self._protocol.read_event.is_set()
1142
                and not self._transport.is_closing()
1143
                and not self._protocol.is_at_eof
1144
            ):
1145
                self._transport.resume_reading()
11✔
1146
                await self._protocol.read_event.wait()
11✔
1147
                self._transport.pause_reading()
11✔
1148
            else:
1149
                await AsyncIOBackend.checkpoint()
11✔
1150

1151
            try:
11✔
1152
                chunk = self._protocol.read_queue.popleft()
11✔
1153
            except IndexError:
11✔
1154
                if self._closed:
11✔
1155
                    raise ClosedResourceError from None
11✔
1156
                elif self._protocol.exception:
11✔
1157
                    raise self._protocol.exception from None
11✔
1158
                else:
1159
                    raise EndOfStream from None
11✔
1160

1161
            if len(chunk) > max_bytes:
11✔
1162
                # Split the oversized chunk
1163
                chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
9✔
1164
                self._protocol.read_queue.appendleft(leftover)
9✔
1165

1166
            # If the read queue is empty, clear the flag so that the next call will
1167
            # block until data is available
1168
            if not self._protocol.read_queue:
11✔
1169
                self._protocol.read_event.clear()
11✔
1170

1171
        return chunk
11✔
1172

1173
    async def send(self, item: bytes) -> None:
11✔
1174
        with self._send_guard:
11✔
1175
            await AsyncIOBackend.checkpoint()
11✔
1176

1177
            if self._closed:
11✔
1178
                raise ClosedResourceError
11✔
1179
            elif self._protocol.exception is not None:
11✔
1180
                raise self._protocol.exception
11✔
1181

1182
            try:
11✔
1183
                self._transport.write(item)
11✔
1184
            except RuntimeError as exc:
×
1185
                if self._transport.is_closing():
×
1186
                    raise BrokenResourceError from exc
×
1187
                else:
1188
                    raise
×
1189

1190
            await self._protocol.write_event.wait()
11✔
1191

1192
    async def send_eof(self) -> None:
11✔
1193
        try:
11✔
1194
            self._transport.write_eof()
11✔
1195
        except OSError:
×
1196
            pass
×
1197

1198
    async def aclose(self) -> None:
11✔
1199
        if not self._transport.is_closing():
11✔
1200
            self._closed = True
11✔
1201
            try:
11✔
1202
                self._transport.write_eof()
11✔
1203
            except OSError:
7✔
1204
                pass
7✔
1205

1206
            self._transport.close()
11✔
1207
            await sleep(0)
11✔
1208
            self._transport.abort()
11✔
1209

1210

1211
class _RawSocketMixin:
11✔
1212
    _receive_future: asyncio.Future | None = None
11✔
1213
    _send_future: asyncio.Future | None = None
11✔
1214
    _closing = False
11✔
1215

1216
    def __init__(self, raw_socket: socket.socket):
11✔
1217
        self.__raw_socket = raw_socket
8✔
1218
        self._receive_guard = ResourceGuard("reading from")
8✔
1219
        self._send_guard = ResourceGuard("writing to")
8✔
1220

1221
    @property
11✔
1222
    def _raw_socket(self) -> socket.socket:
11✔
1223
        return self.__raw_socket
8✔
1224

1225
    def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1226
        def callback(f: object) -> None:
8✔
1227
            del self._receive_future
8✔
1228
            loop.remove_reader(self.__raw_socket)
8✔
1229

1230
        f = self._receive_future = asyncio.Future()
8✔
1231
        loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1232
        f.add_done_callback(callback)
8✔
1233
        return f
8✔
1234

1235
    def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
11✔
1236
        def callback(f: object) -> None:
8✔
1237
            del self._send_future
8✔
1238
            loop.remove_writer(self.__raw_socket)
8✔
1239

1240
        f = self._send_future = asyncio.Future()
8✔
1241
        loop.add_writer(self.__raw_socket, f.set_result, None)
8✔
1242
        f.add_done_callback(callback)
8✔
1243
        return f
8✔
1244

1245
    async def aclose(self) -> None:
11✔
1246
        if not self._closing:
8✔
1247
            self._closing = True
8✔
1248
            if self.__raw_socket.fileno() != -1:
8✔
1249
                self.__raw_socket.close()
8✔
1250

1251
            if self._receive_future:
8✔
1252
                self._receive_future.set_result(None)
8✔
1253
            if self._send_future:
8✔
1254
                self._send_future.set_result(None)
×
1255

1256

1257
class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
11✔
1258
    async def send_eof(self) -> None:
11✔
1259
        with self._send_guard:
8✔
1260
            self._raw_socket.shutdown(socket.SHUT_WR)
8✔
1261

1262
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
1263
        loop = get_running_loop()
8✔
1264
        await AsyncIOBackend.checkpoint()
8✔
1265
        with self._receive_guard:
8✔
1266
            while True:
5✔
1267
                try:
8✔
1268
                    data = self._raw_socket.recv(max_bytes)
8✔
1269
                except BlockingIOError:
8✔
1270
                    await self._wait_until_readable(loop)
8✔
1271
                except OSError as exc:
8✔
1272
                    if self._closing:
8✔
1273
                        raise ClosedResourceError from None
8✔
1274
                    else:
1275
                        raise BrokenResourceError from exc
1✔
1276
                else:
1277
                    if not data:
8✔
1278
                        raise EndOfStream
8✔
1279

1280
                    return data
8✔
1281

1282
    async def send(self, item: bytes) -> None:
11✔
1283
        loop = get_running_loop()
8✔
1284
        await AsyncIOBackend.checkpoint()
8✔
1285
        with self._send_guard:
8✔
1286
            view = memoryview(item)
8✔
1287
            while view:
8✔
1288
                try:
8✔
1289
                    bytes_sent = self._raw_socket.send(view)
8✔
1290
                except BlockingIOError:
8✔
1291
                    await self._wait_until_writable(loop)
8✔
1292
                except OSError as exc:
8✔
1293
                    if self._closing:
8✔
1294
                        raise ClosedResourceError from None
8✔
1295
                    else:
1296
                        raise BrokenResourceError from exc
1✔
1297
                else:
1298
                    view = view[bytes_sent:]
8✔
1299

1300
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
11✔
1301
        if not isinstance(msglen, int) or msglen < 0:
8✔
1302
            raise ValueError("msglen must be a non-negative integer")
8✔
1303
        if not isinstance(maxfds, int) or maxfds < 1:
8✔
1304
            raise ValueError("maxfds must be a positive integer")
8✔
1305

1306
        loop = get_running_loop()
8✔
1307
        fds = array.array("i")
8✔
1308
        await AsyncIOBackend.checkpoint()
8✔
1309
        with self._receive_guard:
8✔
1310
            while True:
5✔
1311
                try:
8✔
1312
                    message, ancdata, flags, addr = self._raw_socket.recvmsg(
8✔
1313
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
1314
                    )
1315
                except BlockingIOError:
8✔
1316
                    await self._wait_until_readable(loop)
8✔
1317
                except OSError as exc:
×
1318
                    if self._closing:
×
1319
                        raise ClosedResourceError from None
×
1320
                    else:
1321
                        raise BrokenResourceError from exc
×
1322
                else:
1323
                    if not message and not ancdata:
8✔
1324
                        raise EndOfStream
×
1325

1326
                    break
5✔
1327

1328
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
8✔
1329
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
8✔
1330
                raise RuntimeError(
×
1331
                    f"Received unexpected ancillary data; message = {message!r}, "
1332
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
1333
                )
1334

1335
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
8✔
1336

1337
        return message, list(fds)
8✔
1338

1339
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
11✔
1340
        if not message:
8✔
1341
            raise ValueError("message must not be empty")
8✔
1342
        if not fds:
8✔
1343
            raise ValueError("fds must not be empty")
8✔
1344

1345
        loop = get_running_loop()
8✔
1346
        filenos: list[int] = []
8✔
1347
        for fd in fds:
8✔
1348
            if isinstance(fd, int):
8✔
1349
                filenos.append(fd)
×
1350
            elif isinstance(fd, IOBase):
8✔
1351
                filenos.append(fd.fileno())
8✔
1352

1353
        fdarray = array.array("i", filenos)
8✔
1354
        await AsyncIOBackend.checkpoint()
8✔
1355
        with self._send_guard:
8✔
1356
            while True:
5✔
1357
                try:
8✔
1358
                    # The ignore can be removed after mypy picks up
1359
                    # https://github.com/python/typeshed/pull/5545
1360
                    self._raw_socket.sendmsg(
8✔
1361
                        [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
1362
                    )
1363
                    break
8✔
1364
                except BlockingIOError:
×
1365
                    await self._wait_until_writable(loop)
×
1366
                except OSError as exc:
×
1367
                    if self._closing:
×
1368
                        raise ClosedResourceError from None
×
1369
                    else:
1370
                        raise BrokenResourceError from exc
×
1371

1372

1373
class TCPSocketListener(abc.SocketListener):
11✔
1374
    _accept_scope: CancelScope | None = None
11✔
1375
    _closed = False
11✔
1376

1377
    def __init__(self, raw_socket: socket.socket):
11✔
1378
        self.__raw_socket = raw_socket
11✔
1379
        self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
11✔
1380
        self._accept_guard = ResourceGuard("accepting connections from")
11✔
1381

1382
    @property
11✔
1383
    def _raw_socket(self) -> socket.socket:
11✔
1384
        return self.__raw_socket
11✔
1385

1386
    async def accept(self) -> abc.SocketStream:
11✔
1387
        if self._closed:
11✔
1388
            raise ClosedResourceError
11✔
1389

1390
        with self._accept_guard:
11✔
1391
            await AsyncIOBackend.checkpoint()
11✔
1392
            with CancelScope() as self._accept_scope:
11✔
1393
                try:
11✔
1394
                    client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
11✔
1395
                except asyncio.CancelledError:
11✔
1396
                    # Workaround for https://bugs.python.org/issue41317
1397
                    try:
11✔
1398
                        self._loop.remove_reader(self._raw_socket)
11✔
1399
                    except (ValueError, NotImplementedError):
2✔
1400
                        pass
2✔
1401

1402
                    if self._closed:
11✔
1403
                        raise ClosedResourceError from None
10✔
1404

1405
                    raise
11✔
1406
                finally:
1407
                    self._accept_scope = None
11✔
1408

1409
        client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
1410
        transport, protocol = await self._loop.connect_accepted_socket(
11✔
1411
            StreamProtocol, client_sock
1412
        )
1413
        return SocketStream(transport, protocol)
11✔
1414

1415
    async def aclose(self) -> None:
11✔
1416
        if self._closed:
11✔
1417
            return
11✔
1418

1419
        self._closed = True
11✔
1420
        if self._accept_scope:
11✔
1421
            # Workaround for https://bugs.python.org/issue41317
1422
            try:
11✔
1423
                self._loop.remove_reader(self._raw_socket)
11✔
1424
            except (ValueError, NotImplementedError):
2✔
1425
                pass
2✔
1426

1427
            self._accept_scope.cancel()
10✔
1428
            await sleep(0)
10✔
1429

1430
        self._raw_socket.close()
11✔
1431

1432

1433
class UNIXSocketListener(abc.SocketListener):
11✔
1434
    def __init__(self, raw_socket: socket.socket):
11✔
1435
        self.__raw_socket = raw_socket
8✔
1436
        self._loop = get_running_loop()
8✔
1437
        self._accept_guard = ResourceGuard("accepting connections from")
8✔
1438
        self._closed = False
8✔
1439

1440
    async def accept(self) -> abc.SocketStream:
11✔
1441
        await AsyncIOBackend.checkpoint()
8✔
1442
        with self._accept_guard:
8✔
1443
            while True:
5✔
1444
                try:
8✔
1445
                    client_sock, _ = self.__raw_socket.accept()
8✔
1446
                    client_sock.setblocking(False)
8✔
1447
                    return UNIXSocketStream(client_sock)
8✔
1448
                except BlockingIOError:
8✔
1449
                    f: asyncio.Future = asyncio.Future()
8✔
1450
                    self._loop.add_reader(self.__raw_socket, f.set_result, None)
8✔
1451
                    f.add_done_callback(
8✔
1452
                        lambda _: self._loop.remove_reader(self.__raw_socket)
1453
                    )
1454
                    await f
8✔
1455
                except OSError as exc:
×
1456
                    if self._closed:
×
1457
                        raise ClosedResourceError from None
×
1458
                    else:
1459
                        raise BrokenResourceError from exc
1✔
1460

1461
    async def aclose(self) -> None:
11✔
1462
        self._closed = True
8✔
1463
        self.__raw_socket.close()
8✔
1464

1465
    @property
11✔
1466
    def _raw_socket(self) -> socket.socket:
11✔
1467
        return self.__raw_socket
8✔
1468

1469

1470
class UDPSocket(abc.UDPSocket):
11✔
1471
    def __init__(
11✔
1472
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1473
    ):
1474
        self._transport = transport
10✔
1475
        self._protocol = protocol
10✔
1476
        self._receive_guard = ResourceGuard("reading from")
10✔
1477
        self._send_guard = ResourceGuard("writing to")
10✔
1478
        self._closed = False
10✔
1479

1480
    @property
11✔
1481
    def _raw_socket(self) -> socket.socket:
11✔
1482
        return self._transport.get_extra_info("socket")
10✔
1483

1484
    async def aclose(self) -> None:
11✔
1485
        if not self._transport.is_closing():
10✔
1486
            self._closed = True
10✔
1487
            self._transport.close()
10✔
1488

1489
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
11✔
1490
        with self._receive_guard:
10✔
1491
            await AsyncIOBackend.checkpoint()
10✔
1492

1493
            # If the buffer is empty, ask for more data
1494
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1495
                self._protocol.read_event.clear()
10✔
1496
                await self._protocol.read_event.wait()
10✔
1497

1498
            try:
10✔
1499
                return self._protocol.read_queue.popleft()
10✔
1500
            except IndexError:
10✔
1501
                if self._closed:
10✔
1502
                    raise ClosedResourceError from None
10✔
1503
                else:
1504
                    raise BrokenResourceError from None
1✔
1505

1506
    async def send(self, item: UDPPacketType) -> None:
11✔
1507
        with self._send_guard:
10✔
1508
            await AsyncIOBackend.checkpoint()
10✔
1509
            await self._protocol.write_event.wait()
10✔
1510
            if self._closed:
10✔
1511
                raise ClosedResourceError
10✔
1512
            elif self._transport.is_closing():
10✔
1513
                raise BrokenResourceError
×
1514
            else:
1515
                self._transport.sendto(*item)
10✔
1516

1517

1518
class ConnectedUDPSocket(abc.ConnectedUDPSocket):
11✔
1519
    def __init__(
11✔
1520
        self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
1521
    ):
1522
        self._transport = transport
10✔
1523
        self._protocol = protocol
10✔
1524
        self._receive_guard = ResourceGuard("reading from")
10✔
1525
        self._send_guard = ResourceGuard("writing to")
10✔
1526
        self._closed = False
10✔
1527

1528
    @property
11✔
1529
    def _raw_socket(self) -> socket.socket:
11✔
1530
        return self._transport.get_extra_info("socket")
10✔
1531

1532
    async def aclose(self) -> None:
11✔
1533
        if not self._transport.is_closing():
10✔
1534
            self._closed = True
10✔
1535
            self._transport.close()
10✔
1536

1537
    async def receive(self) -> bytes:
11✔
1538
        with self._receive_guard:
10✔
1539
            await AsyncIOBackend.checkpoint()
10✔
1540

1541
            # If the buffer is empty, ask for more data
1542
            if not self._protocol.read_queue and not self._transport.is_closing():
10✔
1543
                self._protocol.read_event.clear()
10✔
1544
                await self._protocol.read_event.wait()
10✔
1545

1546
            try:
10✔
1547
                packet = self._protocol.read_queue.popleft()
10✔
1548
            except IndexError:
10✔
1549
                if self._closed:
10✔
1550
                    raise ClosedResourceError from None
10✔
1551
                else:
1552
                    raise BrokenResourceError from None
×
1553

1554
            return packet[0]
10✔
1555

1556
    async def send(self, item: bytes) -> None:
11✔
1557
        with self._send_guard:
10✔
1558
            await AsyncIOBackend.checkpoint()
10✔
1559
            await self._protocol.write_event.wait()
10✔
1560
            if self._closed:
10✔
1561
                raise ClosedResourceError
10✔
1562
            elif self._transport.is_closing():
10✔
1563
                raise BrokenResourceError
×
1564
            else:
1565
                self._transport.sendto(item)
10✔
1566

1567

1568
class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
11✔
1569
    async def receive(self) -> UNIXDatagramPacketType:
11✔
1570
        loop = get_running_loop()
8✔
1571
        await AsyncIOBackend.checkpoint()
8✔
1572
        with self._receive_guard:
8✔
1573
            while True:
5✔
1574
                try:
8✔
1575
                    data = self._raw_socket.recvfrom(65536)
8✔
1576
                except BlockingIOError:
8✔
1577
                    await self._wait_until_readable(loop)
8✔
1578
                except OSError as exc:
8✔
1579
                    if self._closing:
8✔
1580
                        raise ClosedResourceError from None
8✔
1581
                    else:
1582
                        raise BrokenResourceError from exc
1✔
1583
                else:
1584
                    return data
8✔
1585

1586
    async def send(self, item: UNIXDatagramPacketType) -> None:
11✔
1587
        loop = get_running_loop()
8✔
1588
        await AsyncIOBackend.checkpoint()
8✔
1589
        with self._send_guard:
8✔
1590
            while True:
5✔
1591
                try:
8✔
1592
                    self._raw_socket.sendto(*item)
8✔
1593
                except BlockingIOError:
8✔
1594
                    await self._wait_until_writable(loop)
×
1595
                except OSError as exc:
8✔
1596
                    if self._closing:
8✔
1597
                        raise ClosedResourceError from None
8✔
1598
                    else:
1599
                        raise BrokenResourceError from exc
1✔
1600
                else:
1601
                    return
8✔
1602

1603

1604
class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
11✔
1605
    async def receive(self) -> bytes:
11✔
1606
        loop = get_running_loop()
8✔
1607
        await AsyncIOBackend.checkpoint()
8✔
1608
        with self._receive_guard:
8✔
1609
            while True:
5✔
1610
                try:
8✔
1611
                    data = self._raw_socket.recv(65536)
8✔
1612
                except BlockingIOError:
8✔
1613
                    await self._wait_until_readable(loop)
8✔
1614
                except OSError as exc:
8✔
1615
                    if self._closing:
8✔
1616
                        raise ClosedResourceError from None
8✔
1617
                    else:
1618
                        raise BrokenResourceError from exc
1✔
1619
                else:
1620
                    return data
8✔
1621

1622
    async def send(self, item: bytes) -> None:
11✔
1623
        loop = get_running_loop()
8✔
1624
        await AsyncIOBackend.checkpoint()
8✔
1625
        with self._send_guard:
8✔
1626
            while True:
5✔
1627
                try:
8✔
1628
                    self._raw_socket.send(item)
8✔
1629
                except BlockingIOError:
8✔
1630
                    await self._wait_until_writable(loop)
×
1631
                except OSError as exc:
8✔
1632
                    if self._closing:
8✔
1633
                        raise ClosedResourceError from None
8✔
1634
                    else:
1635
                        raise BrokenResourceError from exc
1✔
1636
                else:
1637
                    return
8✔
1638

1639

1640
_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
11✔
1641
_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
11✔
1642

1643

1644
#
1645
# Synchronization
1646
#
1647

1648

1649
class Event(BaseEvent):
11✔
1650
    def __new__(cls) -> Event:
11✔
1651
        return object.__new__(cls)
11✔
1652

1653
    def __init__(self) -> None:
11✔
1654
        self._event = asyncio.Event()
11✔
1655

1656
    def set(self) -> None:
11✔
1657
        self._event.set()
11✔
1658

1659
    def is_set(self) -> bool:
11✔
1660
        return self._event.is_set()
11✔
1661

1662
    async def wait(self) -> None:
11✔
1663
        if self.is_set():
11✔
1664
            await AsyncIOBackend.checkpoint()
11✔
1665
        else:
1666
            await self._event.wait()
11✔
1667

1668
    def statistics(self) -> EventStatistics:
11✔
1669
        return EventStatistics(len(self._event._waiters))
10✔
1670

1671

1672
class Lock(BaseLock):
11✔
1673
    def __new__(cls) -> Lock:
11✔
1674
        return object.__new__(cls)
10✔
1675

1676
    def __init__(self) -> None:
11✔
1677
        self._owner_task: asyncio.Task | None = None
10✔
1678
        self._waiters: deque[tuple[asyncio.Task, asyncio.Future]] = deque()
10✔
1679

1680
    async def acquire(self) -> None:
11✔
1681
        if self._owner_task is None and not self._waiters:
10✔
1682
            await AsyncIOBackend.checkpoint_if_cancelled()
10✔
1683
            self._owner_task = current_task()
10✔
1684
            try:
10✔
1685
                await AsyncIOBackend.cancel_shielded_checkpoint()
10✔
1686
            except CancelledError:
10✔
1687
                self.release()
10✔
1688
                raise
10✔
1689

1690
            return
10✔
1691

1692
        task = cast(asyncio.Task, current_task())
10✔
1693
        fut: asyncio.Future[None] = asyncio.Future()
10✔
1694
        item = task, fut
10✔
1695
        self._waiters.append(item)
10✔
1696
        try:
10✔
1697
            await fut
10✔
1698
        except CancelledError:
10✔
1699
            self._waiters.remove(item)
10✔
1700
            if self._owner_task is task:
10✔
1701
                self.release()
10✔
1702

1703
            raise
10✔
1704

1705
        self._waiters.remove(item)
10✔
1706

1707
    def acquire_nowait(self) -> None:
11✔
1708
        if self._owner_task is None and not self._waiters:
10✔
1709
            self._owner_task = current_task()
10✔
1710
            return
10✔
1711

1712
        raise WouldBlock
10✔
1713

1714
    def locked(self) -> bool:
11✔
1715
        return self._owner_task is not None
10✔
1716

1717
    def release(self) -> None:
11✔
1718
        if self._owner_task != current_task():
10✔
NEW
1719
            raise RuntimeError("The current task is not holding this lock")
×
1720

1721
        for task, fut in self._waiters:
10✔
1722
            if not fut.cancelled():
10✔
1723
                self._owner_task = task
10✔
1724
                fut.set_result(None)
10✔
1725
                return
10✔
1726

1727
        self._owner_task = None
10✔
1728

1729
    def statistics(self) -> LockStatistics:
11✔
1730
        task_info = AsyncIOTaskInfo(self._owner_task) if self._owner_task else None
10✔
1731
        return LockStatistics(self.locked(), task_info, len(self._waiters))
10✔
1732

1733

1734
class Semaphore(BaseSemaphore):
11✔
1735
    def __new__(cls, initial_value: int, *, max_value: int | None = None) -> Semaphore:
11✔
1736
        return object.__new__(cls)
10✔
1737

1738
    def __init__(self, initial_value: int, *, max_value: int | None = None):
11✔
1739
        super().__init__(initial_value, max_value=max_value)
10✔
1740
        self._value = initial_value
10✔
1741
        self._max_value = max_value
10✔
1742
        self._waiters: deque[asyncio.Future[None]] = deque()
10✔
1743

1744
    async def acquire(self) -> None:
11✔
1745
        if self._value > 0 and not self._waiters:
10✔
1746
            await AsyncIOBackend.checkpoint_if_cancelled()
10✔
1747
            self._value -= 1
10✔
1748
            try:
10✔
1749
                await AsyncIOBackend.cancel_shielded_checkpoint()
10✔
1750
            except CancelledError:
10✔
1751
                self.release()
10✔
1752
                raise
10✔
1753

1754
            return
10✔
1755

1756
        fut: asyncio.Future[None] = asyncio.Future()
10✔
1757
        self._waiters.append(fut)
10✔
1758
        try:
10✔
1759
            await fut
10✔
1760
        except CancelledError:
10✔
1761
            try:
10✔
1762
                self._waiters.remove(fut)
10✔
1763
            except ValueError:
10✔
1764
                self.release()
10✔
1765

1766
            raise
10✔
1767

1768
    def acquire_nowait(self) -> None:
11✔
1769
        if self._value == 0:
10✔
1770
            raise WouldBlock
10✔
1771

1772
        self._value -= 1
10✔
1773

1774
    def release(self) -> None:
11✔
1775
        if self._max_value is not None and self._value == self._max_value:
10✔
1776
            raise ValueError("semaphore released too many times")
10✔
1777

1778
        for fut in self._waiters:
10✔
1779
            if not fut.cancelled():
10✔
1780
                fut.set_result(None)
10✔
1781
                self._waiters.remove(fut)
10✔
1782
                return
10✔
1783

1784
        self._value += 1
10✔
1785

1786
    @property
11✔
1787
    def value(self) -> int:
11✔
1788
        return self._value
10✔
1789

1790
    @property
11✔
1791
    def max_value(self) -> int | None:
11✔
1792
        return self._max_value
10✔
1793

1794
    def statistics(self) -> SemaphoreStatistics:
11✔
1795
        return SemaphoreStatistics(len(self._waiters))
10✔
1796

1797

1798
class CapacityLimiter(BaseCapacityLimiter):
11✔
1799
    _total_tokens: float = 0
11✔
1800

1801
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
11✔
1802
        return object.__new__(cls)
11✔
1803

1804
    def __init__(self, total_tokens: float):
11✔
1805
        self._borrowers: set[Any] = set()
11✔
1806
        self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
11✔
1807
        self.total_tokens = total_tokens
11✔
1808

1809
    async def __aenter__(self) -> None:
11✔
1810
        await self.acquire()
11✔
1811

1812
    async def __aexit__(
11✔
1813
        self,
1814
        exc_type: type[BaseException] | None,
1815
        exc_val: BaseException | None,
1816
        exc_tb: TracebackType | None,
1817
    ) -> None:
1818
        self.release()
11✔
1819

1820
    @property
11✔
1821
    def total_tokens(self) -> float:
11✔
1822
        return self._total_tokens
10✔
1823

1824
    @total_tokens.setter
11✔
1825
    def total_tokens(self, value: float) -> None:
11✔
1826
        if not isinstance(value, int) and not math.isinf(value):
11✔
1827
            raise TypeError("total_tokens must be an int or math.inf")
10✔
1828
        if value < 1:
11✔
1829
            raise ValueError("total_tokens must be >= 1")
10✔
1830

1831
        waiters_to_notify = max(value - self._total_tokens, 0)
11✔
1832
        self._total_tokens = value
11✔
1833

1834
        # Notify waiting tasks that they have acquired the limiter
1835
        while self._wait_queue and waiters_to_notify:
11✔
1836
            event = self._wait_queue.popitem(last=False)[1]
10✔
1837
            event.set()
10✔
1838
            waiters_to_notify -= 1
10✔
1839

1840
    @property
11✔
1841
    def borrowed_tokens(self) -> int:
11✔
1842
        return len(self._borrowers)
10✔
1843

1844
    @property
11✔
1845
    def available_tokens(self) -> float:
11✔
1846
        return self._total_tokens - len(self._borrowers)
10✔
1847

1848
    def acquire_nowait(self) -> None:
11✔
1849
        self.acquire_on_behalf_of_nowait(current_task())
×
1850

1851
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
1852
        if borrower in self._borrowers:
11✔
1853
            raise RuntimeError(
10✔
1854
                "this borrower is already holding one of this CapacityLimiter's "
1855
                "tokens"
1856
            )
1857

1858
        if self._wait_queue or len(self._borrowers) >= self._total_tokens:
11✔
1859
            raise WouldBlock
10✔
1860

1861
        self._borrowers.add(borrower)
11✔
1862

1863
    async def acquire(self) -> None:
11✔
1864
        return await self.acquire_on_behalf_of(current_task())
11✔
1865

1866
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
1867
        await AsyncIOBackend.checkpoint_if_cancelled()
11✔
1868
        try:
11✔
1869
            self.acquire_on_behalf_of_nowait(borrower)
11✔
1870
        except WouldBlock:
10✔
1871
            event = asyncio.Event()
10✔
1872
            self._wait_queue[borrower] = event
10✔
1873
            try:
10✔
1874
                await event.wait()
10✔
1875
            except BaseException:
×
1876
                self._wait_queue.pop(borrower, None)
×
1877
                raise
×
1878

1879
            self._borrowers.add(borrower)
10✔
1880
        else:
1881
            try:
11✔
1882
                await AsyncIOBackend.cancel_shielded_checkpoint()
11✔
1883
            except BaseException:
10✔
1884
                self.release()
10✔
1885
                raise
10✔
1886

1887
    def release(self) -> None:
11✔
1888
        self.release_on_behalf_of(current_task())
11✔
1889

1890
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
1891
        try:
11✔
1892
            self._borrowers.remove(borrower)
11✔
1893
        except KeyError:
10✔
1894
            raise RuntimeError(
10✔
1895
                "this borrower isn't holding any of this CapacityLimiter's tokens"
1896
            ) from None
1897

1898
        # Notify the next task in line if this limiter has free capacity now
1899
        if self._wait_queue and len(self._borrowers) < self._total_tokens:
11✔
1900
            event = self._wait_queue.popitem(last=False)[1]
10✔
1901
            event.set()
10✔
1902

1903
    def statistics(self) -> CapacityLimiterStatistics:
11✔
1904
        return CapacityLimiterStatistics(
10✔
1905
            self.borrowed_tokens,
1906
            self.total_tokens,
1907
            tuple(self._borrowers),
1908
            len(self._wait_queue),
1909
        )
1910

1911

1912
_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
11✔
1913

1914

1915
#
1916
# Operating system signals
1917
#
1918

1919

1920
class _SignalReceiver:
11✔
1921
    def __init__(self, signals: tuple[Signals, ...]):
11✔
1922
        self._signals = signals
9✔
1923
        self._loop = get_running_loop()
9✔
1924
        self._signal_queue: deque[Signals] = deque()
9✔
1925
        self._future: asyncio.Future = asyncio.Future()
9✔
1926
        self._handled_signals: set[Signals] = set()
9✔
1927

1928
    def _deliver(self, signum: Signals) -> None:
11✔
1929
        self._signal_queue.append(signum)
9✔
1930
        if not self._future.done():
9✔
1931
            self._future.set_result(None)
9✔
1932

1933
    def __enter__(self) -> _SignalReceiver:
11✔
1934
        for sig in set(self._signals):
9✔
1935
            self._loop.add_signal_handler(sig, self._deliver, sig)
9✔
1936
            self._handled_signals.add(sig)
9✔
1937

1938
        return self
9✔
1939

1940
    def __exit__(
11✔
1941
        self,
1942
        exc_type: type[BaseException] | None,
1943
        exc_val: BaseException | None,
1944
        exc_tb: TracebackType | None,
1945
    ) -> bool | None:
1946
        for sig in self._handled_signals:
9✔
1947
            self._loop.remove_signal_handler(sig)
9✔
1948
        return None
9✔
1949

1950
    def __aiter__(self) -> _SignalReceiver:
11✔
1951
        return self
9✔
1952

1953
    async def __anext__(self) -> Signals:
11✔
1954
        await AsyncIOBackend.checkpoint()
9✔
1955
        if not self._signal_queue:
9✔
1956
            self._future = asyncio.Future()
×
1957
            await self._future
×
1958

1959
        return self._signal_queue.popleft()
9✔
1960

1961

1962
#
1963
# Testing and debugging
1964
#
1965

1966

1967
class AsyncIOTaskInfo(TaskInfo):
11✔
1968
    def __init__(self, task: asyncio.Task):
11✔
1969
        task_state = _task_states.get(task)
11✔
1970
        if task_state is None:
11✔
1971
            parent_id = None
11✔
1972
        else:
1973
            parent_id = task_state.parent_id
11✔
1974

1975
        super().__init__(id(task), parent_id, task.get_name(), task.get_coro())
11✔
1976
        self._task = weakref.ref(task)
11✔
1977

1978
    def has_pending_cancellation(self) -> bool:
11✔
1979
        if not (task := self._task()):
11✔
1980
            # If the task isn't around anymore, it won't have a pending cancellation
1981
            return False
×
1982

1983
        if sys.version_info >= (3, 11):
11✔
1984
            if task.cancelling():
5✔
1985
                return True
5✔
1986
        elif (
6✔
1987
            isinstance(task._fut_waiter, asyncio.Future)
1988
            and task._fut_waiter.cancelled()
1989
        ):
1990
            return True
6✔
1991

1992
        if task_state := _task_states.get(task):
11✔
1993
            if cancel_scope := task_state.cancel_scope:
11✔
1994
                return cancel_scope.cancel_called or (
11✔
1995
                    not cancel_scope.shield and cancel_scope._parent_cancelled()
1996
                )
1997

1998
        return False
11✔
1999

2000

2001
class TestRunner(abc.TestRunner):
11✔
2002
    _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
11✔
2003

2004
    def __init__(
11✔
2005
        self,
2006
        *,
2007
        debug: bool | None = None,
2008
        use_uvloop: bool = False,
2009
        loop_factory: Callable[[], AbstractEventLoop] | None = None,
2010
    ) -> None:
2011
        if use_uvloop and loop_factory is None:
11✔
2012
            import uvloop
×
2013

2014
            loop_factory = uvloop.new_event_loop
×
2015

2016
        self._runner = Runner(debug=debug, loop_factory=loop_factory)
11✔
2017
        self._exceptions: list[BaseException] = []
11✔
2018
        self._runner_task: asyncio.Task | None = None
11✔
2019

2020
    def __enter__(self) -> TestRunner:
11✔
2021
        self._runner.__enter__()
11✔
2022
        self.get_loop().set_exception_handler(self._exception_handler)
11✔
2023
        return self
11✔
2024

2025
    def __exit__(
11✔
2026
        self,
2027
        exc_type: type[BaseException] | None,
2028
        exc_val: BaseException | None,
2029
        exc_tb: TracebackType | None,
2030
    ) -> None:
2031
        self._runner.__exit__(exc_type, exc_val, exc_tb)
11✔
2032

2033
    def get_loop(self) -> AbstractEventLoop:
11✔
2034
        return self._runner.get_loop()
11✔
2035

2036
    def _exception_handler(
11✔
2037
        self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
2038
    ) -> None:
2039
        if isinstance(context.get("exception"), Exception):
11✔
2040
            self._exceptions.append(context["exception"])
11✔
2041
        else:
2042
            loop.default_exception_handler(context)
11✔
2043

2044
    def _raise_async_exceptions(self) -> None:
11✔
2045
        # Re-raise any exceptions raised in asynchronous callbacks
2046
        if self._exceptions:
11✔
2047
            exceptions, self._exceptions = self._exceptions, []
11✔
2048
            if len(exceptions) == 1:
11✔
2049
                raise exceptions[0]
11✔
2050
            elif exceptions:
×
2051
                raise BaseExceptionGroup(
×
2052
                    "Multiple exceptions occurred in asynchronous callbacks", exceptions
2053
                )
2054

2055
    async def _run_tests_and_fixtures(
11✔
2056
        self,
2057
        receive_stream: MemoryObjectReceiveStream[
2058
            tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
2059
        ],
2060
    ) -> None:
2061
        with receive_stream, self._send_stream:
11✔
2062
            async for coro, future in receive_stream:
11✔
2063
                try:
11✔
2064
                    retval = await coro
11✔
2065
                except BaseException as exc:
11✔
2066
                    if not future.cancelled():
11✔
2067
                        future.set_exception(exc)
11✔
2068
                else:
2069
                    if not future.cancelled():
11✔
2070
                        future.set_result(retval)
11✔
2071

2072
    async def _call_in_runner_task(
11✔
2073
        self,
2074
        func: Callable[P, Awaitable[T_Retval]],
2075
        *args: P.args,
2076
        **kwargs: P.kwargs,
2077
    ) -> T_Retval:
2078
        if not self._runner_task:
11✔
2079
            self._send_stream, receive_stream = create_memory_object_stream[
11✔
2080
                Tuple[Awaitable[Any], asyncio.Future]
2081
            ](1)
2082
            self._runner_task = self.get_loop().create_task(
11✔
2083
                self._run_tests_and_fixtures(receive_stream)
2084
            )
2085

2086
        coro = func(*args, **kwargs)
11✔
2087
        future: asyncio.Future[T_Retval] = self.get_loop().create_future()
11✔
2088
        self._send_stream.send_nowait((coro, future))
11✔
2089
        return await future
11✔
2090

2091
    def run_asyncgen_fixture(
11✔
2092
        self,
2093
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
2094
        kwargs: dict[str, Any],
2095
    ) -> Iterable[T_Retval]:
2096
        asyncgen = fixture_func(**kwargs)
11✔
2097
        fixturevalue: T_Retval = self.get_loop().run_until_complete(
11✔
2098
            self._call_in_runner_task(asyncgen.asend, None)
2099
        )
2100
        self._raise_async_exceptions()
11✔
2101

2102
        yield fixturevalue
11✔
2103

2104
        try:
11✔
2105
            self.get_loop().run_until_complete(
11✔
2106
                self._call_in_runner_task(asyncgen.asend, None)
2107
            )
2108
        except StopAsyncIteration:
11✔
2109
            self._raise_async_exceptions()
11✔
2110
        else:
2111
            self.get_loop().run_until_complete(asyncgen.aclose())
×
2112
            raise RuntimeError("Async generator fixture did not stop")
×
2113

2114
    def run_fixture(
11✔
2115
        self,
2116
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
2117
        kwargs: dict[str, Any],
2118
    ) -> T_Retval:
2119
        retval = self.get_loop().run_until_complete(
11✔
2120
            self._call_in_runner_task(fixture_func, **kwargs)
2121
        )
2122
        self._raise_async_exceptions()
11✔
2123
        return retval
11✔
2124

2125
    def run_test(
11✔
2126
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
2127
    ) -> None:
2128
        try:
11✔
2129
            self.get_loop().run_until_complete(
11✔
2130
                self._call_in_runner_task(test_func, **kwargs)
2131
            )
2132
        except Exception as exc:
11✔
2133
            self._exceptions.append(exc)
11✔
2134

2135
        self._raise_async_exceptions()
11✔
2136

2137

2138
class AsyncIOBackend(AsyncBackend):
11✔
2139
    @classmethod
11✔
2140
    def run(
11✔
2141
        cls,
2142
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2143
        args: tuple[Unpack[PosArgsT]],
2144
        kwargs: dict[str, Any],
2145
        options: dict[str, Any],
2146
    ) -> T_Retval:
2147
        @wraps(func)
11✔
2148
        async def wrapper() -> T_Retval:
11✔
2149
            task = cast(asyncio.Task, current_task())
11✔
2150
            task.set_name(get_callable_name(func))
11✔
2151
            _task_states[task] = TaskState(None, None)
11✔
2152

2153
            try:
11✔
2154
                return await func(*args)
11✔
2155
            finally:
2156
                del _task_states[task]
11✔
2157

2158
        debug = options.get("debug", None)
11✔
2159
        loop_factory = options.get("loop_factory", None)
11✔
2160
        if loop_factory is None and options.get("use_uvloop", False):
11✔
2161
            import uvloop
7✔
2162

2163
            loop_factory = uvloop.new_event_loop
7✔
2164

2165
        with Runner(debug=debug, loop_factory=loop_factory) as runner:
11✔
2166
            return runner.run(wrapper())
11✔
2167

2168
    @classmethod
11✔
2169
    def current_token(cls) -> object:
11✔
2170
        return get_running_loop()
11✔
2171

2172
    @classmethod
11✔
2173
    def current_time(cls) -> float:
11✔
2174
        return get_running_loop().time()
11✔
2175

2176
    @classmethod
11✔
2177
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
2178
        return CancelledError
11✔
2179

2180
    @classmethod
11✔
2181
    async def checkpoint(cls) -> None:
11✔
2182
        await sleep(0)
11✔
2183

2184
    @classmethod
11✔
2185
    async def checkpoint_if_cancelled(cls) -> None:
11✔
2186
        task = current_task()
11✔
2187
        if task is None:
11✔
2188
            return
×
2189

2190
        try:
11✔
2191
            cancel_scope = _task_states[task].cancel_scope
11✔
2192
        except KeyError:
11✔
2193
            return
11✔
2194

2195
        while cancel_scope:
11✔
2196
            if cancel_scope.cancel_called:
11✔
2197
                await sleep(0)
11✔
2198
            elif cancel_scope.shield:
11✔
2199
                break
10✔
2200
            else:
2201
                cancel_scope = cancel_scope._parent_scope
11✔
2202

2203
    @classmethod
11✔
2204
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
2205
        with CancelScope(shield=True):
11✔
2206
            await sleep(0)
11✔
2207

2208
    @classmethod
11✔
2209
    async def sleep(cls, delay: float) -> None:
11✔
2210
        await sleep(delay)
11✔
2211

2212
    @classmethod
11✔
2213
    def create_cancel_scope(
11✔
2214
        cls, *, deadline: float = math.inf, shield: bool = False
2215
    ) -> CancelScope:
2216
        return CancelScope(deadline=deadline, shield=shield)
11✔
2217

2218
    @classmethod
11✔
2219
    def current_effective_deadline(cls) -> float:
11✔
2220
        try:
10✔
2221
            cancel_scope = _task_states[
10✔
2222
                current_task()  # type: ignore[index]
2223
            ].cancel_scope
2224
        except KeyError:
×
2225
            return math.inf
×
2226

2227
        deadline = math.inf
10✔
2228
        while cancel_scope:
10✔
2229
            deadline = min(deadline, cancel_scope.deadline)
10✔
2230
            if cancel_scope._cancel_called:
10✔
2231
                deadline = -math.inf
10✔
2232
                break
10✔
2233
            elif cancel_scope.shield:
10✔
2234
                break
10✔
2235
            else:
2236
                cancel_scope = cancel_scope._parent_scope
10✔
2237

2238
        return deadline
10✔
2239

2240
    @classmethod
11✔
2241
    def create_task_group(cls) -> abc.TaskGroup:
11✔
2242
        return TaskGroup()
11✔
2243

2244
    @classmethod
11✔
2245
    def create_event(cls) -> abc.Event:
11✔
2246
        return Event()
11✔
2247

2248
    @classmethod
11✔
2249
    def create_lock(cls) -> abc.Lock:
11✔
2250
        return Lock()
10✔
2251

2252
    @classmethod
11✔
2253
    def create_semaphore(
11✔
2254
        cls, initial_value: int, *, max_value: int | None = None
2255
    ) -> abc.Semaphore:
2256
        return Semaphore(initial_value, max_value=max_value)
10✔
2257

2258
    @classmethod
11✔
2259
    def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
11✔
2260
        return CapacityLimiter(total_tokens)
10✔
2261

2262
    @classmethod
11✔
2263
    async def run_sync_in_worker_thread(
11✔
2264
        cls,
2265
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2266
        args: tuple[Unpack[PosArgsT]],
2267
        abandon_on_cancel: bool = False,
2268
        limiter: abc.CapacityLimiter | None = None,
2269
    ) -> T_Retval:
2270
        await cls.checkpoint()
11✔
2271

2272
        # If this is the first run in this event loop thread, set up the necessary
2273
        # variables
2274
        try:
11✔
2275
            idle_workers = _threadpool_idle_workers.get()
11✔
2276
            workers = _threadpool_workers.get()
11✔
2277
        except LookupError:
11✔
2278
            idle_workers = deque()
11✔
2279
            workers = set()
11✔
2280
            _threadpool_idle_workers.set(idle_workers)
11✔
2281
            _threadpool_workers.set(workers)
11✔
2282

2283
        async with limiter or cls.current_default_thread_limiter():
11✔
2284
            with CancelScope(shield=not abandon_on_cancel) as scope:
11✔
2285
                future: asyncio.Future = asyncio.Future()
11✔
2286
                root_task = find_root_task()
11✔
2287
                if not idle_workers:
11✔
2288
                    worker = WorkerThread(root_task, workers, idle_workers)
11✔
2289
                    worker.start()
11✔
2290
                    workers.add(worker)
11✔
2291
                    root_task.add_done_callback(worker.stop)
11✔
2292
                else:
2293
                    worker = idle_workers.pop()
11✔
2294

2295
                    # Prune any other workers that have been idle for MAX_IDLE_TIME
2296
                    # seconds or longer
2297
                    now = cls.current_time()
11✔
2298
                    while idle_workers:
11✔
2299
                        if (
10✔
2300
                            now - idle_workers[0].idle_since
2301
                            < WorkerThread.MAX_IDLE_TIME
2302
                        ):
2303
                            break
10✔
2304

2305
                        expired_worker = idle_workers.popleft()
×
2306
                        expired_worker.root_task.remove_done_callback(
×
2307
                            expired_worker.stop
2308
                        )
2309
                        expired_worker.stop()
×
2310

2311
                context = copy_context()
11✔
2312
                context.run(sniffio.current_async_library_cvar.set, None)
11✔
2313
                if abandon_on_cancel or scope._parent_scope is None:
11✔
2314
                    worker_scope = scope
11✔
2315
                else:
2316
                    worker_scope = scope._parent_scope
11✔
2317

2318
                worker.queue.put_nowait((context, func, args, future, worker_scope))
11✔
2319
                return await future
11✔
2320

2321
    @classmethod
11✔
2322
    def check_cancelled(cls) -> None:
11✔
2323
        scope: CancelScope | None = threadlocals.current_cancel_scope
11✔
2324
        while scope is not None:
11✔
2325
            if scope.cancel_called:
11✔
2326
                raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
11✔
2327

2328
            if scope.shield:
11✔
2329
                return
×
2330

2331
            scope = scope._parent_scope
11✔
2332

2333
    @classmethod
11✔
2334
    def run_async_from_thread(
11✔
2335
        cls,
2336
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
2337
        args: tuple[Unpack[PosArgsT]],
2338
        token: object,
2339
    ) -> T_Retval:
2340
        async def task_wrapper(scope: CancelScope) -> T_Retval:
11✔
2341
            __tracebackhide__ = True
11✔
2342
            task = cast(asyncio.Task, current_task())
11✔
2343
            _task_states[task] = TaskState(None, scope)
11✔
2344
            scope._tasks.add(task)
11✔
2345
            try:
11✔
2346
                return await func(*args)
11✔
2347
            except CancelledError as exc:
11✔
2348
                raise concurrent.futures.CancelledError(str(exc)) from None
11✔
2349
            finally:
2350
                scope._tasks.discard(task)
11✔
2351

2352
        loop = cast(AbstractEventLoop, token)
11✔
2353
        context = copy_context()
11✔
2354
        context.run(sniffio.current_async_library_cvar.set, "asyncio")
11✔
2355
        wrapper = task_wrapper(threadlocals.current_cancel_scope)
11✔
2356
        f: concurrent.futures.Future[T_Retval] = context.run(
11✔
2357
            asyncio.run_coroutine_threadsafe, wrapper, loop
2358
        )
2359
        return f.result()
11✔
2360

2361
    @classmethod
11✔
2362
    def run_sync_from_thread(
11✔
2363
        cls,
2364
        func: Callable[[Unpack[PosArgsT]], T_Retval],
2365
        args: tuple[Unpack[PosArgsT]],
2366
        token: object,
2367
    ) -> T_Retval:
2368
        @wraps(func)
11✔
2369
        def wrapper() -> None:
11✔
2370
            try:
11✔
2371
                sniffio.current_async_library_cvar.set("asyncio")
11✔
2372
                f.set_result(func(*args))
11✔
2373
            except BaseException as exc:
11✔
2374
                f.set_exception(exc)
11✔
2375
                if not isinstance(exc, Exception):
11✔
2376
                    raise
×
2377

2378
        f: concurrent.futures.Future[T_Retval] = Future()
11✔
2379
        loop = cast(AbstractEventLoop, token)
11✔
2380
        loop.call_soon_threadsafe(wrapper)
11✔
2381
        return f.result()
11✔
2382

2383
    @classmethod
11✔
2384
    def create_blocking_portal(cls) -> abc.BlockingPortal:
11✔
2385
        return BlockingPortal()
11✔
2386

2387
    @classmethod
11✔
2388
    async def open_process(
11✔
2389
        cls,
2390
        command: str | bytes | Sequence[str | bytes],
2391
        *,
2392
        shell: bool,
2393
        stdin: int | IO[Any] | None,
2394
        stdout: int | IO[Any] | None,
2395
        stderr: int | IO[Any] | None,
2396
        cwd: str | bytes | PathLike | None = None,
2397
        env: Mapping[str, str] | None = None,
2398
        start_new_session: bool = False,
2399
    ) -> Process:
2400
        await cls.checkpoint()
10✔
2401
        if shell:
10✔
2402
            process = await asyncio.create_subprocess_shell(
10✔
2403
                cast("str | bytes", command),
2404
                stdin=stdin,
2405
                stdout=stdout,
2406
                stderr=stderr,
2407
                cwd=cwd,
2408
                env=env,
2409
                start_new_session=start_new_session,
2410
            )
2411
        else:
2412
            process = await asyncio.create_subprocess_exec(
10✔
2413
                *command,
2414
                stdin=stdin,
2415
                stdout=stdout,
2416
                stderr=stderr,
2417
                cwd=cwd,
2418
                env=env,
2419
                start_new_session=start_new_session,
2420
            )
2421

2422
        stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
10✔
2423
        stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
10✔
2424
        stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
10✔
2425
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
10✔
2426

2427
    @classmethod
11✔
2428
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
11✔
2429
        create_task(
10✔
2430
            _shutdown_process_pool_on_exit(workers),
2431
            name="AnyIO process pool shutdown task",
2432
        )
2433
        find_root_task().add_done_callback(
10✔
2434
            partial(_forcibly_shutdown_process_pool_on_exit, workers)  # type:ignore[arg-type]
2435
        )
2436

2437
    @classmethod
11✔
2438
    async def connect_tcp(
11✔
2439
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
2440
    ) -> abc.SocketStream:
2441
        transport, protocol = cast(
11✔
2442
            Tuple[asyncio.Transport, StreamProtocol],
2443
            await get_running_loop().create_connection(
2444
                StreamProtocol, host, port, local_addr=local_address
2445
            ),
2446
        )
2447
        transport.pause_reading()
11✔
2448
        return SocketStream(transport, protocol)
11✔
2449

2450
    @classmethod
11✔
2451
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
11✔
2452
        await cls.checkpoint()
8✔
2453
        loop = get_running_loop()
8✔
2454
        raw_socket = socket.socket(socket.AF_UNIX)
8✔
2455
        raw_socket.setblocking(False)
8✔
2456
        while True:
5✔
2457
            try:
8✔
2458
                raw_socket.connect(path)
8✔
2459
            except BlockingIOError:
8✔
2460
                f: asyncio.Future = asyncio.Future()
×
2461
                loop.add_writer(raw_socket, f.set_result, None)
×
2462
                f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2463
                await f
×
2464
            except BaseException:
8✔
2465
                raw_socket.close()
8✔
2466
                raise
8✔
2467
            else:
2468
                return UNIXSocketStream(raw_socket)
8✔
2469

2470
    @classmethod
11✔
2471
    def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2472
        return TCPSocketListener(sock)
11✔
2473

2474
    @classmethod
11✔
2475
    def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
11✔
2476
        return UNIXSocketListener(sock)
8✔
2477

2478
    @classmethod
11✔
2479
    async def create_udp_socket(
11✔
2480
        cls,
2481
        family: AddressFamily,
2482
        local_address: IPSockAddrType | None,
2483
        remote_address: IPSockAddrType | None,
2484
        reuse_port: bool,
2485
    ) -> UDPSocket | ConnectedUDPSocket:
2486
        transport, protocol = await get_running_loop().create_datagram_endpoint(
10✔
2487
            DatagramProtocol,
2488
            local_addr=local_address,
2489
            remote_addr=remote_address,
2490
            family=family,
2491
            reuse_port=reuse_port,
2492
        )
2493
        if protocol.exception:
10✔
2494
            transport.close()
×
2495
            raise protocol.exception
×
2496

2497
        if not remote_address:
10✔
2498
            return UDPSocket(transport, protocol)
10✔
2499
        else:
2500
            return ConnectedUDPSocket(transport, protocol)
10✔
2501

2502
    @classmethod
11✔
2503
    async def create_unix_datagram_socket(  # type: ignore[override]
11✔
2504
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
2505
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
2506
        await cls.checkpoint()
8✔
2507
        loop = get_running_loop()
8✔
2508

2509
        if remote_path:
8✔
2510
            while True:
5✔
2511
                try:
8✔
2512
                    raw_socket.connect(remote_path)
8✔
2513
                except BlockingIOError:
×
2514
                    f: asyncio.Future = asyncio.Future()
×
2515
                    loop.add_writer(raw_socket, f.set_result, None)
×
2516
                    f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
×
2517
                    await f
×
2518
                except BaseException:
×
2519
                    raw_socket.close()
×
2520
                    raise
×
2521
                else:
2522
                    return ConnectedUNIXDatagramSocket(raw_socket)
8✔
2523
        else:
2524
            return UNIXDatagramSocket(raw_socket)
8✔
2525

2526
    @classmethod
11✔
2527
    async def getaddrinfo(
11✔
2528
        cls,
2529
        host: bytes | str | None,
2530
        port: str | int | None,
2531
        *,
2532
        family: int | AddressFamily = 0,
2533
        type: int | SocketKind = 0,
2534
        proto: int = 0,
2535
        flags: int = 0,
2536
    ) -> list[
2537
        tuple[
2538
            AddressFamily,
2539
            SocketKind,
2540
            int,
2541
            str,
2542
            tuple[str, int] | tuple[str, int, int, int],
2543
        ]
2544
    ]:
2545
        return await get_running_loop().getaddrinfo(
11✔
2546
            host, port, family=family, type=type, proto=proto, flags=flags
2547
        )
2548

2549
    @classmethod
11✔
2550
    async def getnameinfo(
11✔
2551
        cls, sockaddr: IPSockAddrType, flags: int = 0
2552
    ) -> tuple[str, str]:
2553
        return await get_running_loop().getnameinfo(sockaddr, flags)
10✔
2554

2555
    @classmethod
11✔
2556
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
11✔
2557
        await cls.checkpoint()
×
2558
        try:
×
2559
            read_events = _read_events.get()
×
2560
        except LookupError:
×
2561
            read_events = {}
×
2562
            _read_events.set(read_events)
×
2563

2564
        if read_events.get(sock):
×
2565
            raise BusyResourceError("reading from") from None
×
2566

2567
        loop = get_running_loop()
×
2568
        event = read_events[sock] = asyncio.Event()
×
2569
        loop.add_reader(sock, event.set)
×
2570
        try:
×
2571
            await event.wait()
×
2572
        finally:
2573
            if read_events.pop(sock, None) is not None:
×
2574
                loop.remove_reader(sock)
×
2575
                readable = True
×
2576
            else:
2577
                readable = False
×
2578

2579
        if not readable:
×
2580
            raise ClosedResourceError
×
2581

2582
    @classmethod
11✔
2583
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
11✔
2584
        await cls.checkpoint()
×
2585
        try:
×
2586
            write_events = _write_events.get()
×
2587
        except LookupError:
×
2588
            write_events = {}
×
2589
            _write_events.set(write_events)
×
2590

2591
        if write_events.get(sock):
×
2592
            raise BusyResourceError("writing to") from None
×
2593

2594
        loop = get_running_loop()
×
2595
        event = write_events[sock] = asyncio.Event()
×
2596
        loop.add_writer(sock.fileno(), event.set)
×
2597
        try:
×
2598
            await event.wait()
×
2599
        finally:
2600
            if write_events.pop(sock, None) is not None:
×
2601
                loop.remove_writer(sock)
×
2602
                writable = True
×
2603
            else:
2604
                writable = False
×
2605

2606
        if not writable:
×
2607
            raise ClosedResourceError
×
2608

2609
    @classmethod
11✔
2610
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
2611
        try:
11✔
2612
            return _default_thread_limiter.get()
11✔
2613
        except LookupError:
11✔
2614
            limiter = CapacityLimiter(40)
11✔
2615
            _default_thread_limiter.set(limiter)
11✔
2616
            return limiter
11✔
2617

2618
    @classmethod
11✔
2619
    def open_signal_receiver(
11✔
2620
        cls, *signals: Signals
2621
    ) -> ContextManager[AsyncIterator[Signals]]:
2622
        return _SignalReceiver(signals)
9✔
2623

2624
    @classmethod
11✔
2625
    def get_current_task(cls) -> TaskInfo:
11✔
2626
        return AsyncIOTaskInfo(current_task())  # type: ignore[arg-type]
11✔
2627

2628
    @classmethod
11✔
2629
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
2630
        return [AsyncIOTaskInfo(task) for task in all_tasks() if not task.done()]
11✔
2631

2632
    @classmethod
11✔
2633
    async def wait_all_tasks_blocked(cls) -> None:
11✔
2634
        await cls.checkpoint()
11✔
2635
        this_task = current_task()
11✔
2636
        while True:
7✔
2637
            for task in all_tasks():
11✔
2638
                if task is this_task:
11✔
2639
                    continue
11✔
2640

2641
                waiter = task._fut_waiter  # type: ignore[attr-defined]
11✔
2642
                if waiter is None or waiter.done():
11✔
2643
                    await sleep(0.1)
11✔
2644
                    break
11✔
2645
            else:
2646
                return
11✔
2647

2648
    @classmethod
11✔
2649
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
2650
        return TestRunner(**options)
11✔
2651

2652

2653
backend_class = AsyncIOBackend
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc