• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 5886867302

17 Aug 2023 04:45AM UTC coverage: 91.383% (-0.2%) from 91.56%
5886867302

push

github-actions

jell-o-fishi
simplified __str__ on Frame

688 of 829 branches covered (82.99%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

3798 of 4080 relevant lines covered (93.09%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.76
/rsocket/queue_peekable.py
1
import sys
1✔
2
from asyncio import Queue, QueueEmpty
1✔
3

4

5
class QueuePeekable(Queue):
1✔
6

7
    async def peek(self):
1✔
8
        """Peek the next item in the queue.
9

10
        If queue is empty, wait until an item is available.
11
        """
12
        while self.empty():
1✔
13
            getter = self._get_loop().create_future()
1✔
14
            self._getters.append(getter)
1✔
15
            try:
1✔
16
                await getter
1✔
17
            except Exception:
1✔
18
                getter.cancel()  # Just in case getter is not done yet.
×
19
                try:
×
20
                    # Clean self._getters from canceled getters.
21
                    self._getters.remove(getter)
×
22
                except ValueError:
×
23
                    # The getter could be removed from self._getters by a
24
                    # previous put_nowait call.
25
                    pass
26
                if not self.empty() and not getter.cancelled():
×
27
                    # We were woken up by put_nowait(), but can't take
28
                    # the call.  Wake up the next in line.
29
                    self._wakeup_next(self._getters)
×
30
                raise
×
31
        return self.peek_nowait()
1✔
32

33
    def peek_nowait(self):
1✔
34
        """Peek the next item in the queue.
35

36
        Return an item if one is immediately available, else raise QueueEmpty.
37
        """
38
        if self.empty():
1!
39
            raise QueueEmpty
×
40

41
        item = self._queue[0]
1✔
42
        self._wakeup_next(self._putters)
1✔
43
        return item
1✔
44

45

46
if sys.version_info < (3, 10):
1!
47
    class QueuePeekableBackwardCompatible(QueuePeekable):
×
48
        def _get_loop(self):
×
49
            return self._loop
×
50

51

52
    QueuePeekable = QueuePeekableBackwardCompatible
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc