• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 08a45e28-4998-4845-a88b-f2c425830a31

21 Feb 2025 08:33PM UTC coverage: 86.896% (+0.01%) from 86.883%
08a45e28-4998-4845-a88b-f2c425830a31

push

circleci

web-flow
fix SNS FIFO ordering (#12285)

Co-authored-by: Daniel Fangl <daniel.fangl@localstack.cloud>

70 of 79 new or added lines in 2 files covered. (88.61%)

117 existing lines in 8 files now uncovered.

61670 of 70970 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/localstack-core/localstack/services/sns/executor.py
1
import itertools
1✔
2
import logging
1✔
3
import os
1✔
4
import queue
1✔
5
import threading
1✔
6

7
LOG = logging.getLogger(__name__)
1✔
8

9

10
def _worker(work_queue: queue.Queue):
1✔
11
    try:
1✔
12
        while True:
1✔
13
            work_item = work_queue.get(block=True)
1✔
14
            if work_item is None:
1✔
15
                return
1✔
16
            work_item.run()
1✔
17
            # delete reference to the work item to avoid it being in memory until the next blocking `queue.get` call returns
18
            del work_item
1✔
19

NEW
20
    except Exception:
×
NEW
21
        LOG.exception("Exception in worker")
×
22

23

24
class _WorkItem:
1✔
25
    def __init__(self, fn, args, kwargs):
1✔
26
        self.fn = fn
1✔
27
        self.args = args
1✔
28
        self.kwargs = kwargs
1✔
29

30
    def run(self):
1✔
31
        try:
1✔
32
            self.fn(*self.args, **self.kwargs)
1✔
NEW
33
        except Exception:
×
NEW
34
            LOG.exception("Unhandled Exception in while running %s", self.fn.__name__)
×
35

36

37
class TopicPartitionedThreadPoolExecutor:
1✔
38
    """
39
    This topic partition the work between workers based on Topics.
40
    It guarantees that each Topic only has one worker assigned, and thus that the tasks will be executed sequentially.
41

42
    Loosely based on ThreadPoolExecutor for stdlib, but does not return Future as SNS does not need it (fire&forget)
43
    Could be extended if needed to fit other needs.
44

45
    Currently, we do not re-balance between workers if some of them have more load. This could be investigated.
46
    """
47

48
    # Used to assign unique thread names when thread_name_prefix is not supplied.
49
    _counter = itertools.count().__next__
1✔
50

51
    def __init__(self, max_workers: int = None, thread_name_prefix: str = ""):
1✔
52
        if max_workers is None:
1✔
NEW
53
            max_workers = min(32, (os.cpu_count() or 1) + 4)
×
54
        if max_workers <= 0:
1✔
NEW
55
            raise ValueError("max_workers must be greater than 0")
×
56

57
        self._max_workers = max_workers
1✔
58
        self._thread_name_prefix = (
1✔
59
            thread_name_prefix or f"TopicThreadPoolExecutor-{self._counter()}"
60
        )
61

62
        # for now, the pool isn't fair and is not redistributed depending on load
63
        self._pool = {}
1✔
64
        self._shutdown = False
1✔
65
        self._lock = threading.Lock()
1✔
66
        self._threads = set()
1✔
67
        self._work_queues = []
1✔
68
        self._cycle = itertools.cycle(range(max_workers))
1✔
69

70
    def _add_worker(self):
1✔
71
        work_queue = queue.SimpleQueue()
1✔
72
        self._work_queues.append(work_queue)
1✔
73
        thread_name = f"{self._thread_name_prefix}_{len(self._threads)}"
1✔
74
        t = threading.Thread(name=thread_name, target=_worker, args=(work_queue,))
1✔
75
        t.daemon = True
1✔
76
        t.start()
1✔
77
        self._threads.add(t)
1✔
78

79
    def _get_work_queue(self, topic: str) -> queue.SimpleQueue:
1✔
80
        if not (work_queue := self._pool.get(topic)):
1✔
81
            if len(self._threads) < self._max_workers:
1✔
82
                self._add_worker()
1✔
83

84
            # we cycle through the possible indexes for a work queue, in order to distribute the load across
85
            # once we get to the max amount of worker, the cycle will start back at 0
86
            index = next(self._cycle)
1✔
87
            work_queue = self._work_queues[index]
1✔
88

89
            # TODO: the pool is not cleaned up at the moment, think about the clean-up interface
90
            self._pool[topic] = work_queue
1✔
91
        return work_queue
1✔
92

93
    def submit(self, fn, topic, /, *args, **kwargs) -> None:
1✔
94
        with self._lock:
1✔
95
            work_queue = self._get_work_queue(topic)
1✔
96

97
            if self._shutdown:
1✔
NEW
98
                raise RuntimeError("cannot schedule new futures after shutdown")
×
99

100
            w = _WorkItem(fn, args, kwargs)
1✔
101
            work_queue.put(w)
1✔
102

103
    def shutdown(self, wait=True):
1✔
104
        with self._lock:
1✔
105
            self._shutdown = True
1✔
106

107
            # Send a wake-up to prevent threads calling
108
            # _work_queue.get(block=True) from permanently blocking.
109
            for work_queue in self._work_queues:
1✔
110
                work_queue.put(None)
1✔
111

112
        if wait:
1✔
NEW
113
            for t in self._threads:
×
NEW
114
                t.join()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc