• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.5
/localstack-core/localstack/services/cloudformation/engine/template_utils.py
1
import re
1✔
2
from typing import Any
1✔
3

4
from localstack.services.cloudformation.deployment_utils import PLACEHOLDER_AWS_NO_VALUE
1✔
5
from localstack.services.cloudformation.engine.errors import TemplateError
1✔
6
from localstack.utils.urls import localstack_host
1✔
7

8
AWS_URL_SUFFIX = localstack_host().host  # value is "amazonaws.com" in real AWS
1✔
9

10

11
def get_deps_for_resource(resource: dict, evaluated_conditions: dict[str, bool]) -> set[str]:
1✔
12
    """
13
    :param resource: the resource definition to be checked for dependencies
14
    :param evaluated_conditions:
15
    :return: a set of logical resource IDs which this resource depends on
16
    """
17
    property_dependencies = resolve_dependencies(
1✔
18
        resource.get("Properties", {}), evaluated_conditions
19
    )
20
    explicit_dependencies = resource.get("DependsOn", [])
1✔
21
    if not isinstance(explicit_dependencies, list):
1✔
22
        explicit_dependencies = [explicit_dependencies]
×
23
    return property_dependencies.union(explicit_dependencies)
1✔
24

25

26
def resolve_dependencies(d: dict, evaluated_conditions: dict[str, bool]) -> set[str]:
1✔
27
    items = set()
1✔
28

29
    if isinstance(d, dict):
1✔
30
        for k, v in d.items():
1✔
31
            if k == "Fn::If":
1✔
32
                # check the condition and only traverse down the correct path
33
                condition_name, true_value, false_value = v
×
34
                if evaluated_conditions[condition_name]:
×
35
                    items = items.union(resolve_dependencies(true_value, evaluated_conditions))
×
36
                else:
37
                    items = items.union(resolve_dependencies(false_value, evaluated_conditions))
×
38
            elif k == "Ref":
1✔
39
                items.add(v)
1✔
40
            elif k == "Fn::GetAtt":
1✔
41
                items.add(v[0] if isinstance(v, list) else v.split(".")[0])
×
42
            elif k == "Fn::Sub":
1✔
43
                # we can assume anything in there is a ref
44
                if isinstance(v, str):
×
45
                    # { "Fn::Sub" : "Hello ${Name}" }
46
                    variables_found = re.findall("\\${([^}]+)}", v)
×
47
                    for var in variables_found:
×
48
                        if "." in var:
×
49
                            var = var.split(".")[0]
×
50
                        items.add(var)
×
51
                elif isinstance(v, list):
×
52
                    # { "Fn::Sub" : [ "Hello ${Name}", { "Name": "SomeName" } ] }
53
                    variables_found = re.findall("\\${([^}]+)}", v[0])
×
54
                    for var in variables_found:
×
55
                        if var in v[1]:
×
56
                            # variable is included in provided mapping and can either be a static value or another reference
57
                            if isinstance(v[1][var], dict):
×
58
                                # e.g. { "Fn::Sub" : [ "Hello ${Name}", { "Name": {"Ref": "NameParam"} } ] }
59
                                #   the values can have references, so we need to go deeper
60
                                items = items.union(
×
61
                                    resolve_dependencies(v[1][var], evaluated_conditions)
62
                                )
63
                        else:
64
                            # it's now either a GetAtt call or a direct reference
65
                            if "." in var:
×
66
                                var = var.split(".")[0]
×
67
                            items.add(var)
×
68
                else:
69
                    raise Exception(f"Invalid template structure in Fn::Sub: {v}")
×
70
            elif isinstance(v, dict):
1✔
71
                items = items.union(resolve_dependencies(v, evaluated_conditions))
1✔
72
            elif isinstance(v, list):
1✔
73
                for item in v:
×
74
                    # TODO: assumption that every element is a dict might not be true
75
                    items = items.union(resolve_dependencies(item, evaluated_conditions))
×
76
            else:
77
                pass
1✔
78
    elif isinstance(d, list):
×
79
        for item in d:
×
80
            items = items.union(resolve_dependencies(item, evaluated_conditions))
×
81
    r = {i for i in items if not i.startswith("AWS::")}
1✔
82
    return r
1✔
83

84

85
def resolve_stack_conditions(
1✔
86
    account_id: str,
87
    region_name: str,
88
    conditions: dict,
89
    parameters: dict,
90
    mappings: dict,
91
    stack_name: str,
92
) -> dict[str, bool]:
93
    """
94
    Within each condition, you can reference another:
95
        condition
96
        parameter value
97
        mapping
98

99
    You can use the following intrinsic functions to define conditions:
100
        Fn::And
101
        Fn::Equals
102
        Fn::If
103
        Fn::Not
104
        Fn::Or
105

106
    TODO: more checks on types from references (e.g. in a mapping value)
107
    TODO: does a ref ever return a non-string value?
108
    TODO: when unifying/reworking intrinsic functions rework this to a class structure
109
    """
110
    result = {}
×
111
    for condition_name, condition in conditions.items():
×
112
        result[condition_name] = resolve_condition(
×
113
            account_id, region_name, condition, conditions, parameters, mappings, stack_name
114
        )
115
    return result
×
116

117

118
def resolve_pseudo_parameter(
1✔
119
    account_id: str, region_name: str, pseudo_parameter: str, stack_name: str
120
) -> Any:
121
    """
122
    TODO: this function needs access to more stack context
123
    """
124
    # pseudo parameters
125
    match pseudo_parameter:
×
126
        case "AWS::Region":
×
127
            return region_name
×
128
        case "AWS::Partition":
×
129
            return "aws"
×
130
        case "AWS::StackName":
×
131
            return stack_name
×
132
        case "AWS::StackId":
×
133
            # TODO return proper stack id!
134
            return stack_name
×
135
        case "AWS::AccountId":
×
136
            return account_id
×
137
        case "AWS::NoValue":
×
138
            return PLACEHOLDER_AWS_NO_VALUE
×
139
        case "AWS::NotificationARNs":
×
140
            # TODO!
141
            return {}
×
142
        case "AWS::URLSuffix":
×
143
            return AWS_URL_SUFFIX
×
144

145

146
def resolve_conditional_mapping_ref(
1✔
147
    ref_name, account_id: str, region_name: str, stack_name: str, parameters
148
):
149
    if ref_name.startswith("AWS::"):
×
150
        ref_value = resolve_pseudo_parameter(account_id, region_name, ref_name, stack_name)
×
151
        if ref_value is None:
×
152
            raise TemplateError(f"Invalid pseudo parameter '{ref_name}'")
×
153
    else:
154
        param = parameters.get(ref_name)
×
155
        if not param:
×
156
            raise TemplateError(
×
157
                f"Invalid reference: '{ref_name}' does not exist in parameters: '{parameters}'"
158
            )
159
        ref_value = param.get("ResolvedValue") or param.get("ParameterValue")
×
160

161
    return ref_value
×
162

163

164
def resolve_condition(
1✔
165
    account_id: str, region_name: str, condition, conditions, parameters, mappings, stack_name
166
):
167
    if isinstance(condition, dict):
×
168
        for k, v in condition.items():
×
169
            match k:
×
170
                case "Ref":
×
171
                    if isinstance(v, str) and v.startswith("AWS::"):
×
172
                        return resolve_pseudo_parameter(
×
173
                            account_id, region_name, v, stack_name
174
                        )  # TODO: this pseudo parameter resolving needs context(!)
175
                    # TODO: add util function for resolving individual refs (e.g. one util for resolving pseudo parameters)
176
                    # TODO: pseudo-parameters like AWS::Region
177
                    # can only really be a parameter here
178
                    # TODO: how are conditions references written here? as {"Condition": "ConditionA"} or via Ref?
179
                    # TODO: test for a boolean parameter?
180
                    param = parameters[v]
×
181
                    parameter_type: str = param["ParameterType"]
×
182
                    parameter_value = param.get("ResolvedValue") or param.get("ParameterValue")
×
183

184
                    if parameter_type in ["CommaDelimitedList"] or parameter_type.startswith(
×
185
                        "List<"
186
                    ):
187
                        return [p.strip() for p in parameter_value.split(",")]
×
188
                    else:
189
                        return parameter_value
×
190

191
                case "Condition":
×
192
                    return resolve_condition(
×
193
                        account_id,
194
                        region_name,
195
                        conditions[v],
196
                        conditions,
197
                        parameters,
198
                        mappings,
199
                        stack_name,
200
                    )
201
                case "Fn::FindInMap":
×
202
                    map_name, top_level_key, second_level_key = v
×
203
                    if isinstance(map_name, dict) and "Ref" in map_name:
×
204
                        ref_name = map_name["Ref"]
×
205
                        map_name = resolve_conditional_mapping_ref(
×
206
                            ref_name, account_id, region_name, stack_name, parameters
207
                        )
208

209
                    if isinstance(top_level_key, dict) and "Ref" in top_level_key:
×
210
                        ref_name = top_level_key["Ref"]
×
211
                        top_level_key = resolve_conditional_mapping_ref(
×
212
                            ref_name, account_id, region_name, stack_name, parameters
213
                        )
214

215
                    if isinstance(second_level_key, dict) and "Ref" in second_level_key:
×
216
                        ref_name = second_level_key["Ref"]
×
217
                        second_level_key = resolve_conditional_mapping_ref(
×
218
                            ref_name, account_id, region_name, stack_name, parameters
219
                        )
220

221
                    mapping = mappings.get(map_name)
×
222
                    if not mapping:
×
223
                        raise TemplateError(
×
224
                            f"Invalid reference: '{map_name}' could not be found in the template mappings: '{list(mappings.keys())}'"
225
                        )
226

227
                    top_level_map = mapping.get(top_level_key)
×
228
                    if not top_level_map:
×
229
                        raise TemplateError(
×
230
                            f"Invalid reference: '{top_level_key}' could not be found in the '{map_name}' mapping: '{list(mapping.keys())}'"
231
                        )
232

233
                    value = top_level_map.get(second_level_key)
×
234
                    if not value:
×
235
                        raise TemplateError(
×
236
                            f"Invalid reference: '{second_level_key}' could not be found in the '{top_level_key}' mapping: '{top_level_map}'"
237
                        )
238

239
                    return value
×
240
                case "Fn::If":
×
241
                    if_condition_name, true_branch, false_branch = v
×
242
                    if resolve_condition(
×
243
                        account_id,
244
                        region_name,
245
                        if_condition_name,
246
                        conditions,
247
                        parameters,
248
                        mappings,
249
                        stack_name,
250
                    ):
251
                        return resolve_condition(
×
252
                            account_id,
253
                            region_name,
254
                            true_branch,
255
                            conditions,
256
                            parameters,
257
                            mappings,
258
                            stack_name,
259
                        )
260
                    else:
261
                        return resolve_condition(
×
262
                            account_id,
263
                            region_name,
264
                            false_branch,
265
                            conditions,
266
                            parameters,
267
                            mappings,
268
                            stack_name,
269
                        )
270
                case "Fn::Not":
×
271
                    return not resolve_condition(
×
272
                        account_id, region_name, v[0], conditions, parameters, mappings, stack_name
273
                    )
274
                case "Fn::And":
×
275
                    # TODO: should actually restrict this a bit
276
                    return resolve_condition(
×
277
                        account_id, region_name, v[0], conditions, parameters, mappings, stack_name
278
                    ) and resolve_condition(
279
                        account_id, region_name, v[1], conditions, parameters, mappings, stack_name
280
                    )
281
                case "Fn::Or":
×
282
                    return resolve_condition(
×
283
                        account_id, region_name, v[0], conditions, parameters, mappings, stack_name
284
                    ) or resolve_condition(
285
                        account_id, region_name, v[1], conditions, parameters, mappings, stack_name
286
                    )
287
                case "Fn::Equals":
×
288
                    left = resolve_condition(
×
289
                        account_id, region_name, v[0], conditions, parameters, mappings, stack_name
290
                    )
291
                    right = resolve_condition(
×
292
                        account_id, region_name, v[1], conditions, parameters, mappings, stack_name
293
                    )
294
                    return fn_equals_type_conversion(left) == fn_equals_type_conversion(right)
×
295
                case "Fn::Join":
×
296
                    join_list = v[1]
×
297
                    if isinstance(v[1], dict):
×
298
                        join_list = resolve_condition(
×
299
                            account_id,
300
                            region_name,
301
                            v[1],
302
                            conditions,
303
                            parameters,
304
                            mappings,
305
                            stack_name,
306
                        )
307
                    result = v[0].join(
×
308
                        [
309
                            resolve_condition(
310
                                account_id,
311
                                region_name,
312
                                x,
313
                                conditions,
314
                                parameters,
315
                                mappings,
316
                                stack_name,
317
                            )
318
                            for x in join_list
319
                        ]
320
                    )
321
                    return result
×
322
                case "Fn::Select":
×
323
                    index = v[0]
×
324
                    options = v[1]
×
325

326
                    if isinstance(options, dict):
×
327
                        options = resolve_condition(
×
328
                            account_id,
329
                            region_name,
330
                            options,
331
                            conditions,
332
                            parameters,
333
                            mappings,
334
                            stack_name,
335
                        )
336

337
                    if isinstance(options, list):
×
338
                        for i, option in enumerate(options):
×
339
                            if isinstance(option, dict):
×
340
                                options[i] = resolve_condition(
×
341
                                    account_id,
342
                                    region_name,
343
                                    option,
344
                                    conditions,
345
                                    parameters,
346
                                    mappings,
347
                                    stack_name,
348
                                )
349

350
                        return options[index]
×
351

352
                    if index != 0:
×
353
                        raise Exception(
×
354
                            f"Template error: Fn::Select  cannot select nonexistent value at index {index}"
355
                        )
356

357
                    return options
×
358

359
                case "Fn::Sub":
×
360
                    # we can assume anything in there is a ref
361
                    if isinstance(v, str):
×
362
                        # { "Fn::Sub" : "Hello ${Name}" }
363
                        result = v
×
364
                        variables_found = re.findall("\\${([^}]+)}", v)
×
365
                        for var in variables_found:
×
366
                            # can't be a resource here (!), so also not attribute access
367
                            if var.startswith("AWS::"):
×
368
                                # pseudo-parameter
369
                                resolved_pseudo_param = resolve_pseudo_parameter(
×
370
                                    account_id, region_name, var, stack_name
371
                                )
372
                                result = result.replace(f"${{{var}}}", resolved_pseudo_param)
×
373
                            else:
374
                                # parameter
375
                                param = parameters[var]
×
376
                                parameter_type: str = param["ParameterType"]
×
377
                                resolved_parameter = param.get("ResolvedValue") or param.get(
×
378
                                    "ParameterValue"
379
                                )
380

381
                                if parameter_type in [
×
382
                                    "CommaDelimitedList"
383
                                ] or parameter_type.startswith("List<"):
384
                                    resolved_parameter = [
×
385
                                        p.strip() for p in resolved_parameter.split(",")
386
                                    ]
387

388
                                result = result.replace(f"${{{var}}}", resolved_parameter)
×
389

390
                        return result
×
391
                    elif isinstance(v, list):
×
392
                        # { "Fn::Sub" : [ "Hello ${Name}", { "Name": "SomeName" } ] }
393
                        result = v[0]
×
394
                        variables_found = re.findall("\\${([^}]+)}", v[0])
×
395
                        for var in variables_found:
×
396
                            if var in v[1]:
×
397
                                # variable is included in provided mapping and can either be a static value or another reference
398
                                if isinstance(v[1][var], dict):
×
399
                                    # e.g. { "Fn::Sub" : [ "Hello ${Name}", { "Name": {"Ref": "NameParam"} } ] }
400
                                    #   the values can have references, so we need to go deeper
401
                                    resolved_var = resolve_condition(
×
402
                                        account_id,
403
                                        region_name,
404
                                        v[1][var],
405
                                        conditions,
406
                                        parameters,
407
                                        mappings,
408
                                        stack_name,
409
                                    )
410
                                    result = result.replace(f"${{{var}}}", resolved_var)
×
411
                                else:
412
                                    result = result.replace(f"${{{var}}}", v[1][var])
×
413
                            else:
414
                                # it's now either a GetAtt call or a direct reference
415
                                if var.startswith("AWS::"):
×
416
                                    # pseudo-parameter
417
                                    resolved_pseudo_param = resolve_pseudo_parameter(
×
418
                                        account_id, region_name, var, stack_name
419
                                    )
420
                                    result = result.replace(f"${{{var}}}", resolved_pseudo_param)
×
421
                                else:
422
                                    # parameter
423
                                    param = parameters[var]
×
424
                                    parameter_type: str = param["ParameterType"]
×
425
                                    resolved_parameter = param.get("ResolvedValue") or param.get(
×
426
                                        "ParameterValue"
427
                                    )
428

429
                                    if parameter_type in [
×
430
                                        "CommaDelimitedList"
431
                                    ] or parameter_type.startswith("List<"):
432
                                        resolved_parameter = [
×
433
                                            p.strip() for p in resolved_parameter.split(",")
434
                                        ]
435

436
                                    result = result.replace(f"${{{var}}}", resolved_parameter)
×
437
                        return result
×
438
                    else:
439
                        raise Exception(f"Invalid template structure in Fn::Sub: {v}")
×
440
                case _:
×
441
                    raise Exception(f"Invalid condition structure encountered: {condition=}")
×
442
    else:
443
        return condition
×
444

445

446
def fn_equals_type_conversion(value) -> str:
1✔
447
    if isinstance(value, str):
×
448
        return value
×
449
    elif isinstance(value, bool):
×
450
        return "true" if value else "false"
×
451
    else:
452
        return str(value)  # TODO: investigate correct behavior
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc