• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-py / 5701255977

pending completion
5701255977

push

github-actions

jell-o-fishi
test cleanup

749 of 889 branches covered (84.25%)

Branch coverage included in aggregate %.

3793 of 4074 relevant lines covered (93.1%)

1.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.76
/rsocket/queue_peekable.py
1
import sys
2✔
2
from asyncio import Queue, QueueEmpty
2✔
3

4

5
class QueuePeekable(Queue):
2✔
6

7
    async def peek(self):
2✔
8
        """Peek the next item in the queue.
9

10
        If queue is empty, wait until an item is available.
11
        """
12
        while self.empty():
2✔
13
            getter = self._get_loop().create_future()
2✔
14
            self._getters.append(getter)
2✔
15
            try:
2✔
16
                await getter
2✔
17
            except Exception:
2✔
18
                getter.cancel()  # Just in case getter is not done yet.
×
19
                try:
×
20
                    # Clean self._getters from canceled getters.
21
                    self._getters.remove(getter)
×
22
                except ValueError:
×
23
                    # The getter could be removed from self._getters by a
24
                    # previous put_nowait call.
25
                    pass
26
                if not self.empty() and not getter.cancelled():
×
27
                    # We were woken up by put_nowait(), but can't take
28
                    # the call.  Wake up the next in line.
29
                    self._wakeup_next(self._getters)
×
30
                raise
×
31
        return self.peek_nowait()
2✔
32

33
    def peek_nowait(self):
2✔
34
        """Peek the next item in the queue.
35

36
        Return an item if one is immediately available, else raise QueueEmpty.
37
        """
38
        if self.empty():
2!
39
            raise QueueEmpty
×
40

41
        item = self._queue[0]
2✔
42
        self._wakeup_next(self._putters)
2✔
43
        return item
2✔
44

45

46
if sys.version_info < (3, 10):
2!
47
    class QueuePeekableBackwardCompatible(QueuePeekable):
×
48
        def _get_loop(self):
×
49
            return self._loop
×
50

51

52
    QueuePeekable = QueuePeekableBackwardCompatible
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc