• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.85
/localstack-core/localstack/utils/threads.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import subprocess
1✔
4
import threading
1✔
5
import traceback
1✔
6
from collections.abc import Callable
1✔
7
from concurrent.futures import Future
1✔
8
from multiprocessing.dummy import Pool
1✔
9
from typing import Any, ParamSpec, TypeVar
1✔
10

11
P = ParamSpec("P")
1✔
12
T = TypeVar("T")
1✔
13

14
LOG = logging.getLogger(__name__)
1✔
15

16
# arrays for temporary threads and resources
17
TMP_THREADS: list["FuncThread"] = []
1✔
18
TMP_PROCESSES: list[subprocess.Popen[Any]] = []
1✔
19

20
counter_lock = threading.Lock()
1✔
21
counter = 0
1✔
22

23

24
class FuncThread(threading.Thread):
1✔
25
    """Helper class to run a Python function in a background thread."""
26

27
    def __init__(
1✔
28
        self,
29
        func: "Callable[P, T]",
30
        params: Any = None,
31
        quiet: bool = False,
32
        on_stop: Callable[["FuncThread"], None] | None = None,
33
        name: str | None = None,
34
        daemon: bool = True,
35
    ):
36
        global counter
37
        global counter_lock
38

39
        if name:
1✔
40
            with counter_lock:
1✔
41
                counter += 1
1✔
42
                thread_counter_current = counter
1✔
43

44
            threading.Thread.__init__(
1✔
45
                self, name=f"{name}-functhread{thread_counter_current}", daemon=daemon
46
            )
47
        else:
48
            threading.Thread.__init__(self, daemon=daemon)
1✔
49

50
        self.params = params
1✔
51
        self.func = func
1✔
52
        self.quiet = quiet
1✔
53
        self.result_future: Future[T | Exception | None] = Future()
1✔
54
        self._stop_event = threading.Event()
1✔
55
        self.on_stop = on_stop
1✔
56

57
    def run(self) -> None:
1✔
58
        result: Any = None
1✔
59
        try:
1✔
60
            kwargs = {}  # type: ignore[var-annotated]
1✔
61
            result = self.func(self.params, **kwargs)
1✔
62
        except Exception as e:
1✔
63
            self.result_future.set_exception(e)
1✔
64
            result = e
1✔
65
            if not self.quiet:
1✔
66
                LOG.info(
1✔
67
                    "Thread run method %s(%s) failed: %s %s",
68
                    self.func,
69
                    self.params,
70
                    e,
71
                    traceback.format_exc(),
72
                )
73
        finally:
74
            try:
1✔
75
                self.result_future.set_result(result)
1✔
76
                pass
1✔
77
            except concurrent.futures.InvalidStateError as e:
1✔
78
                # this can happen on shutdown if the task is already canceled
79
                LOG.debug(e)
1✔
80

81
    @property
1✔
82
    def running(self) -> bool:
1✔
83
        return not self._stop_event.is_set()
1✔
84

85
    def stop(self, quiet: bool = False) -> None:
1✔
86
        self._stop_event.set()
1✔
87

88
        if self.on_stop:
1✔
89
            try:
1✔
90
                self.on_stop(self)
1✔
UNCOV
91
            except Exception as e:
×
UNCOV
92
                LOG.warning("error while calling on_stop callback: %s", e)
×
93

94

95
def start_thread(
1✔
96
    method: "Callable[P, T]",
97
    params: Any = None,
98
    quiet: bool = False,
99
    on_stop: Callable[["FuncThread"], None] | None = None,
100
    _shutdown_hook: bool = True,
101
    name: str | None = None,
102
) -> FuncThread:
103
    """Start the given method in a background thread, and add the thread to the TMP_THREADS shutdown hook"""
104
    if not name:
1✔
105
        # technically we should add a new level here for *internal* warnings
106
        LOG.debug("start_thread called without providing a custom name")
1✔
107
    name = name or method.__name__
1✔
108
    thread = FuncThread(method, params=params, quiet=quiet, name=name, on_stop=on_stop)
1✔
109
    thread.start()
1✔
110
    if _shutdown_hook:
1✔
111
        TMP_THREADS.append(thread)
1✔
112
    return thread
1✔
113

114

115
def start_worker_thread(
1✔
116
    method: "Callable[P, T]", params: Any = None, name: str | None = None
117
) -> FuncThread:
118
    return start_thread(method, params, _shutdown_hook=False, name=name or "start_worker_thread")
1✔
119

120

121
def cleanup_threads_and_processes(quiet: bool = True) -> None:
1✔
122
    from localstack.utils.run import kill_process_tree
1✔
123

124
    for thread in TMP_THREADS:
1✔
125
        if thread:
1✔
126
            try:
1✔
127
                if hasattr(thread, "shutdown"):
1✔
128
                    thread.shutdown()
1✔
129
                    continue
1✔
130
                if hasattr(thread, "kill"):
1✔
UNCOV
131
                    thread.kill()
×
UNCOV
132
                    continue
×
133
                thread.stop(quiet=quiet)
1✔
134
            except Exception as e:
1✔
135
                LOG.debug("[shutdown] Error stopping thread %s: %s", thread, e)
1✔
136
                if not thread.daemon:
1✔
UNCOV
137
                    LOG.warning(
×
138
                        "[shutdown] Non-daemon thread %s may block localstack shutdown", thread
139
                    )
140
    for proc in TMP_PROCESSES:
1✔
141
        try:
×
UNCOV
142
            kill_process_tree(proc.pid)
×
143
            # proc.terminate()
UNCOV
144
        except Exception as e:
×
145
            LOG.debug("[shutdown] Error cleaning up process tree %s: %s", proc, e)
×
146
    # clean up async tasks
147
    try:
1✔
148
        import asyncio
1✔
149

150
        for task in asyncio.all_tasks():
1✔
UNCOV
151
            try:
×
UNCOV
152
                task.cancel()
×
UNCOV
153
            except Exception as e:
×
UNCOV
154
                LOG.debug("[shutdown] Error cancelling asyncio task %s: %s", task, e)
×
155
    except Exception:
1✔
156
        pass
1✔
157
    LOG.debug("[shutdown] Done cleaning up threads / processes / tasks")
1✔
158
    # clear lists
159
    TMP_THREADS.clear()
1✔
160
    TMP_PROCESSES.clear()
1✔
161

162

163
def parallelize(func: Callable, arr: list, size: int = None):  # type: ignore
1✔
164
    if not size:
1✔
UNCOV
165
        size = len(arr)
×
166
    if size <= 0:
1✔
UNCOV
167
        return None
×
168

169
    with Pool(size) as pool:
1✔
170
        return pool.map(func, arr)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc