• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 533d4262-4f08-49b7-b7ba-18c46e51ac1a

02 Jun 2025 06:43PM UTC coverage: 86.752% (+0.1%) from 86.654%
533d4262-4f08-49b7-b7ba-18c46e51ac1a

push

circleci

web-flow
APIGW: implement Canary Deployments CRUD logic (#12694)

142 of 147 new or added lines in 3 files covered. (96.6%)

187 existing lines in 16 files now uncovered.

64937 of 74854 relevant lines covered (86.75%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.1
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import re
1✔
4
from typing import Any, Final, Generic, Optional, TypeVar
1✔
5

6
from localstack.services.cloudformation.engine.transformers import (
1✔
7
    Transformer,
8
    execute_macro,
9
    transformers,
10
)
11
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
12
    ChangeSetEntity,
13
    ChangeType,
14
    Maybe,
15
    NodeArray,
16
    NodeCondition,
17
    NodeDependsOn,
18
    NodeDivergence,
19
    NodeIntrinsicFunction,
20
    NodeMapping,
21
    NodeObject,
22
    NodeOutput,
23
    NodeOutputs,
24
    NodeParameter,
25
    NodeProperties,
26
    NodeProperty,
27
    NodeResource,
28
    NodeTemplate,
29
    Nothing,
30
    Scope,
31
    TerminalValue,
32
    TerminalValueCreated,
33
    TerminalValueModified,
34
    TerminalValueRemoved,
35
    TerminalValueUnchanged,
36
    is_nothing,
37
)
38
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
39
    ChangeSetModelVisitor,
40
)
41
from localstack.services.cloudformation.stores import get_cloudformation_store
1✔
42
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
43
from localstack.utils.aws.arns import get_partition
1✔
44
from localstack.utils.urls import localstack_host
1✔
45

46
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
47

48
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
49
    "AWS::Partition",
50
    "AWS::AccountId",
51
    "AWS::Region",
52
    "AWS::StackName",
53
    "AWS::StackId",
54
    "AWS::URLSuffix",
55
    "AWS::NoValue",
56
    "AWS::NotificationARNs",
57
}
58

59
TBefore = TypeVar("TBefore")
1✔
60
TAfter = TypeVar("TAfter")
1✔
61

62

63
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
64
    before: Maybe[TBefore]
1✔
65
    after: Maybe[TAfter]
1✔
66

67
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
68
        self.before = before
1✔
69
        self.after = after
1✔
70

71
    def __eq__(self, other):
1✔
UNCOV
72
        if not isinstance(other, PreprocEntityDelta):
×
UNCOV
73
            return False
×
UNCOV
74
        return self.before == other.before and self.after == other.after
×
75

76

77
class PreprocProperties:
1✔
78
    properties: dict[str, Any]
1✔
79

80
    def __init__(self, properties: dict[str, Any]):
1✔
81
        self.properties = properties
1✔
82

83
    def __eq__(self, other):
1✔
84
        if not isinstance(other, PreprocProperties):
1✔
UNCOV
85
            return False
×
86
        return self.properties == other.properties
1✔
87

88

89
class PreprocResource:
1✔
90
    logical_id: str
1✔
91
    physical_resource_id: Optional[str]
1✔
92
    condition: Optional[bool]
1✔
93
    resource_type: str
1✔
94
    properties: PreprocProperties
1✔
95
    depends_on: Optional[list[str]]
1✔
96

97
    def __init__(
1✔
98
        self,
99
        logical_id: str,
100
        physical_resource_id: str,
101
        condition: Optional[bool],
102
        resource_type: str,
103
        properties: PreprocProperties,
104
        depends_on: Optional[list[str]],
105
    ):
106
        self.logical_id = logical_id
1✔
107
        self.physical_resource_id = physical_resource_id
1✔
108
        self.condition = condition
1✔
109
        self.resource_type = resource_type
1✔
110
        self.properties = properties
1✔
111
        self.depends_on = depends_on
1✔
112

113
    @staticmethod
1✔
114
    def _compare_conditions(c1: bool, c2: bool):
1✔
115
        # The lack of condition equates to a true condition.
116
        c1 = c1 if isinstance(c1, bool) else True
1✔
117
        c2 = c2 if isinstance(c2, bool) else True
1✔
118
        return c1 == c2
1✔
119

120
    def __eq__(self, other):
1✔
121
        if not isinstance(other, PreprocResource):
1✔
122
            return False
1✔
123
        return all(
1✔
124
            [
125
                self.logical_id == other.logical_id,
126
                self._compare_conditions(self.condition, other.condition),
127
                self.resource_type == other.resource_type,
128
                self.properties == other.properties,
129
            ]
130
        )
131

132

133
class PreprocOutput:
1✔
134
    name: str
1✔
135
    value: Any
1✔
136
    export: Optional[Any]
1✔
137
    condition: Optional[bool]
1✔
138

139
    def __init__(self, name: str, value: Any, export: Optional[Any], condition: Optional[bool]):
1✔
140
        self.name = name
1✔
141
        self.value = value
1✔
142
        self.export = export
1✔
143
        self.condition = condition
1✔
144

145
    def __eq__(self, other):
1✔
UNCOV
146
        if not isinstance(other, PreprocOutput):
×
UNCOV
147
            return False
×
UNCOV
148
        return all(
×
149
            [
150
                self.name == other.name,
151
                self.value == other.value,
152
                self.export == other.export,
153
                self.condition == other.condition,
154
            ]
155
        )
156

157

158
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
159
    _change_set: Final[ChangeSet]
1✔
160
    _node_template: Final[NodeTemplate]
1✔
161
    _before_resolved_resources: Final[dict]
1✔
162
    _processed: dict[Scope, Any]
1✔
163

164
    def __init__(self, change_set: ChangeSet):
1✔
165
        self._change_set = change_set
1✔
166
        self._node_template = change_set.update_graph
1✔
167
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
168
        self._processed = dict()
1✔
169

170
    def process(self) -> None:
1✔
171
        self._processed.clear()
1✔
172
        self.visit(self._node_template)
1✔
173

174
    def _get_node_resource_for(
1✔
175
        self, resource_name: str, node_template: NodeTemplate
176
    ) -> NodeResource:
177
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
178
        for node_resource in node_template.resources.resources:
1✔
179
            if node_resource.name == resource_name:
1✔
180
                self.visit(node_resource)
1✔
181
                return node_resource
1✔
UNCOV
182
        raise RuntimeError(f"No resource '{resource_name}' was found")
×
183

184
    def _get_node_property_for(
1✔
185
        self, property_name: str, node_resource: NodeResource
186
    ) -> Optional[NodeProperty]:
187
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
188
        for node_property in node_resource.properties.properties:
1✔
189
            if node_property.name == property_name:
1✔
190
                self.visit(node_property)
1✔
191
                return node_property
1✔
192
        return None
1✔
193

194
    def _deployed_property_value_of(
1✔
195
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
196
    ) -> Any:
197
        # TODO: typing around resolved resources is needed and should be reflected here.
198

199
        # Before we can obtain deployed value for a resource, we need to first ensure to
200
        # process the resource if this wasn't processed already. Ideally, values should only
201
        # be accessible through delta objects, to ensure computation is always complete at
202
        # every level.
203
        _ = self._get_node_resource_for(
1✔
204
            resource_name=resource_logical_id, node_template=self._node_template
205
        )
206
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
207
        if resolved_resource is None:
1✔
208
            raise RuntimeError(
1✔
209
                f"No deployed instances of resource '{resource_logical_id}' were found"
210
            )
211
        properties = resolved_resource.get("Properties", dict())
1✔
212
        property_value: Optional[Any] = properties.get(property_name)
1✔
213
        if property_value is None:
1✔
UNCOV
214
            raise RuntimeError(
×
215
                f"No '{property_name}' found for deployed resource '{resource_logical_id}' was found"
216
            )
217
        return property_value
1✔
218

219
    def _before_deployed_property_value_of(
1✔
220
        self, resource_logical_id: str, property_name: str
221
    ) -> Any:
222
        return self._deployed_property_value_of(
1✔
223
            resource_logical_id=resource_logical_id,
224
            property_name=property_name,
225
            resolved_resources=self._before_resolved_resources,
226
        )
227

228
    def _after_deployed_property_value_of(
1✔
229
        self, resource_logical_id: str, property_name: str
230
    ) -> Optional[str]:
231
        return self._before_deployed_property_value_of(
1✔
232
            resource_logical_id=resource_logical_id, property_name=property_name
233
        )
234

235
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
236
        mappings: list[NodeMapping] = self._node_template.mappings.mappings
1✔
237
        # TODO: another scenarios suggesting property lookups might be preferable.
238
        for mapping in mappings:
1✔
239
            if mapping.name == map_name:
1✔
240
                self.visit(mapping)
1✔
241
                return mapping
1✔
UNCOV
242
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
243

244
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
245
        parameters: list[NodeParameter] = self._node_template.parameters.parameters
1✔
246
        # TODO: another scenarios suggesting property lookups might be preferable.
247
        for parameter in parameters:
1✔
248
            if parameter.name == parameter_name:
1✔
249
                self.visit(parameter)
1✔
250
                return parameter
1✔
251
        return Nothing
1✔
252

253
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
254
        conditions: list[NodeCondition] = self._node_template.conditions.conditions
1✔
255
        # TODO: another scenarios suggesting property lookups might be preferable.
256
        for condition in conditions:
1✔
257
            if condition.name == condition_name:
1✔
258
                self.visit(condition)
1✔
259
                return condition
1✔
UNCOV
260
        return Nothing
×
261

262
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
263
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
264
        if isinstance(node_condition, NodeCondition):
1✔
265
            condition_delta = self.visit(node_condition)
1✔
266
            return condition_delta
1✔
UNCOV
267
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
268

269
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
270
        match pseudo_parameter_name:
1✔
271
            case "AWS::Partition":
1✔
272
                return get_partition(self._change_set.region_name)
1✔
273
            case "AWS::AccountId":
1✔
274
                return self._change_set.stack.account_id
1✔
275
            case "AWS::Region":
1✔
276
                return self._change_set.stack.region_name
1✔
277
            case "AWS::StackName":
1✔
278
                return self._change_set.stack.stack_name
1✔
279
            case "AWS::StackId":
1✔
280
                return self._change_set.stack.stack_id
1✔
281
            case "AWS::URLSuffix":
1✔
282
                return _AWS_URL_SUFFIX
1✔
283
            case "AWS::NoValue":
1✔
284
                return None
1✔
UNCOV
285
            case _:
×
UNCOV
286
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
287

288
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
289
        if logical_id in _PSEUDO_PARAMETERS:
1✔
290
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
291
                pseudo_parameter_name=logical_id
292
            )
293
            # Pseudo parameters are constants within the lifecycle of a template.
294
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
295

296
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
297
        if isinstance(node_parameter, NodeParameter):
1✔
298
            parameter_delta = self.visit(node_parameter)
1✔
299
            return parameter_delta
1✔
300

301
        node_resource = self._get_node_resource_for(
1✔
302
            resource_name=logical_id, node_template=self._node_template
303
        )
304
        resource_delta = self.visit(node_resource)
1✔
305
        before = resource_delta.before
1✔
306
        after = resource_delta.after
1✔
307
        return PreprocEntityDelta(before=before, after=after)
1✔
308

309
    def _resolve_mapping(
1✔
310
        self, map_name: str, top_level_key: str, second_level_key
311
    ) -> PreprocEntityDelta:
312
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
313
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
314
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
315
        if not isinstance(top_level_value, NodeObject):
1✔
UNCOV
316
            raise RuntimeError()
×
317
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
318
        mapping_value_delta = self.visit(second_level_value)
1✔
319
        return mapping_value_delta
1✔
320

321
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
322
        scope = change_set_entity.scope
1✔
323
        if scope in self._processed:
1✔
324
            delta = self._processed[scope]
1✔
325
            return delta
1✔
326
        delta = super().visit(change_set_entity=change_set_entity)
1✔
327
        self._processed[scope] = delta
1✔
328
        return delta
1✔
329

330
    def visit_terminal_value_modified(
1✔
331
        self, terminal_value_modified: TerminalValueModified
332
    ) -> PreprocEntityDelta:
333
        return PreprocEntityDelta(
1✔
334
            before=terminal_value_modified.value,
335
            after=terminal_value_modified.modified_value,
336
        )
337

338
    def visit_terminal_value_created(
1✔
339
        self, terminal_value_created: TerminalValueCreated
340
    ) -> PreprocEntityDelta:
341
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
342

343
    def visit_terminal_value_removed(
1✔
344
        self, terminal_value_removed: TerminalValueRemoved
345
    ) -> PreprocEntityDelta:
346
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
347

348
    def visit_terminal_value_unchanged(
1✔
349
        self, terminal_value_unchanged: TerminalValueUnchanged
350
    ) -> PreprocEntityDelta:
351
        return PreprocEntityDelta(
1✔
352
            before=terminal_value_unchanged.value,
353
            after=terminal_value_unchanged.value,
354
        )
355

356
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
357
        before_delta = self.visit(node_divergence.value)
1✔
358
        after_delta = self.visit(node_divergence.divergence)
1✔
359
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
360

361
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
362
        node_change_type = node_object.change_type
1✔
363
        before = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
364
        after = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
365
        for name, change_set_entity in node_object.bindings.items():
1✔
366
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
367
            delta_before = delta.before
1✔
368
            delta_after = delta.after
1✔
369
            if not is_nothing(before) and not is_nothing(delta_before) and delta_before is not None:
1✔
370
                before[name] = delta_before
1✔
371
            if not is_nothing(after) and not is_nothing(delta_after) and delta_after is not None:
1✔
372
                after[name] = delta_after
1✔
373
        return PreprocEntityDelta(before=before, after=after)
1✔
374

375
    def visit_node_intrinsic_function_fn_get_att(
1✔
376
        self, node_intrinsic_function: NodeIntrinsicFunction
377
    ) -> PreprocEntityDelta:
378
        # TODO: validate the return value according to the spec.
379
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
380
        before_argument: Maybe[list[str]] = arguments_delta.before
1✔
381
        if isinstance(before_argument, str):
1✔
UNCOV
382
            before_argument = before_argument.split(".")
×
383
        after_argument: Maybe[list[str]] = arguments_delta.after
1✔
384
        if isinstance(after_argument, str):
1✔
385
            after_argument = after_argument.split(".")
1✔
386

387
        before = Nothing
1✔
388
        if before_argument:
1✔
389
            before_logical_name_of_resource = before_argument[0]
1✔
390
            before_attribute_name = before_argument[1]
1✔
391

392
            before_node_resource = self._get_node_resource_for(
1✔
393
                resource_name=before_logical_name_of_resource, node_template=self._node_template
394
            )
395
            before_node_property: Optional[NodeProperty] = self._get_node_property_for(
1✔
396
                property_name=before_attribute_name, node_resource=before_node_resource
397
            )
398
            if before_node_property is not None:
1✔
399
                # The property is statically defined in the template and its value can be computed.
400
                before_property_delta = self.visit(before_node_property)
1✔
401
                before = before_property_delta.before
1✔
402
            else:
403
                # The property is not statically defined and must therefore be available in
404
                # the properties deployed set.
405
                before = self._before_deployed_property_value_of(
1✔
406
                    resource_logical_id=before_logical_name_of_resource,
407
                    property_name=before_attribute_name,
408
                )
409

410
        after = Nothing
1✔
411
        if after_argument:
1✔
412
            after_logical_name_of_resource = after_argument[0]
1✔
413
            after_attribute_name = after_argument[1]
1✔
414
            after_node_resource = self._get_node_resource_for(
1✔
415
                resource_name=after_logical_name_of_resource, node_template=self._node_template
416
            )
417
            after_node_property = self._get_node_property_for(
1✔
418
                property_name=after_attribute_name, node_resource=after_node_resource
419
            )
420
            if after_node_property is not None:
1✔
421
                # The property is statically defined in the template and its value can be computed.
422
                after_property_delta = self.visit(after_node_property)
1✔
423
                after = after_property_delta.after
1✔
424
            else:
425
                # The property is not statically defined and must therefore be available in
426
                # the properties deployed set.
427
                after = self._after_deployed_property_value_of(
1✔
428
                    resource_logical_id=after_logical_name_of_resource,
429
                    property_name=after_attribute_name,
430
                )
431

432
        return PreprocEntityDelta(before=before, after=after)
1✔
433

434
    def visit_node_intrinsic_function_fn_equals(
1✔
435
        self, node_intrinsic_function: NodeIntrinsicFunction
436
    ) -> PreprocEntityDelta:
437
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
438
        before_values = arguments_delta.before
1✔
439
        after_values = arguments_delta.after
1✔
440
        before = Nothing
1✔
441
        if before_values:
1✔
442
            before = before_values[0] == before_values[1]
1✔
443
        after = Nothing
1✔
444
        if after_values:
1✔
445
            after = after_values[0] == after_values[1]
1✔
446
        return PreprocEntityDelta(before=before, after=after)
1✔
447

448
    def visit_node_intrinsic_function_fn_if(
1✔
449
        self, node_intrinsic_function: NodeIntrinsicFunction
450
    ) -> PreprocEntityDelta:
451
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
452
        arguments_before = arguments_delta.before
1✔
453
        arguments_after = arguments_delta.after
1✔
454

455
        def _compute_delta_for_if_statement(args: list[Any]) -> PreprocEntityDelta:
1✔
456
            condition_name = args[0]
1✔
457
            boolean_expression_delta = self._resolve_condition(logical_id=condition_name)
1✔
458
            return PreprocEntityDelta(
1✔
459
                before=args[1] if boolean_expression_delta.before else args[2],
460
                after=args[1] if boolean_expression_delta.after else args[2],
461
            )
462

463
        # TODO: add support for this being created or removed.
464
        before = Nothing
1✔
465
        if not is_nothing(arguments_before):
1✔
466
            before_outcome_delta = _compute_delta_for_if_statement(arguments_before)
×
467
            before = before_outcome_delta.before
×
468
        after = Nothing
1✔
469
        if not is_nothing(arguments_after):
1✔
470
            after_outcome_delta = _compute_delta_for_if_statement(arguments_after)
1✔
471
            after = after_outcome_delta.after
1✔
472
        return PreprocEntityDelta(before=before, after=after)
1✔
473

474
    def visit_node_intrinsic_function_fn_not(
1✔
475
        self, node_intrinsic_function: NodeIntrinsicFunction
476
    ) -> PreprocEntityDelta:
477
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
478
        before_condition = arguments_delta.before
1✔
479
        after_condition = arguments_delta.after
1✔
480
        before = Nothing
1✔
481
        if not is_nothing(before_condition):
1✔
UNCOV
482
            before_condition_outcome = before_condition[0]
×
UNCOV
483
            before = not before_condition_outcome
×
484
        after = Nothing
1✔
485
        if not is_nothing(after_condition):
1✔
486
            after_condition_outcome = after_condition[0]
1✔
487
            after = not after_condition_outcome
1✔
488
        # Implicit change type computation.
489
        return PreprocEntityDelta(before=before, after=after)
1✔
490

491
    def _compute_fn_transform(self, args: dict[str, Any]) -> Any:
1✔
492
        # TODO: add typing to arguments before this level.
493
        # TODO: add schema validation
494
        # TODO: add support for other transform types
495

496
        account_id = self._change_set.account_id
1✔
497
        region_name = self._change_set.region_name
1✔
498
        transform_name: str = args.get("Name")
1✔
499
        if not isinstance(transform_name, str):
1✔
UNCOV
500
            raise RuntimeError("Invalid or missing Fn::Transform 'Name' argument")
×
501
        transform_parameters: dict = args.get("Parameters")
1✔
502
        if not isinstance(transform_parameters, dict):
1✔
UNCOV
503
            raise RuntimeError("Invalid or missing Fn::Transform 'Parameters' argument")
×
504

505
        if transform_name in transformers:
1✔
506
            # TODO: port and refactor this 'transformers' logic to this package.
507
            builtin_transformer_class = transformers[transform_name]
1✔
508
            builtin_transformer: Transformer = builtin_transformer_class()
1✔
509
            transform_output: Any = builtin_transformer.transform(
1✔
510
                account_id=account_id, region_name=region_name, parameters=transform_parameters
511
            )
512
            return transform_output
1✔
513

UNCOV
514
        macros_store = get_cloudformation_store(
×
515
            account_id=account_id, region_name=region_name
516
        ).macros
UNCOV
517
        if transform_name in macros_store:
×
518
            # TODO: this formatting of stack parameters is odd but required to integrate with v1 execute_macro util.
519
            #  consider porting this utils and passing the plain list of parameters instead.
UNCOV
520
            stack_parameters = {
×
521
                parameter["ParameterKey"]: parameter
522
                for parameter in self._change_set.stack.parameters
523
            }
UNCOV
524
            transform_output: Any = execute_macro(
×
525
                account_id=account_id,
526
                region_name=region_name,
527
                parsed_template=dict(),  # TODO: review the requirements for this argument.
528
                macro=args,  # TODO: review support for non dict bindings (v1).
529
                stack_parameters=stack_parameters,
530
                transformation_parameters=transform_parameters,
531
                is_intrinsic=True,
532
            )
UNCOV
533
            return transform_output
×
534

UNCOV
535
        raise RuntimeError(
×
536
            f"Unsupported transform function '{transform_name}' in '{self._change_set.stack.stack_name}'"
537
        )
538

539
    def visit_node_intrinsic_function_fn_transform(
1✔
540
        self, node_intrinsic_function: NodeIntrinsicFunction
541
    ) -> PreprocEntityDelta:
542
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
543
        arguments_before = arguments_delta.before
1✔
544
        arguments_after = arguments_delta.after
1✔
545

546
        # TODO: review the use of cache in self.precessed from the 'before' run to
547
        #  ensure changes to the lambda (such as after UpdateFunctionCode) do not
548
        #  generalise tot he before value at this depth (thus making it seems as
549
        #  though for this transformation before==after). Another options may be to
550
        #  have specialised caching for transformations.
551

552
        # TODO: add tests to review the behaviour of CFN with changes to transformation
553
        #  function code and no changes to the template.
554

555
        before = Nothing
1✔
556
        if not is_nothing(arguments_before):
1✔
UNCOV
557
            before = self._compute_fn_transform(args=arguments_before)
×
558
        after = Nothing
1✔
559
        if not is_nothing(arguments_after):
1✔
560
            after = self._compute_fn_transform(args=arguments_after)
1✔
561
        return PreprocEntityDelta(before=before, after=after)
1✔
562

563
    def visit_node_intrinsic_function_fn_sub(
1✔
564
        self, node_intrinsic_function: NodeIntrinsicFunction
565
    ) -> PreprocEntityDelta:
566
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
567
        arguments_before = arguments_delta.before
1✔
568
        arguments_after = arguments_delta.after
1✔
569

570
        def _compute_sub(args: str | list[Any], select_before: bool = False) -> str:
1✔
571
            # TODO: add further schema validation.
572
            string_template: str
573
            sub_parameters: dict
574
            if isinstance(args, str):
1✔
575
                string_template = args
1✔
576
                sub_parameters = dict()
1✔
577
            elif (
1✔
578
                isinstance(args, list)
579
                and len(args) == 2
580
                and isinstance(args[0], str)
581
                and isinstance(args[1], dict)
582
            ):
583
                string_template = args[0]
1✔
584
                sub_parameters = args[1]
1✔
585
            else:
UNCOV
586
                raise RuntimeError(
×
587
                    "Invalid arguments shape for Fn::Sub, expected a String "
588
                    f"or a Tuple of String and Map but got '{args}'"
589
                )
590
            sub_string = string_template
1✔
591
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
592
            for template_variable_name in template_variable_names:
1✔
593
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
594
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
595
                        pseudo_parameter_name=template_variable_name
596
                    )
597
                elif template_variable_name in sub_parameters:
1✔
598
                    template_variable_value = sub_parameters[template_variable_name]
1✔
599
                else:
600
                    try:
1✔
601
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
602
                        template_variable_value = (
1✔
603
                            resource_delta.before if select_before else resource_delta.after
604
                        )
605
                        if isinstance(template_variable_value, PreprocResource):
1✔
UNCOV
606
                            template_variable_value = template_variable_value.logical_id
×
UNCOV
607
                    except RuntimeError:
×
UNCOV
608
                        raise RuntimeError(
×
609
                            f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
610
                        )
611
                sub_string = sub_string.replace(
1✔
612
                    f"${{{template_variable_name}}}", template_variable_value
613
                )
614
            return sub_string
1✔
615

616
        before = Nothing
1✔
617
        if not is_nothing(arguments_before):
1✔
618
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
619
        after = Nothing
1✔
620
        if not is_nothing(arguments_after):
1✔
621
            after = _compute_sub(args=arguments_after)
1✔
622
        return PreprocEntityDelta(before=before, after=after)
1✔
623

624
    def visit_node_intrinsic_function_fn_join(
1✔
625
        self, node_intrinsic_function: NodeIntrinsicFunction
626
    ) -> PreprocEntityDelta:
627
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
628
        arguments_before = arguments_delta.before
1✔
629
        arguments_after = arguments_delta.after
1✔
630

631
        def _compute_join(args: list[Any]) -> str:
1✔
632
            # TODO: add support for schema validation.
633
            # TODO: add tests for joining non string values.
634
            delimiter: str = str(args[0])
1✔
635
            values: list[Any] = args[1]
1✔
636
            if not isinstance(values, list):
1✔
UNCOV
637
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
×
638
            join_result = delimiter.join(map(str, values))
1✔
639
            return join_result
1✔
640

641
        before = Nothing
1✔
642
        if isinstance(arguments_before, list) and len(arguments_before) == 2:
1✔
643
            before = _compute_join(arguments_before)
1✔
644
        after = Nothing
1✔
645
        if isinstance(arguments_after, list) and len(arguments_after) == 2:
1✔
646
            after = _compute_join(arguments_after)
1✔
647
        return PreprocEntityDelta(before=before, after=after)
1✔
648

649
    def visit_node_intrinsic_function_fn_select(
1✔
650
        self, node_intrinsic_function: NodeIntrinsicFunction
651
    ):
652
        # TODO: add further support for schema validation
653
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
654
        arguments_before = arguments_delta.before
1✔
655
        arguments_after = arguments_delta.after
1✔
656

657
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
658
            values: list[Any] = args[1]
1✔
659
            if not isinstance(values, list) or not values:
1✔
UNCOV
660
                raise RuntimeError(f"Invalid arguments list value for Fn::Select: '{values}'")
×
661
            values_len = len(values)
1✔
662
            index: int = int(args[0])
1✔
663
            if not isinstance(index, int) or index < 0 or index > values_len:
1✔
UNCOV
664
                raise RuntimeError(f"Invalid or out of range index value for Fn::Select: '{index}'")
×
665
            selection = values[index]
1✔
666
            return selection
1✔
667

668
        before = Nothing
1✔
669
        if not is_nothing(arguments_before):
1✔
670
            before = _compute_fn_select(arguments_before)
1✔
671

672
        after = Nothing
1✔
673
        if not is_nothing(arguments_after):
1✔
674
            after = _compute_fn_select(arguments_after)
1✔
675

676
        return PreprocEntityDelta(before=before, after=after)
1✔
677

678
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
679
        self, node_intrinsic_function: NodeIntrinsicFunction
680
    ) -> PreprocEntityDelta:
681
        # TODO: add type checking/validation for result unit?
682
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
683
        before_arguments = arguments_delta.before
1✔
684
        after_arguments = arguments_delta.after
1✔
685
        before = Nothing
1✔
686
        if before_arguments:
1✔
687
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
688
            before = before_value_delta.before
1✔
689
        after = Nothing
1✔
690
        if after_arguments:
1✔
691
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
692
            after = after_value_delta.after
1✔
693
        return PreprocEntityDelta(before=before, after=after)
1✔
694

695
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
696
        bindings_delta = self.visit(node_mapping.bindings)
1✔
697
        return bindings_delta
1✔
698

699
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
700
        dynamic_value = node_parameter.dynamic_value
1✔
701
        dynamic_delta = self.visit(dynamic_value)
1✔
702

703
        default_value = node_parameter.default_value
1✔
704
        default_delta = self.visit(default_value)
1✔
705

706
        before = dynamic_delta.before or default_delta.before
1✔
707
        after = dynamic_delta.after or default_delta.after
1✔
708

709
        return PreprocEntityDelta(before=before, after=after)
1✔
710

711
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
712
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
713
        return array_identifiers_delta
1✔
714

715
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
716
        delta = self.visit(node_condition.body)
1✔
717
        return delta
1✔
718

719
    def _resource_physical_resource_id_from(
1✔
720
        self, logical_resource_id: str, resolved_resources: dict
721
    ) -> str:
722
        # TODO: typing around resolved resources is needed and should be reflected here.
723
        resolved_resource = resolved_resources.get(logical_resource_id, dict())
1✔
724
        physical_resource_id: Optional[str] = resolved_resource.get("PhysicalResourceId")
1✔
725
        if not isinstance(physical_resource_id, str):
1✔
726
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
727
        return physical_resource_id
1✔
728

729
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
730
        # TODO: typing around resolved resources is needed and should be reflected here.
731
        return self._resource_physical_resource_id_from(
1✔
732
            logical_resource_id=resource_logical_id,
733
            resolved_resources=self._before_resolved_resources,
734
        )
735

736
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
737
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
738

739
    def visit_node_intrinsic_function_ref(
1✔
740
        self, node_intrinsic_function: NodeIntrinsicFunction
741
    ) -> PreprocEntityDelta:
742
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
743
        before_logical_id = arguments_delta.before
1✔
744
        after_logical_id = arguments_delta.after
1✔
745

746
        # TODO: extend this to support references to other types.
747
        before = Nothing
1✔
748
        if not is_nothing(before_logical_id):
1✔
749
            before_delta = self._resolve_reference(logical_id=before_logical_id)
1✔
750
            before = before_delta.before
1✔
751
            if isinstance(before, PreprocResource):
1✔
752
                before = before.physical_resource_id
1✔
753

754
        after = Nothing
1✔
755
        if not is_nothing(after_logical_id):
1✔
756
            after_delta = self._resolve_reference(logical_id=after_logical_id)
1✔
757
            after = after_delta.after
1✔
758
            if isinstance(after, PreprocResource):
1✔
759
                after = after.physical_resource_id
1✔
760

761
        return PreprocEntityDelta(before=before, after=after)
1✔
762

763
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
764
        node_change_type = node_array.change_type
1✔
765
        before = list() if node_change_type != ChangeType.CREATED else Nothing
1✔
766
        after = list() if node_change_type != ChangeType.REMOVED else Nothing
1✔
767
        for change_set_entity in node_array.array:
1✔
768
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
769
            delta_before = delta.before
1✔
770
            delta_after = delta.after
1✔
771
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
772
                before.append(delta_before)
1✔
773
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
774
                after.append(delta_after)
1✔
775
        return PreprocEntityDelta(before=before, after=after)
1✔
776

777
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
778
        return self.visit(node_property.value)
1✔
779

780
    def visit_node_properties(
1✔
781
        self, node_properties: NodeProperties
782
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
783
        node_change_type = node_properties.change_type
1✔
784
        before_bindings = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
785
        after_bindings = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
786
        for node_property in node_properties.properties:
1✔
787
            property_name = node_property.name
1✔
788
            delta = self.visit(node_property)
1✔
789
            delta_before = delta.before
1✔
790
            delta_after = delta.after
1✔
791
            if (
1✔
792
                not is_nothing(before_bindings)
793
                and not is_nothing(delta_before)
794
                and delta_before is not None
795
            ):
796
                before_bindings[property_name] = delta_before
1✔
797
            if (
1✔
798
                not is_nothing(after_bindings)
799
                and not is_nothing(delta_after)
800
                and delta_after is not None
801
            ):
802
                after_bindings[property_name] = delta_after
1✔
803
        before = Nothing
1✔
804
        if not is_nothing(before_bindings):
1✔
805
            before = PreprocProperties(properties=before_bindings)
1✔
806
        after = Nothing
1✔
807
        if not is_nothing(after_bindings):
1✔
808
            after = PreprocProperties(properties=after_bindings)
1✔
809
        return PreprocEntityDelta(before=before, after=after)
1✔
810

811
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
812
        reference_delta = self.visit(reference)
1✔
813
        before_reference = reference_delta.before
1✔
814
        before = Nothing
1✔
815
        if isinstance(before_reference, str):
1✔
816
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
817
            before = before_delta.before
1✔
818
        after = Nothing
1✔
819
        after_reference = reference_delta.after
1✔
820
        if isinstance(after_reference, str):
1✔
821
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
822
            after = after_delta.after
1✔
823
        return PreprocEntityDelta(before=before, after=after)
1✔
824

825
    def visit_node_resource(
1✔
826
        self, node_resource: NodeResource
827
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
828
        change_type = node_resource.change_type
1✔
829
        condition_before = Nothing
1✔
830
        condition_after = Nothing
1✔
831
        if not is_nothing(node_resource.condition_reference):
1✔
832
            condition_delta = self._resolve_resource_condition_reference(
1✔
833
                node_resource.condition_reference
834
            )
835
            condition_before = condition_delta.before
1✔
836
            condition_after = condition_delta.after
1✔
837

838
        depends_on_before = Nothing
1✔
839
        depends_on_after = Nothing
1✔
840
        if not is_nothing(node_resource.depends_on):
1✔
841
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
842
            depends_on_before = depends_on_delta.before
1✔
843
            depends_on_after = depends_on_delta.after
1✔
844

845
        type_delta = self.visit(node_resource.type_)
1✔
846
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
847
            node_resource.properties
848
        )
849

850
        before = Nothing
1✔
851
        after = Nothing
1✔
852
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
853
            logical_resource_id = node_resource.name
1✔
854
            before_physical_resource_id = self._before_resource_physical_id(
1✔
855
                resource_logical_id=logical_resource_id
856
            )
857
            before = PreprocResource(
1✔
858
                logical_id=logical_resource_id,
859
                physical_resource_id=before_physical_resource_id,
860
                condition=condition_before,
861
                resource_type=type_delta.before,
862
                properties=properties_delta.before,
863
                depends_on=depends_on_before,
864
            )
865
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
866
            logical_resource_id = node_resource.name
1✔
867
            try:
1✔
868
                after_physical_resource_id = self._after_resource_physical_id(
1✔
869
                    resource_logical_id=logical_resource_id
870
                )
871
            except RuntimeError:
1✔
872
                after_physical_resource_id = None
1✔
873
            after = PreprocResource(
1✔
874
                logical_id=logical_resource_id,
875
                physical_resource_id=after_physical_resource_id,
876
                condition=condition_after,
877
                resource_type=type_delta.after,
878
                properties=properties_delta.after,
879
                depends_on=depends_on_after,
880
            )
881
        return PreprocEntityDelta(before=before, after=after)
1✔
882

883
    def visit_node_output(
1✔
884
        self, node_output: NodeOutput
885
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
886
        change_type = node_output.change_type
1✔
887
        value_delta = self.visit(node_output.value)
1✔
888

889
        condition_delta = Nothing
1✔
890
        if not is_nothing(node_output.condition_reference):
1✔
891
            condition_delta = self._resolve_resource_condition_reference(
1✔
892
                node_output.condition_reference
893
            )
894
            condition_before = condition_delta.before
1✔
895
            condition_after = condition_delta.after
1✔
896
            if not condition_before and condition_after:
1✔
897
                change_type = ChangeType.CREATED
1✔
898
            elif condition_before and not condition_after:
1✔
UNCOV
899
                change_type = ChangeType.REMOVED
×
900

901
        export_delta = Nothing
1✔
902
        if not is_nothing(node_output.export):
1✔
UNCOV
903
            export_delta = self.visit(node_output.export)
×
904

905
        before: Maybe[PreprocOutput] = Nothing
1✔
906
        if change_type != ChangeType.CREATED:
1✔
907
            before = PreprocOutput(
1✔
908
                name=node_output.name,
909
                value=value_delta.before,
910
                export=export_delta.before if export_delta else None,
911
                condition=condition_delta.before if condition_delta else None,
912
            )
913
        after: Maybe[PreprocOutput] = Nothing
1✔
914
        if change_type != ChangeType.REMOVED:
1✔
915
            after = PreprocOutput(
1✔
916
                name=node_output.name,
917
                value=value_delta.after,
918
                export=export_delta.after if export_delta else None,
919
                condition=condition_delta.after if condition_delta else None,
920
            )
921
        return PreprocEntityDelta(before=before, after=after)
1✔
922

923
    def visit_node_outputs(
1✔
924
        self, node_outputs: NodeOutputs
925
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
926
        before: list[PreprocOutput] = list()
1✔
927
        after: list[PreprocOutput] = list()
1✔
928
        for node_output in node_outputs.outputs:
1✔
929
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
930
            output_before = output_delta.before
1✔
931
            output_after = output_delta.after
1✔
932
            if not is_nothing(output_before):
1✔
933
                before.append(output_before)
1✔
934
            if not is_nothing(output_after):
1✔
935
                after.append(output_after)
1✔
936
        return PreprocEntityDelta(before=before, after=after)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc