• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a0150a35-50a8-4c2f-b8f7-cda83b0b938f

03 Jun 2025 05:34PM UTC coverage: 86.768%. Remained the same
a0150a35-50a8-4c2f-b8f7-cda83b0b938f

push

circleci

web-flow
CloudFormation v2 Engine: Base Support for Fn::Base64 (#12700)

20 of 22 new or added lines in 3 files covered. (90.91%)

93 existing lines in 11 files now uncovered.

65077 of 75001 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.72
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import re
1✔
5
from typing import Any, Final, Generic, Optional, TypeVar
1✔
6

7
from botocore.exceptions import ClientError
1✔
8

9
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
10
from localstack.aws.connect import connect_to
1✔
11
from localstack.services.cloudformation.engine.transformers import (
1✔
12
    Transformer,
13
    execute_macro,
14
    transformers,
15
)
16
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
17
    ChangeSetEntity,
18
    ChangeType,
19
    Maybe,
20
    NodeArray,
21
    NodeCondition,
22
    NodeDependsOn,
23
    NodeDivergence,
24
    NodeIntrinsicFunction,
25
    NodeMapping,
26
    NodeObject,
27
    NodeOutput,
28
    NodeOutputs,
29
    NodeParameter,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeTemplate,
34
    Nothing,
35
    Scope,
36
    TerminalValue,
37
    TerminalValueCreated,
38
    TerminalValueModified,
39
    TerminalValueRemoved,
40
    TerminalValueUnchanged,
41
    is_nothing,
42
)
43
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
44
    ChangeSetModelVisitor,
45
)
46
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
47
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
48
from localstack.utils.aws.arns import get_partition
1✔
49
from localstack.utils.run import to_str
1✔
50
from localstack.utils.strings import to_bytes
1✔
51
from localstack.utils.urls import localstack_host
1✔
52

53
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
54

55
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
56
    "AWS::Partition",
57
    "AWS::AccountId",
58
    "AWS::Region",
59
    "AWS::StackName",
60
    "AWS::StackId",
61
    "AWS::URLSuffix",
62
    "AWS::NoValue",
63
    "AWS::NotificationARNs",
64
}
65

66
TBefore = TypeVar("TBefore")
1✔
67
TAfter = TypeVar("TAfter")
1✔
68

69

70
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
71
    before: Maybe[TBefore]
1✔
72
    after: Maybe[TAfter]
1✔
73

74
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
75
        self.before = before
1✔
76
        self.after = after
1✔
77

78
    def __eq__(self, other):
1✔
UNCOV
79
        if not isinstance(other, PreprocEntityDelta):
×
UNCOV
80
            return False
×
UNCOV
81
        return self.before == other.before and self.after == other.after
×
82

83

84
class PreprocProperties:
1✔
85
    properties: dict[str, Any]
1✔
86

87
    def __init__(self, properties: dict[str, Any]):
1✔
88
        self.properties = properties
1✔
89

90
    def __eq__(self, other):
1✔
91
        if not isinstance(other, PreprocProperties):
1✔
UNCOV
92
            return False
×
93
        return self.properties == other.properties
1✔
94

95

96
class PreprocResource:
1✔
97
    logical_id: str
1✔
98
    physical_resource_id: Optional[str]
1✔
99
    condition: Optional[bool]
1✔
100
    resource_type: str
1✔
101
    properties: PreprocProperties
1✔
102
    depends_on: Optional[list[str]]
1✔
103

104
    def __init__(
1✔
105
        self,
106
        logical_id: str,
107
        physical_resource_id: str,
108
        condition: Optional[bool],
109
        resource_type: str,
110
        properties: PreprocProperties,
111
        depends_on: Optional[list[str]],
112
    ):
113
        self.logical_id = logical_id
1✔
114
        self.physical_resource_id = physical_resource_id
1✔
115
        self.condition = condition
1✔
116
        self.resource_type = resource_type
1✔
117
        self.properties = properties
1✔
118
        self.depends_on = depends_on
1✔
119

120
    @staticmethod
1✔
121
    def _compare_conditions(c1: bool, c2: bool):
1✔
122
        # The lack of condition equates to a true condition.
123
        c1 = c1 if isinstance(c1, bool) else True
1✔
124
        c2 = c2 if isinstance(c2, bool) else True
1✔
125
        return c1 == c2
1✔
126

127
    def __eq__(self, other):
1✔
128
        if not isinstance(other, PreprocResource):
1✔
129
            return False
1✔
130
        return all(
1✔
131
            [
132
                self.logical_id == other.logical_id,
133
                self._compare_conditions(self.condition, other.condition),
134
                self.resource_type == other.resource_type,
135
                self.properties == other.properties,
136
            ]
137
        )
138

139

140
class PreprocOutput:
1✔
141
    name: str
1✔
142
    value: Any
1✔
143
    export: Optional[Any]
1✔
144
    condition: Optional[bool]
1✔
145

146
    def __init__(self, name: str, value: Any, export: Optional[Any], condition: Optional[bool]):
1✔
147
        self.name = name
1✔
148
        self.value = value
1✔
149
        self.export = export
1✔
150
        self.condition = condition
1✔
151

152
    def __eq__(self, other):
1✔
UNCOV
153
        if not isinstance(other, PreprocOutput):
×
UNCOV
154
            return False
×
UNCOV
155
        return all(
×
156
            [
157
                self.name == other.name,
158
                self.value == other.value,
159
                self.export == other.export,
160
                self.condition == other.condition,
161
            ]
162
        )
163

164

165
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
166
    _change_set: Final[ChangeSet]
1✔
167
    _node_template: Final[NodeTemplate]
1✔
168
    _before_resolved_resources: Final[dict]
1✔
169
    _processed: dict[Scope, Any]
1✔
170

171
    def __init__(self, change_set: ChangeSet):
1✔
172
        self._change_set = change_set
1✔
173
        self._node_template = change_set.update_graph
1✔
174
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
175
        self._processed = dict()
1✔
176

177
    def process(self) -> None:
1✔
178
        self._processed.clear()
1✔
179
        self.visit(self._node_template)
1✔
180

181
    def _get_node_resource_for(
1✔
182
        self, resource_name: str, node_template: NodeTemplate
183
    ) -> NodeResource:
184
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
185
        for node_resource in node_template.resources.resources:
1✔
186
            if node_resource.name == resource_name:
1✔
187
                self.visit(node_resource)
1✔
188
                return node_resource
1✔
UNCOV
189
        raise RuntimeError(f"No resource '{resource_name}' was found")
×
190

191
    def _get_node_property_for(
1✔
192
        self, property_name: str, node_resource: NodeResource
193
    ) -> Optional[NodeProperty]:
194
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
195
        for node_property in node_resource.properties.properties:
1✔
196
            if node_property.name == property_name:
1✔
197
                self.visit(node_property)
1✔
198
                return node_property
1✔
199
        return None
1✔
200

201
    def _deployed_property_value_of(
1✔
202
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
203
    ) -> Any:
204
        # TODO: typing around resolved resources is needed and should be reflected here.
205

206
        # Before we can obtain deployed value for a resource, we need to first ensure to
207
        # process the resource if this wasn't processed already. Ideally, values should only
208
        # be accessible through delta objects, to ensure computation is always complete at
209
        # every level.
210
        _ = self._get_node_resource_for(
1✔
211
            resource_name=resource_logical_id, node_template=self._node_template
212
        )
213
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
214
        if resolved_resource is None:
1✔
215
            raise RuntimeError(
1✔
216
                f"No deployed instances of resource '{resource_logical_id}' were found"
217
            )
218
        properties = resolved_resource.get("Properties", dict())
1✔
219
        property_value: Optional[Any] = properties.get(property_name)
1✔
220
        if property_value is None:
1✔
UNCOV
221
            raise RuntimeError(
×
222
                f"No '{property_name}' found for deployed resource '{resource_logical_id}' was found"
223
            )
224
        return property_value
1✔
225

226
    def _before_deployed_property_value_of(
1✔
227
        self, resource_logical_id: str, property_name: str
228
    ) -> Any:
229
        return self._deployed_property_value_of(
1✔
230
            resource_logical_id=resource_logical_id,
231
            property_name=property_name,
232
            resolved_resources=self._before_resolved_resources,
233
        )
234

235
    def _after_deployed_property_value_of(
1✔
236
        self, resource_logical_id: str, property_name: str
237
    ) -> Optional[str]:
238
        return self._before_deployed_property_value_of(
1✔
239
            resource_logical_id=resource_logical_id, property_name=property_name
240
        )
241

242
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
243
        mappings: list[NodeMapping] = self._node_template.mappings.mappings
1✔
244
        # TODO: another scenarios suggesting property lookups might be preferable.
245
        for mapping in mappings:
1✔
246
            if mapping.name == map_name:
1✔
247
                self.visit(mapping)
1✔
248
                return mapping
1✔
UNCOV
249
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
250

251
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
252
        parameters: list[NodeParameter] = self._node_template.parameters.parameters
1✔
253
        # TODO: another scenarios suggesting property lookups might be preferable.
254
        for parameter in parameters:
1✔
255
            if parameter.name == parameter_name:
1✔
256
                self.visit(parameter)
1✔
257
                return parameter
1✔
258
        return Nothing
1✔
259

260
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
261
        conditions: list[NodeCondition] = self._node_template.conditions.conditions
1✔
262
        # TODO: another scenarios suggesting property lookups might be preferable.
263
        for condition in conditions:
1✔
264
            if condition.name == condition_name:
1✔
265
                self.visit(condition)
1✔
266
                return condition
1✔
UNCOV
267
        return Nothing
×
268

269
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
270
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
271
        if isinstance(node_condition, NodeCondition):
1✔
272
            condition_delta = self.visit(node_condition)
1✔
273
            return condition_delta
1✔
UNCOV
274
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
275

276
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
277
        match pseudo_parameter_name:
1✔
278
            case "AWS::Partition":
1✔
279
                return get_partition(self._change_set.region_name)
1✔
280
            case "AWS::AccountId":
1✔
281
                return self._change_set.stack.account_id
1✔
282
            case "AWS::Region":
1✔
283
                return self._change_set.stack.region_name
1✔
284
            case "AWS::StackName":
1✔
285
                return self._change_set.stack.stack_name
1✔
286
            case "AWS::StackId":
1✔
287
                return self._change_set.stack.stack_id
1✔
288
            case "AWS::URLSuffix":
1✔
289
                return _AWS_URL_SUFFIX
1✔
290
            case "AWS::NoValue":
1✔
291
                return None
1✔
UNCOV
292
            case _:
×
UNCOV
293
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
294

295
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
296
        if logical_id in _PSEUDO_PARAMETERS:
1✔
297
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
298
                pseudo_parameter_name=logical_id
299
            )
300
            # Pseudo parameters are constants within the lifecycle of a template.
301
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
302

303
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
304
        if isinstance(node_parameter, NodeParameter):
1✔
305
            parameter_delta = self.visit(node_parameter)
1✔
306
            return parameter_delta
1✔
307

308
        node_resource = self._get_node_resource_for(
1✔
309
            resource_name=logical_id, node_template=self._node_template
310
        )
311
        resource_delta = self.visit(node_resource)
1✔
312
        before = resource_delta.before
1✔
313
        after = resource_delta.after
1✔
314
        return PreprocEntityDelta(before=before, after=after)
1✔
315

316
    def _resolve_mapping(
1✔
317
        self, map_name: str, top_level_key: str, second_level_key
318
    ) -> PreprocEntityDelta:
319
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
320
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
321
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
322
        if not isinstance(top_level_value, NodeObject):
1✔
UNCOV
323
            raise RuntimeError()
×
324
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
325
        mapping_value_delta = self.visit(second_level_value)
1✔
326
        return mapping_value_delta
1✔
327

328
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
329
        scope = change_set_entity.scope
1✔
330
        if scope in self._processed:
1✔
331
            delta = self._processed[scope]
1✔
332
            return delta
1✔
333
        delta = super().visit(change_set_entity=change_set_entity)
1✔
334
        self._processed[scope] = delta
1✔
335
        return delta
1✔
336

337
    def visit_terminal_value_modified(
1✔
338
        self, terminal_value_modified: TerminalValueModified
339
    ) -> PreprocEntityDelta:
340
        return PreprocEntityDelta(
1✔
341
            before=terminal_value_modified.value,
342
            after=terminal_value_modified.modified_value,
343
        )
344

345
    def visit_terminal_value_created(
1✔
346
        self, terminal_value_created: TerminalValueCreated
347
    ) -> PreprocEntityDelta:
348
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
349

350
    def visit_terminal_value_removed(
1✔
351
        self, terminal_value_removed: TerminalValueRemoved
352
    ) -> PreprocEntityDelta:
353
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
354

355
    def visit_terminal_value_unchanged(
1✔
356
        self, terminal_value_unchanged: TerminalValueUnchanged
357
    ) -> PreprocEntityDelta:
358
        return PreprocEntityDelta(
1✔
359
            before=terminal_value_unchanged.value,
360
            after=terminal_value_unchanged.value,
361
        )
362

363
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
364
        before_delta = self.visit(node_divergence.value)
1✔
365
        after_delta = self.visit(node_divergence.divergence)
1✔
366
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
367

368
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
369
        node_change_type = node_object.change_type
1✔
370
        before = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
371
        after = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
372
        for name, change_set_entity in node_object.bindings.items():
1✔
373
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
374
            delta_before = delta.before
1✔
375
            delta_after = delta.after
1✔
376
            if not is_nothing(before) and not is_nothing(delta_before) and delta_before is not None:
1✔
377
                before[name] = delta_before
1✔
378
            if not is_nothing(after) and not is_nothing(delta_after) and delta_after is not None:
1✔
379
                after[name] = delta_after
1✔
380
        return PreprocEntityDelta(before=before, after=after)
1✔
381

382
    def visit_node_intrinsic_function_fn_get_att(
1✔
383
        self, node_intrinsic_function: NodeIntrinsicFunction
384
    ) -> PreprocEntityDelta:
385
        # TODO: validate the return value according to the spec.
386
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
387
        before_argument: Maybe[list[str]] = arguments_delta.before
1✔
388
        if isinstance(before_argument, str):
1✔
UNCOV
389
            before_argument = before_argument.split(".")
×
390
        after_argument: Maybe[list[str]] = arguments_delta.after
1✔
391
        if isinstance(after_argument, str):
1✔
392
            after_argument = after_argument.split(".")
1✔
393

394
        before = Nothing
1✔
395
        if before_argument:
1✔
396
            before_logical_name_of_resource = before_argument[0]
1✔
397
            before_attribute_name = before_argument[1]
1✔
398

399
            before_node_resource = self._get_node_resource_for(
1✔
400
                resource_name=before_logical_name_of_resource, node_template=self._node_template
401
            )
402
            before_node_property: Optional[NodeProperty] = self._get_node_property_for(
1✔
403
                property_name=before_attribute_name, node_resource=before_node_resource
404
            )
405
            if before_node_property is not None:
1✔
406
                # The property is statically defined in the template and its value can be computed.
407
                before_property_delta = self.visit(before_node_property)
1✔
408
                before = before_property_delta.before
1✔
409
            else:
410
                # The property is not statically defined and must therefore be available in
411
                # the properties deployed set.
412
                before = self._before_deployed_property_value_of(
1✔
413
                    resource_logical_id=before_logical_name_of_resource,
414
                    property_name=before_attribute_name,
415
                )
416

417
        after = Nothing
1✔
418
        if after_argument:
1✔
419
            after_logical_name_of_resource = after_argument[0]
1✔
420
            after_attribute_name = after_argument[1]
1✔
421
            after_node_resource = self._get_node_resource_for(
1✔
422
                resource_name=after_logical_name_of_resource, node_template=self._node_template
423
            )
424
            after_node_property = self._get_node_property_for(
1✔
425
                property_name=after_attribute_name, node_resource=after_node_resource
426
            )
427
            if after_node_property is not None:
1✔
428
                # The property is statically defined in the template and its value can be computed.
429
                after_property_delta = self.visit(after_node_property)
1✔
430
                after = after_property_delta.after
1✔
431
            else:
432
                # The property is not statically defined and must therefore be available in
433
                # the properties deployed set.
434
                after = self._after_deployed_property_value_of(
1✔
435
                    resource_logical_id=after_logical_name_of_resource,
436
                    property_name=after_attribute_name,
437
                )
438

439
        return PreprocEntityDelta(before=before, after=after)
1✔
440

441
    def visit_node_intrinsic_function_fn_equals(
1✔
442
        self, node_intrinsic_function: NodeIntrinsicFunction
443
    ) -> PreprocEntityDelta:
444
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
445
        before_values = arguments_delta.before
1✔
446
        after_values = arguments_delta.after
1✔
447
        before = Nothing
1✔
448
        if before_values:
1✔
449
            before = before_values[0] == before_values[1]
1✔
450
        after = Nothing
1✔
451
        if after_values:
1✔
452
            after = after_values[0] == after_values[1]
1✔
453
        return PreprocEntityDelta(before=before, after=after)
1✔
454

455
    def visit_node_intrinsic_function_fn_if(
1✔
456
        self, node_intrinsic_function: NodeIntrinsicFunction
457
    ) -> PreprocEntityDelta:
458
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
459
        arguments_before = arguments_delta.before
1✔
460
        arguments_after = arguments_delta.after
1✔
461

462
        def _compute_delta_for_if_statement(args: list[Any]) -> PreprocEntityDelta:
1✔
463
            condition_name = args[0]
1✔
464
            boolean_expression_delta = self._resolve_condition(logical_id=condition_name)
1✔
465
            return PreprocEntityDelta(
1✔
466
                before=args[1] if boolean_expression_delta.before else args[2],
467
                after=args[1] if boolean_expression_delta.after else args[2],
468
            )
469

470
        # TODO: add support for this being created or removed.
471
        before = Nothing
1✔
472
        if not is_nothing(arguments_before):
1✔
UNCOV
473
            before_outcome_delta = _compute_delta_for_if_statement(arguments_before)
×
UNCOV
474
            before = before_outcome_delta.before
×
475
        after = Nothing
1✔
476
        if not is_nothing(arguments_after):
1✔
477
            after_outcome_delta = _compute_delta_for_if_statement(arguments_after)
1✔
478
            after = after_outcome_delta.after
1✔
479
        return PreprocEntityDelta(before=before, after=after)
1✔
480

481
    def visit_node_intrinsic_function_fn_not(
1✔
482
        self, node_intrinsic_function: NodeIntrinsicFunction
483
    ) -> PreprocEntityDelta:
484
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
485
        before_condition = arguments_delta.before
1✔
486
        after_condition = arguments_delta.after
1✔
487
        before = Nothing
1✔
488
        if not is_nothing(before_condition):
1✔
UNCOV
489
            before_condition_outcome = before_condition[0]
×
UNCOV
490
            before = not before_condition_outcome
×
491
        after = Nothing
1✔
492
        if not is_nothing(after_condition):
1✔
493
            after_condition_outcome = after_condition[0]
1✔
494
            after = not after_condition_outcome
1✔
495
        # Implicit change type computation.
496
        return PreprocEntityDelta(before=before, after=after)
1✔
497

498
    def _compute_fn_transform(self, args: dict[str, Any]) -> Any:
1✔
499
        # TODO: add typing to arguments before this level.
500
        # TODO: add schema validation
501
        # TODO: add support for other transform types
502

503
        account_id = self._change_set.account_id
1✔
504
        region_name = self._change_set.region_name
1✔
505
        transform_name: str = args.get("Name")
1✔
506
        if not isinstance(transform_name, str):
1✔
UNCOV
507
            raise RuntimeError("Invalid or missing Fn::Transform 'Name' argument")
×
508
        transform_parameters: dict = args.get("Parameters")
1✔
509
        if not isinstance(transform_parameters, dict):
1✔
510
            raise RuntimeError("Invalid or missing Fn::Transform 'Parameters' argument")
×
511

512
        if transform_name in transformers:
1✔
513
            # TODO: port and refactor this 'transformers' logic to this package.
514
            builtin_transformer_class = transformers[transform_name]
1✔
515
            builtin_transformer: Transformer = builtin_transformer_class()
1✔
516
            transform_output: Any = builtin_transformer.transform(
1✔
517
                account_id=account_id, region_name=region_name, parameters=transform_parameters
518
            )
519
            return transform_output
1✔
520

UNCOV
521
        macros_store = get_cloudformation_store(
×
522
            account_id=account_id, region_name=region_name
523
        ).macros
524
        if transform_name in macros_store:
×
525
            # TODO: this formatting of stack parameters is odd but required to integrate with v1 execute_macro util.
526
            #  consider porting this utils and passing the plain list of parameters instead.
527
            stack_parameters = {
×
528
                parameter["ParameterKey"]: parameter
529
                for parameter in self._change_set.stack.parameters
530
            }
UNCOV
531
            transform_output: Any = execute_macro(
×
532
                account_id=account_id,
533
                region_name=region_name,
534
                parsed_template=dict(),  # TODO: review the requirements for this argument.
535
                macro=args,  # TODO: review support for non dict bindings (v1).
536
                stack_parameters=stack_parameters,
537
                transformation_parameters=transform_parameters,
538
                is_intrinsic=True,
539
            )
UNCOV
540
            return transform_output
×
541

UNCOV
542
        raise RuntimeError(
×
543
            f"Unsupported transform function '{transform_name}' in '{self._change_set.stack.stack_name}'"
544
        )
545

546
    def visit_node_intrinsic_function_fn_transform(
1✔
547
        self, node_intrinsic_function: NodeIntrinsicFunction
548
    ) -> PreprocEntityDelta:
549
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
550
        arguments_before = arguments_delta.before
1✔
551
        arguments_after = arguments_delta.after
1✔
552

553
        # TODO: review the use of cache in self.precessed from the 'before' run to
554
        #  ensure changes to the lambda (such as after UpdateFunctionCode) do not
555
        #  generalise tot he before value at this depth (thus making it seems as
556
        #  though for this transformation before==after). Another options may be to
557
        #  have specialised caching for transformations.
558

559
        # TODO: add tests to review the behaviour of CFN with changes to transformation
560
        #  function code and no changes to the template.
561

562
        before = Nothing
1✔
563
        if not is_nothing(arguments_before):
1✔
UNCOV
564
            before = self._compute_fn_transform(args=arguments_before)
×
565
        after = Nothing
1✔
566
        if not is_nothing(arguments_after):
1✔
567
            after = self._compute_fn_transform(args=arguments_after)
1✔
568
        return PreprocEntityDelta(before=before, after=after)
1✔
569

570
    def visit_node_intrinsic_function_fn_sub(
1✔
571
        self, node_intrinsic_function: NodeIntrinsicFunction
572
    ) -> PreprocEntityDelta:
573
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
574
        arguments_before = arguments_delta.before
1✔
575
        arguments_after = arguments_delta.after
1✔
576

577
        def _compute_sub(args: str | list[Any], select_before: bool = False) -> str:
1✔
578
            # TODO: add further schema validation.
579
            string_template: str
580
            sub_parameters: dict
581
            if isinstance(args, str):
1✔
582
                string_template = args
1✔
583
                sub_parameters = dict()
1✔
584
            elif (
1✔
585
                isinstance(args, list)
586
                and len(args) == 2
587
                and isinstance(args[0], str)
588
                and isinstance(args[1], dict)
589
            ):
590
                string_template = args[0]
1✔
591
                sub_parameters = args[1]
1✔
592
            else:
UNCOV
593
                raise RuntimeError(
×
594
                    "Invalid arguments shape for Fn::Sub, expected a String "
595
                    f"or a Tuple of String and Map but got '{args}'"
596
                )
597
            sub_string = string_template
1✔
598
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
599
            for template_variable_name in template_variable_names:
1✔
600
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
601
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
602
                        pseudo_parameter_name=template_variable_name
603
                    )
604
                elif template_variable_name in sub_parameters:
1✔
605
                    template_variable_value = sub_parameters[template_variable_name]
1✔
606
                else:
607
                    try:
1✔
608
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
609
                        template_variable_value = (
1✔
610
                            resource_delta.before if select_before else resource_delta.after
611
                        )
612
                        if isinstance(template_variable_value, PreprocResource):
1✔
UNCOV
613
                            template_variable_value = template_variable_value.logical_id
×
UNCOV
614
                    except RuntimeError:
×
UNCOV
615
                        raise RuntimeError(
×
616
                            f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
617
                        )
618
                sub_string = sub_string.replace(
1✔
619
                    f"${{{template_variable_name}}}", template_variable_value
620
                )
621
            return sub_string
1✔
622

623
        before = Nothing
1✔
624
        if not is_nothing(arguments_before):
1✔
625
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
626
        after = Nothing
1✔
627
        if not is_nothing(arguments_after):
1✔
628
            after = _compute_sub(args=arguments_after)
1✔
629
        return PreprocEntityDelta(before=before, after=after)
1✔
630

631
    def visit_node_intrinsic_function_fn_join(
1✔
632
        self, node_intrinsic_function: NodeIntrinsicFunction
633
    ) -> PreprocEntityDelta:
634
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
635
        arguments_before = arguments_delta.before
1✔
636
        arguments_after = arguments_delta.after
1✔
637

638
        def _compute_join(args: list[Any]) -> str:
1✔
639
            # TODO: add support for schema validation.
640
            # TODO: add tests for joining non string values.
641
            delimiter: str = str(args[0])
1✔
642
            values: list[Any] = args[1]
1✔
643
            if not isinstance(values, list):
1✔
UNCOV
644
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
×
645
            join_result = delimiter.join(map(str, values))
1✔
646
            return join_result
1✔
647

648
        before = Nothing
1✔
649
        if isinstance(arguments_before, list) and len(arguments_before) == 2:
1✔
650
            before = _compute_join(arguments_before)
1✔
651
        after = Nothing
1✔
652
        if isinstance(arguments_after, list) and len(arguments_after) == 2:
1✔
653
            after = _compute_join(arguments_after)
1✔
654
        return PreprocEntityDelta(before=before, after=after)
1✔
655

656
    def visit_node_intrinsic_function_fn_select(
1✔
657
        self, node_intrinsic_function: NodeIntrinsicFunction
658
    ):
659
        # TODO: add further support for schema validation
660
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
661
        arguments_before = arguments_delta.before
1✔
662
        arguments_after = arguments_delta.after
1✔
663

664
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
665
            values: list[Any] = args[1]
1✔
666
            if not isinstance(values, list) or not values:
1✔
UNCOV
667
                raise RuntimeError(f"Invalid arguments list value for Fn::Select: '{values}'")
×
668
            values_len = len(values)
1✔
669
            index: int = int(args[0])
1✔
670
            if not isinstance(index, int) or index < 0 or index > values_len:
1✔
UNCOV
671
                raise RuntimeError(f"Invalid or out of range index value for Fn::Select: '{index}'")
×
672
            selection = values[index]
1✔
673
            return selection
1✔
674

675
        before = Nothing
1✔
676
        if not is_nothing(arguments_before):
1✔
677
            before = _compute_fn_select(arguments_before)
1✔
678

679
        after = Nothing
1✔
680
        if not is_nothing(arguments_after):
1✔
681
            after = _compute_fn_select(arguments_after)
1✔
682

683
        return PreprocEntityDelta(before=before, after=after)
1✔
684

685
    def visit_node_intrinsic_function_fn_split(
1✔
686
        self, node_intrinsic_function: NodeIntrinsicFunction
687
    ):
688
        # TODO: add further support for schema validation
689
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
690
        arguments_before = arguments_delta.before
1✔
691
        arguments_after = arguments_delta.after
1✔
692

693
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
694
            delimiter = args[0]
1✔
695
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
696
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
697
            source_string = args[1]
1✔
698
            if not isinstance(source_string, str):
1✔
699
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
×
700
            split_string = source_string.split(delimiter)
1✔
701
            return split_string
1✔
702

703
        before = Nothing
1✔
704
        if not is_nothing(arguments_before):
1✔
705
            before = _compute_fn_split(arguments_before)
1✔
706

707
        after = Nothing
1✔
708
        if not is_nothing(arguments_after):
1✔
709
            after = _compute_fn_split(arguments_after)
1✔
710

711
        return PreprocEntityDelta(before=before, after=after)
1✔
712

713
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
714
        self, node_intrinsic_function: NodeIntrinsicFunction
715
    ) -> PreprocEntityDelta:
716
        # TODO: add further support for schema validation
717
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
718
        arguments_before = arguments_delta.before
1✔
719
        arguments_after = arguments_delta.after
1✔
720

721
        def _compute_fn_get_a_zs(region) -> Any:
1✔
722
            if not isinstance(region, str):
1✔
UNCOV
723
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
724

725
            if not region:
1✔
726
                region = self._change_set.region_name
1✔
727

728
            account_id = self._change_set.account_id
1✔
729
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
730
            try:
1✔
731
                describe_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
732
                    ec2_client.describe_availability_zones()
733
                )
UNCOV
734
            except ClientError:
×
UNCOV
735
                raise RuntimeError(
×
736
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
737
                )
738
            availability_zones: AvailabilityZoneList = describe_availability_zones_result[
1✔
739
                "AvailabilityZones"
740
            ]
741
            azs = [az["ZoneName"] for az in availability_zones]
1✔
742
            return azs
1✔
743

744
        before = Nothing
1✔
745
        if not is_nothing(arguments_before):
1✔
UNCOV
746
            before = _compute_fn_get_a_zs(arguments_before)
×
747

748
        after = Nothing
1✔
749
        if not is_nothing(arguments_after):
1✔
750
            after = _compute_fn_get_a_zs(arguments_after)
1✔
751

752
        return PreprocEntityDelta(before=before, after=after)
1✔
753

754
    def visit_node_intrinsic_function_fn_base64(
1✔
755
        self, node_intrinsic_function: NodeIntrinsicFunction
756
    ) -> PreprocEntityDelta:
757
        # TODO: add further support for schema validation
758
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
759
        arguments_before = arguments_delta.before
1✔
760
        arguments_after = arguments_delta.after
1✔
761

762
        def _compute_fn_base_64(string) -> Any:
1✔
763
            if not isinstance(string, str):
1✔
NEW
764
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
765
            # Ported from v1:
766
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
767
            return base64_string
1✔
768

769
        before = Nothing
1✔
770
        if not is_nothing(arguments_before):
1✔
771
            before = _compute_fn_base_64(arguments_before)
1✔
772

773
        after = Nothing
1✔
774
        if not is_nothing(arguments_after):
1✔
775
            after = _compute_fn_base_64(arguments_after)
1✔
776

777
        return PreprocEntityDelta(before=before, after=after)
1✔
778

779
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
780
        self, node_intrinsic_function: NodeIntrinsicFunction
781
    ) -> PreprocEntityDelta:
782
        # TODO: add type checking/validation for result unit?
783
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
784
        before_arguments = arguments_delta.before
1✔
785
        after_arguments = arguments_delta.after
1✔
786
        before = Nothing
1✔
787
        if before_arguments:
1✔
788
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
789
            before = before_value_delta.before
1✔
790
        after = Nothing
1✔
791
        if after_arguments:
1✔
792
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
793
            after = after_value_delta.after
1✔
794
        return PreprocEntityDelta(before=before, after=after)
1✔
795

796
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
797
        bindings_delta = self.visit(node_mapping.bindings)
1✔
798
        return bindings_delta
1✔
799

800
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
801
        dynamic_value = node_parameter.dynamic_value
1✔
802
        dynamic_delta = self.visit(dynamic_value)
1✔
803

804
        default_value = node_parameter.default_value
1✔
805
        default_delta = self.visit(default_value)
1✔
806

807
        before = dynamic_delta.before or default_delta.before
1✔
808
        after = dynamic_delta.after or default_delta.after
1✔
809

810
        return PreprocEntityDelta(before=before, after=after)
1✔
811

812
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
813
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
814
        return array_identifiers_delta
1✔
815

816
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
817
        delta = self.visit(node_condition.body)
1✔
818
        return delta
1✔
819

820
    def _resource_physical_resource_id_from(
1✔
821
        self, logical_resource_id: str, resolved_resources: dict
822
    ) -> str:
823
        # TODO: typing around resolved resources is needed and should be reflected here.
824
        resolved_resource = resolved_resources.get(logical_resource_id, dict())
1✔
825
        physical_resource_id: Optional[str] = resolved_resource.get("PhysicalResourceId")
1✔
826
        if not isinstance(physical_resource_id, str):
1✔
827
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
828
        return physical_resource_id
1✔
829

830
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
831
        # TODO: typing around resolved resources is needed and should be reflected here.
832
        return self._resource_physical_resource_id_from(
1✔
833
            logical_resource_id=resource_logical_id,
834
            resolved_resources=self._before_resolved_resources,
835
        )
836

837
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
838
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
839

840
    def visit_node_intrinsic_function_ref(
1✔
841
        self, node_intrinsic_function: NodeIntrinsicFunction
842
    ) -> PreprocEntityDelta:
843
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
844
        before_logical_id = arguments_delta.before
1✔
845
        after_logical_id = arguments_delta.after
1✔
846

847
        # TODO: extend this to support references to other types.
848
        before = Nothing
1✔
849
        if not is_nothing(before_logical_id):
1✔
850
            before_delta = self._resolve_reference(logical_id=before_logical_id)
1✔
851
            before = before_delta.before
1✔
852
            if isinstance(before, PreprocResource):
1✔
853
                before = before.physical_resource_id
1✔
854

855
        after = Nothing
1✔
856
        if not is_nothing(after_logical_id):
1✔
857
            after_delta = self._resolve_reference(logical_id=after_logical_id)
1✔
858
            after = after_delta.after
1✔
859
            if isinstance(after, PreprocResource):
1✔
860
                after = after.physical_resource_id
1✔
861

862
        return PreprocEntityDelta(before=before, after=after)
1✔
863

864
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
865
        node_change_type = node_array.change_type
1✔
866
        before = list() if node_change_type != ChangeType.CREATED else Nothing
1✔
867
        after = list() if node_change_type != ChangeType.REMOVED else Nothing
1✔
868
        for change_set_entity in node_array.array:
1✔
869
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
870
            delta_before = delta.before
1✔
871
            delta_after = delta.after
1✔
872
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
873
                before.append(delta_before)
1✔
874
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
875
                after.append(delta_after)
1✔
876
        return PreprocEntityDelta(before=before, after=after)
1✔
877

878
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
879
        return self.visit(node_property.value)
1✔
880

881
    def visit_node_properties(
1✔
882
        self, node_properties: NodeProperties
883
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
884
        node_change_type = node_properties.change_type
1✔
885
        before_bindings = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
886
        after_bindings = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
887
        for node_property in node_properties.properties:
1✔
888
            property_name = node_property.name
1✔
889
            delta = self.visit(node_property)
1✔
890
            delta_before = delta.before
1✔
891
            delta_after = delta.after
1✔
892
            if (
1✔
893
                not is_nothing(before_bindings)
894
                and not is_nothing(delta_before)
895
                and delta_before is not None
896
            ):
897
                before_bindings[property_name] = delta_before
1✔
898
            if (
1✔
899
                not is_nothing(after_bindings)
900
                and not is_nothing(delta_after)
901
                and delta_after is not None
902
            ):
903
                after_bindings[property_name] = delta_after
1✔
904
        before = Nothing
1✔
905
        if not is_nothing(before_bindings):
1✔
906
            before = PreprocProperties(properties=before_bindings)
1✔
907
        after = Nothing
1✔
908
        if not is_nothing(after_bindings):
1✔
909
            after = PreprocProperties(properties=after_bindings)
1✔
910
        return PreprocEntityDelta(before=before, after=after)
1✔
911

912
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
913
        reference_delta = self.visit(reference)
1✔
914
        before_reference = reference_delta.before
1✔
915
        before = Nothing
1✔
916
        if isinstance(before_reference, str):
1✔
917
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
918
            before = before_delta.before
1✔
919
        after = Nothing
1✔
920
        after_reference = reference_delta.after
1✔
921
        if isinstance(after_reference, str):
1✔
922
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
923
            after = after_delta.after
1✔
924
        return PreprocEntityDelta(before=before, after=after)
1✔
925

926
    def visit_node_resource(
1✔
927
        self, node_resource: NodeResource
928
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
929
        change_type = node_resource.change_type
1✔
930
        condition_before = Nothing
1✔
931
        condition_after = Nothing
1✔
932
        if not is_nothing(node_resource.condition_reference):
1✔
933
            condition_delta = self._resolve_resource_condition_reference(
1✔
934
                node_resource.condition_reference
935
            )
936
            condition_before = condition_delta.before
1✔
937
            condition_after = condition_delta.after
1✔
938

939
        depends_on_before = Nothing
1✔
940
        depends_on_after = Nothing
1✔
941
        if not is_nothing(node_resource.depends_on):
1✔
942
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
943
            depends_on_before = depends_on_delta.before
1✔
944
            depends_on_after = depends_on_delta.after
1✔
945

946
        type_delta = self.visit(node_resource.type_)
1✔
947
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
948
            node_resource.properties
949
        )
950

951
        before = Nothing
1✔
952
        after = Nothing
1✔
953
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
954
            logical_resource_id = node_resource.name
1✔
955
            before_physical_resource_id = self._before_resource_physical_id(
1✔
956
                resource_logical_id=logical_resource_id
957
            )
958
            before = PreprocResource(
1✔
959
                logical_id=logical_resource_id,
960
                physical_resource_id=before_physical_resource_id,
961
                condition=condition_before,
962
                resource_type=type_delta.before,
963
                properties=properties_delta.before,
964
                depends_on=depends_on_before,
965
            )
966
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
967
            logical_resource_id = node_resource.name
1✔
968
            try:
1✔
969
                after_physical_resource_id = self._after_resource_physical_id(
1✔
970
                    resource_logical_id=logical_resource_id
971
                )
972
            except RuntimeError:
1✔
973
                after_physical_resource_id = None
1✔
974
            after = PreprocResource(
1✔
975
                logical_id=logical_resource_id,
976
                physical_resource_id=after_physical_resource_id,
977
                condition=condition_after,
978
                resource_type=type_delta.after,
979
                properties=properties_delta.after,
980
                depends_on=depends_on_after,
981
            )
982
        return PreprocEntityDelta(before=before, after=after)
1✔
983

984
    def visit_node_output(
1✔
985
        self, node_output: NodeOutput
986
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
987
        change_type = node_output.change_type
1✔
988
        value_delta = self.visit(node_output.value)
1✔
989

990
        condition_delta = Nothing
1✔
991
        if not is_nothing(node_output.condition_reference):
1✔
992
            condition_delta = self._resolve_resource_condition_reference(
1✔
993
                node_output.condition_reference
994
            )
995
            condition_before = condition_delta.before
1✔
996
            condition_after = condition_delta.after
1✔
997
            if not condition_before and condition_after:
1✔
998
                change_type = ChangeType.CREATED
1✔
999
            elif condition_before and not condition_after:
1✔
UNCOV
1000
                change_type = ChangeType.REMOVED
×
1001

1002
        export_delta = Nothing
1✔
1003
        if not is_nothing(node_output.export):
1✔
UNCOV
1004
            export_delta = self.visit(node_output.export)
×
1005

1006
        before: Maybe[PreprocOutput] = Nothing
1✔
1007
        if change_type != ChangeType.CREATED:
1✔
1008
            before = PreprocOutput(
1✔
1009
                name=node_output.name,
1010
                value=value_delta.before,
1011
                export=export_delta.before if export_delta else None,
1012
                condition=condition_delta.before if condition_delta else None,
1013
            )
1014
        after: Maybe[PreprocOutput] = Nothing
1✔
1015
        if change_type != ChangeType.REMOVED:
1✔
1016
            after = PreprocOutput(
1✔
1017
                name=node_output.name,
1018
                value=value_delta.after,
1019
                export=export_delta.after if export_delta else None,
1020
                condition=condition_delta.after if condition_delta else None,
1021
            )
1022
        return PreprocEntityDelta(before=before, after=after)
1✔
1023

1024
    def visit_node_outputs(
1✔
1025
        self, node_outputs: NodeOutputs
1026
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1027
        before: list[PreprocOutput] = list()
1✔
1028
        after: list[PreprocOutput] = list()
1✔
1029
        for node_output in node_outputs.outputs:
1✔
1030
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1031
            output_before = output_delta.before
1✔
1032
            output_after = output_delta.after
1✔
1033
            if not is_nothing(output_before):
1✔
1034
                before.append(output_before)
1✔
1035
            if not is_nothing(output_after):
1✔
1036
                after.append(output_after)
1✔
1037
        return PreprocEntityDelta(before=before, after=after)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc