• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18505123992

14 Oct 2025 05:30PM UTC coverage: 86.888% (-0.01%) from 86.899%
18505123992

push

github

web-flow
S3: fix `aws-global` validation in CreateBucket (#13250)

10 of 10 new or added lines in 4 files covered. (100.0%)

831 existing lines in 40 files now uncovered.

68028 of 78294 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.52
/localstack-core/localstack/utils/sync.py
1
"""Concurrency synchronization utilities"""
2

3
import functools
1✔
4
import threading
1✔
5
import time
1✔
6
from collections import defaultdict
1✔
7
from collections.abc import Callable
1✔
8
from typing import Literal, TypeVar
1✔
9

10

11
class ShortCircuitWaitException(Exception):
1✔
12
    """raise to immediately stop waiting, e.g. when an operation permanently failed"""
13

14
    pass
1✔
15

16

17
def wait_until(
1✔
18
    fn: Callable[[], bool],
19
    wait: float = 1.0,
20
    max_retries: int = 10,
21
    strategy: Literal["exponential", "static", "linear"] = "exponential",
22
    _retries: int = 1,
23
    _max_wait: float = 240,
24
) -> bool:
25
    """waits until a given condition is true, rechecking it periodically"""
26
    assert _retries > 0
1✔
27
    if max_retries < _retries:
1✔
28
        return False
1✔
29
    try:
1✔
30
        completed = fn()
1✔
31
    except ShortCircuitWaitException:
1✔
UNCOV
32
        return False
×
33
    except Exception:
1✔
34
        completed = False
1✔
35

36
    if completed:
1✔
37
        return True
1✔
38
    else:
39
        if wait > _max_wait:
1✔
UNCOV
40
            return False
×
41
        time.sleep(wait)
1✔
42
        next_wait = wait  # default: static
1✔
43
        if strategy == "linear":
1✔
44
            next_wait = (wait / _retries) * (_retries + 1)
1✔
45
        elif strategy == "exponential":
1✔
46
            next_wait = wait * 2
1✔
47
        return wait_until(fn, next_wait, max_retries, strategy, _retries + 1, _max_wait)
1✔
48

49

50
T = TypeVar("T")
1✔
51

52

53
def retry(function: Callable[..., T], retries=3, sleep=1.0, sleep_before=0, **kwargs) -> T:
1✔
54
    raise_error = None
1✔
55
    if sleep_before > 0:
1✔
56
        time.sleep(sleep_before)
1✔
57
    retries = int(retries)
1✔
58
    for i in range(0, retries + 1):
1✔
59
        try:
1✔
60
            return function(**kwargs)
1✔
61
        except Exception as error:
1✔
62
            raise_error = error
1✔
63
            time.sleep(sleep)
1✔
64
    raise raise_error
1✔
65

66

67
def poll_condition(condition, timeout: float = None, interval: float = 0.5) -> bool:
1✔
68
    """
69
    Poll evaluates the given condition until a truthy value is returned. It does this every `interval` seconds
70
    (0.5 by default), until the timeout (in seconds, if any) is reached.
71

72
    Poll returns True once `condition()` returns a truthy value, or False if the timeout is reached.
73
    """
74
    remaining = 0
1✔
75
    if timeout is not None:
1✔
76
        remaining = timeout
1✔
77

78
    while not condition():
1✔
79
        if timeout is not None:
1✔
80
            remaining -= interval
1✔
81

82
            if remaining <= 0:
1✔
83
                return False
1✔
84

85
        time.sleep(interval)
1✔
86

87
    return True
1✔
88

89

90
def synchronized(lock=None):
1✔
91
    """
92
    Synchronization decorator as described in
93
    http://blog.dscpl.com.au/2014/01/the-missing-synchronized-decorator.html.
94
    """
95

96
    def _decorator(wrapped):
1✔
97
        @functools.wraps(wrapped)
1✔
98
        def _wrapper(*args, **kwargs):
1✔
99
            with lock:
1✔
100
                return wrapped(*args, **kwargs)
1✔
101

102
        return _wrapper
1✔
103

104
    return _decorator
1✔
105

106

107
def sleep_forever():
1✔
108
    while True:
×
UNCOV
109
        time.sleep(1)
×
110

111

112
class SynchronizedDefaultDict(defaultdict):
1✔
113
    def __init__(self, *args, **kwargs):
1✔
114
        super().__init__(*args, **kwargs)
1✔
115
        self._lock = threading.RLock()
1✔
116

117
    def fromkeys(self, keys, value=None):
1✔
118
        with self._lock:
×
UNCOV
119
            return super().fromkeys(keys, value)
×
120

121
    def __getitem__(self, key):
1✔
122
        with self._lock:
1✔
123
            return super().__getitem__(key)
1✔
124

125
    def __setitem__(self, key, value):
1✔
126
        with self._lock:
1✔
127
            super().__setitem__(key, value)
1✔
128

129
    def __delitem__(self, key):
1✔
130
        with self._lock:
1✔
131
            super().__delitem__(key)
1✔
132

133
    def __iter__(self):
1✔
134
        with self._lock:
×
UNCOV
135
            return super().__iter__()
×
136

137
    def __len__(self):
1✔
138
        with self._lock:
×
UNCOV
139
            return super().__len__()
×
140

141
    def __str__(self):
1✔
142
        with self._lock:
×
UNCOV
143
            return super().__str__()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc