• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / anyio / 10653597381

01 Sep 2024 11:25AM UTC coverage: 91.671% (-0.1%) from 91.788%
10653597381

Pull #761

github

web-flow
Merge 6d0f355bd into 8a5b34626
Pull Request #761: Delegated the implementations of Lock and Semaphore to the async backend class

229 of 250 new or added lines in 4 files covered. (91.6%)

2 existing lines in 2 files now uncovered.

4744 of 5175 relevant lines covered (91.67%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.03
/src/anyio/_core/_synchronization.py
1
from __future__ import annotations
11✔
2

3
import math
11✔
4
from collections import deque
11✔
5
from dataclasses import dataclass
11✔
6
from types import TracebackType
11✔
7

8
from sniffio import AsyncLibraryNotFoundError
11✔
9

10
from ..lowlevel import checkpoint
11✔
11
from ._eventloop import get_async_backend
11✔
12
from ._exceptions import BusyResourceError
11✔
13
from ._tasks import CancelScope
11✔
14
from ._testing import TaskInfo, get_current_task
11✔
15

16

17
@dataclass(frozen=True)
11✔
18
class EventStatistics:
11✔
19
    """
20
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
21
    """
22

23
    tasks_waiting: int
11✔
24

25

26
@dataclass(frozen=True)
11✔
27
class CapacityLimiterStatistics:
11✔
28
    """
29
    :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
30
    :ivar float total_tokens: total number of available tokens
31
    :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
32
        this limiter
33
    :ivar int tasks_waiting: number of tasks waiting on
34
        :meth:`~.CapacityLimiter.acquire` or
35
        :meth:`~.CapacityLimiter.acquire_on_behalf_of`
36
    """
37

38
    borrowed_tokens: int
11✔
39
    total_tokens: float
11✔
40
    borrowers: tuple[object, ...]
11✔
41
    tasks_waiting: int
11✔
42

43

44
@dataclass(frozen=True)
11✔
45
class LockStatistics:
11✔
46
    """
47
    :ivar bool locked: flag indicating if this lock is locked or not
48
    :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
49
        lock is not held by any task)
50
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
51
    """
52

53
    locked: bool
11✔
54
    owner: TaskInfo | None
11✔
55
    tasks_waiting: int
11✔
56

57

58
@dataclass(frozen=True)
11✔
59
class ConditionStatistics:
11✔
60
    """
61
    :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
62
    :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
63
        :class:`~.Lock`
64
    """
65

66
    tasks_waiting: int
11✔
67
    lock_statistics: LockStatistics
11✔
68

69

70
@dataclass(frozen=True)
11✔
71
class SemaphoreStatistics:
11✔
72
    """
73
    :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
74

75
    """
76

77
    tasks_waiting: int
11✔
78

79

80
class Event:
11✔
81
    def __new__(cls) -> Event:
11✔
82
        try:
11✔
83
            return get_async_backend().create_event()
11✔
84
        except AsyncLibraryNotFoundError:
10✔
85
            return EventAdapter()
10✔
86

87
    def set(self) -> None:
11✔
88
        """Set the flag, notifying all listeners."""
89
        raise NotImplementedError
×
90

91
    def is_set(self) -> bool:
11✔
92
        """Return ``True`` if the flag is set, ``False`` if not."""
93
        raise NotImplementedError
×
94

95
    async def wait(self) -> None:
11✔
96
        """
97
        Wait until the flag has been set.
98

99
        If the flag has already been set when this method is called, it returns
100
        immediately.
101

102
        """
103
        raise NotImplementedError
×
104

105
    def statistics(self) -> EventStatistics:
11✔
106
        """Return statistics about the current state of this event."""
107
        raise NotImplementedError
×
108

109

110
class EventAdapter(Event):
11✔
111
    _internal_event: Event | None = None
11✔
112

113
    def __new__(cls) -> EventAdapter:
11✔
114
        return object.__new__(cls)
10✔
115

116
    @property
11✔
117
    def _event(self) -> Event:
11✔
118
        if self._internal_event is None:
11✔
119
            self._internal_event = get_async_backend().create_event()
11✔
120

121
        return self._internal_event
11✔
122

123
    def set(self) -> None:
11✔
124
        self._event.set()
11✔
125

126
    def is_set(self) -> bool:
11✔
127
        return self._internal_event is not None and self._internal_event.is_set()
10✔
128

129
    async def wait(self) -> None:
11✔
130
        await self._event.wait()
11✔
131

132
    def statistics(self) -> EventStatistics:
11✔
133
        if self._internal_event is None:
10✔
134
            return EventStatistics(tasks_waiting=0)
10✔
135

136
        return self._internal_event.statistics()
×
137

138

139
class Lock:
11✔
140
    def __new__(cls) -> Lock:
11✔
141
        try:
10✔
142
            return get_async_backend().create_lock()
10✔
143
        except AsyncLibraryNotFoundError:
10✔
144
            return LockAdapter()
10✔
145

146
    async def __aenter__(self) -> None:
11✔
147
        await self.acquire()
10✔
148

149
    async def __aexit__(
11✔
150
        self,
151
        exc_type: type[BaseException] | None,
152
        exc_val: BaseException | None,
153
        exc_tb: TracebackType | None,
154
    ) -> None:
155
        self.release()
10✔
156

157
    async def acquire(self) -> None:
11✔
158
        """Acquire the lock."""
NEW
159
        raise NotImplementedError
×
160

161
    def acquire_nowait(self) -> None:
11✔
162
        """
163
        Acquire the lock, without blocking.
164

165
        :raises ~anyio.WouldBlock: if the operation would block
166

167
        """
NEW
168
        raise NotImplementedError
×
169

170
    def release(self) -> None:
11✔
171
        """Release the lock."""
NEW
172
        raise NotImplementedError
×
173

174
    def locked(self) -> bool:
11✔
175
        """Return True if the lock is currently held."""
NEW
176
        raise NotImplementedError
×
177

178
    def statistics(self) -> LockStatistics:
11✔
179
        """
180
        Return statistics about the current state of this lock.
181

182
        .. versionadded:: 3.0
183
        """
NEW
184
        raise NotImplementedError
×
185

186

187
class LockAdapter(Lock):
11✔
188
    _internal_lock: Lock | None = None
11✔
189

190
    def __new__(cls) -> LockAdapter:
11✔
191
        return object.__new__(cls)
10✔
192

193
    @property
11✔
194
    def _lock(self) -> Lock:
11✔
195
        if self._internal_lock is None:
10✔
196
            self._internal_lock = get_async_backend().create_lock()
10✔
197

198
        return self._internal_lock
10✔
199

200
    async def __aenter__(self) -> None:
11✔
201
        await self._lock.acquire()
10✔
202

203
    async def __aexit__(
11✔
204
        self,
205
        exc_type: type[BaseException] | None,
206
        exc_val: BaseException | None,
207
        exc_tb: TracebackType | None,
208
    ) -> None:
209
        if self._internal_lock is not None:
10✔
210
            self._internal_lock.release()
10✔
211

212
    async def acquire(self) -> None:
11✔
213
        """Acquire the lock."""
214
        await self._lock.acquire()
10✔
215

216
    def acquire_nowait(self) -> None:
11✔
217
        """
218
        Acquire the lock, without blocking.
219

220
        :raises ~anyio.WouldBlock: if the operation would block
221

222
        """
NEW
223
        self._lock.acquire_nowait()
×
224

225
    def release(self) -> None:
11✔
226
        """Release the lock."""
227
        self._lock.release()
10✔
228

229
    def locked(self) -> bool:
11✔
230
        """Return True if the lock is currently held."""
NEW
231
        return self._lock.locked()
×
232

233
    def statistics(self) -> LockStatistics:
11✔
234
        """
235
        Return statistics about the current state of this lock.
236

237
        .. versionadded:: 3.0
238

239
        """
240
        if self._internal_lock is None:
10✔
241
            return LockStatistics(False, None, 0)
10✔
242

NEW
243
        return self._internal_lock.statistics()
×
244

245

246
class Condition:
11✔
247
    _owner_task: TaskInfo | None = None
11✔
248

249
    def __init__(self, lock: Lock | None = None):
11✔
250
        self._lock = lock or Lock()
10✔
251
        self._waiters: deque[Event] = deque()
10✔
252

253
    async def __aenter__(self) -> None:
11✔
254
        await self.acquire()
10✔
255

256
    async def __aexit__(
11✔
257
        self,
258
        exc_type: type[BaseException] | None,
259
        exc_val: BaseException | None,
260
        exc_tb: TracebackType | None,
261
    ) -> None:
262
        self.release()
10✔
263

264
    def _check_acquired(self) -> None:
11✔
265
        if self._owner_task != get_current_task():
10✔
266
            raise RuntimeError("The current task is not holding the underlying lock")
×
267

268
    async def acquire(self) -> None:
11✔
269
        """Acquire the underlying lock."""
270
        await self._lock.acquire()
10✔
271
        self._owner_task = get_current_task()
10✔
272

273
    def acquire_nowait(self) -> None:
11✔
274
        """
275
        Acquire the underlying lock, without blocking.
276

277
        :raises ~anyio.WouldBlock: if the operation would block
278

279
        """
280
        self._lock.acquire_nowait()
10✔
281
        self._owner_task = get_current_task()
10✔
282

283
    def release(self) -> None:
11✔
284
        """Release the underlying lock."""
285
        self._lock.release()
10✔
286

287
    def locked(self) -> bool:
11✔
288
        """Return True if the lock is set."""
289
        return self._lock.locked()
10✔
290

291
    def notify(self, n: int = 1) -> None:
11✔
292
        """Notify exactly n listeners."""
293
        self._check_acquired()
10✔
294
        for _ in range(n):
10✔
295
            try:
10✔
296
                event = self._waiters.popleft()
10✔
297
            except IndexError:
×
298
                break
×
299

300
            event.set()
10✔
301

302
    def notify_all(self) -> None:
11✔
303
        """Notify all the listeners."""
304
        self._check_acquired()
10✔
305
        for event in self._waiters:
10✔
306
            event.set()
10✔
307

308
        self._waiters.clear()
10✔
309

310
    async def wait(self) -> None:
11✔
311
        """Wait for a notification."""
312
        await checkpoint()
10✔
313
        event = Event()
10✔
314
        self._waiters.append(event)
10✔
315
        self.release()
10✔
316
        try:
10✔
317
            await event.wait()
10✔
318
        except BaseException:
10✔
319
            if not event.is_set():
10✔
320
                self._waiters.remove(event)
10✔
321

322
            raise
10✔
323
        finally:
324
            with CancelScope(shield=True):
10✔
325
                await self.acquire()
10✔
326

327
    def statistics(self) -> ConditionStatistics:
11✔
328
        """
329
        Return statistics about the current state of this condition.
330

331
        .. versionadded:: 3.0
332
        """
333
        return ConditionStatistics(len(self._waiters), self._lock.statistics())
10✔
334

335

336
class Semaphore:
11✔
337
    def __new__(cls, initial_value: int, *, max_value: int | None = None) -> Semaphore:
11✔
338
        try:
10✔
339
            return get_async_backend().create_semaphore(
10✔
340
                initial_value, max_value=max_value
341
            )
342
        except AsyncLibraryNotFoundError:
10✔
343
            return SemaphoreAdapter(initial_value, max_value=max_value)
10✔
344

345
    def __init__(self, initial_value: int, *, max_value: int | None = None):
11✔
346
        if not isinstance(initial_value, int):
10✔
347
            raise TypeError("initial_value must be an integer")
×
348
        if initial_value < 0:
10✔
349
            raise ValueError("initial_value must be >= 0")
×
350
        if max_value is not None:
10✔
351
            if not isinstance(max_value, int):
10✔
352
                raise TypeError("max_value must be an integer or None")
×
353
            if max_value < initial_value:
10✔
354
                raise ValueError(
×
355
                    "max_value must be equal to or higher than initial_value"
356
                )
357

358
    async def __aenter__(self) -> Semaphore:
11✔
359
        await self.acquire()
10✔
360
        return self
10✔
361

362
    async def __aexit__(
11✔
363
        self,
364
        exc_type: type[BaseException] | None,
365
        exc_val: BaseException | None,
366
        exc_tb: TracebackType | None,
367
    ) -> None:
368
        self.release()
10✔
369

370
    async def acquire(self) -> None:
11✔
371
        """Decrement the semaphore value, blocking if necessary."""
NEW
372
        raise NotImplementedError
×
373

374
    def acquire_nowait(self) -> None:
11✔
375
        """
376
        Acquire the underlying lock, without blocking.
377

378
        :raises ~anyio.WouldBlock: if the operation would block
379

380
        """
NEW
381
        raise NotImplementedError
×
382

383
    def release(self) -> None:
11✔
384
        """Increment the semaphore value."""
NEW
385
        raise NotImplementedError
×
386

387
    @property
11✔
388
    def value(self) -> int:
11✔
389
        """The current value of the semaphore."""
NEW
390
        raise NotImplementedError
×
391

392
    @property
11✔
393
    def max_value(self) -> int | None:
11✔
394
        """The maximum value of the semaphore."""
NEW
395
        raise NotImplementedError
×
396

397
    def statistics(self) -> SemaphoreStatistics:
11✔
398
        """
399
        Return statistics about the current state of this semaphore.
400

401
        .. versionadded:: 3.0
402
        """
NEW
403
        raise NotImplementedError
×
404

405

406
class SemaphoreAdapter(Semaphore):
11✔
407
    _internal_semaphore: Semaphore | None = None
11✔
408

409
    def __new__(
11✔
410
        cls, initial_value: int, *, max_value: int | None = None
411
    ) -> SemaphoreAdapter:
412
        return object.__new__(cls)
10✔
413

414
    def __init__(self, initial_value: int, *, max_value: int | None = None) -> None:
11✔
415
        self._initial_value = initial_value
10✔
416
        self._max_value = max_value
10✔
417

418
    @property
11✔
419
    def _semaphore(self) -> Semaphore:
11✔
420
        if self._internal_semaphore is None:
10✔
421
            self._internal_semaphore = get_async_backend().create_semaphore(
10✔
422
                self._initial_value, max_value=self._max_value
423
            )
424

425
        return self._internal_semaphore
10✔
426

427
    async def acquire(self) -> None:
11✔
428
        await self._semaphore.acquire()
10✔
429

430
    def acquire_nowait(self) -> None:
11✔
NEW
431
        self._semaphore.acquire_nowait()
×
432

433
    def release(self) -> None:
11✔
434
        self._semaphore.release()
10✔
435

436
    @property
11✔
437
    def value(self) -> int:
11✔
438
        if self._internal_semaphore is None:
10✔
439
            return self._initial_value
10✔
440

NEW
441
        return self._semaphore.value
×
442

443
    @property
11✔
444
    def max_value(self) -> int | None:
11✔
445
        return self._max_value
10✔
446

447
    def statistics(self) -> SemaphoreStatistics:
11✔
448
        if self._internal_semaphore is None:
10✔
449
            return SemaphoreStatistics(tasks_waiting=0)
10✔
450

NEW
451
        return self._semaphore.statistics()
×
452

453

454
class CapacityLimiter:
11✔
455
    def __new__(cls, total_tokens: float) -> CapacityLimiter:
11✔
456
        try:
10✔
457
            return get_async_backend().create_capacity_limiter(total_tokens)
10✔
458
        except AsyncLibraryNotFoundError:
10✔
459
            return CapacityLimiterAdapter(total_tokens)
10✔
460

461
    async def __aenter__(self) -> None:
11✔
462
        raise NotImplementedError
×
463

464
    async def __aexit__(
11✔
465
        self,
466
        exc_type: type[BaseException] | None,
467
        exc_val: BaseException | None,
468
        exc_tb: TracebackType | None,
469
    ) -> bool | None:
470
        raise NotImplementedError
×
471

472
    @property
11✔
473
    def total_tokens(self) -> float:
11✔
474
        """
475
        The total number of tokens available for borrowing.
476

477
        This is a read-write property. If the total number of tokens is increased, the
478
        proportionate number of tasks waiting on this limiter will be granted their
479
        tokens.
480

481
        .. versionchanged:: 3.0
482
            The property is now writable.
483

484
        """
485
        raise NotImplementedError
×
486

487
    @total_tokens.setter
11✔
488
    def total_tokens(self, value: float) -> None:
11✔
489
        raise NotImplementedError
×
490

491
    @property
11✔
492
    def borrowed_tokens(self) -> int:
11✔
493
        """The number of tokens that have currently been borrowed."""
494
        raise NotImplementedError
×
495

496
    @property
11✔
497
    def available_tokens(self) -> float:
11✔
498
        """The number of tokens currently available to be borrowed"""
499
        raise NotImplementedError
×
500

501
    def acquire_nowait(self) -> None:
11✔
502
        """
503
        Acquire a token for the current task without waiting for one to become
504
        available.
505

506
        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
507

508
        """
509
        raise NotImplementedError
×
510

511
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
512
        """
513
        Acquire a token without waiting for one to become available.
514

515
        :param borrower: the entity borrowing a token
516
        :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
517

518
        """
519
        raise NotImplementedError
×
520

521
    async def acquire(self) -> None:
11✔
522
        """
523
        Acquire a token for the current task, waiting if necessary for one to become
524
        available.
525

526
        """
527
        raise NotImplementedError
×
528

529
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
530
        """
531
        Acquire a token, waiting if necessary for one to become available.
532

533
        :param borrower: the entity borrowing a token
534

535
        """
536
        raise NotImplementedError
×
537

538
    def release(self) -> None:
11✔
539
        """
540
        Release the token held by the current task.
541

542
        :raises RuntimeError: if the current task has not borrowed a token from this
543
            limiter.
544

545
        """
546
        raise NotImplementedError
×
547

548
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
549
        """
550
        Release the token held by the given borrower.
551

552
        :raises RuntimeError: if the borrower has not borrowed a token from this
553
            limiter.
554

555
        """
556
        raise NotImplementedError
×
557

558
    def statistics(self) -> CapacityLimiterStatistics:
11✔
559
        """
560
        Return statistics about the current state of this limiter.
561

562
        .. versionadded:: 3.0
563

564
        """
565
        raise NotImplementedError
×
566

567

568
class CapacityLimiterAdapter(CapacityLimiter):
11✔
569
    _internal_limiter: CapacityLimiter | None = None
11✔
570

571
    def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
11✔
572
        return object.__new__(cls)
10✔
573

574
    def __init__(self, total_tokens: float) -> None:
11✔
575
        self.total_tokens = total_tokens
10✔
576

577
    @property
11✔
578
    def _limiter(self) -> CapacityLimiter:
11✔
579
        if self._internal_limiter is None:
10✔
580
            self._internal_limiter = get_async_backend().create_capacity_limiter(
10✔
581
                self._total_tokens
582
            )
583

584
        return self._internal_limiter
10✔
585

586
    async def __aenter__(self) -> None:
11✔
587
        await self._limiter.__aenter__()
10✔
588

589
    async def __aexit__(
11✔
590
        self,
591
        exc_type: type[BaseException] | None,
592
        exc_val: BaseException | None,
593
        exc_tb: TracebackType | None,
594
    ) -> bool | None:
595
        return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
10✔
596

597
    @property
11✔
598
    def total_tokens(self) -> float:
11✔
599
        if self._internal_limiter is None:
10✔
600
            return self._total_tokens
10✔
601

602
        return self._internal_limiter.total_tokens
×
603

604
    @total_tokens.setter
11✔
605
    def total_tokens(self, value: float) -> None:
11✔
606
        if not isinstance(value, int) and value is not math.inf:
10✔
607
            raise TypeError("total_tokens must be an int or math.inf")
10✔
608
        elif value < 1:
10✔
609
            raise ValueError("total_tokens must be >= 1")
×
610

611
        if self._internal_limiter is None:
10✔
612
            self._total_tokens = value
10✔
613
            return
10✔
614

615
        self._limiter.total_tokens = value
×
616

617
    @property
11✔
618
    def borrowed_tokens(self) -> int:
11✔
619
        if self._internal_limiter is None:
10✔
620
            return 0
10✔
621

622
        return self._internal_limiter.borrowed_tokens
×
623

624
    @property
11✔
625
    def available_tokens(self) -> float:
11✔
626
        if self._internal_limiter is None:
×
627
            return self._total_tokens
×
628

629
        return self._internal_limiter.available_tokens
×
630

631
    def acquire_nowait(self) -> None:
11✔
632
        self._limiter.acquire_nowait()
×
633

634
    def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
11✔
635
        self._limiter.acquire_on_behalf_of_nowait(borrower)
×
636

637
    async def acquire(self) -> None:
11✔
638
        await self._limiter.acquire()
×
639

640
    async def acquire_on_behalf_of(self, borrower: object) -> None:
11✔
641
        await self._limiter.acquire_on_behalf_of(borrower)
×
642

643
    def release(self) -> None:
11✔
644
        self._limiter.release()
×
645

646
    def release_on_behalf_of(self, borrower: object) -> None:
11✔
647
        self._limiter.release_on_behalf_of(borrower)
×
648

649
    def statistics(self) -> CapacityLimiterStatistics:
11✔
650
        if self._internal_limiter is None:
10✔
651
            return CapacityLimiterStatistics(
10✔
652
                borrowed_tokens=0,
653
                total_tokens=self.total_tokens,
654
                borrowers=(),
655
                tasks_waiting=0,
656
            )
657

658
        return self._internal_limiter.statistics()
×
659

660

661
class ResourceGuard:
11✔
662
    """
663
    A context manager for ensuring that a resource is only used by a single task at a
664
    time.
665

666
    Entering this context manager while the previous has not exited it yet will trigger
667
    :exc:`BusyResourceError`.
668

669
    :param action: the action to guard against (visible in the :exc:`BusyResourceError`
670
        when triggered, e.g. "Another task is already {action} this resource")
671

672
    .. versionadded:: 4.1
673
    """
674

675
    __slots__ = "action", "_guarded"
11✔
676

677
    def __init__(self, action: str = "using"):
11✔
678
        self.action: str = action
11✔
679
        self._guarded = False
11✔
680

681
    def __enter__(self) -> None:
11✔
682
        if self._guarded:
11✔
683
            raise BusyResourceError(self.action)
11✔
684

685
        self._guarded = True
11✔
686

687
    def __exit__(
11✔
688
        self,
689
        exc_type: type[BaseException] | None,
690
        exc_val: BaseException | None,
691
        exc_tb: TracebackType | None,
692
    ) -> bool | None:
693
        self._guarded = False
11✔
694
        return None
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc