• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653597381

01 Sep 2024 11:25AM UTC coverage: 91.671% (-0.1%) from 91.788%
10653597381

Pull #761

github

web-flow
Merge 6d0f355bd into 8a5b34626
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

229 of 250 new or added lines in 4 files covered. (91.6%)

2 existing lines in 2 files now uncovered.

4744 of 5175 relevant lines covered (91.67%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.09
/src/anyio/abc/_eventloop.py
1
from __future__ import annotations
11✔
2

3
import math
11✔
4
import sys
11✔
5
from abc import ABCMeta, abstractmethod
11✔
6
from collections.abc import AsyncIterator, Awaitable, Mapping
11✔
7
from os import PathLike
11✔
8
from signal import Signals
11✔
9
from socket import AddressFamily, SocketKind, socket
11✔
10
from typing import (
11✔
11
    IO,
12
    TYPE_CHECKING,
13
    Any,
14
    Callable,
15
    ContextManager,
16
    Sequence,
17
    TypeVar,
18
    overload,
19
)
20

21
if sys.version_info >= (3, 11):
11✔
22
    from typing import TypeVarTuple, Unpack
5✔
23
else:
24
    from typing_extensions import TypeVarTuple, Unpack
6✔
25

26
if TYPE_CHECKING:
11✔
27
    from typing import Literal
×
28

NEW
29
    from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
×
30
    from .._core._tasks import CancelScope
×
31
    from .._core._testing import TaskInfo
×
32
    from ..from_thread import BlockingPortal
×
33
    from ._sockets import (
×
34
        ConnectedUDPSocket,
35
        ConnectedUNIXDatagramSocket,
36
        IPSockAddrType,
37
        SocketListener,
38
        SocketStream,
39
        UDPSocket,
40
        UNIXDatagramSocket,
41
        UNIXSocketStream,
42
    )
43
    from ._subprocesses import Process
×
44
    from ._tasks import TaskGroup
×
45
    from ._testing import TestRunner
×
46

47
T_Retval = TypeVar("T_Retval")
11✔
48
PosArgsT = TypeVarTuple("PosArgsT")
11✔
49

50

51
class AsyncBackend(metaclass=ABCMeta):
11✔
52
    @classmethod
11✔
53
    @abstractmethod
11✔
54
    def run(
11✔
55
        cls,
56
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
57
        args: tuple[Unpack[PosArgsT]],
58
        kwargs: dict[str, Any],
59
        options: dict[str, Any],
60
    ) -> T_Retval:
61
        """
62
        Run the given coroutine function in an asynchronous event loop.
63

64
        The current thread must not be already running an event loop.
65

66
        :param func: a coroutine function
67
        :param args: positional arguments to ``func``
68
        :param kwargs: positional arguments to ``func``
69
        :param options: keyword arguments to call the backend ``run()`` implementation
70
            with
71
        :return: the return value of the coroutine function
72
        """
73

74
    @classmethod
11✔
75
    @abstractmethod
11✔
76
    def current_token(cls) -> object:
11✔
77
        """
78

79
        :return:
80
        """
81

82
    @classmethod
11✔
83
    @abstractmethod
11✔
84
    def current_time(cls) -> float:
11✔
85
        """
86
        Return the current value of the event loop's internal clock.
87

88
        :return: the clock value (seconds)
89
        """
90

91
    @classmethod
11✔
92
    @abstractmethod
11✔
93
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
94
        """Return the exception class that is raised in a task if it's cancelled."""
95

96
    @classmethod
11✔
97
    @abstractmethod
11✔
98
    async def checkpoint(cls) -> None:
11✔
99
        """
100
        Check if the task has been cancelled, and allow rescheduling of other tasks.
101

102
        This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
103
        :meth:`cancel_shielded_checkpoint`.
104
        """
105

106
    @classmethod
11✔
107
    async def checkpoint_if_cancelled(cls) -> None:
11✔
108
        """
109
        Check if the current task group has been cancelled.
110

111
        This will check if the task has been cancelled, but will not allow other tasks
112
        to be scheduled if not.
113

114
        """
115
        if cls.current_effective_deadline() == -math.inf:
×
116
            await cls.checkpoint()
×
117

118
    @classmethod
11✔
119
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
120
        """
121
        Allow the rescheduling of other tasks.
122

123
        This will give other tasks the opportunity to run, but without checking if the
124
        current task group has been cancelled, unlike with :meth:`checkpoint`.
125

126
        """
127
        with cls.create_cancel_scope(shield=True):
×
128
            await cls.sleep(0)
×
129

130
    @classmethod
11✔
131
    @abstractmethod
11✔
132
    async def sleep(cls, delay: float) -> None:
11✔
133
        """
134
        Pause the current task for the specified duration.
135

136
        :param delay: the duration, in seconds
137
        """
138

139
    @classmethod
11✔
140
    @abstractmethod
11✔
141
    def create_cancel_scope(
11✔
142
        cls, *, deadline: float = math.inf, shield: bool = False
143
    ) -> CancelScope:
144
        pass
×
145

146
    @classmethod
11✔
147
    @abstractmethod
11✔
148
    def current_effective_deadline(cls) -> float:
11✔
149
        """
150
        Return the nearest deadline among all the cancel scopes effective for the
151
        current task.
152

153
        :return:
154
            - a clock value from the event loop's internal clock
155
            - ``inf`` if there is no deadline in effect
156
            - ``-inf`` if the current scope has been cancelled
157
        :rtype: float
158
        """
159

160
    @classmethod
11✔
161
    @abstractmethod
11✔
162
    def create_task_group(cls) -> TaskGroup:
11✔
163
        pass
×
164

165
    @classmethod
11✔
166
    @abstractmethod
11✔
167
    def create_event(cls) -> Event:
11✔
168
        pass
×
169

170
    @classmethod
11✔
171
    @abstractmethod
11✔
172
    def create_lock(cls) -> Lock:
11✔
NEW
173
        pass
×
174

175
    @classmethod
11✔
176
    @abstractmethod
11✔
177
    def create_semaphore(
11✔
178
        cls, initial_value: int, *, max_value: int | None = None
179
    ) -> Semaphore:
NEW
180
        pass
×
181

182
    @classmethod
11✔
183
    @abstractmethod
11✔
184
    def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
11✔
185
        pass
×
186

187
    @classmethod
11✔
188
    @abstractmethod
11✔
189
    async def run_sync_in_worker_thread(
11✔
190
        cls,
191
        func: Callable[[Unpack[PosArgsT]], T_Retval],
192
        args: tuple[Unpack[PosArgsT]],
193
        abandon_on_cancel: bool = False,
194
        limiter: CapacityLimiter | None = None,
195
    ) -> T_Retval:
196
        pass
×
197

198
    @classmethod
11✔
199
    @abstractmethod
11✔
200
    def check_cancelled(cls) -> None:
11✔
201
        pass
×
202

203
    @classmethod
11✔
204
    @abstractmethod
11✔
205
    def run_async_from_thread(
11✔
206
        cls,
207
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
208
        args: tuple[Unpack[PosArgsT]],
209
        token: object,
210
    ) -> T_Retval:
211
        pass
×
212

213
    @classmethod
11✔
214
    @abstractmethod
11✔
215
    def run_sync_from_thread(
11✔
216
        cls,
217
        func: Callable[[Unpack[PosArgsT]], T_Retval],
218
        args: tuple[Unpack[PosArgsT]],
219
        token: object,
220
    ) -> T_Retval:
221
        pass
×
222

223
    @classmethod
11✔
224
    @abstractmethod
11✔
225
    def create_blocking_portal(cls) -> BlockingPortal:
11✔
226
        pass
×
227

228
    @classmethod
11✔
229
    @overload
11✔
230
    async def open_process(
11✔
231
        cls,
232
        command: str | bytes,
233
        *,
234
        shell: Literal[True],
235
        stdin: int | IO[Any] | None,
236
        stdout: int | IO[Any] | None,
237
        stderr: int | IO[Any] | None,
238
        cwd: str | bytes | PathLike[str] | None = None,
239
        env: Mapping[str, str] | None = None,
240
        start_new_session: bool = False,
241
    ) -> Process:
242
        pass
×
243

244
    @classmethod
11✔
245
    @overload
11✔
246
    async def open_process(
11✔
247
        cls,
248
        command: Sequence[str | bytes],
249
        *,
250
        shell: Literal[False],
251
        stdin: int | IO[Any] | None,
252
        stdout: int | IO[Any] | None,
253
        stderr: int | IO[Any] | None,
254
        cwd: str | bytes | PathLike[str] | None = None,
255
        env: Mapping[str, str] | None = None,
256
        start_new_session: bool = False,
257
    ) -> Process:
258
        pass
×
259

260
    @classmethod
11✔
261
    @abstractmethod
11✔
262
    async def open_process(
11✔
263
        cls,
264
        command: str | bytes | Sequence[str | bytes],
265
        *,
266
        shell: bool,
267
        stdin: int | IO[Any] | None,
268
        stdout: int | IO[Any] | None,
269
        stderr: int | IO[Any] | None,
270
        cwd: str | bytes | PathLike[str] | None = None,
271
        env: Mapping[str, str] | None = None,
272
        start_new_session: bool = False,
273
    ) -> Process:
274
        pass
×
275

276
    @classmethod
11✔
277
    @abstractmethod
11✔
278
    def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
11✔
279
        pass
×
280

281
    @classmethod
11✔
282
    @abstractmethod
11✔
283
    async def connect_tcp(
11✔
284
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
285
    ) -> SocketStream:
286
        pass
×
287

288
    @classmethod
11✔
289
    @abstractmethod
11✔
290
    async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
11✔
291
        pass
×
292

293
    @classmethod
11✔
294
    @abstractmethod
11✔
295
    def create_tcp_listener(cls, sock: socket) -> SocketListener:
11✔
296
        pass
×
297

298
    @classmethod
11✔
299
    @abstractmethod
11✔
300
    def create_unix_listener(cls, sock: socket) -> SocketListener:
11✔
301
        pass
×
302

303
    @classmethod
11✔
304
    @abstractmethod
11✔
305
    async def create_udp_socket(
11✔
306
        cls,
307
        family: AddressFamily,
308
        local_address: IPSockAddrType | None,
309
        remote_address: IPSockAddrType | None,
310
        reuse_port: bool,
311
    ) -> UDPSocket | ConnectedUDPSocket:
312
        pass
×
313

314
    @classmethod
11✔
315
    @overload
11✔
316
    async def create_unix_datagram_socket(
11✔
317
        cls, raw_socket: socket, remote_path: None
318
    ) -> UNIXDatagramSocket: ...
319

320
    @classmethod
11✔
321
    @overload
11✔
322
    async def create_unix_datagram_socket(
11✔
323
        cls, raw_socket: socket, remote_path: str | bytes
324
    ) -> ConnectedUNIXDatagramSocket: ...
325

326
    @classmethod
11✔
327
    @abstractmethod
11✔
328
    async def create_unix_datagram_socket(
11✔
329
        cls, raw_socket: socket, remote_path: str | bytes | None
330
    ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
331
        pass
×
332

333
    @classmethod
11✔
334
    @abstractmethod
11✔
335
    async def getaddrinfo(
11✔
336
        cls,
337
        host: bytes | str | None,
338
        port: str | int | None,
339
        *,
340
        family: int | AddressFamily = 0,
341
        type: int | SocketKind = 0,
342
        proto: int = 0,
343
        flags: int = 0,
344
    ) -> list[
345
        tuple[
346
            AddressFamily,
347
            SocketKind,
348
            int,
349
            str,
350
            tuple[str, int] | tuple[str, int, int, int],
351
        ]
352
    ]:
353
        pass
×
354

355
    @classmethod
11✔
356
    @abstractmethod
11✔
357
    async def getnameinfo(
11✔
358
        cls, sockaddr: IPSockAddrType, flags: int = 0
359
    ) -> tuple[str, str]:
360
        pass
×
361

362
    @classmethod
11✔
363
    @abstractmethod
11✔
364
    async def wait_socket_readable(cls, sock: socket) -> None:
11✔
365
        pass
×
366

367
    @classmethod
11✔
368
    @abstractmethod
11✔
369
    async def wait_socket_writable(cls, sock: socket) -> None:
11✔
370
        pass
×
371

372
    @classmethod
11✔
373
    @abstractmethod
11✔
374
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
375
        pass
×
376

377
    @classmethod
11✔
378
    @abstractmethod
11✔
379
    def open_signal_receiver(
11✔
380
        cls, *signals: Signals
381
    ) -> ContextManager[AsyncIterator[Signals]]:
382
        pass
×
383

384
    @classmethod
11✔
385
    @abstractmethod
11✔
386
    def get_current_task(cls) -> TaskInfo:
11✔
387
        pass
×
388

389
    @classmethod
11✔
390
    @abstractmethod
11✔
391
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
392
        pass
×
393

394
    @classmethod
11✔
395
    @abstractmethod
11✔
396
    async def wait_all_tasks_blocked(cls) -> None:
11✔
397
        pass
×
398

399
    @classmethod
11✔
400
    @abstractmethod
11✔
401
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
402
        pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc