• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21575916503

30 Jan 2026 10:27AM UTC coverage: 86.969% (+0.007%) from 86.962%
21575916503

push

github

web-flow
Admin: Add typehints to utils/strings and utils/threads (#13658)

39 of 42 new or added lines in 3 files covered. (92.86%)

27 existing lines in 1 file now uncovered.

70391 of 80938 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.26
/localstack-core/localstack/utils/threads.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import subprocess
1✔
4
import threading
1✔
5
import traceback
1✔
6
from collections.abc import Callable
1✔
7
from concurrent.futures import Future
1✔
8
from multiprocessing.dummy import Pool
1✔
9
from typing import TYPE_CHECKING, Any, TypeVar
1✔
10

11
if TYPE_CHECKING:
1✔
NEW
12
    from typing_extensions import ParamSpec
×
13

NEW
14
    P = ParamSpec("P")
×
15

16
T = TypeVar("T")
1✔
17

18
LOG = logging.getLogger(__name__)
1✔
19

20
# arrays for temporary threads and resources
21
TMP_THREADS: list["FuncThread"] = []
1✔
22
TMP_PROCESSES: list[subprocess.Popen[Any]] = []
1✔
23

24
counter_lock = threading.Lock()
1✔
25
counter = 0
1✔
26

27

28
class FuncThread(threading.Thread):
1✔
29
    """Helper class to run a Python function in a background thread."""
30

31
    def __init__(
1✔
32
        self,
33
        func: "Callable[P, T]",
34
        params: Any = None,
35
        quiet: bool = False,
36
        on_stop: Callable[["FuncThread"], None] | None = None,
37
        name: str | None = None,
38
        daemon: bool = True,
39
    ):
40
        global counter
41
        global counter_lock
42

43
        if name:
1✔
44
            with counter_lock:
1✔
45
                counter += 1
1✔
46
                thread_counter_current = counter
1✔
47

48
            threading.Thread.__init__(
1✔
49
                self, name=f"{name}-functhread{thread_counter_current}", daemon=daemon
50
            )
51
        else:
52
            threading.Thread.__init__(self, daemon=daemon)
1✔
53

54
        self.params = params
1✔
55
        self.func = func
1✔
56
        self.quiet = quiet
1✔
57
        self.result_future: Future[T | Exception | None] = Future()
1✔
58
        self._stop_event = threading.Event()
1✔
59
        self.on_stop = on_stop
1✔
60

61
    def run(self) -> None:
1✔
62
        result: Any = None
1✔
63
        try:
1✔
64
            kwargs = {}  # type: ignore[var-annotated]
1✔
65
            result = self.func(self.params, **kwargs)
1✔
66
        except Exception as e:
1✔
67
            self.result_future.set_exception(e)
1✔
68
            result = e
1✔
69
            if not self.quiet:
1✔
70
                LOG.info(
1✔
71
                    "Thread run method %s(%s) failed: %s %s",
72
                    self.func,
73
                    self.params,
74
                    e,
75
                    traceback.format_exc(),
76
                )
77
        finally:
78
            try:
1✔
79
                self.result_future.set_result(result)
1✔
80
                pass
1✔
81
            except concurrent.futures.InvalidStateError as e:
1✔
82
                # this can happen on shutdown if the task is already canceled
83
                LOG.debug(e)
1✔
84

85
    @property
1✔
86
    def running(self) -> bool:
1✔
87
        return not self._stop_event.is_set()
1✔
88

89
    def stop(self, quiet: bool = False) -> None:
1✔
90
        self._stop_event.set()
1✔
91

92
        if self.on_stop:
1✔
93
            try:
1✔
94
                self.on_stop(self)
1✔
95
            except Exception as e:
×
96
                LOG.warning("error while calling on_stop callback: %s", e)
×
97

98

99
def start_thread(
1✔
100
    method: "Callable[P, T]",
101
    params: Any = None,
102
    quiet: bool = False,
103
    on_stop: Callable[["FuncThread"], None] | None = None,
104
    _shutdown_hook: bool = True,
105
    name: str | None = None,
106
) -> FuncThread:
107
    """Start the given method in a background thread, and add the thread to the TMP_THREADS shutdown hook"""
108
    if not name:
1✔
109
        # technically we should add a new level here for *internal* warnings
110
        LOG.debug("start_thread called without providing a custom name")
1✔
111
    name = name or method.__name__
1✔
112
    thread = FuncThread(method, params=params, quiet=quiet, name=name, on_stop=on_stop)
1✔
113
    thread.start()
1✔
114
    if _shutdown_hook:
1✔
115
        TMP_THREADS.append(thread)
1✔
116
    return thread
1✔
117

118

119
def start_worker_thread(
1✔
120
    method: "Callable[P, T]", params: Any = None, name: str | None = None
121
) -> FuncThread:
122
    return start_thread(method, params, _shutdown_hook=False, name=name or "start_worker_thread")
1✔
123

124

125
def cleanup_threads_and_processes(quiet: bool = True) -> None:
1✔
126
    from localstack.utils.run import kill_process_tree
1✔
127

128
    for thread in TMP_THREADS:
1✔
129
        if thread:
1✔
130
            try:
1✔
131
                if hasattr(thread, "shutdown"):
1✔
132
                    thread.shutdown()
1✔
133
                    continue
1✔
134
                if hasattr(thread, "kill"):
1✔
135
                    thread.kill()
×
136
                    continue
×
137
                thread.stop(quiet=quiet)
1✔
138
            except Exception as e:
1✔
139
                LOG.debug("[shutdown] Error stopping thread %s: %s", thread, e)
1✔
140
                if not thread.daemon:
1✔
141
                    LOG.warning(
×
142
                        "[shutdown] Non-daemon thread %s may block localstack shutdown", thread
143
                    )
144
    for proc in TMP_PROCESSES:
1✔
145
        try:
×
146
            kill_process_tree(proc.pid)
×
147
            # proc.terminate()
148
        except Exception as e:
×
149
            LOG.debug("[shutdown] Error cleaning up process tree %s: %s", proc, e)
×
150
    # clean up async tasks
151
    try:
1✔
152
        import asyncio
1✔
153

154
        for task in asyncio.all_tasks():
1✔
155
            try:
×
156
                task.cancel()
×
157
            except Exception as e:
×
158
                LOG.debug("[shutdown] Error cancelling asyncio task %s: %s", task, e)
×
159
    except Exception:
1✔
160
        pass
1✔
161
    LOG.debug("[shutdown] Done cleaning up threads / processes / tasks")
1✔
162
    # clear lists
163
    TMP_THREADS.clear()
1✔
164
    TMP_PROCESSES.clear()
1✔
165

166

167
def parallelize(func: Callable, arr: list, size: int = None):  # type: ignore
1✔
168
    if not size:
1✔
169
        size = len(arr)
×
170
    if size <= 0:
1✔
171
        return None
×
172

173
    with Pool(size) as pool:
1✔
174
        return pool.map(func, arr)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc