• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.79
/localstack-core/localstack/utils/sync.py
1
"""Concurrency synchronization utilities"""
2

3
import functools
1✔
4
import threading
1✔
5
import time
1✔
6
from collections import defaultdict
1✔
7
from collections.abc import Callable
1✔
8
from typing import Literal, TypeVar
1✔
9

10

11
class ShortCircuitWaitException(Exception):
1✔
12
    """raise to immediately stop waiting, e.g. when an operation permanently failed"""
13

14
    pass
1✔
15

16

17
def wait_until(
1✔
18
    fn: Callable[[], bool],
19
    wait: float = 1.0,
20
    max_retries: int = 10,
21
    strategy: Literal["exponential", "static", "linear"] = "exponential",
22
    _retries: int = 1,
23
    _max_wait: float = 240,
24
) -> bool:
25
    """waits until a given condition is true, rechecking it periodically"""
26
    assert _retries > 0
1✔
27
    if max_retries < _retries:
1✔
28
        return False
1✔
29
    try:
1✔
30
        completed = fn()
1✔
31
    except ShortCircuitWaitException:
1✔
32
        return False
×
33
    except Exception:
1✔
34
        completed = False
1✔
35

36
    if completed:
1✔
37
        return True
1✔
38
    else:
39
        if wait > _max_wait:
1✔
40
            return False
×
41
        time.sleep(wait)
1✔
42
        next_wait = wait  # default: static
1✔
43
        if strategy == "linear":
1✔
44
            next_wait = (wait / _retries) * (_retries + 1)
1✔
45
        elif strategy == "exponential":
1✔
46
            next_wait = wait * 2
1✔
47
        return wait_until(fn, next_wait, max_retries, strategy, _retries + 1, _max_wait)
1✔
48

49

50
T = TypeVar("T")
1✔
51

52

53
def retry(function: Callable[..., T], retries=3, sleep=1.0, sleep_before=0, **kwargs) -> T:
1✔
54
    raise_error = None
1✔
55
    if sleep_before > 0:
1✔
56
        time.sleep(sleep_before)
1✔
57
    retries = int(retries)
1✔
58
    for i in range(0, retries + 1):
1✔
59
        try:
1✔
60
            return function(**kwargs)
1✔
61
        except Exception as error:
1✔
62
            raise_error = error
1✔
63
            time.sleep(sleep)
1✔
64
    raise raise_error
1✔
65

66

67
def poll_condition(condition, timeout: float = None, interval: float = 0.5) -> bool:
1✔
68
    """
69
    Poll evaluates the given condition until a truthy value is returned. It does this every `interval` seconds
70
    (0.5 by default), until the timeout (in seconds, if any) is reached.
71

72
    Poll returns True once `condition()` returns a truthy value, or False if the timeout is reached.
73
    """
74
    remaining = 0
1✔
75
    if timeout is not None:
1✔
76
        remaining = timeout
1✔
77

78
    while not condition():
1✔
79
        if timeout is not None:
1✔
80
            remaining -= interval
1✔
81

82
            if remaining <= 0:
1✔
83
                return False
×
84

85
        time.sleep(interval)
1✔
86

87
    return True
1✔
88

89

90
def synchronized(lock=None):
1✔
91
    """
92
    Synchronization decorator as described in
93
    http://blog.dscpl.com.au/2014/01/the-missing-synchronized-decorator.html.
94
    """
95

96
    def _decorator(wrapped):
1✔
97
        @functools.wraps(wrapped)
1✔
98
        def _wrapper(*args, **kwargs):
1✔
99
            with lock:
1✔
100
                return wrapped(*args, **kwargs)
1✔
101

102
        return _wrapper
1✔
103

104
    return _decorator
1✔
105

106

107
def sleep_forever():
1✔
108
    while True:
×
109
        time.sleep(1)
×
110

111

112
class SynchronizedDefaultDict(defaultdict):
1✔
113
    def __init__(self, *args, **kwargs):
1✔
114
        super().__init__(*args, **kwargs)
1✔
115
        self._lock = threading.RLock()
1✔
116

117
    def fromkeys(self, keys, value=None):
1✔
118
        with self._lock:
×
119
            return super().fromkeys(keys, value)
×
120

121
    def __getitem__(self, key):
1✔
122
        with self._lock:
1✔
123
            return super().__getitem__(key)
1✔
124

125
    def __setitem__(self, key, value):
1✔
126
        with self._lock:
1✔
127
            super().__setitem__(key, value)
1✔
128

129
    def __delitem__(self, key):
1✔
130
        with self._lock:
1✔
131
            super().__delitem__(key)
1✔
132

133
    def __iter__(self):
1✔
134
        with self._lock:
×
135
            return super().__iter__()
×
136

137
    def __len__(self):
1✔
138
        with self._lock:
×
139
            return super().__len__()
×
140

141
    def __str__(self):
1✔
142
        with self._lock:
×
143
            return super().__str__()
×
144

145

146
class Once:
1✔
147
    """
148
    An object that will perform an action exactly once.
149
    Inspired by Golang's [sync.Once](https://pkg.go.dev/sync#Once) operation.
150

151

152
    ### Example 1
153

154
    Multiple threads using `Once::do` to ensure only 1 line is printed.
155

156
    ```python
157
    import threading
158
    import time
159
    import random
160

161
    greet_once = Once()
162
    def greet():
163
        print("This should happen only once.")
164

165
    greet_threads = []
166
    for _ in range(10):
167
        t = threading.Thread(target=lambda: greet_once.do(greet))
168
        greet_threads.append(t)
169
        t.start()
170

171
    for t in greet_threads:
172
        t.join()
173
    ```
174

175

176
    ### Example 2
177

178
    Ensuring idemponent calling to prevent exceptions on multiple calls.
179

180
    ```python
181
    import os
182

183
    class Service:
184
        close_once: sync.Once
185

186
    def start(self):
187
        with open("my-service.txt) as f:
188
            myfile.write("Started service")
189

190
    def close(self):
191
        # Ensure we only ever delete the file once on close
192
        self.close_once.do(lambda: os.remove("my-service.txt"))
193

194
    ```
195

196

197
    """
198

199
    _is_done: bool = False
1✔
200
    _mu: threading.Lock = threading.Lock()
1✔
201

202
    def do(self, fn: Callable[[], None]):
1✔
203
        """
204
        `do` calls the function `fn()` if-and-only-if `do` has never been called before.
205

206
        This ensures idempotent and thread-safe execution.
207

208
        If the function raises an exception, `do` considers `fn` as done, where subsequent calls are still no-ops.
209
        """
210
        if self._is_done:
1✔
211
            return
1✔
212

213
        with self._mu:
1✔
214
            if not self._is_done:
1✔
215
                try:
1✔
216
                    fn()
1✔
217
                finally:
218
                    self._is_done = True
1✔
219

220

221
def once_func(fn: Callable[..., T]) -> Callable[..., T | None]:
1✔
222
    """
223
    Wraps and returns a function that can only ever execute once.
224

225
    The first call to the returned function will permanently set the result.
226
    If the wrapped function raises an exception, this will be re-raised on each subsequent call.
227

228
    This function can be used either as a decorator or called directly.
229

230
    Direct usage:
231
    ```python
232
    delete_file = once_func(os.remove)
233

234
    delete_file("myfile.txt")  # deletes the file
235
    delete_file("myfile.txt")  # does nothing
236
    ```
237

238
    As a decorator:
239
    ```python
240
    @once_func
241
    def delete_file():
242
        os.remove("myfile.txt")
243

244
    delete_file()  # deletes the file
245
    delete_file()  # does nothing
246
    ```
247
    """
248
    once = Once()
1✔
249

250
    result, exception = None, None
1✔
251

252
    def _do(*args, **kwargs):
1✔
253
        nonlocal result, exception
254
        try:
1✔
255
            result = fn(*args, **kwargs)
1✔
256
        except Exception as e:
1✔
257
            exception = e
1✔
258
            raise
1✔
259

260
    @functools.wraps(fn)
1✔
261
    def wrapper(*args, **kwargs):
1✔
262
        once.do(lambda: _do(*args, **kwargs))
1✔
263
        if exception is not None:
1✔
264
            raise exception
1✔
265
        return result
1✔
266

267
    return wrapper
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc