• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/localstack-core/localstack/services/sqs/queue.py
1
import time
1✔
2
from queue import Empty, PriorityQueue, Queue
1✔
3

4

5
class InterruptibleQueue[T](Queue):
1✔
6
    # is_shutdown is used to check whether we have triggered a shutdown of the Queue
7
    is_shutdown: bool
1✔
8

9
    def __init__(self, maxsize: int = 0):
1✔
10
        super().__init__(maxsize)
1✔
11
        self.is_shutdown = False
1✔
12

13
    def get(self, block: bool = True, timeout: float | None = None) -> T:
1✔
14
        with self.not_empty:
1✔
15
            if self.is_shutdown:
1✔
UNCOV
16
                raise Empty
×
17
            if not block:
1✔
18
                if not self._qsize():
1✔
19
                    raise Empty
1✔
20
            elif timeout is None:
1✔
UNCOV
21
                while not self._qsize() and not self.is_shutdown:  # additional shutdown check
×
UNCOV
22
                    self.not_empty.wait()
×
23
            elif timeout < 0:
1✔
24
                raise ValueError("'timeout' must be a non-negative number")
×
25
            else:
26
                endtime = time.time() + timeout
1✔
27
                while not self._qsize() and not self.is_shutdown:  # additional shutdown check
1✔
28
                    remaining = endtime - time.time()
1✔
29
                    if remaining <= 0.0:
1✔
30
                        raise Empty
1✔
31
                    self.not_empty.wait(remaining)
1✔
32
            if self.is_shutdown:  # additional shutdown check
1✔
33
                raise Empty
1✔
34
            item = self._get()
1✔
35
            self.not_full.notify()
1✔
36
            return item
1✔
37

38
    def shutdown(self) -> None:
1✔
39
        """
40
        `shutdown` signals to stop all current and future `Queue.get` calls from executing.
41

42
        This is helpful for exiting otherwise blocking calls early.
43
        """
44
        with self.not_empty:
1✔
45
            self.is_shutdown = True
1✔
46
            self.not_empty.notify_all()
1✔
47

48

49
class InterruptiblePriorityQueue[T](PriorityQueue[T], InterruptibleQueue[T]):
1✔
50
    pass
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc