• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10708046317

04 Sep 2024 07:12PM UTC coverage: 91.676% (-0.2%) from 91.834%
10708046317

Pull #761

github

web-flow
Merge fa137ec93 into ee8165b55
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

249 of 274 new or added lines in 4 files covered. (90.88%)

2 existing lines in 2 files now uncovered.

4791 of 5226 relevant lines covered (91.68%)

9.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.09
/src/anyio/abc/_eventloop.py
1
from __future__ import annotations
11✔
2

3
import math
11✔
4
import sys
11✔
5
from abc import ABCMeta, abstractmethod
11✔
6
from collections.abc import AsyncIterator, Awaitable
11✔
7
from os import PathLike
11✔
8
from signal import Signals
11✔
9
from socket import AddressFamily, SocketKind, socket
11✔
10
from typing import (
11✔
11
    IO,
12
    TYPE_CHECKING,
13
    Any,
14
    Callable,
15
    ContextManager,
16
    Sequence,
17
    TypeVar,
18
    Union,
19
    overload,
20
)
21

22
if sys.version_info >= (3, 11):
11✔
23
    from typing import TypeVarTuple, Unpack
6✔
24
else:
25
    from typing_extensions import TypeVarTuple, Unpack
6✔
26

27
if sys.version_info >= (3, 10):
11✔
28
    from typing import TypeAlias
7✔
29
else:
30
    from typing_extensions import TypeAlias
4✔
31

32
if TYPE_CHECKING:
11✔
NEW
33
    from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
×
34
    from .._core._tasks import CancelScope
×
35
    from .._core._testing import TaskInfo
×
36
    from ..from_thread import BlockingPortal
×
37
    from ._sockets import (
×
38
        ConnectedUDPSocket,
39
        ConnectedUNIXDatagramSocket,
40
        IPSockAddrType,
41
        SocketListener,
42
        SocketStream,
43
        UDPSocket,
44
        UNIXDatagramSocket,
45
        UNIXSocketStream,
46
    )
47
    from ._subprocesses import Process
×
48
    from ._tasks import TaskGroup
×
49
    from ._testing import TestRunner
×
50

51
T_Retval = TypeVar("T_Retval")
11✔
52
PosArgsT = TypeVarTuple("PosArgsT")
11✔
53
StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
11✔
54

55

56
class AsyncBackend(metaclass=ABCMeta):
11✔
57
    @classmethod
11✔
58
    @abstractmethod
11✔
59
    def run(
11✔
60
        cls,
61
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
62
        args: tuple[Unpack[PosArgsT]],
63
        kwargs: dict[str, Any],
64
        options: dict[str, Any],
65
    ) -> T_Retval:
66
        """
67
        Run the given coroutine function in an asynchronous event loop.
68

69
        The current thread must not be already running an event loop.
70

71
        :param func: a coroutine function
72
        :param args: positional arguments to ``func``
73
        :param kwargs: positional arguments to ``func``
74
        :param options: keyword arguments to call the backend ``run()`` implementation
75
            with
76
        :return: the return value of the coroutine function
77
        """
78

79
    @classmethod
11✔
80
    @abstractmethod
11✔
81
    def current_token(cls) -> object:
11✔
82
        """
83

84
        :return:
85
        """
86

87
    @classmethod
11✔
88
    @abstractmethod
11✔
89
    def current_time(cls) -> float:
11✔
90
        """
91
        Return the current value of the event loop's internal clock.
92

93
        :return: the clock value (seconds)
94
        """
95

96
    @classmethod
11✔
97
    @abstractmethod
11✔
98
    def cancelled_exception_class(cls) -> type[BaseException]:
11✔
99
        """Return the exception class that is raised in a task if it's cancelled."""
100

101
    @classmethod
11✔
102
    @abstractmethod
11✔
103
    async def checkpoint(cls) -> None:
11✔
104
        """
105
        Check if the task has been cancelled, and allow rescheduling of other tasks.
106

107
        This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
108
        :meth:`cancel_shielded_checkpoint`.
109
        """
110

111
    @classmethod
11✔
112
    async def checkpoint_if_cancelled(cls) -> None:
11✔
113
        """
114
        Check if the current task group has been cancelled.
115

116
        This will check if the task has been cancelled, but will not allow other tasks
117
        to be scheduled if not.
118

119
        """
120
        if cls.current_effective_deadline() == -math.inf:
×
121
            await cls.checkpoint()
×
122

123
    @classmethod
11✔
124
    async def cancel_shielded_checkpoint(cls) -> None:
11✔
125
        """
126
        Allow the rescheduling of other tasks.
127

128
        This will give other tasks the opportunity to run, but without checking if the
129
        current task group has been cancelled, unlike with :meth:`checkpoint`.
130

131
        """
132
        with cls.create_cancel_scope(shield=True):
×
133
            await cls.sleep(0)
×
134

135
    @classmethod
11✔
136
    @abstractmethod
11✔
137
    async def sleep(cls, delay: float) -> None:
11✔
138
        """
139
        Pause the current task for the specified duration.
140

141
        :param delay: the duration, in seconds
142
        """
143

144
    @classmethod
11✔
145
    @abstractmethod
11✔
146
    def create_cancel_scope(
11✔
147
        cls, *, deadline: float = math.inf, shield: bool = False
148
    ) -> CancelScope:
149
        pass
×
150

151
    @classmethod
11✔
152
    @abstractmethod
11✔
153
    def current_effective_deadline(cls) -> float:
11✔
154
        """
155
        Return the nearest deadline among all the cancel scopes effective for the
156
        current task.
157

158
        :return:
159
            - a clock value from the event loop's internal clock
160
            - ``inf`` if there is no deadline in effect
161
            - ``-inf`` if the current scope has been cancelled
162
        :rtype: float
163
        """
164

165
    @classmethod
11✔
166
    @abstractmethod
11✔
167
    def create_task_group(cls) -> TaskGroup:
11✔
168
        pass
×
169

170
    @classmethod
11✔
171
    @abstractmethod
11✔
172
    def create_event(cls) -> Event:
11✔
173
        pass
×
174

175
    @classmethod
11✔
176
    @abstractmethod
11✔
177
    def create_lock(cls, *, fast_acquire: bool) -> Lock:
11✔
NEW
178
        pass
×
179

180
    @classmethod
11✔
181
    @abstractmethod
11✔
182
    def create_semaphore(
11✔
183
        cls,
184
        initial_value: int,
185
        *,
186
        max_value: int | None = None,
187
        fast_acquire: bool = False,
188
    ) -> Semaphore:
NEW
189
        pass
×
190

191
    @classmethod
11✔
192
    @abstractmethod
11✔
193
    def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
11✔
194
        pass
×
195

196
    @classmethod
11✔
197
    @abstractmethod
11✔
198
    async def run_sync_in_worker_thread(
11✔
199
        cls,
200
        func: Callable[[Unpack[PosArgsT]], T_Retval],
201
        args: tuple[Unpack[PosArgsT]],
202
        abandon_on_cancel: bool = False,
203
        limiter: CapacityLimiter | None = None,
204
    ) -> T_Retval:
205
        pass
×
206

207
    @classmethod
11✔
208
    @abstractmethod
11✔
209
    def check_cancelled(cls) -> None:
11✔
210
        pass
×
211

212
    @classmethod
11✔
213
    @abstractmethod
11✔
214
    def run_async_from_thread(
11✔
215
        cls,
216
        func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
217
        args: tuple[Unpack[PosArgsT]],
218
        token: object,
219
    ) -> T_Retval:
220
        pass
×
221

222
    @classmethod
11✔
223
    @abstractmethod
11✔
224
    def run_sync_from_thread(
11✔
225
        cls,
226
        func: Callable[[Unpack[PosArgsT]], T_Retval],
227
        args: tuple[Unpack[PosArgsT]],
228
        token: object,
229
    ) -> T_Retval:
230
        pass
×
231

232
    @classmethod
11✔
233
    @abstractmethod
11✔
234
    def create_blocking_portal(cls) -> BlockingPortal:
11✔
235
        pass
×
236

237
    @classmethod
11✔
238
    @abstractmethod
11✔
239
    async def open_process(
11✔
240
        cls,
241
        command: StrOrBytesPath | Sequence[StrOrBytesPath],
242
        *,
243
        stdin: int | IO[Any] | None,
244
        stdout: int | IO[Any] | None,
245
        stderr: int | IO[Any] | None,
246
        **kwargs: Any,
247
    ) -> Process:
248
        pass
×
249

250
    @classmethod
11✔
251
    @abstractmethod
11✔
252
    def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
11✔
253
        pass
×
254

255
    @classmethod
11✔
256
    @abstractmethod
11✔
257
    async def connect_tcp(
11✔
258
        cls, host: str, port: int, local_address: IPSockAddrType | None = None
259
    ) -> SocketStream:
260
        pass
×
261

262
    @classmethod
11✔
263
    @abstractmethod
11✔
264
    async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
11✔
265
        pass
×
266

267
    @classmethod
11✔
268
    @abstractmethod
11✔
269
    def create_tcp_listener(cls, sock: socket) -> SocketListener:
11✔
270
        pass
×
271

272
    @classmethod
11✔
273
    @abstractmethod
11✔
274
    def create_unix_listener(cls, sock: socket) -> SocketListener:
11✔
275
        pass
×
276

277
    @classmethod
11✔
278
    @abstractmethod
11✔
279
    async def create_udp_socket(
11✔
280
        cls,
281
        family: AddressFamily,
282
        local_address: IPSockAddrType | None,
283
        remote_address: IPSockAddrType | None,
284
        reuse_port: bool,
285
    ) -> UDPSocket | ConnectedUDPSocket:
286
        pass
×
287

288
    @classmethod
11✔
289
    @overload
11✔
290
    async def create_unix_datagram_socket(
11✔
291
        cls, raw_socket: socket, remote_path: None
292
    ) -> UNIXDatagramSocket: ...
293

294
    @classmethod
11✔
295
    @overload
11✔
296
    async def create_unix_datagram_socket(
11✔
297
        cls, raw_socket: socket, remote_path: str | bytes
298
    ) -> ConnectedUNIXDatagramSocket: ...
299

300
    @classmethod
11✔
301
    @abstractmethod
11✔
302
    async def create_unix_datagram_socket(
11✔
303
        cls, raw_socket: socket, remote_path: str | bytes | None
304
    ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
305
        pass
×
306

307
    @classmethod
11✔
308
    @abstractmethod
11✔
309
    async def getaddrinfo(
11✔
310
        cls,
311
        host: bytes | str | None,
312
        port: str | int | None,
313
        *,
314
        family: int | AddressFamily = 0,
315
        type: int | SocketKind = 0,
316
        proto: int = 0,
317
        flags: int = 0,
318
    ) -> list[
319
        tuple[
320
            AddressFamily,
321
            SocketKind,
322
            int,
323
            str,
324
            tuple[str, int] | tuple[str, int, int, int],
325
        ]
326
    ]:
327
        pass
×
328

329
    @classmethod
11✔
330
    @abstractmethod
11✔
331
    async def getnameinfo(
11✔
332
        cls, sockaddr: IPSockAddrType, flags: int = 0
333
    ) -> tuple[str, str]:
334
        pass
×
335

336
    @classmethod
11✔
337
    @abstractmethod
11✔
338
    async def wait_socket_readable(cls, sock: socket) -> None:
11✔
339
        pass
×
340

341
    @classmethod
11✔
342
    @abstractmethod
11✔
343
    async def wait_socket_writable(cls, sock: socket) -> None:
11✔
344
        pass
×
345

346
    @classmethod
11✔
347
    @abstractmethod
11✔
348
    def current_default_thread_limiter(cls) -> CapacityLimiter:
11✔
349
        pass
×
350

351
    @classmethod
11✔
352
    @abstractmethod
11✔
353
    def open_signal_receiver(
11✔
354
        cls, *signals: Signals
355
    ) -> ContextManager[AsyncIterator[Signals]]:
356
        pass
×
357

358
    @classmethod
11✔
359
    @abstractmethod
11✔
360
    def get_current_task(cls) -> TaskInfo:
11✔
361
        pass
×
362

363
    @classmethod
11✔
364
    @abstractmethod
11✔
365
    def get_running_tasks(cls) -> Sequence[TaskInfo]:
11✔
366
        pass
×
367

368
    @classmethod
11✔
369
    @abstractmethod
11✔
370
    async def wait_all_tasks_blocked(cls) -> None:
11✔
371
        pass
×
372

373
    @classmethod
11✔
374
    @abstractmethod
11✔
375
    def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
11✔
376
        pass
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc