• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 13353792253

16 Feb 2025 09:36AM UTC coverage: 91.992% (+0.2%) from 91.818%
13353792253

Pull #292

github

web-flow
Merge 59cc42ce7 into 1200045e3
Pull Request #292: added python 3.13. to test flow

822 of 965 branches covered (85.18%)

Branch coverage included in aggregate %.

3980 of 4255 relevant lines covered (93.54%)

2.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.27
/rsocket/queue_peekable.py
1
import sys
3✔
2
from asyncio import Queue, QueueEmpty
3✔
3

4

5
class QueuePeekable(Queue):
3✔
6

7
    async def peek(self):
3✔
8
        """Peek the next item in the queue.
9

10
        If queue is empty, wait until an item is available.
11
        """
12
        while self.empty():
3✔
13
            getter = self._get_loop().create_future()
3✔
14
            self._getters.append(getter)
3✔
15
            try:
3✔
16
                await getter
3✔
17
            except Exception:
3✔
18
                getter.cancel()  # Just in case getter is not done yet.
×
19
                try:
×
20
                    # Clean self._getters from canceled getters.
21
                    self._getters.remove(getter)
×
22
                except ValueError:
×
23
                    # The getter could be removed from self._getters by a
24
                    # previous put_nowait call.
25
                    pass
26
                if not self.empty() and not getter.cancelled():
×
27
                    # We were woken up by put_nowait(), but can't take
28
                    # the call.  Wake up the next in line.
29
                    self._wakeup_next(self._getters)
×
30
                raise
×
31
        return self.peek_nowait()
3✔
32

33
    def peek_nowait(self):
3✔
34
        """Peek the next item in the queue.
35

36
        Return an item if one is immediately available, else raise QueueEmpty.
37
        """
38
        if self.empty():
3!
39
            raise QueueEmpty
×
40

41
        item = self._queue[0]
3✔
42
        self._wakeup_next(self._putters)
3✔
43
        return item
3✔
44

45

46
if sys.version_info < (3, 10):
3✔
47
    class QueuePeekableBackwardCompatible(QueuePeekable):
1✔
48
        def _get_loop(self):
1✔
49
            return self._loop
1✔
50

51

52
    QueuePeekable = QueuePeekableBackwardCompatible
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc