• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16869709979

08 Aug 2025 06:44PM UTC coverage: 86.837% (-0.004%) from 86.841%
16869709979

push

github

web-flow
CFNV2: fix delete change set (#12977)

3 of 3 new or added lines in 1 file covered. (100.0%)

130 existing lines in 5 files now uncovered.

66655 of 76759 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.28
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
13
from localstack.aws.connect import connect_to
1✔
14
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
15
    ChangeSetEntity,
16
    ChangeType,
17
    Maybe,
18
    NodeArray,
19
    NodeCondition,
20
    NodeDependsOn,
21
    NodeDivergence,
22
    NodeIntrinsicFunction,
23
    NodeMapping,
24
    NodeObject,
25
    NodeOutput,
26
    NodeOutputs,
27
    NodeParameter,
28
    NodeParameters,
29
    NodeProperties,
30
    NodeProperty,
31
    NodeResource,
32
    NodeTemplate,
33
    Nothing,
34
    NothingType,
35
    Scope,
36
    TerminalValue,
37
    TerminalValueCreated,
38
    TerminalValueModified,
39
    TerminalValueRemoved,
40
    TerminalValueUnchanged,
41
    is_nothing,
42
)
43
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
44
    ChangeSetModelVisitor,
45
)
46
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
47
    extract_dynamic_reference,
48
    perform_dynamic_reference_lookup,
49
)
50
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
51
from localstack.services.cloudformation.stores import (
1✔
52
    exports_map,
53
)
54
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
55
from localstack.utils.aws.arns import get_partition
1✔
56
from localstack.utils.objects import get_value_from_path
1✔
57
from localstack.utils.run import to_str
1✔
58
from localstack.utils.strings import to_bytes
1✔
59
from localstack.utils.urls import localstack_host
1✔
60

61
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
62

63
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
64
    "AWS::Partition",
65
    "AWS::AccountId",
66
    "AWS::Region",
67
    "AWS::StackName",
68
    "AWS::StackId",
69
    "AWS::URLSuffix",
70
    "AWS::NoValue",
71
    "AWS::NotificationARNs",
72
}
73

74
TBefore = TypeVar("TBefore")
1✔
75
TAfter = TypeVar("TAfter")
1✔
76

77
MOCKED_REFERENCE = "unknown"
1✔
78

79
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
80

81

82
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
83
    before: Maybe[TBefore]
1✔
84
    after: Maybe[TAfter]
1✔
85

86
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
87
        self.before = before
1✔
88
        self.after = after
1✔
89

90
    def __eq__(self, other):
1✔
UNCOV
91
        if not isinstance(other, PreprocEntityDelta):
×
UNCOV
92
            return False
×
UNCOV
93
        return self.before == other.before and self.after == other.after
×
94

95

96
class PreprocProperties:
1✔
97
    properties: dict[str, Any]
1✔
98

99
    def __init__(self, properties: dict[str, Any]):
1✔
100
        self.properties = properties
1✔
101

102
    def __eq__(self, other):
1✔
103
        if not isinstance(other, PreprocProperties):
1✔
UNCOV
104
            return False
×
105
        return self.properties == other.properties
1✔
106

107

108
class PreprocResource:
1✔
109
    logical_id: str
1✔
110
    physical_resource_id: str | None
1✔
111
    condition: bool | None
1✔
112
    resource_type: str
1✔
113
    properties: PreprocProperties
1✔
114
    depends_on: list[str] | None
1✔
115
    requires_replacement: bool
1✔
116

117
    def __init__(
1✔
118
        self,
119
        logical_id: str,
120
        physical_resource_id: str,
121
        condition: bool | None,
122
        resource_type: str,
123
        properties: PreprocProperties,
124
        depends_on: list[str] | None,
125
        requires_replacement: bool,
126
    ):
127
        self.logical_id = logical_id
1✔
128
        self.physical_resource_id = physical_resource_id
1✔
129
        self.condition = condition
1✔
130
        self.resource_type = resource_type
1✔
131
        self.properties = properties
1✔
132
        self.depends_on = depends_on
1✔
133
        self.requires_replacement = requires_replacement
1✔
134

135
    @staticmethod
1✔
136
    def _compare_conditions(c1: bool, c2: bool):
1✔
137
        # The lack of condition equates to a true condition.
138
        c1 = c1 if isinstance(c1, bool) else True
1✔
139
        c2 = c2 if isinstance(c2, bool) else True
1✔
140
        return c1 == c2
1✔
141

142
    def __eq__(self, other):
1✔
143
        if not isinstance(other, PreprocResource):
1✔
144
            return False
1✔
145
        return all(
1✔
146
            [
147
                self.logical_id == other.logical_id,
148
                self._compare_conditions(self.condition, other.condition),
149
                self.resource_type == other.resource_type,
150
                self.properties == other.properties,
151
            ]
152
        )
153

154

155
class PreprocOutput:
1✔
156
    name: str
1✔
157
    value: Any
1✔
158
    export: Any | None
1✔
159
    condition: bool | None
1✔
160

161
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
162
        self.name = name
1✔
163
        self.value = value
1✔
164
        self.export = export
1✔
165
        self.condition = condition
1✔
166

167
    def __eq__(self, other):
1✔
UNCOV
168
        if not isinstance(other, PreprocOutput):
×
UNCOV
169
            return False
×
UNCOV
170
        return all(
×
171
            [
172
                self.name == other.name,
173
                self.value == other.value,
174
                self.export == other.export,
175
                self.condition == other.condition,
176
            ]
177
        )
178

179

180
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
181
    _change_set: Final[ChangeSet]
1✔
182
    _before_resolved_resources: Final[dict]
1✔
183
    _before_cache: Final[dict[Scope, Any]]
1✔
184
    _after_cache: Final[dict[Scope, Any]]
1✔
185

186
    def __init__(self, change_set: ChangeSet):
1✔
187
        self._change_set = change_set
1✔
188
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
189
        self._before_cache = dict()
1✔
190
        self._after_cache = dict()
1✔
191

192
    def _setup_runtime_cache(self) -> None:
1✔
193
        runtime_cache_key = self.__class__.__name__
1✔
194

195
        self._before_cache.clear()
1✔
196
        self._after_cache.clear()
1✔
197

198
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
199
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
200
            self._before_cache.update(cache)
1✔
201

202
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
203
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
UNCOV
204
            self._after_cache.update(cache)
×
205

206
    def _save_runtime_cache(self) -> None:
1✔
207
        runtime_cache_key = self.__class__.__name__
1✔
208

209
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
210
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
211

212
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
213
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
214

215
    def process(self) -> None:
1✔
216
        self._setup_runtime_cache()
1✔
217
        node_template = self._change_set.update_model.node_template
1✔
218
        self.visit(node_template)
1✔
219
        self._save_runtime_cache()
1✔
220

221
    def _get_node_resource_for(
1✔
222
        self, resource_name: str, node_template: NodeTemplate
223
    ) -> NodeResource:
224
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
225
        for node_resource in node_template.resources.resources:
1✔
226
            if node_resource.name == resource_name:
1✔
227
                self.visit(node_resource)
1✔
228
                return node_resource
1✔
229
        raise ValidationError(
1✔
230
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
231
        )
232

233
    def _get_node_property_for(
1✔
234
        self, property_name: str, node_resource: NodeResource
235
    ) -> NodeProperty | None:
236
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
237
        for node_property in node_resource.properties.properties:
1✔
238
            if node_property.name == property_name:
1✔
239
                self.visit(node_property)
1✔
240
                return node_property
1✔
241
        return None
1✔
242

243
    def _deployed_property_value_of(
1✔
244
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
245
    ) -> Any:
246
        # We have to override this function to make sure it does not try to access the
247
        # resolved resource
248

249
        # Before we can obtain deployed value for a resource, we need to first ensure to
250
        # process the resource if this wasn't processed already. Ideally, values should only
251
        # be accessible through delta objects, to ensure computation is always complete at
252
        # every level.
253
        _ = self._get_node_resource_for(
1✔
254
            resource_name=resource_logical_id,
255
            node_template=self._change_set.update_model.node_template,
256
        )
257
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
258
        if resolved_resource is None:
1✔
259
            raise RuntimeError(
1✔
260
                f"No deployed instances of resource '{resource_logical_id}' were found"
261
            )
262
        properties = resolved_resource.get("Properties", dict())
1✔
263
        # support structured properties, e.g. NestedStack.Outputs.OutputName
264
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
265

266
        if property_value:
1✔
267
            if not isinstance(property_value, str):
1✔
268
                # TODO: is this correct? If there is a bug in the logic here, it's probably
269
                #  better to know about it with a clear error message than to receive some form
270
                #  of message about trying to use a dictionary in place of a string
UNCOV
271
                raise RuntimeError(
×
272
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value"
273
                )
274
            return property_value
1✔
275
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
276
            return MOCKED_REFERENCE
1✔
277

UNCOV
278
        return property_value
×
279

280
    def _before_deployed_property_value_of(
1✔
281
        self, resource_logical_id: str, property_name: str
282
    ) -> Any:
283
        return self._deployed_property_value_of(
1✔
284
            resource_logical_id=resource_logical_id,
285
            property_name=property_name,
286
            resolved_resources=self._before_resolved_resources,
287
        )
288

289
    def _after_deployed_property_value_of(
1✔
290
        self, resource_logical_id: str, property_name: str
291
    ) -> str | None:
292
        return self._before_deployed_property_value_of(
1✔
293
            resource_logical_id=resource_logical_id, property_name=property_name
294
        )
295

296
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
297
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
298
        # TODO: another scenarios suggesting property lookups might be preferable.
299
        for mapping in mappings:
1✔
300
            if mapping.name == map_name:
1✔
301
                self.visit(mapping)
1✔
302
                return mapping
1✔
UNCOV
303
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
304

305
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
306
        parameters: list[NodeParameter] = (
1✔
307
            self._change_set.update_model.node_template.parameters.parameters
308
        )
309
        # TODO: another scenarios suggesting property lookups might be preferable.
310
        for parameter in parameters:
1✔
311
            if parameter.name == parameter_name:
1✔
312
                self.visit(parameter)
1✔
313
                return parameter
1✔
314
        return Nothing
1✔
315

316
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
317
        conditions: list[NodeCondition] = (
1✔
318
            self._change_set.update_model.node_template.conditions.conditions
319
        )
320
        # TODO: another scenarios suggesting property lookups might be preferable.
321
        for condition in conditions:
1✔
322
            if condition.name == condition_name:
1✔
323
                self.visit(condition)
1✔
324
                return condition
1✔
UNCOV
325
        return Nothing
×
326

327
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
328
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
329
        if isinstance(node_condition, NodeCondition):
1✔
330
            condition_delta = self.visit(node_condition)
1✔
331
            return condition_delta
1✔
UNCOV
332
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
333

334
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
335
        match pseudo_parameter_name:
1✔
336
            case "AWS::Partition":
1✔
337
                return get_partition(self._change_set.region_name)
1✔
338
            case "AWS::AccountId":
1✔
339
                return self._change_set.stack.account_id
1✔
340
            case "AWS::Region":
1✔
341
                return self._change_set.stack.region_name
1✔
342
            case "AWS::StackName":
1✔
343
                return self._change_set.stack.stack_name
1✔
344
            case "AWS::StackId":
1✔
345
                return self._change_set.stack.stack_id
1✔
346
            case "AWS::URLSuffix":
1✔
347
                return _AWS_URL_SUFFIX
1✔
348
            case "AWS::NoValue":
1✔
349
                return None
1✔
UNCOV
350
            case _:
×
UNCOV
351
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
352

353
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
354
        if logical_id in _PSEUDO_PARAMETERS:
1✔
355
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
356
                pseudo_parameter_name=logical_id
357
            )
358
            # Pseudo parameters are constants within the lifecycle of a template.
359
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
360

361
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
362
        if isinstance(node_parameter, NodeParameter):
1✔
363
            parameter_delta = self.visit(node_parameter)
1✔
364
            return parameter_delta
1✔
365

366
        node_resource = self._get_node_resource_for(
1✔
367
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
368
        )
369
        resource_delta = self.visit(node_resource)
1✔
370
        before = resource_delta.before
1✔
371
        after = resource_delta.after
1✔
372
        return PreprocEntityDelta(before=before, after=after)
1✔
373

374
    def _resolve_mapping(
1✔
375
        self, map_name: str, top_level_key: str, second_level_key
376
    ) -> PreprocEntityDelta:
377
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
378
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
379
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
380
        if not isinstance(top_level_value, NodeObject):
1✔
381
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
382
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
383
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
384
        if not isinstance(second_level_value, TerminalValue):
1✔
385
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
386
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
387
        mapping_value_delta = self.visit(second_level_value)
1✔
388
        return mapping_value_delta
1✔
389

390
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
391
        entity_scope = change_set_entity.scope
1✔
392
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
393
            before = self._before_cache[entity_scope]
1✔
394
            after = self._after_cache[entity_scope]
1✔
395
            return PreprocEntityDelta(before=before, after=after)
1✔
396
        delta = super().visit(change_set_entity=change_set_entity)
1✔
397
        if isinstance(delta, PreprocEntityDelta):
1✔
398
            self._before_cache[entity_scope] = delta.before
1✔
399
            self._after_cache[entity_scope] = delta.after
1✔
400
        return delta
1✔
401

402
    def _cached_apply(
1✔
403
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
404
    ) -> PreprocEntityDelta:
405
        """
406
        Applies the resolver function to the given input delta if and only if the required
407
        values are not already present in the runtime caches. This function handles both
408
        the 'before' and 'after' components of the delta independently.
409

410
        The resolver function receives either the 'before' or 'after' value from the input
411
        delta and returns a resolved value. If the result returned by the resolver is
412
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
413
        component from it:  the 'before' value if the input was 'before', and the 'after'
414
        value if the input was 'after'.
415

416
        This function only reads from the cache and does not update it. It is the caller's
417
        responsibility to handle caching, either manually or via the upstream visit method
418
        of this class.
419

420
        Args:
421
            scope (Scope): The current scope used as a key for cache lookup.
422
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
423
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
424

425
        Returns:
426
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
427
        """
428

429
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
430
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
431
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
432

433
        arguments_before = arguments_delta.before
1✔
434
        arguments_after = arguments_delta.after
1✔
435

436
        before = self._before_cache.get(scope, Nothing)
1✔
437
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
438
            before = resolver(arguments_before)
1✔
439
            if isinstance(before, PreprocEntityDelta):
1✔
440
                before = before.before
1✔
441

442
        after = self._after_cache.get(scope, Nothing)
1✔
443
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
444
            after = resolver(arguments_after)
1✔
445
            if isinstance(after, PreprocEntityDelta):
1✔
446
                after = after.after
1✔
447

448
        return PreprocEntityDelta(before=before, after=after)
1✔
449

450
    def visit_terminal_value_modified(
1✔
451
        self, terminal_value_modified: TerminalValueModified
452
    ) -> PreprocEntityDelta:
453
        return PreprocEntityDelta(
1✔
454
            before=terminal_value_modified.value,
455
            after=terminal_value_modified.modified_value,
456
        )
457

458
    def visit_terminal_value_created(
1✔
459
        self, terminal_value_created: TerminalValueCreated
460
    ) -> PreprocEntityDelta:
461
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
462

463
    def visit_terminal_value_removed(
1✔
464
        self, terminal_value_removed: TerminalValueRemoved
465
    ) -> PreprocEntityDelta:
466
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
467

468
    def visit_terminal_value_unchanged(
1✔
469
        self, terminal_value_unchanged: TerminalValueUnchanged
470
    ) -> PreprocEntityDelta:
471
        return PreprocEntityDelta(
1✔
472
            before=terminal_value_unchanged.value,
473
            after=terminal_value_unchanged.value,
474
        )
475

476
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
477
        before_delta = self.visit(node_divergence.value)
1✔
478
        after_delta = self.visit(node_divergence.divergence)
1✔
479
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
480

481
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
482
        node_change_type = node_object.change_type
1✔
483
        before = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
484
        after = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
485
        for name, change_set_entity in node_object.bindings.items():
1✔
486
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
487
            delta_before = delta.before
1✔
488
            delta_after = delta.after
1✔
489
            if not is_nothing(before) and not is_nothing(delta_before) and delta_before is not None:
1✔
490
                before[name] = delta_before
1✔
491
            if not is_nothing(after) and not is_nothing(delta_after) and delta_after is not None:
1✔
492
                after[name] = delta_after
1✔
493
        return PreprocEntityDelta(before=before, after=after)
1✔
494

495
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
496
        # TODO: add arguments validation.
497
        arguments_list: list[str]
498
        if isinstance(arguments, str):
1✔
499
            arguments_list = arguments.split(".")
1✔
500
        else:
501
            arguments_list = arguments
1✔
502
        logical_name_of_resource = arguments_list[0]
1✔
503
        attribute_name = arguments_list[1]
1✔
504

505
        node_resource = self._get_node_resource_for(
1✔
506
            resource_name=logical_name_of_resource,
507
            node_template=self._change_set.update_model.node_template,
508
        )
509
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
510
            property_name=attribute_name, node_resource=node_resource
511
        )
512
        if node_property is not None:
1✔
513
            # The property is statically defined in the template and its value can be computed.
514
            property_delta = self.visit(node_property)
1✔
515
            value = property_delta.before if select_before else property_delta.after
1✔
516
        else:
517
            # The property is not statically defined and must therefore be available in
518
            # the properties deployed set.
519
            if select_before:
1✔
520
                value = self._before_deployed_property_value_of(
1✔
521
                    resource_logical_id=logical_name_of_resource,
522
                    property_name=attribute_name,
523
                )
524
            else:
525
                value = self._after_deployed_property_value_of(
1✔
526
                    resource_logical_id=logical_name_of_resource,
527
                    property_name=attribute_name,
528
                )
529
        return value
1✔
530

531
    def visit_node_intrinsic_function_fn_get_att(
1✔
532
        self, node_intrinsic_function: NodeIntrinsicFunction
533
    ) -> PreprocEntityDelta:
534
        # TODO: validate the return value according to the spec.
535
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
536
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
537
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
538

539
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
540
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
541
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
542

543
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
544
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
545
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
546

547
        return PreprocEntityDelta(before=before, after=after)
1✔
548

549
    def visit_node_intrinsic_function_fn_equals(
1✔
550
        self, node_intrinsic_function: NodeIntrinsicFunction
551
    ) -> PreprocEntityDelta:
552
        # TODO: add argument shape validation.
553
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
554
            return args[0] == args[1]
1✔
555

556
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
557
        delta = self._cached_apply(
1✔
558
            scope=node_intrinsic_function.scope,
559
            arguments_delta=arguments_delta,
560
            resolver=_compute_fn_equals,
561
        )
562
        return delta
1✔
563

564
    def visit_node_intrinsic_function_fn_if(
1✔
565
        self, node_intrinsic_function: NodeIntrinsicFunction
566
    ) -> PreprocEntityDelta:
567
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
568
        # False branch. If the condition is False, we don't evaluate the True branch.
569
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
UNCOV
570
            raise ValueError(
×
571
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
572
            )
573

574
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
575
        if_delta = PreprocEntityDelta()
1✔
576
        if not is_nothing(condition_delta.before):
1✔
577
            node_condition = self._get_node_condition_if_exists(
1✔
578
                condition_name=condition_delta.before
579
            )
580
            condition_value = self.visit(node_condition).before
1✔
581
            if condition_value:
1✔
582
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
583
            else:
584
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
585
            if_delta.before = arg_delta.before
1✔
586

587
        if not is_nothing(condition_delta.after):
1✔
588
            node_condition = self._get_node_condition_if_exists(
1✔
589
                condition_name=condition_delta.after
590
            )
591
            condition_value = self.visit(node_condition).after
1✔
592
            if condition_value:
1✔
593
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
594
            else:
595
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
596
            if_delta.after = arg_delta.after
1✔
597

598
        return if_delta
1✔
599

600
    def visit_node_intrinsic_function_fn_and(
1✔
601
        self, node_intrinsic_function: NodeIntrinsicFunction
602
    ) -> PreprocEntityDelta:
603
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
604
            result = all(args)
1✔
605
            return result
1✔
606

607
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
608
        delta = self._cached_apply(
1✔
609
            scope=node_intrinsic_function.scope,
610
            arguments_delta=arguments_delta,
611
            resolver=_compute_fn_and,
612
        )
613
        return delta
1✔
614

615
    def visit_node_intrinsic_function_fn_or(
1✔
616
        self, node_intrinsic_function: NodeIntrinsicFunction
617
    ) -> PreprocEntityDelta:
618
        def _compute_fn_or(args: list[bool]):
1✔
619
            result = any(args)
1✔
620
            return result
1✔
621

622
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
623
        delta = self._cached_apply(
1✔
624
            scope=node_intrinsic_function.scope,
625
            arguments_delta=arguments_delta,
626
            resolver=_compute_fn_or,
627
        )
628
        return delta
1✔
629

630
    def visit_node_intrinsic_function_fn_not(
1✔
631
        self, node_intrinsic_function: NodeIntrinsicFunction
632
    ) -> PreprocEntityDelta:
633
        def _compute_fn_not(arg: bool) -> bool:
1✔
634
            return not arg
1✔
635

636
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
637
        delta = self._cached_apply(
1✔
638
            scope=node_intrinsic_function.scope,
639
            arguments_delta=arguments_delta,
640
            resolver=_compute_fn_not,
641
        )
642
        return delta
1✔
643

644
    def visit_node_intrinsic_function_fn_sub(
1✔
645
        self, node_intrinsic_function: NodeIntrinsicFunction
646
    ) -> PreprocEntityDelta:
647
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
648
            # TODO: add further schema validation.
649
            string_template: str
650
            sub_parameters: dict
651
            if isinstance(args, str):
1✔
652
                string_template = args
1✔
653
                sub_parameters = dict()
1✔
654
            elif (
1✔
655
                isinstance(args, list)
656
                and len(args) == 2
657
                and isinstance(args[0], str)
658
                and isinstance(args[1], dict)
659
            ):
660
                string_template = args[0]
1✔
661
                sub_parameters = args[1]
1✔
662
            else:
UNCOV
663
                raise RuntimeError(
×
664
                    "Invalid arguments shape for Fn::Sub, expected a String "
665
                    f"or a Tuple of String and Map but got '{args}'"
666
                )
667
            sub_string = string_template
1✔
668
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
669
            for template_variable_name in template_variable_names:
1✔
670
                template_variable_value = Nothing
1✔
671

672
                # Try to resolve the variable name as pseudo parameter.
673
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
674
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
675
                        pseudo_parameter_name=template_variable_name
676
                    )
677

678
                # Try to resolve the variable name as an entry to the defined parameters.
679
                elif template_variable_name in sub_parameters:
1✔
680
                    template_variable_value = sub_parameters[template_variable_name]
1✔
681

682
                # Try to resolve the variable name as GetAtt.
683
                elif "." in template_variable_name:
1✔
684
                    try:
1✔
685
                        template_variable_value = self._resolve_attribute(
1✔
686
                            arguments=template_variable_name, select_before=select_before
687
                        )
UNCOV
688
                    except RuntimeError:
×
UNCOV
689
                        pass
×
690

691
                # Try to resolve the variable name as Ref.
692
                else:
693
                    try:
1✔
694
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
695
                        template_variable_value = (
1✔
696
                            resource_delta.before if select_before else resource_delta.after
697
                        )
698
                        if isinstance(template_variable_value, PreprocResource):
1✔
699
                            template_variable_value = template_variable_value.physical_resource_id
1✔
UNCOV
700
                    except RuntimeError:
×
UNCOV
701
                        pass
×
702

703
                if is_nothing(template_variable_value):
1✔
UNCOV
704
                    raise RuntimeError(
×
705
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
706
                    )
707

708
                if not isinstance(template_variable_value, str):
1✔
709
                    template_variable_value = str(template_variable_value)
1✔
710

711
                sub_string = sub_string.replace(
1✔
712
                    f"${{{template_variable_name}}}", template_variable_value
713
                )
714

715
            # FIXME: the following type reduction is ported from v1; however it appears as though such
716
            #        reduction is not performed by the engine, and certainly not at this depth given the
717
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
718
            #        and the resource providers reviewed.
719
            account_id = self._change_set.account_id
1✔
720
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
721
            if sub_string == account_id or is_another_account_id:
1✔
722
                result = sub_string
1✔
723
            elif sub_string.isdigit():
1✔
724
                result = int(sub_string)
1✔
725
            else:
726
                try:
1✔
727
                    result = float(sub_string)
1✔
728
                except ValueError:
1✔
729
                    result = sub_string
1✔
730
            return result
1✔
731

732
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
733
        arguments_before = arguments_delta.before
1✔
734
        arguments_after = arguments_delta.after
1✔
735
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
736
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
737
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
738
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
739
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
740
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
741
        return PreprocEntityDelta(before=before, after=after)
1✔
742

743
    def visit_node_intrinsic_function_fn_join(
1✔
744
        self, node_intrinsic_function: NodeIntrinsicFunction
745
    ) -> PreprocEntityDelta:
746
        # TODO: add support for schema validation.
747
        # TODO: add tests for joining non string values.
748
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
749
            if not (isinstance(args, list) and len(args) == 2):
1✔
750
                return Nothing
1✔
751
            delimiter: str = str(args[0])
1✔
752
            values: list[Any] = args[1]
1✔
753
            if not isinstance(values, list):
1✔
754
                # shortcut if values is the empty string, for example:
755
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
756
                # CDK bootstrap does this
757
                if values == "":
1✔
758
                    return ""
1✔
759
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
760
            str_values: list[str] = list()
1✔
761
            for value in values:
1✔
762
                if value is None:
1✔
763
                    continue
1✔
764
                str_value = str(value)
1✔
765
                str_values.append(str_value)
1✔
766
            join_result = delimiter.join(str_values)
1✔
767
            return join_result
1✔
768

769
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
770
        delta = self._cached_apply(
1✔
771
            scope=node_intrinsic_function.scope,
772
            arguments_delta=arguments_delta,
773
            resolver=_compute_fn_join,
774
        )
775
        return delta
1✔
776

777
    def visit_node_intrinsic_function_fn_select(
1✔
778
        self, node_intrinsic_function: NodeIntrinsicFunction
779
    ):
780
        # TODO: add further support for schema validation
781
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
782
            values: list[Any] = args[1]
1✔
783
            if not isinstance(values, list) or not values:
1✔
UNCOV
784
                raise RuntimeError(f"Invalid arguments list value for Fn::Select: '{values}'")
×
785
            values_len = len(values)
1✔
786
            index: int = int(args[0])
1✔
787
            if not isinstance(index, int) or index < 0 or index > values_len:
1✔
UNCOV
788
                raise RuntimeError(f"Invalid or out of range index value for Fn::Select: '{index}'")
×
789
            selection = values[index]
1✔
790
            return selection
1✔
791

792
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
793
        delta = self._cached_apply(
1✔
794
            scope=node_intrinsic_function.scope,
795
            arguments_delta=arguments_delta,
796
            resolver=_compute_fn_select,
797
        )
798
        return delta
1✔
799

800
    def visit_node_intrinsic_function_fn_split(
1✔
801
        self, node_intrinsic_function: NodeIntrinsicFunction
802
    ):
803
        # TODO: add further support for schema validation
804
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
805
            delimiter = args[0]
1✔
806
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
807
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
808
            source_string = args[1]
1✔
809
            if not isinstance(source_string, str):
1✔
UNCOV
810
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
×
811
            split_string = source_string.split(delimiter)
1✔
812
            return split_string
1✔
813

814
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
815
        delta = self._cached_apply(
1✔
816
            scope=node_intrinsic_function.scope,
817
            arguments_delta=arguments_delta,
818
            resolver=_compute_fn_split,
819
        )
820
        return delta
1✔
821

822
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
823
        self, node_intrinsic_function: NodeIntrinsicFunction
824
    ) -> PreprocEntityDelta:
825
        # TODO: add further support for schema validation
826

827
        def _compute_fn_get_a_zs(region) -> Any:
1✔
828
            if not isinstance(region, str):
1✔
UNCOV
829
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
830

831
            if not region:
1✔
832
                region = self._change_set.region_name
1✔
833

834
            account_id = self._change_set.account_id
1✔
835
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
836
            try:
1✔
837
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
838
                    ec2_client.describe_availability_zones()
839
                )
UNCOV
840
            except ClientError:
×
UNCOV
841
                raise RuntimeError(
×
842
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
843
                )
844
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
845
                "AvailabilityZones"
846
            ]
847
            azs = [az["ZoneName"] for az in availability_zones]
1✔
848
            return azs
1✔
849

850
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
851
        delta = self._cached_apply(
1✔
852
            scope=node_intrinsic_function.scope,
853
            arguments_delta=arguments_delta,
854
            resolver=_compute_fn_get_a_zs,
855
        )
856
        return delta
1✔
857

858
    def visit_node_intrinsic_function_fn_base64(
1✔
859
        self, node_intrinsic_function: NodeIntrinsicFunction
860
    ) -> PreprocEntityDelta:
861
        # TODO: add further support for schema validation
862
        def _compute_fn_base_64(string) -> Any:
1✔
863
            if not isinstance(string, str):
1✔
UNCOV
864
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
865
            # Ported from v1:
866
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
867
            return base64_string
1✔
868

869
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
870
        delta = self._cached_apply(
1✔
871
            scope=node_intrinsic_function.scope,
872
            arguments_delta=arguments_delta,
873
            resolver=_compute_fn_base_64,
874
        )
875
        return delta
1✔
876

877
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
878
        self, node_intrinsic_function: NodeIntrinsicFunction
879
    ) -> PreprocEntityDelta:
880
        # TODO: add type checking/validation for result unit?
881
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
882
        before_arguments = arguments_delta.before
1✔
883
        after_arguments = arguments_delta.after
1✔
884
        before = Nothing
1✔
885
        if before_arguments:
1✔
886
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
887
            before = before_value_delta.before
1✔
888
        after = Nothing
1✔
889
        if after_arguments:
1✔
890
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
891
            after = after_value_delta.after
1✔
892
        return PreprocEntityDelta(before=before, after=after)
1✔
893

894
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
895
        bindings_delta = self.visit(node_mapping.bindings)
1✔
896
        return bindings_delta
1✔
897

898
    def visit_node_parameters(
1✔
899
        self, node_parameters: NodeParameters
900
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
901
        before_parameters = dict()
1✔
902
        after_parameters = dict()
1✔
903
        for parameter in node_parameters.parameters:
1✔
904
            parameter_delta = self.visit(parameter)
1✔
905
            parameter_before = parameter_delta.before
1✔
906
            if not is_nothing(parameter_before):
1✔
907
                before_parameters[parameter.name] = parameter_before
1✔
908
            parameter_after = parameter_delta.after
1✔
909
            if not is_nothing(parameter_after):
1✔
910
                after_parameters[parameter.name] = parameter_after
1✔
911
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
912

913
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
914
        dynamic_value = node_parameter.dynamic_value
1✔
915
        dynamic_delta = self.visit(dynamic_value)
1✔
916

917
        default_value = node_parameter.default_value
1✔
918
        default_delta = self.visit(default_value)
1✔
919

920
        before = dynamic_delta.before or default_delta.before
1✔
921
        after = dynamic_delta.after or default_delta.after
1✔
922

923
        parameter_type = self.visit(node_parameter.type_)
1✔
924

925
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
926
            match type_:
1✔
927
                case "List<String>":
1✔
928
                    return [item.strip() for item in value.split(",")]
1✔
929
            return value
1✔
930

931
        if not is_nothing(after):
1✔
932
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
933

934
        return PreprocEntityDelta(before=before, after=after)
1✔
935

936
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
937
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
938
        return array_identifiers_delta
1✔
939

940
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
941
        delta = self.visit(node_condition.body)
1✔
942
        return delta
1✔
943

944
    def _resource_physical_resource_id_from(
1✔
945
        self, logical_resource_id: str, resolved_resources: dict
946
    ) -> str:
947
        # TODO: typing around resolved resources is needed and should be reflected here.
948
        resolved_resource = resolved_resources.get(logical_resource_id, dict())
1✔
949
        physical_resource_id: str | None = resolved_resource.get("PhysicalResourceId")
1✔
950
        if not isinstance(physical_resource_id, str):
1✔
951
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
952
        return physical_resource_id
1✔
953

954
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
955
        # TODO: typing around resolved resources is needed and should be reflected here.
956
        return self._resource_physical_resource_id_from(
1✔
957
            logical_resource_id=resource_logical_id,
958
            resolved_resources=self._before_resolved_resources,
959
        )
960

961
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
962
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
963

964
    def visit_node_intrinsic_function_ref(
1✔
965
        self, node_intrinsic_function: NodeIntrinsicFunction
966
    ) -> PreprocEntityDelta:
967
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
968
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
969
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
970
                reference_delta.before = before.physical_resource_id
1✔
971
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
972
                reference_delta.after = after.physical_resource_id
1✔
973
            return reference_delta
1✔
974

975
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
976
        delta = self._cached_apply(
1✔
977
            scope=node_intrinsic_function.scope,
978
            arguments_delta=arguments_delta,
979
            resolver=_compute_fn_ref,
980
        )
981
        return delta
1✔
982

983
    def visit_node_intrinsic_function_condition(
1✔
984
        self, node_intrinsic_function: NodeIntrinsicFunction
985
    ) -> PreprocEntityDelta:
986
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
987

988
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
989
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
990
            if is_nothing(node_condition):
1✔
UNCOV
991
                raise RuntimeError(f"Undefined condition '{name}'")
×
992
            condition_delta = self.visit(node_condition)
1✔
993
            return condition_delta
1✔
994

995
        delta = self._cached_apply(
1✔
996
            resolver=_delta_of_condition,
997
            scope=node_intrinsic_function.scope,
998
            arguments_delta=arguments_delta,
999
        )
1000
        return delta
1✔
1001

1002
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1003
        node_change_type = node_array.change_type
1✔
1004
        before = list() if node_change_type != ChangeType.CREATED else Nothing
1✔
1005
        after = list() if node_change_type != ChangeType.REMOVED else Nothing
1✔
1006
        for change_set_entity in node_array.array:
1✔
1007
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1008
            delta_before = delta.before
1✔
1009
            delta_after = delta.after
1✔
1010
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1011
                before.append(delta_before)
1✔
1012
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1013
                after.append(delta_after)
1✔
1014
        return PreprocEntityDelta(before=before, after=after)
1✔
1015

1016
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
1017
        # TODO: what about other positions?
1018
        value = self.visit(node_property.value)
1✔
1019
        if not is_nothing(value.before):
1✔
1020
            if dynamic_ref := extract_dynamic_reference(value.before):
1✔
1021
                value.before = perform_dynamic_reference_lookup(
1✔
1022
                    reference=dynamic_ref,
1023
                    account_id=self._change_set.account_id,
1024
                    region_name=self._change_set.region_name,
1025
                )
1026
        if not is_nothing(value.after):
1✔
1027
            if dynamic_ref := extract_dynamic_reference(value.after):
1✔
1028
                value.after = perform_dynamic_reference_lookup(
1✔
1029
                    reference=dynamic_ref,
1030
                    account_id=self._change_set.account_id,
1031
                    region_name=self._change_set.region_name,
1032
                )
1033
        return value
1✔
1034

1035
    def visit_node_properties(
1✔
1036
        self, node_properties: NodeProperties
1037
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1038
        node_change_type = node_properties.change_type
1✔
1039
        before_bindings = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
1040
        after_bindings = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
1041
        for node_property in node_properties.properties:
1✔
1042
            property_name = node_property.name
1✔
1043
            delta = self.visit(node_property)
1✔
1044
            delta_before = delta.before
1✔
1045
            delta_after = delta.after
1✔
1046
            if (
1✔
1047
                not is_nothing(before_bindings)
1048
                and not is_nothing(delta_before)
1049
                and delta_before is not None
1050
            ):
1051
                before_bindings[property_name] = delta_before
1✔
1052
            if (
1✔
1053
                not is_nothing(after_bindings)
1054
                and not is_nothing(delta_after)
1055
                and delta_after is not None
1056
            ):
1057
                after_bindings[property_name] = delta_after
1✔
1058
        before = Nothing
1✔
1059
        if not is_nothing(before_bindings):
1✔
1060
            before = PreprocProperties(properties=before_bindings)
1✔
1061
        after = Nothing
1✔
1062
        if not is_nothing(after_bindings):
1✔
1063
            after = PreprocProperties(properties=after_bindings)
1✔
1064
        return PreprocEntityDelta(before=before, after=after)
1✔
1065

1066
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1067
        reference_delta = self.visit(reference)
1✔
1068
        before_reference = reference_delta.before
1✔
1069
        before = Nothing
1✔
1070
        if isinstance(before_reference, str):
1✔
1071
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1072
            before = before_delta.before
1✔
1073
        after = Nothing
1✔
1074
        after_reference = reference_delta.after
1✔
1075
        if isinstance(after_reference, str):
1✔
1076
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1077
            after = after_delta.after
1✔
1078
        return PreprocEntityDelta(before=before, after=after)
1✔
1079

1080
    def visit_node_resource(
1✔
1081
        self, node_resource: NodeResource
1082
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1083
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1084
            raise ValidationError(
1✔
1085
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1086
            )
1087
        change_type = node_resource.change_type
1✔
1088
        condition_before = Nothing
1✔
1089
        condition_after = Nothing
1✔
1090
        if not is_nothing(node_resource.condition_reference):
1✔
1091
            condition_delta = self._resolve_resource_condition_reference(
1✔
1092
                node_resource.condition_reference
1093
            )
1094
            condition_before = condition_delta.before
1✔
1095
            condition_after = condition_delta.after
1✔
1096

1097
        depends_on_before = Nothing
1✔
1098
        depends_on_after = Nothing
1✔
1099
        if not is_nothing(node_resource.depends_on):
1✔
1100
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1101
            depends_on_before = depends_on_delta.before
1✔
1102
            depends_on_after = depends_on_delta.after
1✔
1103

1104
        type_delta = self.visit(node_resource.type_)
1✔
1105
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1106
            node_resource.properties
1107
        )
1108

1109
        before = Nothing
1✔
1110
        after = Nothing
1✔
1111
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1112
            logical_resource_id = node_resource.name
1✔
1113
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1114
                resource_logical_id=logical_resource_id
1115
            )
1116
            before = PreprocResource(
1✔
1117
                logical_id=logical_resource_id,
1118
                physical_resource_id=before_physical_resource_id,
1119
                condition=condition_before,
1120
                resource_type=type_delta.before,
1121
                properties=properties_delta.before,
1122
                depends_on=depends_on_before,
1123
                requires_replacement=False,
1124
            )
1125
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1126
            logical_resource_id = node_resource.name
1✔
1127
            try:
1✔
1128
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1129
                    resource_logical_id=logical_resource_id
1130
                )
1131
            except RuntimeError:
1✔
1132
                after_physical_resource_id = None
1✔
1133
            after = PreprocResource(
1✔
1134
                logical_id=logical_resource_id,
1135
                physical_resource_id=after_physical_resource_id,
1136
                condition=condition_after,
1137
                resource_type=type_delta.after,
1138
                properties=properties_delta.after,
1139
                depends_on=depends_on_after,
1140
                requires_replacement=node_resource.requires_replacement,
1141
            )
1142
        return PreprocEntityDelta(before=before, after=after)
1✔
1143

1144
    def visit_node_output(
1✔
1145
        self, node_output: NodeOutput
1146
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1147
        change_type = node_output.change_type
1✔
1148
        value_delta = self.visit(node_output.value)
1✔
1149

1150
        condition_delta = Nothing
1✔
1151
        if not is_nothing(node_output.condition_reference):
1✔
1152
            condition_delta = self._resolve_resource_condition_reference(
1✔
1153
                node_output.condition_reference
1154
            )
1155
            condition_before = condition_delta.before
1✔
1156
            condition_after = condition_delta.after
1✔
1157
            if not condition_before and condition_after:
1✔
1158
                change_type = ChangeType.CREATED
1✔
1159
            elif condition_before and not condition_after:
1✔
1160
                change_type = ChangeType.REMOVED
1✔
1161

1162
        export_delta = Nothing
1✔
1163
        if not is_nothing(node_output.export):
1✔
1164
            export_delta = self.visit(node_output.export)
1✔
1165

1166
        before: Maybe[PreprocOutput] = Nothing
1✔
1167
        if change_type != ChangeType.CREATED:
1✔
1168
            before = PreprocOutput(
1✔
1169
                name=node_output.name,
1170
                value=value_delta.before,
1171
                export=export_delta.before if export_delta else None,
1172
                condition=condition_delta.before if condition_delta else None,
1173
            )
1174
        after: Maybe[PreprocOutput] = Nothing
1✔
1175
        if change_type != ChangeType.REMOVED:
1✔
1176
            after = PreprocOutput(
1✔
1177
                name=node_output.name,
1178
                value=value_delta.after,
1179
                export=export_delta.after if export_delta else None,
1180
                condition=condition_delta.after if condition_delta else None,
1181
            )
1182
        return PreprocEntityDelta(before=before, after=after)
1✔
1183

1184
    def visit_node_outputs(
1✔
1185
        self, node_outputs: NodeOutputs
1186
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1187
        before: list[PreprocOutput] = list()
1✔
1188
        after: list[PreprocOutput] = list()
1✔
1189
        for node_output in node_outputs.outputs:
1✔
1190
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1191
            output_before = output_delta.before
1✔
1192
            output_after = output_delta.after
1✔
1193
            if not is_nothing(output_before):
1✔
1194
                before.append(output_before)
1✔
1195
            if not is_nothing(output_after):
1✔
1196
                after.append(output_after)
1✔
1197
        return PreprocEntityDelta(before=before, after=after)
1✔
1198

1199
    def visit_node_intrinsic_function_fn_import_value(
1✔
1200
        self, node_intrinsic_function: NodeIntrinsicFunction
1201
    ) -> PreprocEntityDelta:
1202
        def _compute_fn_import_value(string) -> str:
1✔
1203
            if not isinstance(string, str):
1✔
UNCOV
1204
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1205

1206
            exports = exports_map(
1✔
1207
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1208
            )
1209

1210
            return exports.get(string, {}).get("Value") or Nothing
1✔
1211

1212
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1213
        delta = self._cached_apply(
1✔
1214
            scope=node_intrinsic_function.scope,
1215
            arguments_delta=arguments_delta,
1216
            resolver=_compute_fn_import_value,
1217
        )
1218
        return delta
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc