• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10708046317

04 Sep 2024 07:12PM UTC coverage: 91.676% (-0.2%) from 91.834%
10708046317

Pull #761

github

web-flow
Merge fa137ec93 into ee8165b55
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

249 of 274 new or added lines in 4 files covered. (90.88%)

2 existing lines in 2 files now uncovered.

4791 of 5226 relevant lines covered (91.68%)

9.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.51
/src/anyio/_backends/_trio.py
1
from __future__ import annotations
11✔
2

3
import array
11✔
4
import math
11✔
5
import os
11✔
6
import socket
11✔
7
import sys
11✔
8
import types
11✔
9
import weakref
11✔
10
from collections.abc import AsyncIterator, Iterable
11✔
11
from concurrent.futures import Future
11✔
12
from dataclasses import dataclass
11✔
13
from functools import partial
11✔
14
from io import IOBase
11✔
15
from os import PathLike
11✔
16
from signal import Signals
11✔
17
from socket import AddressFamily, SocketKind
11✔
18
from types import TracebackType
11✔
19
from typing import (
11✔
20
    IO,
21
    Any,
22
    AsyncGenerator,
23
    Awaitable,
24
    Callable,
25
    Collection,
26
    ContextManager,
27
    Coroutine,
28
    Generic,
29
    NoReturn,
30
    Sequence,
31
    TypeVar,
32
    cast,
33
    overload,
34
)
35

36
import trio.from_thread
11✔
37
import trio.lowlevel
11✔
38
from outcome import Error, Outcome, Value
11✔
39
from trio.lowlevel import (
11✔
40
    current_root_task,
41
    current_task,
42
    wait_readable,
43
    wait_writable,
44
)
45
from trio.socket import SocketType as TrioSocketType
11✔
46
from trio.to_thread import run_sync
11✔
47

48
from .. import (
11✔
49
    CapacityLimiterStatistics,
50
    EventStatistics,
51
    LockStatistics,
52
    TaskInfo,
53
    WouldBlock,
54
    abc,
55
)
56
from .._core._eventloop import claim_worker_thread
11✔
57
from .._core._exceptions import (
11✔
58
    BrokenResourceError,
59
    BusyResourceError,
60
    ClosedResourceError,
61
    EndOfStream,
62
)
63
from .._core._sockets import convert_ipv6_sockaddr
11✔
64
from .._core._streams import create_memory_object_stream
11✔
65
from .._core._synchronization import (
11✔
66
    CapacityLimiter as BaseCapacityLimiter,
67
)
68
from .._core._synchronization import Event as BaseEvent
11✔
69
from .._core._synchronization import Lock as BaseLock
11✔
70
from .._core._synchronization import (
11✔
71
    ResourceGuard,
72
    SemaphoreStatistics,
73
)
74
from .._core._synchronization import Semaphore as BaseSemaphore
11✔
75
from .._core._tasks import CancelScope as BaseCancelScope
11✔
76
from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
11✔
77
from ..abc._eventloop import AsyncBackend, StrOrBytesPath
11✔
78
from ..streams.memory import MemoryObjectSendStream
11✔
79

80
if sys.version_info >= (3, 10):
11✔
81
    from typing import ParamSpec
7✔
82
else:
83
    from typing_extensions import ParamSpec
4✔
84

85
if sys.version_info >= (3, 11):
11✔
86
    from typing import TypeVarTuple, Unpack
5✔
87
else:
88
    from exceptiongroup import BaseExceptionGroup
6✔
89
    from typing_extensions import TypeVarTuple, Unpack
6✔
90

91
T = TypeVar("T")
11✔
92
T_Retval = TypeVar("T_Retval")
11✔
93
T_SockAddr = TypeVar("T_SockAddr", str, IPSockAddrType)
11✔
94
PosArgsT = TypeVarTuple("PosArgsT")
11✔
95
P = ParamSpec("P")
11✔
96

97

98
#
99
# Event loop
100
#
101

102
RunVar = trio.lowlevel.RunVar
11✔
103

104

105
#
106
# Timeouts and cancellation
107
#
108

109

110
class CancelScope(BaseCancelScope):
11✔
111
    def __new__(
11✔
112
        cls, original: trio.CancelScope | None = None, **kwargs: object
113
    ) -> CancelScope:
114
        return object.__new__(cls)
11✔
115

116
    def __init__(self, original: trio.CancelScope | None = None, **kwargs: Any) -> None:
11✔
117
        self.__original = original or trio.CancelScope(**kwargs)
11✔
118

119
    def __enter__(self) -> CancelScope:
11✔
120
        self.__original.__enter__()
11✔
121
        return self
11✔
122

123
    def __exit__(
11✔
124
        self,
125
        exc_type: type[BaseException] | None,
126
        exc_val: BaseException | None,
127
        exc_tb: TracebackType | None,
128
    ) -> bool | None:
129
        # https://github.com/python-trio/trio-typing/pull/79
130
        return self.__original.__exit__(exc_type, exc_val, exc_tb)
11✔
131

132
    def cancel(self) -> None:
11✔
133
        self.__original.cancel()
11✔
134

135
    @property
11✔
136
    def deadline(self) -> float:
11✔
137
        return self.__original.deadline
10✔
138

139
    @deadline.setter
11✔
140
    def deadline(self, value: float) -> None:
11✔
141
        self.__original.deadline = value
10✔
142

143
    @property
11✔
144
    def cancel_called(self) -> bool:
11✔
145
        return self.__original.cancel_called
11✔
146

147
    @property
11✔
148
    def cancelled_caught(self) -> bool:
11✔
149
        return self.__original.cancelled_caught
11✔
150

151
    @property
11✔
152
    def shield(self) -> bool:
11✔
153
        return self.__original.shield
10✔
154

155
    @shield.setter
11✔
156
    def shield(self, value: bool) -> None:
11✔
157
        self.__original.shield = value
10✔
158

159

160
#
161
# Task groups
162
#
163

164

165
class TaskGroup(abc.TaskGroup):
11✔
166
    def __init__(self) -> None:
11✔
167
        self._active = False
11✔
168
        self._nursery_manager = trio.open_nursery(strict_exception_groups=True)
11✔
169
        self.cancel_scope = None  # type: ignore[assignment]
11✔
170

171
    async def __aenter__(self) -> TaskGroup:
11✔
172
        self._active = True
11✔
173
        self._nursery = await self._nursery_manager.__aenter__()
11✔
174
        self.cancel_scope = CancelScope(self._nursery.cancel_scope)
11✔
175
        return self
11✔
176

177
    async def __aexit__(
11✔
178
        self,
179
        exc_type: type[BaseException] | None,
180
        exc_val: BaseException | None,
181
        exc_tb: TracebackType | None,
182
    ) -> bool | None:
183
        try:
11✔
184
            return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
11✔
185
        except BaseExceptionGroup as exc:
11✔
186
            _, rest = exc.split(trio.Cancelled)
11✔
187
            if not rest:
11✔
188
                cancelled_exc = trio.Cancelled._create()
11✔
189
                raise cancelled_exc from exc
11✔
190

191
            raise
11✔
192
        finally:
193
            self._active = False
11✔
194

195
    def start_soon(
11✔
196
        self,
197
        func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
198
        *args: Unpack[PosArgsT],
199
        name: object = None,
200
    ) -> None:
201
        if not self._active:
11✔
202
            raise RuntimeError(
10✔
203
                "This task group is not active; no new tasks can be started."
204
            )
205

206
        self._nursery.start_soon(func, *args, name=name)
11✔
207

208
    async def start(
11✔
209
        self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
210
    ) -> Any:
211
        if not self._active:
11✔
212
            raise RuntimeError(
×
213
                "This task group is not active; no new tasks can be started."
214
            )
215

216
        return await self._nursery.start(func, *args, name=name)
11✔
217

218

219
#
220
# Threads
221
#
222

223

224
class BlockingPortal(abc.BlockingPortal):
11✔
225
    def __new__(cls) -> BlockingPortal:
11✔
226
        return object.__new__(cls)
11✔
227

228
    def __init__(self) -> None:
11✔
229
        super().__init__()
11✔
230
        self._token = trio.lowlevel.current_trio_token()
11✔
231

232
    def _spawn_task_from_thread(
11✔
233
        self,
234
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
235
        args: tuple[Unpack[PosArgsT]],
236
        kwargs: dict[str, Any],
237
        name: object,
238
        future: Future[T_Retval],
239
    ) -> None:
240
        trio.from_thread.run_sync(
11✔
241
            partial(self._task_group.start_soon, name=name),
242
            self._call_func,
243
            func,
244
            args,
245
            kwargs,
246
            future,
247
            trio_token=self._token,
248
        )
249

250

251
#
252
# Subprocesses
253
#
254

255

256
@dataclass(eq=False)
11✔
257
class ReceiveStreamWrapper(abc.ByteReceiveStream):
11✔
258
    _stream: trio.abc.ReceiveStream
11✔
259

260
    async def receive(self, max_bytes: int | None = None) -> bytes:
11✔
261
        try:
10✔
262
            data = await self._stream.receive_some(max_bytes)
10✔
263
        except trio.ClosedResourceError as exc:
10✔
264
            raise ClosedResourceError from exc.__cause__
10✔
265
        except trio.BrokenResourceError as exc:
10✔
266
            raise BrokenResourceError from exc.__cause__
×
267

268
        if data:
10✔
269
            return data
10✔
270
        else:
271
            raise EndOfStream
10✔
272

273
    async def aclose(self) -> None:
11✔
274
        await self._stream.aclose()
10✔
275

276

277
@dataclass(eq=False)
11✔
278
class SendStreamWrapper(abc.ByteSendStream):
11✔
279
    _stream: trio.abc.SendStream
11✔
280

281
    async def send(self, item: bytes) -> None:
11✔
282
        try:
10✔
283
            await self._stream.send_all(item)
10✔
284
        except trio.ClosedResourceError as exc:
10✔
285
            raise ClosedResourceError from exc.__cause__
10✔
286
        except trio.BrokenResourceError as exc:
×
287
            raise BrokenResourceError from exc.__cause__
×
288

289
    async def aclose(self) -> None:
11✔
290
        await self._stream.aclose()
10✔
291

292

293
@dataclass(eq=False)
11✔
294
class Process(abc.Process):
11✔
295
    _process: trio.Process
11✔
296
    _stdin: abc.ByteSendStream | None
11✔
297
    _stdout: abc.ByteReceiveStream | None
11✔
298
    _stderr: abc.ByteReceiveStream | None
11✔
299

300
    async def aclose(self) -> None:
11✔
301
        with CancelScope(shield=True):
10✔
302
            if self._stdin:
10✔
303
                await self._stdin.aclose()
10✔
304
            if self._stdout:
10✔
305
                await self._stdout.aclose()
10✔
306
            if self._stderr:
10✔
307
                await self._stderr.aclose()
10✔
308

309
        try:
10✔
310
            await self.wait()
10✔
311
        except BaseException:
10✔
312
            self.kill()
10✔
313
            with CancelScope(shield=True):
10✔
314
                await self.wait()
10✔
315
            raise
10✔
316

317
    async def wait(self) -> int:
11✔
318
        return await self._process.wait()
10✔
319

320
    def terminate(self) -> None:
11✔
321
        self._process.terminate()
8✔
322

323
    def kill(self) -> None:
11✔
324
        self._process.kill()
10✔
325

326
    def send_signal(self, signal: Signals) -> None:
11✔
327
        self._process.send_signal(signal)
×
328

329
    @property
11✔
330
    def pid(self) -> int:
11✔
331
        return self._process.pid
×
332

333
    @property
11✔
334
    def returncode(self) -> int | None:
11✔
335
        return self._process.returncode
10✔
336

337
    @property
11✔
338
    def stdin(self) -> abc.ByteSendStream | None:
11✔
339
        return self._stdin
10✔
340

341
    @property
11✔
342
    def stdout(self) -> abc.ByteReceiveStream | None:
11✔
343
        return self._stdout
10✔
344

345
    @property
11✔
346
    def stderr(self) -> abc.ByteReceiveStream | None:
11✔
347
        return self._stderr
10✔
348

349

350
class _ProcessPoolShutdownInstrument(trio.abc.Instrument):
11✔
351
    def after_run(self) -> None:
11✔
352
        super().after_run()
×
353

354

355
current_default_worker_process_limiter: trio.lowlevel.RunVar = RunVar(
11✔
356
    "current_default_worker_process_limiter"
357
)
358

359

360
async def _shutdown_process_pool(workers: set[abc.Process]) -> None:
11✔
361
    try:
10✔
362
        await trio.sleep(math.inf)
10✔
363
    except trio.Cancelled:
10✔
364
        for process in workers:
10✔
365
            if process.returncode is None:
10✔
366
                process.kill()
10✔
367

368
        with CancelScope(shield=True):
10✔
369
            for process in workers:
10✔
370
                await process.aclose()
10✔
371

372

373
#
374
# Sockets and networking
375
#
376

377

378
class _TrioSocketMixin(Generic[T_SockAddr]):
11✔
379
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
380
        self._trio_socket = trio_socket
11✔
381
        self._closed = False
11✔
382

383
    def _check_closed(self) -> None:
11✔
384
        if self._closed:
×
385
            raise ClosedResourceError
×
386
        if self._trio_socket.fileno() < 0:
×
387
            raise BrokenResourceError
×
388

389
    @property
11✔
390
    def _raw_socket(self) -> socket.socket:
11✔
391
        return self._trio_socket._sock  # type: ignore[attr-defined]
11✔
392

393
    async def aclose(self) -> None:
11✔
394
        if self._trio_socket.fileno() >= 0:
11✔
395
            self._closed = True
11✔
396
            self._trio_socket.close()
11✔
397

398
    def _convert_socket_error(self, exc: BaseException) -> NoReturn:
11✔
399
        if isinstance(exc, trio.ClosedResourceError):
11✔
400
            raise ClosedResourceError from exc
11✔
401
        elif self._trio_socket.fileno() < 0 and self._closed:
11✔
402
            raise ClosedResourceError from None
11✔
403
        elif isinstance(exc, OSError):
11✔
404
            raise BrokenResourceError from exc
11✔
405
        else:
406
            raise exc
11✔
407

408

409
class SocketStream(_TrioSocketMixin, abc.SocketStream):
11✔
410
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
411
        super().__init__(trio_socket)
11✔
412
        self._receive_guard = ResourceGuard("reading from")
11✔
413
        self._send_guard = ResourceGuard("writing to")
11✔
414

415
    async def receive(self, max_bytes: int = 65536) -> bytes:
11✔
416
        with self._receive_guard:
11✔
417
            try:
11✔
418
                data = await self._trio_socket.recv(max_bytes)
11✔
419
            except BaseException as exc:
11✔
420
                self._convert_socket_error(exc)
11✔
421

422
            if data:
11✔
423
                return data
11✔
424
            else:
425
                raise EndOfStream
11✔
426

427
    async def send(self, item: bytes) -> None:
11✔
428
        with self._send_guard:
11✔
429
            view = memoryview(item)
11✔
430
            while view:
11✔
431
                try:
11✔
432
                    bytes_sent = await self._trio_socket.send(view)
11✔
433
                except BaseException as exc:
11✔
434
                    self._convert_socket_error(exc)
11✔
435

436
                view = view[bytes_sent:]
11✔
437

438
    async def send_eof(self) -> None:
11✔
439
        self._trio_socket.shutdown(socket.SHUT_WR)
11✔
440

441

442
class UNIXSocketStream(SocketStream, abc.UNIXSocketStream):
11✔
443
    async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
11✔
444
        if not isinstance(msglen, int) or msglen < 0:
8✔
445
            raise ValueError("msglen must be a non-negative integer")
8✔
446
        if not isinstance(maxfds, int) or maxfds < 1:
8✔
447
            raise ValueError("maxfds must be a positive integer")
8✔
448

449
        fds = array.array("i")
8✔
450
        await trio.lowlevel.checkpoint()
8✔
451
        with self._receive_guard:
8✔
452
            while True:
5✔
453
                try:
8✔
454
                    message, ancdata, flags, addr = await self._trio_socket.recvmsg(
8✔
455
                        msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
456
                    )
457
                except BaseException as exc:
×
458
                    self._convert_socket_error(exc)
×
459
                else:
460
                    if not message and not ancdata:
8✔
461
                        raise EndOfStream
×
462

463
                    break
5✔
464

465
        for cmsg_level, cmsg_type, cmsg_data in ancdata:
8✔
466
            if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
8✔
467
                raise RuntimeError(
×
468
                    f"Received unexpected ancillary data; message = {message!r}, "
469
                    f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
470
                )
471

472
            fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
8✔
473

474
        return message, list(fds)
8✔
475

476
    async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
11✔
477
        if not message:
8✔
478
            raise ValueError("message must not be empty")
8✔
479
        if not fds:
8✔
480
            raise ValueError("fds must not be empty")
8✔
481

482
        filenos: list[int] = []
8✔
483
        for fd in fds:
8✔
484
            if isinstance(fd, int):
8✔
485
                filenos.append(fd)
×
486
            elif isinstance(fd, IOBase):
8✔
487
                filenos.append(fd.fileno())
8✔
488

489
        fdarray = array.array("i", filenos)
8✔
490
        await trio.lowlevel.checkpoint()
8✔
491
        with self._send_guard:
8✔
492
            while True:
5✔
493
                try:
8✔
494
                    await self._trio_socket.sendmsg(
8✔
495
                        [message],
496
                        [
497
                            (
498
                                socket.SOL_SOCKET,
499
                                socket.SCM_RIGHTS,
500
                                fdarray,
501
                            )
502
                        ],
503
                    )
504
                    break
8✔
505
                except BaseException as exc:
×
506
                    self._convert_socket_error(exc)
×
507

508

509
class TCPSocketListener(_TrioSocketMixin, abc.SocketListener):
11✔
510
    def __init__(self, raw_socket: socket.socket):
11✔
511
        super().__init__(trio.socket.from_stdlib_socket(raw_socket))
11✔
512
        self._accept_guard = ResourceGuard("accepting connections from")
11✔
513

514
    async def accept(self) -> SocketStream:
11✔
515
        with self._accept_guard:
11✔
516
            try:
11✔
517
                trio_socket, _addr = await self._trio_socket.accept()
11✔
518
            except BaseException as exc:
11✔
519
                self._convert_socket_error(exc)
11✔
520

521
        trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
522
        return SocketStream(trio_socket)
11✔
523

524

525
class UNIXSocketListener(_TrioSocketMixin, abc.SocketListener):
11✔
526
    def __init__(self, raw_socket: socket.socket):
11✔
527
        super().__init__(trio.socket.from_stdlib_socket(raw_socket))
8✔
528
        self._accept_guard = ResourceGuard("accepting connections from")
8✔
529

530
    async def accept(self) -> UNIXSocketStream:
11✔
531
        with self._accept_guard:
8✔
532
            try:
8✔
533
                trio_socket, _addr = await self._trio_socket.accept()
8✔
534
            except BaseException as exc:
8✔
535
                self._convert_socket_error(exc)
8✔
536

537
        return UNIXSocketStream(trio_socket)
8✔
538

539

540
class UDPSocket(_TrioSocketMixin[IPSockAddrType], abc.UDPSocket):
11✔
541
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
542
        super().__init__(trio_socket)
10✔
543
        self._receive_guard = ResourceGuard("reading from")
10✔
544
        self._send_guard = ResourceGuard("writing to")
10✔
545

546
    async def receive(self) -> tuple[bytes, IPSockAddrType]:
11✔
547
        with self._receive_guard:
10✔
548
            try:
10✔
549
                data, addr = await self._trio_socket.recvfrom(65536)
10✔
550
                return data, convert_ipv6_sockaddr(addr)
10✔
551
            except BaseException as exc:
10✔
552
                self._convert_socket_error(exc)
10✔
553

554
    async def send(self, item: UDPPacketType) -> None:
11✔
555
        with self._send_guard:
10✔
556
            try:
10✔
557
                await self._trio_socket.sendto(*item)
10✔
558
            except BaseException as exc:
10✔
559
                self._convert_socket_error(exc)
10✔
560

561

562
class ConnectedUDPSocket(_TrioSocketMixin[IPSockAddrType], abc.ConnectedUDPSocket):
11✔
563
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
564
        super().__init__(trio_socket)
10✔
565
        self._receive_guard = ResourceGuard("reading from")
10✔
566
        self._send_guard = ResourceGuard("writing to")
10✔
567

568
    async def receive(self) -> bytes:
11✔
569
        with self._receive_guard:
10✔
570
            try:
10✔
571
                return await self._trio_socket.recv(65536)
10✔
572
            except BaseException as exc:
10✔
573
                self._convert_socket_error(exc)
10✔
574

575
    async def send(self, item: bytes) -> None:
11✔
576
        with self._send_guard:
10✔
577
            try:
10✔
578
                await self._trio_socket.send(item)
10✔
579
            except BaseException as exc:
10✔
580
                self._convert_socket_error(exc)
10✔
581

582

583
class UNIXDatagramSocket(_TrioSocketMixin[str], abc.UNIXDatagramSocket):
11✔
584
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
585
        super().__init__(trio_socket)
8✔
586
        self._receive_guard = ResourceGuard("reading from")
8✔
587
        self._send_guard = ResourceGuard("writing to")
8✔
588

589
    async def receive(self) -> UNIXDatagramPacketType:
11✔
590
        with self._receive_guard:
8✔
591
            try:
8✔
592
                data, addr = await self._trio_socket.recvfrom(65536)
8✔
593
                return data, addr
8✔
594
            except BaseException as exc:
8✔
595
                self._convert_socket_error(exc)
8✔
596

597
    async def send(self, item: UNIXDatagramPacketType) -> None:
11✔
598
        with self._send_guard:
8✔
599
            try:
8✔
600
                await self._trio_socket.sendto(*item)
8✔
601
            except BaseException as exc:
8✔
602
                self._convert_socket_error(exc)
8✔
603

604

605
class ConnectedUNIXDatagramSocket(
11✔
606
    _TrioSocketMixin[str], abc.ConnectedUNIXDatagramSocket
607
):
608
    def __init__(self, trio_socket: TrioSocketType) -> None:
11✔
609
        super().__init__(trio_socket)
8✔
610
        self._receive_guard = ResourceGuard("reading from")
8✔
611
        self._send_guard = ResourceGuard("writing to")
8✔
612

613
    async def receive(self) -> bytes:
11✔
614
        with self._receive_guard:
8✔
615
            try:
8✔
616
                return await self._trio_socket.recv(65536)
8✔
617
            except BaseException as exc:
8✔
618
                self._convert_socket_error(exc)
8✔
619

620
    async def send(self, item: bytes) -> None:
11✔
621
        with self._send_guard:
8✔
622
            try:
8✔
623
                await self._trio_socket.send(item)
8✔
624
            except BaseException as exc:
8✔
625
                self._convert_socket_error(exc)
8✔
626

627

628
#
629
# Synchronization
630
#
631

632

633
class Event(BaseEvent):
11✔
634
    def __new__(cls) -> Event:
11✔
635
        return object.__new__(cls)
11✔
636

637
    def __init__(self) -> None:
11✔
638
        self.__original = trio.Event()
11✔
639

640
    def is_set(self) -> bool:
11✔
641
        return self.__original.is_set()
10✔
642

643
    async def wait(self) -> None:
11✔
644
        return await self.__original.wait()
11✔
645

646
    def statistics(self) -> EventStatistics:
11✔
647
        orig_statistics = self.__original.statistics()
10✔
648
        return EventStatistics(tasks_waiting=orig_statistics.tasks_waiting)
10✔
649

650
    def set(self) -> None:
11✔
651
        self.__original.set()
11✔
652

653

654
class Lock(BaseLock):
11✔
655
    def __new__(cls, *, fast_acquire: bool = False) -> Lock:
11✔
656
        return object.__new__(cls)
10✔
657

658
    def __init__(self, *, fast_acquire: bool = False) -> None:
11✔
659
        self._fast_acquire = fast_acquire
10✔
660
        self.__original = trio.Lock()
10✔
661

662
    async def acquire(self) -> None:
11✔
663
        if not self._fast_acquire:
10✔
664
            await self.__original.acquire()
10✔
665
            return
10✔
666

667
        # This is the "fast path" where we don't let other tasks run
668
        await trio.lowlevel.checkpoint_if_cancelled()
10✔
669
        try:
10✔
670
            self.__original.acquire_nowait()
10✔
NEW
671
        except trio.WouldBlock:
×
NEW
672
            await self.__original._lot.park()
×
673

674
    def acquire_nowait(self) -> None:
11✔
675
        try:
10✔
676
            self.__original.acquire_nowait()
10✔
677
        except trio.WouldBlock:
10✔
678
            raise WouldBlock from None
10✔
679

680
    def locked(self) -> bool:
11✔
681
        return self.__original.locked()
10✔
682

683
    def release(self) -> None:
11✔
684
        self.__original.release()
10✔
685

686
    def statistics(self) -> LockStatistics:
11✔
687
        orig_statistics = self.__original.statistics()
10✔
688
        owner = TrioTaskInfo(orig_statistics.owner) if orig_statistics.owner else None
10✔
689
        return LockStatistics(
10✔
690
            orig_statistics.locked, owner, orig_statistics.tasks_waiting
691
        )
692

693

694
class Semaphore(BaseSemaphore):
11✔
695
    def __new__(
11✔
696
        cls,
697
        initial_value: int,
698
        *,
699
        max_value: int | None = None,
700
        fast_acquire: bool = False,
701
    ) -> Semaphore:
702
        return object.__new__(cls)
10✔
703

704
    def __init__(
11✔
705
        self,
706
        initial_value: int,
707
        *,
708
        max_value: int | None = None,
709
        fast_acquire: bool = False,
710
    ) -> None:
711
        super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
10✔
712
        self.__original = trio.Semaphore(initial_value, max_value=max_value)
10✔
713

714
    async def acquire(self) -> None:
11✔
715
        if not self._fast_acquire:
10✔
716
            await self.__original.acquire()
10✔
717
            return
10✔
718

719
        # This is the "fast path" where we don't let other tasks run
720
        await trio.lowlevel.checkpoint_if_cancelled()
10✔
721
        try:
10✔
722
            self.__original.acquire_nowait()
10✔
NEW
723
        except trio.WouldBlock:
×
NEW
724
            await self.__original._lot.park()
×
725

726
    def acquire_nowait(self) -> None:
11✔
727
        try:
10✔
728
            self.__original.acquire_nowait()
10✔
729
        except trio.WouldBlock:
10✔
730
            raise WouldBlock from None
10✔
731

732
    @property
11✔
733
    def max_value(self) -> int | None:
11✔
734
        return self.__original.max_value
10✔
735

736
    @property
11✔
737
    def value(self) -> int:
11✔
738
        return self.__original.value
10✔
739

740
    def release(self) -> None:
11✔
741
        self.__original.release()
10✔
742

743
    def statistics(self) -> SemaphoreStatistics:
11✔
744
        orig_statistics = self.__original.statistics()
10✔
745
        return SemaphoreStatistics(orig_statistics.tasks_waiting)
10✔
746

747

748
class CapacityLimiter(BaseCapacityLimiter):
11✔
749
    def __new__(
11✔
750
        cls,
751
        total_tokens: float | None = None,
752
        *,
753
        original: trio.CapacityLimiter | None = None,
754
    ) -> CapacityLimiter:
755
        return object.__new__(cls)
10✔
756

757
    def __init__(
11✔
758
        self,
759
        total_tokens: float | None = None,
760
        *,
761
        original: trio.CapacityLimiter | None = None,
762
    ) -> None:
763
        if original is not None:
10✔
764
            self.__original = original
10✔
765
        else:
766
            assert total_tokens is not None
10✔
767
            self.__original = trio.CapacityLimiter(total_tokens)
10✔
768

769
    async def __aenter__(self) -> None:
11✔
770
        return await self.__original.__aenter__()
10✔
771

772
    async def __aexit__(
11✔
773
        self,
774
        exc_type: type[BaseException] | None,
775
        exc_val: BaseException | None,
776
        exc_tb: TracebackType | None,
777
    ) -> None:
778
        await self.__original.__aexit__(exc_type, exc_val, exc_tb)
10✔
779

780
    @property
11✔
781
    def total_tokens(self) -> float:
11✔
782
        return self.__original.total_tokens
10✔
783

784
    @total_tokens.setter
11✔
785
    def total_tokens(self, value: float) -> None:
11✔
786
        self.__original.total_tokens = value
10✔
787

788
    @property
11✔
789
    def borrowed_tokens(self) -> int:
11✔
790
        return self.__original.borrowed_tokens
10✔
791

792
    @property
11✔
793
    def available_tokens(self) -> float:
11✔
794
        return self.__original.available_tokens
10✔
795

796
    def acquire_nowait(self) -> None:
11✔
797
        self.__original.acquire_nowait()
×
798

799
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
800
        self.__original.acquire_on_behalf_of_nowait(borrower)
×
801

802
    async def acquire(self) -> None:
11✔
803
        await self.__original.acquire()
10✔
804

805
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
806
        await self.__original.acquire_on_behalf_of(borrower)
10✔
807

808
    def release(self) -> None:
11✔
809
        return self.__original.release()
10✔
810

811
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
812
        return self.__original.release_on_behalf_of(borrower)
10✔
813

814
    def statistics(self) -> CapacityLimiterStatistics:
11✔
815
        orig = self.__original.statistics()
10✔
816
        return CapacityLimiterStatistics(
10✔
817
            borrowed_tokens=orig.borrowed_tokens,
818
            total_tokens=orig.total_tokens,
819
            borrowers=tuple(orig.borrowers),
820
            tasks_waiting=orig.tasks_waiting,
821
        )
822

823

824
_capacity_limiter_wrapper: trio.lowlevel.RunVar = RunVar("_capacity_limiter_wrapper")
11✔
825

826

827
#
828
# Signal handling
829
#
830

831

832
class _SignalReceiver:
11✔
833
    _iterator: AsyncIterator[int]
11✔
834

835
    def __init__(self, signals: tuple[Signals, ...]):
11✔
836
        self._signals = signals
9✔
837

838
    def __enter__(self) -> _SignalReceiver:
11✔
839
        self._cm = trio.open_signal_receiver(*self._signals)
9✔
840
        self._iterator = self._cm.__enter__()
9✔
841
        return self
9✔
842

843
    def __exit__(
11✔
844
        self,
845
        exc_type: type[BaseException] | None,
846
        exc_val: BaseException | None,
847
        exc_tb: TracebackType | None,
848
    ) -> bool | None:
849
        return self._cm.__exit__(exc_type, exc_val, exc_tb)
9✔
850

851
    def __aiter__(self) -> _SignalReceiver:
11✔
852
        return self
9✔
853

854
    async def __anext__(self) -> Signals:
11✔
855
        signum = await self._iterator.__anext__()
9✔
856
        return Signals(signum)
9✔
857

858

859
#
860
# Testing and debugging
861
#
862

863

864
class TestRunner(abc.TestRunner):
11✔
865
    def __init__(self, **options: Any) -> None:
11✔
866
        from queue import Queue
11✔
867

868
        self._call_queue: Queue[Callable[[], object]] = Queue()
11✔
869
        self._send_stream: MemoryObjectSendStream | None = None
11✔
870
        self._options = options
11✔
871

872
    def __exit__(
11✔
873
        self,
874
        exc_type: type[BaseException] | None,
875
        exc_val: BaseException | None,
876
        exc_tb: types.TracebackType | None,
877
    ) -> None:
878
        if self._send_stream:
11✔
879
            self._send_stream.close()
11✔
880
            while self._send_stream is not None:
11✔
881
                self._call_queue.get()()
11✔
882

883
    async def _run_tests_and_fixtures(self) -> None:
11✔
884
        self._send_stream, receive_stream = create_memory_object_stream(1)
11✔
885
        with receive_stream:
11✔
886
            async for coro, outcome_holder in receive_stream:
11✔
887
                try:
11✔
888
                    retval = await coro
11✔
889
                except BaseException as exc:
11✔
890
                    outcome_holder.append(Error(exc))
11✔
891
                else:
892
                    outcome_holder.append(Value(retval))
11✔
893

894
    def _main_task_finished(self, outcome: object) -> None:
11✔
895
        self._send_stream = None
11✔
896

897
    def _call_in_runner_task(
11✔
898
        self,
899
        func: Callable[P, Awaitable[T_Retval]],
900
        *args: P.args,
901
        **kwargs: P.kwargs,
902
    ) -> T_Retval:
903
        if self._send_stream is None:
11✔
904
            trio.lowlevel.start_guest_run(
11✔
905
                self._run_tests_and_fixtures,
906
                run_sync_soon_threadsafe=self._call_queue.put,
907
                done_callback=self._main_task_finished,
908
                **self._options,
909
            )
910
            while self._send_stream is None:
11✔
911
                self._call_queue.get()()
11✔
912

913
        outcome_holder: list[Outcome] = []
11✔
914
        self._send_stream.send_nowait((func(*args, **kwargs), outcome_holder))
11✔
915
        while not outcome_holder:
11✔
916
            self._call_queue.get()()
11✔
917

918
        return outcome_holder[0].unwrap()
11✔
919

920
    def run_asyncgen_fixture(
11✔
921
        self,
922
        fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
923
        kwargs: dict[str, Any],
924
    ) -> Iterable[T_Retval]:
925
        asyncgen = fixture_func(**kwargs)
11✔
926
        fixturevalue: T_Retval = self._call_in_runner_task(asyncgen.asend, None)
11✔
927

928
        yield fixturevalue
11✔
929

930
        try:
11✔
931
            self._call_in_runner_task(asyncgen.asend, None)
11✔
932
        except StopAsyncIteration:
11✔
933
            pass
11✔
934
        else:
935
            self._call_in_runner_task(asyncgen.aclose)
×
936
            raise RuntimeError("Async generator fixture did not stop")
×
937

938
    def run_fixture(
11✔
939
        self,
940
        fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
941
        kwargs: dict[str, Any],
942
    ) -> T_Retval:
943
        return self._call_in_runner_task(fixture_func, **kwargs)
11✔
944

945
    def run_test(
11✔
946
        self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
947
    ) -> None:
948
        self._call_in_runner_task(test_func, **kwargs)
11✔
949

950

951
class TrioTaskInfo(TaskInfo):
11✔
952
    def __init__(self, task: trio.lowlevel.Task):
11✔
953
        parent_id = None
11✔
954
        if task.parent_nursery and task.parent_nursery.parent_task:
11✔
955
            parent_id = id(task.parent_nursery.parent_task)
11✔
956

957
        super().__init__(id(task), parent_id, task.name, task.coro)
11✔
958
        self._task = weakref.proxy(task)
11✔
959

960
    def has_pending_cancellation(self) -> bool:
11✔
961
        try:
11✔
962
            return self._task._cancel_status.effectively_cancelled
11✔
963
        except ReferenceError:
×
964
            # If the task is no longer around, it surely doesn't have a cancellation
965
            # pending
966
            return False
×
967

968

969
class TrioBackend(AsyncBackend):
11✔
970
    @classmethod
11✔
971
    def run(
11✔
972
        cls,
973
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
974
        args: tuple[Unpack[PosArgsT]],
975
        kwargs: dict[str, Any],
976
        options: dict[str, Any],
977
    ) -> T_Retval:
978
        return trio.run(func, *args)
11✔
979

980
    @classmethod
11✔
981
    def current_token(cls) -> object:
11✔
982
        return trio.lowlevel.current_trio_token()
11✔
983

984
    @classmethod
11✔
985
    def current_time(cls) -> float:
11✔
986
        return trio.current_time()
11✔
987

988
    @classmethod
11✔
989
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
990
        return trio.Cancelled
11✔
991

992
    @classmethod
11✔
993
    async def checkpoint(cls) -> None:
11✔
994
        await trio.lowlevel.checkpoint()
11✔
995

996
    @classmethod
11✔
997
    async def checkpoint_if_cancelled(cls) -> None:
11✔
998
        await trio.lowlevel.checkpoint_if_cancelled()
11✔
999

1000
    @classmethod
11✔
1001
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
1002
        await trio.lowlevel.cancel_shielded_checkpoint()
11✔
1003

1004
    @classmethod
11✔
1005
    async def sleep(cls, delay: float) -> None:
11✔
1006
        await trio.sleep(delay)
11✔
1007

1008
    @classmethod
11✔
1009
    def create_cancel_scope(
11✔
1010
        cls, *, deadline: float = math.inf, shield: bool = False
1011
    ) -> abc.CancelScope:
1012
        return CancelScope(deadline=deadline, shield=shield)
11✔
1013

1014
    @classmethod
11✔
1015
    def current_effective_deadline(cls) -> float:
11✔
1016
        return trio.current_effective_deadline()
10✔
1017

1018
    @classmethod
11✔
1019
    def create_task_group(cls) -> abc.TaskGroup:
11✔
1020
        return TaskGroup()
11✔
1021

1022
    @classmethod
11✔
1023
    def create_event(cls) -> abc.Event:
11✔
1024
        return Event()
11✔
1025

1026
    @classmethod
11✔
1027
    def create_lock(cls, *, fast_acquire: bool) -> Lock:
11✔
1028
        return Lock(fast_acquire=fast_acquire)
10✔
1029

1030
    @classmethod
11✔
1031
    def create_semaphore(
11✔
1032
        cls,
1033
        initial_value: int,
1034
        *,
1035
        max_value: int | None = None,
1036
        fast_acquire: bool = False,
1037
    ) -> abc.Semaphore:
1038
        return Semaphore(initial_value, max_value=max_value, fast_acquire=fast_acquire)
10✔
1039

1040
    @classmethod
11✔
1041
    def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
11✔
1042
        return CapacityLimiter(total_tokens)
10✔
1043

1044
    @classmethod
11✔
1045
    async def run_sync_in_worker_thread(
11✔
1046
        cls,
1047
        func: Callable[[Unpack[PosArgsT]], T_Retval],
1048
        args: tuple[Unpack[PosArgsT]],
1049
        abandon_on_cancel: bool = False,
1050
        limiter: abc.CapacityLimiter | None = None,
1051
    ) -> T_Retval:
1052
        def wrapper() -> T_Retval:
11✔
1053
            with claim_worker_thread(TrioBackend, token):
11✔
1054
                return func(*args)
11✔
1055

1056
        token = TrioBackend.current_token()
11✔
1057
        return await run_sync(
11✔
1058
            wrapper,
1059
            abandon_on_cancel=abandon_on_cancel,
1060
            limiter=cast(trio.CapacityLimiter, limiter),
1061
        )
1062

1063
    @classmethod
11✔
1064
    def check_cancelled(cls) -> None:
11✔
1065
        trio.from_thread.check_cancelled()
11✔
1066

1067
    @classmethod
11✔
1068
    def run_async_from_thread(
11✔
1069
        cls,
1070
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
1071
        args: tuple[Unpack[PosArgsT]],
1072
        token: object,
1073
    ) -> T_Retval:
1074
        return trio.from_thread.run(func, *args)
11✔
1075

1076
    @classmethod
11✔
1077
    def run_sync_from_thread(
11✔
1078
        cls,
1079
        func: Callable[[Unpack[PosArgsT]], T_Retval],
1080
        args: tuple[Unpack[PosArgsT]],
1081
        token: object,
1082
    ) -> T_Retval:
1083
        return trio.from_thread.run_sync(func, *args)
11✔
1084

1085
    @classmethod
11✔
1086
    def create_blocking_portal(cls) -> abc.BlockingPortal:
11✔
1087
        return BlockingPortal()
11✔
1088

1089
    @classmethod
11✔
1090
    async def open_process(
11✔
1091
        cls,
1092
        command: StrOrBytesPath | Sequence[StrOrBytesPath],
1093
        *,
1094
        stdin: int | IO[Any] | None,
1095
        stdout: int | IO[Any] | None,
1096
        stderr: int | IO[Any] | None,
1097
        **kwargs: Any,
1098
    ) -> Process:
1099
        def convert_item(item: StrOrBytesPath) -> str:
10✔
1100
            str_or_bytes = os.fspath(item)
10✔
1101
            if isinstance(str_or_bytes, str):
10✔
1102
                return str_or_bytes
10✔
1103
            else:
1104
                return os.fsdecode(str_or_bytes)
×
1105

1106
        if isinstance(command, (str, bytes, PathLike)):
10✔
1107
            process = await trio.lowlevel.open_process(
10✔
1108
                convert_item(command),
1109
                stdin=stdin,
1110
                stdout=stdout,
1111
                stderr=stderr,
1112
                shell=True,
1113
                **kwargs,
1114
            )
1115
        else:
1116
            process = await trio.lowlevel.open_process(
10✔
1117
                [convert_item(item) for item in command],
1118
                stdin=stdin,
1119
                stdout=stdout,
1120
                stderr=stderr,
1121
                shell=False,
1122
                **kwargs,
1123
            )
1124

1125
        stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
10✔
1126
        stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
10✔
1127
        stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
10✔
1128
        return Process(process, stdin_stream, stdout_stream, stderr_stream)
10✔
1129

1130
    @classmethod
11✔
1131
    def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
11✔
1132
        trio.lowlevel.spawn_system_task(_shutdown_process_pool, workers)
10✔
1133

1134
    @classmethod
11✔
1135
    async def connect_tcp(
11✔
1136
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
1137
    ) -> SocketStream:
1138
        family = socket.AF_INET6 if ":" in host else socket.AF_INET
11✔
1139
        trio_socket = trio.socket.socket(family)
11✔
1140
        trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
11✔
1141
        if local_address:
11✔
1142
            await trio_socket.bind(local_address)
×
1143

1144
        try:
11✔
1145
            await trio_socket.connect((host, port))
11✔
1146
        except BaseException:
11✔
1147
            trio_socket.close()
11✔
1148
            raise
11✔
1149

1150
        return SocketStream(trio_socket)
11✔
1151

1152
    @classmethod
11✔
1153
    async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
11✔
1154
        trio_socket = trio.socket.socket(socket.AF_UNIX)
8✔
1155
        try:
8✔
1156
            await trio_socket.connect(path)
8✔
1157
        except BaseException:
8✔
1158
            trio_socket.close()
8✔
1159
            raise
8✔
1160

1161
        return UNIXSocketStream(trio_socket)
8✔
1162

1163
    @classmethod
11✔
1164
    def create_tcp_listener(cls, sock: socket.socket) -> abc.SocketListener:
11✔
1165
        return TCPSocketListener(sock)
11✔
1166

1167
    @classmethod
11✔
1168
    def create_unix_listener(cls, sock: socket.socket) -> abc.SocketListener:
11✔
1169
        return UNIXSocketListener(sock)
8✔
1170

1171
    @classmethod
11✔
1172
    async def create_udp_socket(
11✔
1173
        cls,
1174
        family: socket.AddressFamily,
1175
        local_address: IPSockAddrType | None,
1176
        remote_address: IPSockAddrType | None,
1177
        reuse_port: bool,
1178
    ) -> UDPSocket | ConnectedUDPSocket:
1179
        trio_socket = trio.socket.socket(family=family, type=socket.SOCK_DGRAM)
10✔
1180

1181
        if reuse_port:
10✔
1182
            trio_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
8✔
1183

1184
        if local_address:
10✔
1185
            await trio_socket.bind(local_address)
10✔
1186

1187
        if remote_address:
10✔
1188
            await trio_socket.connect(remote_address)
10✔
1189
            return ConnectedUDPSocket(trio_socket)
10✔
1190
        else:
1191
            return UDPSocket(trio_socket)
10✔
1192

1193
    @classmethod
11✔
1194
    @overload
11✔
1195
    async def create_unix_datagram_socket(
11✔
1196
        cls, raw_socket: socket.socket, remote_path: None
1197
    ) -> abc.UNIXDatagramSocket: ...
1198

1199
    @classmethod
11✔
1200
    @overload
11✔
1201
    async def create_unix_datagram_socket(
11✔
1202
        cls, raw_socket: socket.socket, remote_path: str | bytes
1203
    ) -> abc.ConnectedUNIXDatagramSocket: ...
1204

1205
    @classmethod
11✔
1206
    async def create_unix_datagram_socket(
11✔
1207
        cls, raw_socket: socket.socket, remote_path: str | bytes | None
1208
    ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
1209
        trio_socket = trio.socket.from_stdlib_socket(raw_socket)
8✔
1210

1211
        if remote_path:
8✔
1212
            await trio_socket.connect(remote_path)
8✔
1213
            return ConnectedUNIXDatagramSocket(trio_socket)
8✔
1214
        else:
1215
            return UNIXDatagramSocket(trio_socket)
8✔
1216

1217
    @classmethod
11✔
1218
    async def getaddrinfo(
11✔
1219
        cls,
1220
        host: bytes | str | None,
1221
        port: str | int | None,
1222
        *,
1223
        family: int | AddressFamily = 0,
1224
        type: int | SocketKind = 0,
1225
        proto: int = 0,
1226
        flags: int = 0,
1227
    ) -> list[
1228
        tuple[
1229
            AddressFamily,
1230
            SocketKind,
1231
            int,
1232
            str,
1233
            tuple[str, int] | tuple[str, int, int, int],
1234
        ]
1235
    ]:
1236
        return await trio.socket.getaddrinfo(host, port, family, type, proto, flags)
11✔
1237

1238
    @classmethod
11✔
1239
    async def getnameinfo(
11✔
1240
        cls, sockaddr: IPSockAddrType, flags: int = 0
1241
    ) -> tuple[str, str]:
1242
        return await trio.socket.getnameinfo(sockaddr, flags)
10✔
1243

1244
    @classmethod
11✔
1245
    async def wait_socket_readable(cls, sock: socket.socket) -> None:
11✔
1246
        try:
×
1247
            await wait_readable(sock)
×
1248
        except trio.ClosedResourceError as exc:
×
1249
            raise ClosedResourceError().with_traceback(exc.__traceback__) from None
×
1250
        except trio.BusyResourceError:
×
1251
            raise BusyResourceError("reading from") from None
×
1252

1253
    @classmethod
11✔
1254
    async def wait_socket_writable(cls, sock: socket.socket) -> None:
11✔
1255
        try:
×
1256
            await wait_writable(sock)
×
1257
        except trio.ClosedResourceError as exc:
×
1258
            raise ClosedResourceError().with_traceback(exc.__traceback__) from None
×
1259
        except trio.BusyResourceError:
×
1260
            raise BusyResourceError("writing to") from None
×
1261

1262
    @classmethod
11✔
1263
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
1264
        try:
10✔
1265
            return _capacity_limiter_wrapper.get()
10✔
1266
        except LookupError:
10✔
1267
            limiter = CapacityLimiter(
10✔
1268
                original=trio.to_thread.current_default_thread_limiter()
1269
            )
1270
            _capacity_limiter_wrapper.set(limiter)
10✔
1271
            return limiter
10✔
1272

1273
    @classmethod
11✔
1274
    def open_signal_receiver(
11✔
1275
        cls, *signals: Signals
1276
    ) -> ContextManager[AsyncIterator[Signals]]:
1277
        return _SignalReceiver(signals)
9✔
1278

1279
    @classmethod
11✔
1280
    def get_current_task(cls) -> TaskInfo:
11✔
1281
        task = current_task()
11✔
1282
        return TrioTaskInfo(task)
11✔
1283

1284
    @classmethod
11✔
1285
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
1286
        root_task = current_root_task()
11✔
1287
        assert root_task
11✔
1288
        task_infos = [TrioTaskInfo(root_task)]
11✔
1289
        nurseries = root_task.child_nurseries
11✔
1290
        while nurseries:
11✔
1291
            new_nurseries: list[trio.Nursery] = []
11✔
1292
            for nursery in nurseries:
11✔
1293
                for task in nursery.child_tasks:
11✔
1294
                    task_infos.append(TrioTaskInfo(task))
11✔
1295
                    new_nurseries.extend(task.child_nurseries)
11✔
1296

1297
            nurseries = new_nurseries
11✔
1298

1299
        return task_infos
11✔
1300

1301
    @classmethod
11✔
1302
    async def wait_all_tasks_blocked(cls) -> None:
11✔
1303
        from trio.testing import wait_all_tasks_blocked
11✔
1304

1305
        await wait_all_tasks_blocked()
11✔
1306

1307
    @classmethod
11✔
1308
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
1309
        return TestRunner(**options)
11✔
1310

1311

1312
backend_class = TrioBackend
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc