• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16910279859

12 Aug 2025 01:24PM UTC coverage: 86.848% (+0.01%) from 86.837%
16910279859

push

github

web-flow
fix unit test reporting (#12996)

66740 of 76847 relevant lines covered (86.85%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.13
/localstack-core/localstack/services/cloudformation/provider_utils.py
1
"""
2
A set of utils for use in resource providers.
3

4
Avoid any imports to localstack here and keep external imports to a minimum!
5
This is because we want to be able to package a resource provider without including localstack code.
6
"""
7

8
import builtins
1✔
9
import json
1✔
10
import re
1✔
11
import uuid
1✔
12
from collections.abc import Callable
1✔
13
from copy import deepcopy
1✔
14
from pathlib import Path
1✔
15

16
from botocore.model import Shape, StructureShape
1✔
17

18

19
def generate_default_name(stack_name: str, logical_resource_id: str):
1✔
20
    random_id_part = str(uuid.uuid4())[0:8]
1✔
21
    resource_id_part = logical_resource_id[:24]
1✔
22
    stack_name_part = stack_name[: 63 - 2 - (len(random_id_part) + len(resource_id_part))]
1✔
23
    return f"{stack_name_part}-{resource_id_part}-{random_id_part}"
1✔
24

25

26
def generate_default_name_without_stack(logical_resource_id: str):
1✔
27
    random_id_part = str(uuid.uuid4())[0:8]
1✔
28
    resource_id_part = logical_resource_id[: 63 - 1 - len(random_id_part)]
1✔
29
    return f"{resource_id_part}-{random_id_part}"
1✔
30

31

32
# ========= Helpers for boto calls ==========
33
# (equivalent to the old ones in deployment_utils.py)
34

35

36
def deselect_attributes(model: dict, params: list[str]) -> dict:
1✔
37
    return {k: v for k, v in model.items() if k not in params}
1✔
38

39

40
def select_attributes(model: dict, params: list[str]) -> dict:
1✔
41
    return {k: v for k, v in model.items() if k in params}
1✔
42

43

44
def keys_lower(model: dict) -> dict:
1✔
45
    return {k.lower(): v for k, v in model.items()}
×
46

47

48
def convert_pascalcase_to_lower_camelcase(item: str) -> str:
1✔
49
    if len(item) <= 1:
1✔
50
        return item.lower()
×
51
    else:
52
        return f"{item[0].lower()}{item[1:]}"
1✔
53

54

55
def convert_lower_camelcase_to_pascalcase(item: str) -> str:
1✔
56
    if len(item) <= 1:
1✔
57
        return item.upper()
×
58
    else:
59
        return f"{item[0].upper()}{item[1:]}"
1✔
60

61

62
def _recurse_properties(obj: dict | list, fn: Callable) -> dict | list:
1✔
63
    obj = fn(obj)
1✔
64
    if isinstance(obj, dict):
1✔
65
        return {k: _recurse_properties(v, fn) for k, v in obj.items()}
1✔
66
    elif isinstance(obj, list):
1✔
67
        return [_recurse_properties(v, fn) for v in obj]
1✔
68
    else:
69
        return obj
1✔
70

71

72
def recurse_properties(properties: dict, fn: Callable) -> dict:
1✔
73
    return _recurse_properties(deepcopy(properties), fn)
×
74

75

76
def keys_pascalcase_to_lower_camelcase(model: dict, skip_keys: set = None) -> dict:
1✔
77
    """Recursively change any dicts keys to lower camelcase"""
78

79
    if skip_keys:
1✔
80
        return _pascal_to_camel_keys_preserve_values(model, skip_keys)
1✔
81

82
    def _keys_pascalcase_to_lower_camelcase(obj):
×
83
        if isinstance(obj, dict):
×
84
            return {convert_pascalcase_to_lower_camelcase(k): v for k, v in obj.items()}
×
85
        else:
86
            return obj
×
87

88
    return _recurse_properties(model, _keys_pascalcase_to_lower_camelcase)
×
89

90

91
def _pascal_to_camel_keys_preserve_values(model: dict, skip_keys: set = None) -> dict:
1✔
92
    """
93
    Variant of keys_pascalcase_to_lower_camelcase
94
    All VALUES of provided keys are skipped and not transformed to lower camelcase.
95
    The keys themselves will be transformed.
96
    The function simply stops recursion if a key matches, so make sure no lower level values are ignored.
97
    """
98
    skip_keys = skip_keys or set()
1✔
99

100
    def _transform(obj):
1✔
101
        if isinstance(obj, dict):
1✔
102
            new_dict = {}
1✔
103
            for k, v in obj.items():
1✔
104
                new_key = convert_pascalcase_to_lower_camelcase(k)
1✔
105
                if k in skip_keys:
1✔
106
                    new_dict[new_key] = v
1✔
107
                else:
108
                    new_dict[new_key] = _transform(v)
1✔
109
            return new_dict
1✔
110
        elif isinstance(obj, list):
1✔
111
            return [_transform(i) for i in obj]
1✔
112
        else:
113
            return obj
1✔
114

115
    return _transform(model)
1✔
116

117

118
def keys_lower_camelcase_to_pascalcase(model: dict) -> dict:
1✔
119
    """Recursively change any dicts keys to PascalCase"""
120

121
    def _keys_lower_camelcase_to_pascalcase(obj):
1✔
122
        if isinstance(obj, dict):
1✔
123
            return {convert_lower_camelcase_to_pascalcase(k): v for k, v in obj.items()}
1✔
124
        else:
125
            return obj
1✔
126

127
    return _recurse_properties(model, _keys_lower_camelcase_to_pascalcase)
1✔
128

129

130
def transform_list_to_dict(param, key_attr_name="Key", value_attr_name="Value"):
1✔
131
    result = {}
×
132
    for entry in param:
×
133
        key = entry[key_attr_name]
×
134
        value = entry[value_attr_name]
×
135
        result[key] = value
×
136
    return result
×
137

138

139
def remove_none_values(obj):
1✔
140
    """Remove None values (recursively) in the given object."""
141
    if isinstance(obj, dict):
1✔
142
        return {k: remove_none_values(v) for k, v in obj.items() if v is not None}
1✔
143
    elif isinstance(obj, list):
1✔
144
        return [o for o in obj if o is not None]
1✔
145
    else:
146
        return obj
1✔
147

148

149
# FIXME: this shouldn't be necessary in the future
150
param_validation = re.compile(
1✔
151
    r"Invalid type for parameter (?P<param>[\w.]+), value: (?P<value>\w+), type: <class '(?P<wrong_class>\w+)'>, valid types: <class '(?P<valid_class>\w+)'>"
152
)
153

154

155
def get_nested(obj: dict, path: str):
1✔
156
    parts = path.split(".")
×
157
    result = obj
×
158
    for p in parts[:-1]:
×
159
        result = result.get(p, {})
×
160
    return result.get(parts[-1])
×
161

162

163
def set_nested(obj: dict, path: str, value):
1✔
164
    parts = path.split(".")
×
165
    result = obj
×
166
    for p in parts[:-1]:
×
167
        result = result.get(p, {})
×
168
    result[parts[-1]] = value
×
169

170

171
def fix_boto_parameters_based_on_report(original_params: dict, report: str) -> dict:
1✔
172
    """
173
    Fix invalid type parameter validation errors in boto request parameters
174

175
    :param original_params: original boto request parameters that lead to the parameter validation error
176
    :param report: error report from botocore ParamValidator
177
    :return: a copy of original_params with all values replaced by their correctly cast ones
178
    """
179
    params = deepcopy(original_params)
×
180
    for found in param_validation.findall(report):
×
181
        param_name, value, wrong_class, valid_class = found
×
182
        cast_class = getattr(builtins, valid_class)
×
183
        old_value = get_nested(params, param_name)
×
184

185
        if isinstance(cast_class, bool) and str(old_value).lower() in ["true", "false"]:
×
186
            new_value = str(old_value).lower() == "true"
×
187
        else:
188
            new_value = cast_class(old_value)
×
189
        set_nested(params, param_name, new_value)
×
190
    return params
×
191

192

193
def convert_request_kwargs(parameters: dict, input_shape: StructureShape) -> dict:
1✔
194
    """
195
    Transform a dict of request kwargs for a boto3 request by making sure the keys in the structure recursively conform to the specified input shape.
196
    :param parameters: the kwargs that would be passed to the boto3 client call, e.g. boto3.client("s3").create_bucket(**parameters)
197
    :param input_shape: The botocore input shape of the operation that you want to call later with the fixed inputs
198
    :return: a transformed dictionary with the correct casing recursively applied
199
    """
200

201
    def get_fixed_key(key: str, members: dict[str, Shape]) -> str:
1✔
202
        """return the case-insensitively matched key from the shape or default to the current key"""
203
        for k in members:
1✔
204
            if k.lower() == key.lower():
1✔
205
                return k
1✔
206
        return key
1✔
207

208
    def transform_value(value, member_shape):
1✔
209
        if isinstance(value, dict) and hasattr(member_shape, "members"):
1✔
210
            return convert_request_kwargs(value, member_shape)
1✔
211
        elif isinstance(value, list) and hasattr(member_shape, "member"):
1✔
212
            return [transform_value(item, member_shape.member) for item in value]
×
213

214
        # fix the typing of the value
215
        match member_shape.type_name:
1✔
216
            case "string":
1✔
217
                return str(value)
1✔
218
            case "integer" | "long":
1✔
219
                return int(value)
1✔
220
            case "boolean":
1✔
221
                if isinstance(value, bool):
1✔
222
                    return value
1✔
223
                return True if value.lower() == "true" else False
1✔
224
            case _:
1✔
225
                return value
1✔
226

227
    transformed_dict = {}
1✔
228
    for key, value in parameters.items():
1✔
229
        correct_key = get_fixed_key(key, input_shape.members)
1✔
230
        member_shape = input_shape.members.get(correct_key)
1✔
231

232
        if member_shape is None:
1✔
233
            continue  # skipping this entry, so it's not included in the transformed dict
1✔
234
        elif isinstance(value, dict) and hasattr(member_shape, "members"):
1✔
235
            transformed_dict[correct_key] = convert_request_kwargs(value, member_shape)
1✔
236
        elif isinstance(value, list) and hasattr(member_shape, "member"):
1✔
237
            transformed_dict[correct_key] = [
1✔
238
                transform_value(item, member_shape.member) for item in value
239
            ]
240
        else:
241
            transformed_dict[correct_key] = transform_value(value, member_shape)
1✔
242

243
    return transformed_dict
1✔
244

245

246
def convert_values_to_numbers(input_dict: dict, keys_to_skip: list[str] | None = None):
1✔
247
    """
248
    Recursively converts all string values that represent valid integers
249
    in a dictionary (including nested dictionaries and lists) to integers.
250

251
    Example:
252
    original_dict = {'Gid': '1322', 'SecondaryGids': ['1344', '1452'], 'Uid': '13234'}
253
    output_dict = {'Gid': 1322, 'SecondaryGids': [1344, 1452], 'Uid': 13234}
254

255
    :param input_dict input dict with values to convert
256
    :param keys_to_skip keys to which values are not meant to be converted
257
    :return output_dict
258
    """
259

260
    keys_to_skip = keys_to_skip or []
1✔
261

262
    def recursive_convert(obj):
1✔
263
        if isinstance(obj, dict):
1✔
264
            return {
1✔
265
                key: recursive_convert(value) if key not in keys_to_skip else value
266
                for key, value in obj.items()
267
            }
268
        elif isinstance(obj, list):
1✔
269
            return [recursive_convert(item) for item in obj]
1✔
270
        elif isinstance(obj, str) and obj.isdigit():
1✔
271
            return int(obj)
1✔
272
        else:
273
            return obj
×
274

275
    return recursive_convert(input_dict)
1✔
276

277

278
#  LocalStack specific utilities
279
def get_schema_path(file_path: Path) -> dict:
1✔
280
    file_name_base = file_path.name.removesuffix(".py").removesuffix(".py.enc")
1✔
281
    with Path(file_path).parent.joinpath(f"{file_name_base}.schema.json").open() as fd:
1✔
282
        return json.load(fd)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc