• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 4946493218

pending completion
4946493218

Pull #567

github

GitHub
Merge 6b66f35af into e1ba31f1c
Pull Request #567: Fix broken support for `Callable[..., Awaitable]`

48 of 48 new or added lines in 3 files covered. (100.0%)

4033 of 4459 relevant lines covered (90.45%)

8.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.7
/src/anyio/_core/_synchronization.py
1
from __future__ import annotations
10✔
2

3
from collections import deque
10✔
4
from dataclasses import dataclass
10✔
5
from types import TracebackType
10✔
6

7
from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled
10✔
8
from ._eventloop import get_async_backend
10✔
9
from ._exceptions import BusyResourceError, WouldBlock
10✔
10
from ._tasks import CancelScope
10✔
11
from ._testing import TaskInfo, get_current_task
10✔
12

13

14
@dataclass(frozen=True)
10✔
15
class EventStatistics:
7✔
16
    """
17
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
18
    """
19

20
    tasks_waiting: int
10✔
21

22

23
@dataclass(frozen=True)
10✔
24
class CapacityLimiterStatistics:
7✔
25
    """
26
    :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
27
    :ivar float total_tokens: total number of available tokens
28
    :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
29
        this limiter
30
    :ivar int tasks_waiting: number of tasks waiting on
31
        :meth:`~.CapacityLimiter.acquire` or
32
        :meth:`~.CapacityLimiter.acquire_on_behalf_of`
33
    """
34

35
    borrowed_tokens: int
10✔
36
    total_tokens: float
10✔
37
    borrowers: tuple[object, ...]
10✔
38
    tasks_waiting: int
10✔
39

40

41
@dataclass(frozen=True)
10✔
42
class LockStatistics:
7✔
43
    """
44
    :ivar bool locked: flag indicating if this lock is locked or not
45
    :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
46
        lock is not held by any task)
47
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
48
    """
49

50
    locked: bool
10✔
51
    owner: TaskInfo | None
10✔
52
    tasks_waiting: int
10✔
53

54

55
@dataclass(frozen=True)
10✔
56
class ConditionStatistics:
7✔
57
    """
58
    :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
59
    :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
60
        :class:`~.Lock`
61
    """
62

63
    tasks_waiting: int
10✔
64
    lock_statistics: LockStatistics
10✔
65

66

67
@dataclass(frozen=True)
10✔
68
class SemaphoreStatistics:
7✔
69
    """
70
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
71

72
    """
73

74
    tasks_waiting: int
10✔
75

76

77
class Event:
10✔
78
    def __new__(cls) -> Event:
10✔
79
        return get_async_backend().create_event()
10✔
80

81
    def set(self) -> None:
10✔
82
        """Set the flag, notifying all listeners."""
83
        raise NotImplementedError
×
84

85
    def is_set(self) -> bool:
10✔
86
        """Return ``True`` if the flag is set, ``False`` if not."""
87
        raise NotImplementedError
×
88

89
    async def wait(self) -> None:
10✔
90
        """
91
        Wait until the flag has been set.
92

93
        If the flag has already been set when this method is called, it returns
94
        immediately.
95

96
        """
97
        raise NotImplementedError
×
98

99
    def statistics(self) -> EventStatistics:
10✔
100
        """Return statistics about the current state of this event."""
101
        raise NotImplementedError
×
102

103

104
class Lock:
10✔
105
    _owner_task: TaskInfo | None = None
10✔
106

107
    def __init__(self) -> None:
10✔
108
        self._waiters: deque[tuple[TaskInfo, Event]] = deque()
9✔
109

110
    async def __aenter__(self) -> None:
10✔
111
        await self.acquire()
9✔
112

113
    async def __aexit__(
10✔
114
        self,
115
        exc_type: type[BaseException] | None,
116
        exc_val: BaseException | None,
117
        exc_tb: TracebackType | None,
118
    ) -> None:
119
        self.release()
9✔
120

121
    async def acquire(self) -> None:
10✔
122
        """Acquire the lock."""
123
        await checkpoint_if_cancelled()
9✔
124
        try:
9✔
125
            self.acquire_nowait()
9✔
126
        except WouldBlock:
9✔
127
            task = get_current_task()
9✔
128
            event = Event()
9✔
129
            token = task, event
9✔
130
            self._waiters.append(token)
9✔
131
            try:
9✔
132
                await event.wait()
9✔
133
            except BaseException:
9✔
134
                if not event.is_set():
9✔
135
                    self._waiters.remove(token)
9✔
136
                elif self._owner_task == task:
9✔
137
                    self.release()
9✔
138

139
                raise
9✔
140

141
            assert self._owner_task == task
9✔
142
        else:
143
            try:
9✔
144
                await cancel_shielded_checkpoint()
9✔
145
            except BaseException:
9✔
146
                self.release()
9✔
147
                raise
9✔
148

149
    def acquire_nowait(self) -> None:
10✔
150
        """
151
        Acquire the lock, without blocking.
152

153
        :raises ~WouldBlock: if the operation would block
154

155
        """
156
        task = get_current_task()
9✔
157
        if self._owner_task == task:
9✔
158
            raise RuntimeError("Attempted to acquire an already held Lock")
×
159

160
        if self._owner_task is not None:
9✔
161
            raise WouldBlock
9✔
162

163
        self._owner_task = task
9✔
164

165
    def release(self) -> None:
10✔
166
        """Release the lock."""
167
        if self._owner_task != get_current_task():
9✔
168
            raise RuntimeError("The current task is not holding this lock")
×
169

170
        if self._waiters:
9✔
171
            self._owner_task, event = self._waiters.popleft()
9✔
172
            event.set()
9✔
173
        else:
174
            del self._owner_task
9✔
175

176
    def locked(self) -> bool:
10✔
177
        """Return True if the lock is currently held."""
178
        return self._owner_task is not None
9✔
179

180
    def statistics(self) -> LockStatistics:
10✔
181
        """
182
        Return statistics about the current state of this lock.
183

184
        .. versionadded:: 3.0
185
        """
186
        return LockStatistics(self.locked(), self._owner_task, len(self._waiters))
9✔
187

188

189
class Condition:
10✔
190
    _owner_task: TaskInfo | None = None
10✔
191

192
    def __init__(self, lock: Lock | None = None):
10✔
193
        self._lock = lock or Lock()
9✔
194
        self._waiters: deque[Event] = deque()
9✔
195

196
    async def __aenter__(self) -> None:
10✔
197
        await self.acquire()
9✔
198

199
    async def __aexit__(
10✔
200
        self,
201
        exc_type: type[BaseException] | None,
202
        exc_val: BaseException | None,
203
        exc_tb: TracebackType | None,
204
    ) -> None:
205
        self.release()
9✔
206

207
    def _check_acquired(self) -> None:
10✔
208
        if self._owner_task != get_current_task():
9✔
209
            raise RuntimeError("The current task is not holding the underlying lock")
×
210

211
    async def acquire(self) -> None:
10✔
212
        """Acquire the underlying lock."""
213
        await self._lock.acquire()
9✔
214
        self._owner_task = get_current_task()
9✔
215

216
    def acquire_nowait(self) -> None:
10✔
217
        """
218
        Acquire the underlying lock, without blocking.
219

220
        :raises ~WouldBlock: if the operation would block
221

222
        """
223
        self._lock.acquire_nowait()
9✔
224
        self._owner_task = get_current_task()
9✔
225

226
    def release(self) -> None:
10✔
227
        """Release the underlying lock."""
228
        self._lock.release()
9✔
229

230
    def locked(self) -> bool:
10✔
231
        """Return True if the lock is set."""
232
        return self._lock.locked()
9✔
233

234
    def notify(self, n: int = 1) -> None:
10✔
235
        """Notify exactly n listeners."""
236
        self._check_acquired()
9✔
237
        for _ in range(n):
9✔
238
            try:
9✔
239
                event = self._waiters.popleft()
9✔
240
            except IndexError:
×
241
                break
×
242

243
            event.set()
9✔
244

245
    def notify_all(self) -> None:
10✔
246
        """Notify all the listeners."""
247
        self._check_acquired()
9✔
248
        for event in self._waiters:
9✔
249
            event.set()
9✔
250

251
        self._waiters.clear()
9✔
252

253
    async def wait(self) -> None:
10✔
254
        """Wait for a notification."""
255
        await checkpoint()
9✔
256
        event = Event()
9✔
257
        self._waiters.append(event)
9✔
258
        self.release()
9✔
259
        try:
9✔
260
            await event.wait()
9✔
261
        except BaseException:
9✔
262
            if not event.is_set():
9✔
263
                self._waiters.remove(event)
9✔
264

265
            raise
9✔
266
        finally:
267
            with CancelScope(shield=True):
9✔
268
                await self.acquire()
9✔
269

270
    def statistics(self) -> ConditionStatistics:
10✔
271
        """
272
        Return statistics about the current state of this condition.
273

274
        .. versionadded:: 3.0
275
        """
276
        return ConditionStatistics(len(self._waiters), self._lock.statistics())
9✔
277

278

279
class Semaphore:
10✔
280
    def __init__(self, initial_value: int, *, max_value: int | None = None):
10✔
281
        if not isinstance(initial_value, int):
9✔
282
            raise TypeError("initial_value must be an integer")
×
283
        if initial_value < 0:
9✔
284
            raise ValueError("initial_value must be >= 0")
×
285
        if max_value is not None:
9✔
286
            if not isinstance(max_value, int):
9✔
287
                raise TypeError("max_value must be an integer or None")
×
288
            if max_value < initial_value:
9✔
289
                raise ValueError(
×
290
                    "max_value must be equal to or higher than initial_value"
291
                )
292

293
        self._value = initial_value
9✔
294
        self._max_value = max_value
9✔
295
        self._waiters: deque[Event] = deque()
9✔
296

297
    async def __aenter__(self) -> Semaphore:
10✔
298
        await self.acquire()
9✔
299
        return self
9✔
300

301
    async def __aexit__(
10✔
302
        self,
303
        exc_type: type[BaseException] | None,
304
        exc_val: BaseException | None,
305
        exc_tb: TracebackType | None,
306
    ) -> None:
307
        self.release()
9✔
308

309
    async def acquire(self) -> None:
10✔
310
        """Decrement the semaphore value, blocking if necessary."""
311
        await checkpoint_if_cancelled()
9✔
312
        try:
9✔
313
            self.acquire_nowait()
9✔
314
        except WouldBlock:
9✔
315
            event = Event()
9✔
316
            self._waiters.append(event)
9✔
317
            try:
9✔
318
                await event.wait()
9✔
319
            except BaseException:
9✔
320
                if not event.is_set():
9✔
321
                    self._waiters.remove(event)
9✔
322
                else:
323
                    self.release()
9✔
324

325
                raise
9✔
326
        else:
327
            try:
9✔
328
                await cancel_shielded_checkpoint()
9✔
329
            except BaseException:
9✔
330
                self.release()
9✔
331
                raise
9✔
332

333
    def acquire_nowait(self) -> None:
10✔
334
        """
335
        Acquire the underlying lock, without blocking.
336

337
        :raises ~WouldBlock: if the operation would block
338

339
        """
340
        if self._value == 0:
9✔
341
            raise WouldBlock
9✔
342

343
        self._value -= 1
9✔
344

345
    def release(self) -> None:
10✔
346
        """Increment the semaphore value."""
347
        if self._max_value is not None and self._value == self._max_value:
9✔
348
            raise ValueError("semaphore released too many times")
9✔
349

350
        if self._waiters:
9✔
351
            self._waiters.popleft().set()
9✔
352
        else:
353
            self._value += 1
9✔
354

355
    @property
10✔
356
    def value(self) -> int:
7✔
357
        """The current value of the semaphore."""
358
        return self._value
9✔
359

360
    @property
10✔
361
    def max_value(self) -> int | None:
7✔
362
        """The maximum value of the semaphore."""
363
        return self._max_value
9✔
364

365
    def statistics(self) -> SemaphoreStatistics:
10✔
366
        """
367
        Return statistics about the current state of this semaphore.
368

369
        .. versionadded:: 3.0
370
        """
371
        return SemaphoreStatistics(len(self._waiters))
9✔
372

373

374
class CapacityLimiter:
10✔
375
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
10✔
376
        return get_async_backend().create_capacity_limiter(total_tokens)
9✔
377

378
    async def __aenter__(self) -> None:
10✔
379
        raise NotImplementedError
×
380

381
    async def __aexit__(
10✔
382
        self,
383
        exc_type: type[BaseException] | None,
384
        exc_val: BaseException | None,
385
        exc_tb: TracebackType | None,
386
    ) -> bool | None:
387
        raise NotImplementedError
×
388

389
    @property
10✔
390
    def total_tokens(self) -> float:
7✔
391
        """
392
        The total number of tokens available for borrowing.
393

394
        This is a read-write property. If the total number of tokens is increased, the
395
        proportionate number of tasks waiting on this limiter will be granted their
396
        tokens.
397

398
        .. versionchanged:: 3.0
399
            The property is now writable.
400

401
        """
402
        raise NotImplementedError
×
403

404
    @total_tokens.setter
10✔
405
    def total_tokens(self, value: float) -> None:
7✔
406
        raise NotImplementedError
×
407

408
    @property
10✔
409
    def borrowed_tokens(self) -> int:
7✔
410
        """The number of tokens that have currently been borrowed."""
411
        raise NotImplementedError
×
412

413
    @property
10✔
414
    def available_tokens(self) -> float:
7✔
415
        """The number of tokens currently available to be borrowed"""
416
        raise NotImplementedError
×
417

418
    def acquire_nowait(self) -> None:
10✔
419
        """
420
        Acquire a token for the current task without waiting for one to become
421
        available.
422

423
        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
424

425
        """
426
        raise NotImplementedError
×
427

428
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
10✔
429
        """
430
        Acquire a token without waiting for one to become available.
431

432
        :param borrower: the entity borrowing a token
433
        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
434

435
        """
436
        raise NotImplementedError
×
437

438
    async def acquire(self) -> None:
10✔
439
        """
440
        Acquire a token for the current task, waiting if necessary for one to become
441
        available.
442

443
        """
444
        raise NotImplementedError
×
445

446
    async def acquire_on_behalf_of(self, borrower: object) -> None:
10✔
447
        """
448
        Acquire a token, waiting if necessary for one to become available.
449

450
        :param borrower: the entity borrowing a token
451

452
        """
453
        raise NotImplementedError
×
454

455
    def release(self) -> None:
10✔
456
        """
457
        Release the token held by the current task.
458

459
        :raises RuntimeError: if the current task has not borrowed a token from this
460
            limiter.
461

462
        """
463
        raise NotImplementedError
×
464

465
    def release_on_behalf_of(self, borrower: object) -> None:
10✔
466
        """
467
        Release the token held by the given borrower.
468

469
        :raises RuntimeError: if the borrower has not borrowed a token from this
470
            limiter.
471

472
        """
473
        raise NotImplementedError
×
474

475
    def statistics(self) -> CapacityLimiterStatistics:
10✔
476
        """
477
        Return statistics about the current state of this limiter.
478

479
        .. versionadded:: 3.0
480

481
        """
482
        raise NotImplementedError
×
483

484

485
class ResourceGuard:
10✔
486
    __slots__ = "action", "_guarded"
10✔
487

488
    def __init__(self, action: str):
10✔
489
        self.action = action
10✔
490
        self._guarded = False
10✔
491

492
    def __enter__(self) -> None:
10✔
493
        if self._guarded:
10✔
494
            raise BusyResourceError(self.action)
10✔
495

496
        self._guarded = True
10✔
497

498
    def __exit__(
10✔
499
        self,
500
        exc_type: type[BaseException] | None,
501
        exc_val: BaseException | None,
502
        exc_tb: TracebackType | None,
503
    ) -> bool | None:
504
        self._guarded = False
10✔
505
        return None
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc