• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19558051963

20 Nov 2025 05:48PM UTC coverage: 86.859% (-0.05%) from 86.907%
19558051963

push

github

web-flow
Sns:v2 publish (#13399)

199 of 279 new or added lines in 5 files covered. (71.33%)

168 existing lines in 9 files now uncovered.

68851 of 79268 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.97
/localstack-core/localstack/utils/collections.py
1
"""
2
This package provides custom collection types, as well as tools to analyze
3
and manipulate python collection (dicts, list, sets).
4
"""
5

6
import logging
1✔
7
import re
1✔
8
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sized
1✔
9
from typing import (
1✔
10
    Any,
11
    Optional,
12
    TypedDict,
13
    TypeVar,
14
    Union,
15
    cast,
16
    get_args,
17
    get_origin,
18
)
19

20
import cachetools
1✔
21

22
LOG = logging.getLogger(__name__)
1✔
23

24
# default regex to match an item in a comma-separated list string
25
DEFAULT_REGEX_LIST_ITEM = r"[\w-]+"
1✔
26

27
_E = TypeVar("_E")
1✔
28
"""TypeVar var used internally for container type parameters."""
1✔
29

30

31
class AccessTrackingDict(dict):
1✔
32
    """
33
    Simple utility class that can be used to track (write) accesses to a dict's attributes.
34
    Note: could also be written as a proxy, to preserve the identity of "wrapped" - for now, it
35
          simply duplicates the entries of "wrapped" in the constructor, for simplicity.
36
    """
37

38
    def __init__(self, wrapped, callback: Callable[[dict, str, list, dict], Any] = None):
1✔
UNCOV
39
        super().__init__(wrapped)
×
40
        self.callback = callback
×
41

42
    def __setitem__(self, key, value):
1✔
UNCOV
43
        self.callback and self.callback(self, "__setitem__", [key, value], {})
×
UNCOV
44
        return super().__setitem__(key, value)
×
45

46

47
class DelSafeDict(dict):
1✔
48
    """Useful when applying jsonpatch. Use it as follows:
49

50
    obj.__dict__ = DelSafeDict(obj.__dict__)
51
    apply_patch(obj.__dict__, patch)
52
    """
53

54
    def __delitem__(self, key, *args, **kwargs):
1✔
55
        self[key] = None
1✔
56

57

58
class ImmutableList(tuple):
1✔
59
    """
60
    Wrapper class to create an immutable view of a given list or sequence.
61
    Note: Currently, this is simply a wrapper around `tuple` - could be replaced with
62
    custom implementations over time, if needed.
63
    """
64

65

66
class HashableList(ImmutableList):
1✔
67
    """Hashable, immutable list wrapper that can be used with dicts or hash sets."""
68

69
    def __hash__(self):
1✔
70
        return sum(hash(i) for i in self)
1✔
71

72

73
class ImmutableDict(Mapping):
1✔
74
    """Wrapper class to create an immutable view of a given list or sequence."""
75

76
    def __init__(self, seq=None, **kwargs):
1✔
77
        self._dict = dict(seq, **kwargs)
1✔
78

79
    def __len__(self) -> int:
1✔
80
        return self._dict.__len__()
1✔
81

82
    def __iter__(self) -> Iterator:
1✔
83
        return self._dict.__iter__()
1✔
84

85
    def __getitem__(self, key):
1✔
86
        return self._dict.__getitem__(key)
1✔
87

88
    def __eq__(self, other):
1✔
89
        return self._dict.__eq__(other._dict if isinstance(other, ImmutableDict) else other)
1✔
90

91
    def __str__(self):
1✔
UNCOV
92
        return self._dict.__str__()
×
93

94

95
class HashableJsonDict(ImmutableDict):
1✔
96
    """
97
    Simple dict wrapper that can be used with dicts or hash sets. Note: the assumption is that the dict
98
    can be JSON-encoded (i.e., must be acyclic and contain only lists/dicts and simple types)
99
    """
100

101
    def __hash__(self):
1✔
102
        from localstack.utils.json import canonical_json
1✔
103

104
        return hash(canonical_json(self._dict))
1✔
105

106

107
class PaginatedList(list[_E]):
1✔
108
    """List which can be paginated and filtered. For usage in AWS APIs with paginated responses"""
109

110
    DEFAULT_PAGE_SIZE = 50
1✔
111

112
    def get_page(
1✔
113
        self,
114
        token_generator: Callable[[_E], str],
115
        next_token: str = None,
116
        page_size: int = None,
117
        filter_function: Callable[[_E], bool] = None,
118
    ) -> tuple[list[_E], str | None]:
119
        if filter_function is not None:
1✔
120
            result_list = list(filter(filter_function, self))
1✔
121
        else:
122
            result_list = self
1✔
123

124
        if page_size is None:
1✔
125
            page_size = self.DEFAULT_PAGE_SIZE
1✔
126

127
        # returns all or remaining elements in final page.
128
        if len(result_list) <= page_size and next_token is None:
1✔
129
            return result_list, None
1✔
130

131
        start_idx = 0
1✔
132

133
        try:
1✔
134
            start_item = next(item for item in result_list if token_generator(item) == next_token)
1✔
135
            start_idx = result_list.index(start_item)
1✔
136
        except StopIteration:
1✔
137
            pass
1✔
138

139
        if start_idx + page_size < len(result_list):
1✔
140
            next_token = token_generator(result_list[start_idx + page_size])
1✔
141
        else:
142
            next_token = None
1✔
143

144
        return result_list[start_idx : start_idx + page_size], next_token
1✔
145

146

147
class CustomExpiryTTLCache(cachetools.TTLCache):
1✔
148
    """TTLCache that allows to set custom expiry times for individual keys."""
149

150
    def set_expiry(self, key: Any, ttl: float | int) -> float:
1✔
151
        """Set the expiry of the given key in a TTLCache to (<current_time> + <ttl>)"""
152
        with self.timer as time:
1✔
153
            # note: need to access the internal dunder API here
154
            self._TTLCache__getlink(key).expires = expiry = time + ttl
1✔
155
            return expiry
1✔
156

157

158
def get_safe(dictionary, path, default_value=None):
1✔
159
    """
160
    Performs a safe navigation on a Dictionary object and
161
    returns the result or default value (if specified).
162
    The function follows a common AWS path resolution pattern "$.a.b.c".
163

164
    :type dictionary: dict
165
    :param dictionary: Dict to perform safe navigation.
166

167
    :type path: list|str
168
    :param path: List or dot-separated string containing the path of an attribute,
169
                 starting from the root node "$".
170

171
    :type default_value: any
172
    :param default_value: Default value to return in case resolved value is None.
173

174
    :rtype: any
175
    :return: Resolved value or default_value.
176
    """
177
    if not isinstance(dictionary, dict) or len(dictionary) == 0:
1✔
178
        return default_value
1✔
179

180
    attribute_path = path if isinstance(path, list) else path.split(".")
1✔
181
    if len(attribute_path) == 0 or attribute_path[0] != "$":
1✔
182
        raise AttributeError('Safe navigation must begin with a root node "$"')
×
183

184
    current_value = dictionary
1✔
185
    for path_node in attribute_path:
1✔
186
        if path_node == "$":
1✔
187
            continue
1✔
188

189
        if re.compile("^\\d+$").search(str(path_node)):
1✔
190
            path_node = int(path_node)
1✔
191

192
        if isinstance(current_value, dict) and path_node in current_value:
1✔
193
            current_value = current_value[path_node]
1✔
194
        elif isinstance(current_value, list) and path_node < len(current_value):
1✔
195
            current_value = current_value[path_node]
1✔
196
        else:
197
            current_value = None
1✔
198

199
    return current_value or default_value
1✔
200

201

202
def set_safe_mutable(dictionary, path, value):
1✔
203
    """
204
    Mutates original dict and sets the specified value under provided path.
205

206
    :type dictionary: dict
207
    :param dictionary: Dict to mutate.
208

209
    :type path: list|str
210
    :param path: List or dot-separated string containing the path of an attribute,
211
                 starting from the root node "$".
212

213
    :type value: any
214
    :param value: Value to set under specified path.
215

216
    :rtype: dict
217
    :return: Returns mutated dictionary.
218
    """
219
    if not isinstance(dictionary, dict):
1✔
220
        raise AttributeError('"dictionary" must be of type "dict"')
×
221

222
    attribute_path = path if isinstance(path, list) else path.split(".")
1✔
223
    attribute_path_len = len(attribute_path)
1✔
224

225
    if attribute_path_len == 0 or attribute_path[0] != "$":
1✔
226
        raise AttributeError('Dict navigation must begin with a root node "$"')
×
227

228
    current_pointer = dictionary
1✔
229
    for i in range(attribute_path_len):
1✔
230
        path_node = attribute_path[i]
1✔
231

232
        if path_node == "$":
1✔
233
            continue
1✔
234

235
        if i < attribute_path_len - 1:
1✔
236
            if path_node not in current_pointer:
1✔
237
                current_pointer[path_node] = {}
1✔
238
            if not isinstance(current_pointer, dict):
1✔
239
                raise RuntimeError(
×
240
                    'Error while deeply setting a dict value. Supplied path is not of type "dict"'
241
                )
242
        else:
243
            current_pointer[path_node] = value
1✔
244

245
        current_pointer = current_pointer[path_node]
1✔
246

247
    return dictionary
1✔
248

249

250
def pick_attributes(dictionary, paths):
1✔
251
    """
252
    Picks selected attributes a returns them as a new dictionary.
253
    This function works as a whitelist of attributes to keep in a new dictionary.
254

255
    :type dictionary: dict
256
    :param dictionary: Dict to pick attributes from.
257

258
    :type paths: list of (list or str)
259
    :param paths: List of lists or strings with dot-separated paths, starting from the root node "$".
260

261
    :rtype: dict
262
    :return: Returns whitelisted dictionary.
263
    """
264
    new_dictionary = {}
1✔
265

266
    for path in paths:
1✔
267
        value = get_safe(dictionary, path)
1✔
268

269
        if value is not None:
1✔
270
            set_safe_mutable(new_dictionary, path, value)
1✔
271

272
    return new_dictionary
1✔
273

274

275
def select_attributes(obj: dict, attributes: list[str]) -> dict:
1✔
276
    """Select a subset of attributes from the given dict (returns a copy)"""
277
    attributes = attributes if is_list_or_tuple(attributes) else [attributes]
1✔
278
    return {k: v for k, v in obj.items() if k in attributes}
1✔
279

280

281
def remove_attributes(obj: dict, attributes: list[str], recursive: bool = False) -> dict:
1✔
282
    """Remove a set of attributes from the given dict (in-place)"""
283
    from localstack.utils.objects import recurse_object
1✔
284

285
    if recursive:
1✔
286

287
        def _remove(o, **kwargs):
×
288
            if isinstance(o, dict):
×
289
                remove_attributes(o, attributes)
×
290
            return o
×
291

292
        return recurse_object(obj, _remove)
×
293

294
    attributes = ensure_list(attributes)
1✔
295
    for attr in attributes:
1✔
296
        obj.pop(attr, None)
1✔
297
    return obj
1✔
298

299

300
def rename_attributes(
1✔
301
    obj: dict, old_to_new_attributes: dict[str, str], in_place: bool = False
302
) -> dict:
303
    """Rename a set of attributes in the given dict object. Second parameter is a dict that maps old to
304
    new attribute names. Default is to return a copy, but can also pass in_place=True."""
305
    if not in_place:
×
306
        obj = dict(obj)
×
307
    for old_name, new_name in old_to_new_attributes.items():
×
308
        if old_name in obj:
×
309
            obj[new_name] = obj.pop(old_name)
×
310
    return obj
×
311

312

313
def is_list_or_tuple(obj) -> bool:
1✔
314
    return isinstance(obj, (list, tuple))
1✔
315

316

317
def ensure_list(obj: Any, wrap_none=False) -> list | None:
1✔
318
    """Wrap the given object in a list, or return the object itself if it already is a list."""
319
    if obj is None and not wrap_none:
1✔
320
        return obj
1✔
321
    return obj if isinstance(obj, list) else [obj]
1✔
322

323

324
def to_unique_items_list(inputs, comparator=None):
1✔
325
    """Return a list of unique items from the given input iterable.
326
    The comparator(item1, item2) returns True/False or an int for comparison."""
327

328
    def contained(item):
1✔
329
        for r in result:
1✔
330
            if comparator:
1✔
331
                cmp_res = comparator(item, r)
1✔
332
                if cmp_res is True or str(cmp_res) == "0":
1✔
333
                    return True
1✔
334
            elif item == r:
1✔
335
                return True
1✔
336

337
    result = []
1✔
338
    for it in inputs:
1✔
339
        if not contained(it):
1✔
340
            result.append(it)
1✔
341
    return result
1✔
342

343

344
def merge_recursive(source, destination, none_values=None, overwrite=False):
1✔
345
    if none_values is None:
1✔
346
        none_values = [None]
1✔
347
    for key, value in source.items():
1✔
348
        if isinstance(value, dict):
1✔
349
            # get node or create one
350
            node = destination.setdefault(key, {})
1✔
351
            merge_recursive(value, node, none_values=none_values, overwrite=overwrite)
1✔
352
        else:
353
            from requests.models import CaseInsensitiveDict
1✔
354

355
            if not isinstance(destination, (dict, CaseInsensitiveDict)):
1✔
356
                LOG.warning(
×
357
                    "Destination for merging %s=%s is not dict: %s (%s)",
358
                    key,
359
                    value,
360
                    destination,
361
                    type(destination),
362
                )
363
            if overwrite or destination.get(key) in none_values:
1✔
364
                destination[key] = value
1✔
365
    return destination
1✔
366

367

368
def merge_dicts(*dicts, **kwargs):
1✔
369
    """Merge all dicts in `*dicts` into a single dict, and return the result. If any of the entries
370
    in `*dicts` is None, and `default` is specified as keyword argument, then return `default`."""
371
    result = {}
×
372
    for d in dicts:
×
373
        if d is None and "default" in kwargs:
×
374
            return kwargs["default"]
×
375
        if d:
×
376
            result.update(d)
×
377
    return result
×
378

379

380
def remove_none_values_from_dict(dict: dict) -> dict:
1✔
381
    return {k: v for (k, v) in dict.items() if v is not None}
1✔
382

383

384
def last_index_of(array, value):
1✔
385
    """Return the last index of `value` in the given list, or -1 if it does not exist."""
386
    result = -1
×
387
    for i in reversed(range(len(array))):
×
388
        entry = array[i]
×
389
        if entry == value or (callable(value) and value(entry)):
×
390
            return i
×
391
    return result
×
392

393

394
def is_sub_dict(child_dict: dict, parent_dict: dict) -> bool:
1✔
395
    """Returns whether the first dict is a sub-dict (subset) of the second dict."""
396
    return all(parent_dict.get(key) == val for key, val in child_dict.items())
×
397

398

399
def items_equivalent(list1, list2, comparator):
1✔
400
    """Returns whether two lists are equivalent (i.e., same items contained in both lists,
401
    irrespective of the items' order) with respect to a comparator function."""
402

403
    def contained(item):
×
404
        for _item in list2:
×
405
            if comparator(item, _item):
×
406
                return True
×
407

408
    if len(list1) != len(list2):
×
409
        return False
×
410
    for item in list1:
×
411
        if not contained(item):
×
412
            return False
×
413
    return True
×
414

415

416
def is_none_or_empty(obj: str | None | list | None) -> bool:
1✔
417
    return (
1✔
418
        obj is None
419
        or (isinstance(obj, str) and obj.strip() == "")
420
        or (isinstance(obj, Sized) and len(obj) == 0)
421
    )
422

423

424
def select_from_typed_dict(typed_dict: type[TypedDict], obj: dict, filter: bool = False) -> dict:
1✔
425
    """
426
    Select a subset of attributes from a dictionary based on the keys of a given `TypedDict`.
427
    :param typed_dict: the `TypedDict` blueprint
428
    :param obj: the object to filter
429
    :param filter: if True, remove all keys with an empty (e.g., empty string or dictionary) or `None` value
430
    :return: the resulting dictionary (it returns a copy)
431
    """
432
    selection = select_attributes(
1✔
433
        obj, [*typed_dict.__required_keys__, *typed_dict.__optional_keys__]
434
    )
435
    if filter:
1✔
436
        selection = {k: v for k, v in selection.items() if v}
1✔
437
    return selection
1✔
438

439

440
T = TypeVar("T", bound=dict)
1✔
441

442

443
def convert_to_typed_dict(typed_dict: type[T], obj: dict, strict: bool = False) -> T:
1✔
444
    """
445
    Converts the given object to the given typed dict (by calling the type constructors).
446
    Limitations:
447
    - This does not work for ForwardRefs (type refs in quotes).
448
    - If a type is a Union, the first type is used for the conversion.
449
    - The conversion fails for types which cannot be instantiated with the constructor.
450

451
    :param typed_dict: to convert the given object to
452
    :param obj: object to convert matching keys to the types defined in the typed dict
453
    :param strict: True if a TypeError should be raised in case the conversion fails
454
    :return: obj converted to the typed dict T
455
    """
456
    result = cast(T, select_from_typed_dict(typed_dict, obj, filter=True))
1✔
457
    for key, key_type in typed_dict.__annotations__.items():
1✔
458
        if key in result:
1✔
459
            # If it's a Union, or optional, we extract the first type argument
460
            if get_origin(key_type) in [Union, Optional]:
1✔
461
                key_type = get_args(key_type)[0]
1✔
462
            # Use duck-typing to check if the dict is a typed dict
463
            if hasattr(key_type, "__required_keys__") and hasattr(key_type, "__optional_keys__"):
1✔
464
                result[key] = convert_to_typed_dict(key_type, result[key])
1✔
465
            else:
466
                # Otherwise, we call the type's constructor (on a best-effort basis)
467
                try:
1✔
468
                    result[key] = key_type(result[key])
1✔
469
                except TypeError as e:
1✔
470
                    if strict:
1✔
471
                        raise e
1✔
472
                    else:
473
                        LOG.debug("Could not convert %s to %s.", key, key_type)
1✔
474
    return result
1✔
475

476

477
def dict_multi_values(elements: list | dict) -> dict[str, list[Any]]:
1✔
478
    """
479
    Return a dictionary with the original keys from the list of dictionary and the
480
    values are the list of values of the original dictionary.
481
    """
482
    result_dict = {}
1✔
483
    if isinstance(elements, dict):
1✔
484
        for key, value in elements.items():
1✔
485
            if isinstance(value, list):
1✔
486
                result_dict[key] = value
1✔
487
            else:
488
                result_dict[key] = [value]
1✔
489
    elif isinstance(elements, list):
1✔
490
        if isinstance(elements[0], list):
1✔
491
            for key, value in elements:
1✔
492
                if key in result_dict:
1✔
493
                    result_dict[key].append(value)
1✔
494
                else:
495
                    result_dict[key] = [value]
1✔
496
        else:
497
            result_dict[elements[0]] = elements[1:]
1✔
498
    return result_dict
1✔
499

500

501
ItemType = TypeVar("ItemType")
1✔
502

503

504
def split_list_by(
1✔
505
    lst: Iterable[ItemType], predicate: Callable[[ItemType], bool]
506
) -> tuple[list[ItemType], list[ItemType]]:
507
    truthy, falsy = [], []
×
508

509
    for item in lst:
×
510
        if predicate(item):
×
511
            truthy.append(item)
×
512
        else:
513
            falsy.append(item)
×
514

515
    return truthy, falsy
×
516

517

518
def is_comma_delimited_list(string: str, item_regex: str | None = None) -> bool:
1✔
519
    """
520
    Checks if the given string is a comma-delimited list of items.
521
    The optional `item_regex` parameter specifies the regex pattern for each item in the list.
522
    """
523
    item_regex = item_regex or DEFAULT_REGEX_LIST_ITEM
1✔
524

525
    pattern = re.compile(rf"^\s*({item_regex})(\s*,\s*{item_regex})*\s*$")
1✔
526
    if pattern.match(string) is None:
1✔
527
        return False
1✔
528
    return True
1✔
529

530

531
def optional_list(condition: bool, items: Iterable[_E]) -> list[_E]:
1✔
532
    """
533
    Given an iterable, either create a list out of the entire iterable (if `condition` is `True`), or return the empty list.
534
    >>> print(optional_list(True, [1, 2, 3]))
535
    [1, 2, 3]
536
    >>> print(optional_list(False, [1, 2, 3]))
537
    []
538
    """
539
    return list(filter(lambda _: condition, items))
1✔
540

541

542
def iter_chunks(items: list[_E], chunk_size: int) -> Generator[list[_E], None, None]:
1✔
543
    """
544
    Split a list into smaller chunks of a specified size and iterate over them.
545

546
    It is implemented as a generator and yields each chunk as needed, making it memory-efficient for large lists.
547

548
    :param items:  A list of elements to be divided into chunks.
549
    :param chunk_size: The maximum number of elements that a single chunk can contain.
550
    :return: A generator that yields chunks (sublists) of the original list. Each chunk contains up to `chunk_size`
551
    elements.
552
    """
UNCOV
553
    for i in range(0, len(items), chunk_size):
×
UNCOV
554
        yield items[i : i + chunk_size]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc