• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18181563280

30 Sep 2025 06:41PM UTC coverage: 86.878%. Remained the same
18181563280

push

github

web-flow
Cfn: Fix backslash processing for dynamic replacement values (#13212)

1 of 1 new or added line in 1 file covered. (100.0%)

16 existing lines in 1 file now uncovered.

67811 of 78053 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.32
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeTemplate,
34
    Nothing,
35
    NothingType,
36
    Scope,
37
    TerminalValue,
38
    TerminalValueCreated,
39
    TerminalValueModified,
40
    TerminalValueRemoved,
41
    TerminalValueUnchanged,
42
    is_nothing,
43
)
44
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
45
    ChangeSetModelVisitor,
46
)
47
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
48
    REGEX_DYNAMIC_REF,
49
    extract_dynamic_reference,
50
    perform_dynamic_reference_lookup,
51
)
52
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
53
from localstack.services.cloudformation.stores import (
1✔
54
    exports_map,
55
)
56
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
57
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
58
from localstack.utils.aws.arns import get_partition
1✔
59
from localstack.utils.objects import get_value_from_path
1✔
60
from localstack.utils.run import to_str
1✔
61
from localstack.utils.strings import to_bytes
1✔
62
from localstack.utils.urls import localstack_host
1✔
63

64
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
65

66
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
67
    "AWS::Partition",
68
    "AWS::AccountId",
69
    "AWS::Region",
70
    "AWS::StackName",
71
    "AWS::StackId",
72
    "AWS::URLSuffix",
73
    "AWS::NoValue",
74
    "AWS::NotificationARNs",
75
}
76

77
TBefore = TypeVar("TBefore")
1✔
78
TAfter = TypeVar("TAfter")
1✔
79
_T = TypeVar("_T")
1✔
80

81
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
82
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
83
)
84
MOCKED_REFERENCE = "unknown"
1✔
85

86
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
87

88

89
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
90
    before: Maybe[TBefore]
1✔
91
    after: Maybe[TAfter]
1✔
92

93
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
94
        self.before = before
1✔
95
        self.after = after
1✔
96

97
    def __eq__(self, other):
1✔
98
        if not isinstance(other, PreprocEntityDelta):
×
99
            return False
×
100
        return self.before == other.before and self.after == other.after
×
101

102

103
class PreprocProperties:
1✔
104
    properties: dict[str, Any]
1✔
105

106
    def __init__(self, properties: dict[str, Any]):
1✔
107
        self.properties = properties
1✔
108

109
    def __eq__(self, other):
1✔
110
        if not isinstance(other, PreprocProperties):
1✔
111
            return False
×
112
        return self.properties == other.properties
1✔
113

114

115
class PreprocResource:
1✔
116
    logical_id: str
1✔
117
    physical_resource_id: str | None
1✔
118
    condition: bool | None
1✔
119
    resource_type: str
1✔
120
    properties: PreprocProperties
1✔
121
    depends_on: list[str] | None
1✔
122
    requires_replacement: bool
1✔
123
    status: ResourceStatus | None
1✔
124

125
    def __init__(
1✔
126
        self,
127
        logical_id: str,
128
        physical_resource_id: str,
129
        condition: bool | None,
130
        resource_type: str,
131
        properties: PreprocProperties,
132
        depends_on: list[str] | None,
133
        requires_replacement: bool,
134
        status: ResourceStatus | None = None,
135
    ):
136
        self.logical_id = logical_id
1✔
137
        self.physical_resource_id = physical_resource_id
1✔
138
        self.condition = condition
1✔
139
        self.resource_type = resource_type
1✔
140
        self.properties = properties
1✔
141
        self.depends_on = depends_on
1✔
142
        self.requires_replacement = requires_replacement
1✔
143
        self.status = status
1✔
144

145
    @staticmethod
1✔
146
    def _compare_conditions(c1: bool, c2: bool):
1✔
147
        # The lack of condition equates to a true condition.
148
        c1 = c1 if isinstance(c1, bool) else True
1✔
149
        c2 = c2 if isinstance(c2, bool) else True
1✔
150
        return c1 == c2
1✔
151

152
    def __eq__(self, other):
1✔
153
        if not isinstance(other, PreprocResource):
1✔
154
            return False
1✔
155
        return all(
1✔
156
            [
157
                self.logical_id == other.logical_id,
158
                self._compare_conditions(self.condition, other.condition),
159
                self.resource_type == other.resource_type,
160
                self.properties == other.properties,
161
            ]
162
        )
163

164

165
class PreprocOutput:
1✔
166
    name: str
1✔
167
    value: Any
1✔
168
    export: Any | None
1✔
169
    condition: bool | None
1✔
170

171
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
172
        self.name = name
1✔
173
        self.value = value
1✔
174
        self.export = export
1✔
175
        self.condition = condition
1✔
176

177
    def __eq__(self, other):
1✔
178
        if not isinstance(other, PreprocOutput):
×
179
            return False
×
180
        return all(
×
181
            [
182
                self.name == other.name,
183
                self.value == other.value,
184
                self.export == other.export,
185
                self.condition == other.condition,
186
            ]
187
        )
188

189

190
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
191
    _change_set: Final[ChangeSet]
1✔
192
    _before_resolved_resources: Final[dict]
1✔
193
    _before_cache: Final[dict[Scope, Any]]
1✔
194
    _after_cache: Final[dict[Scope, Any]]
1✔
195

196
    def __init__(self, change_set: ChangeSet):
1✔
197
        self._change_set = change_set
1✔
198
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
199
        self._before_cache = {}
1✔
200
        self._after_cache = {}
1✔
201

202
    def _setup_runtime_cache(self) -> None:
1✔
203
        runtime_cache_key = self.__class__.__name__
1✔
204

205
        self._before_cache.clear()
1✔
206
        self._after_cache.clear()
1✔
207

208
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
209
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
210
            self._before_cache.update(cache)
1✔
211

212
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
213
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
214
            self._after_cache.update(cache)
×
215

216
    def _save_runtime_cache(self) -> None:
1✔
217
        runtime_cache_key = self.__class__.__name__
1✔
218

219
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
220
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
221

222
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
223
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
224

225
    def process(self) -> None:
1✔
226
        self._setup_runtime_cache()
1✔
227
        node_template = self._change_set.update_model.node_template
1✔
228
        self.visit(node_template)
1✔
229
        self._save_runtime_cache()
1✔
230

231
    def _get_node_resource_for(
1✔
232
        self, resource_name: str, node_template: NodeTemplate
233
    ) -> NodeResource:
234
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
235
        for node_resource in node_template.resources.resources:
1✔
236
            if node_resource.name == resource_name:
1✔
237
                self.visit(node_resource)
1✔
238
                return node_resource
1✔
239
        raise ValidationError(
1✔
240
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
241
        )
242

243
    def _get_node_property_for(
1✔
244
        self, property_name: str, node_resource: NodeResource
245
    ) -> NodeProperty | None:
246
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
247
        for node_property in node_resource.properties.properties:
1✔
248
            if node_property.name == property_name:
1✔
249
                self.visit(node_property)
1✔
250
                return node_property
1✔
251
        return None
1✔
252

253
    def _deployed_property_value_of(
1✔
254
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
255
    ) -> Any:
256
        # We have to override this function to make sure it does not try to access the
257
        # resolved resource
258

259
        # Before we can obtain deployed value for a resource, we need to first ensure to
260
        # process the resource if this wasn't processed already. Ideally, values should only
261
        # be accessible through delta objects, to ensure computation is always complete at
262
        # every level.
263
        _ = self._get_node_resource_for(
1✔
264
            resource_name=resource_logical_id,
265
            node_template=self._change_set.update_model.node_template,
266
        )
267
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
268
        if resolved_resource is None:
1✔
269
            raise RuntimeError(
1✔
270
                f"No deployed instances of resource '{resource_logical_id}' were found"
271
            )
272
        properties = resolved_resource.get("Properties", {})
1✔
273
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
274
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
275

276
        if property_value:
1✔
277
            if not isinstance(property_value, (str, list)):
1✔
278
                # TODO: is this correct? If there is a bug in the logic here, it's probably
279
                #  better to know about it with a clear error message than to receive some form
280
                #  of message about trying to use a dictionary in place of a string
281
                raise RuntimeError(
×
282
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
283
                )
284
            return property_value
1✔
285
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
286
            return MOCKED_REFERENCE
1✔
287

288
        return property_value
×
289

290
    def _before_deployed_property_value_of(
1✔
291
        self, resource_logical_id: str, property_name: str
292
    ) -> Any:
293
        return self._deployed_property_value_of(
1✔
294
            resource_logical_id=resource_logical_id,
295
            property_name=property_name,
296
            resolved_resources=self._before_resolved_resources,
297
        )
298

299
    def _after_deployed_property_value_of(
1✔
300
        self, resource_logical_id: str, property_name: str
301
    ) -> str | None:
302
        return self._before_deployed_property_value_of(
1✔
303
            resource_logical_id=resource_logical_id, property_name=property_name
304
        )
305

306
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
307
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
308
        # TODO: another scenarios suggesting property lookups might be preferable.
309
        for mapping in mappings:
1✔
310
            if mapping.name == map_name:
1✔
311
                self.visit(mapping)
1✔
312
                return mapping
1✔
313
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
314

315
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
316
        parameters: list[NodeParameter] = (
1✔
317
            self._change_set.update_model.node_template.parameters.parameters
318
        )
319
        # TODO: another scenarios suggesting property lookups might be preferable.
320
        for parameter in parameters:
1✔
321
            if parameter.name == parameter_name:
1✔
322
                self.visit(parameter)
1✔
323
                return parameter
1✔
324
        return Nothing
1✔
325

326
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
327
        conditions: list[NodeCondition] = (
1✔
328
            self._change_set.update_model.node_template.conditions.conditions
329
        )
330
        # TODO: another scenarios suggesting property lookups might be preferable.
331
        for condition in conditions:
1✔
332
            if condition.name == condition_name:
1✔
333
                self.visit(condition)
1✔
334
                return condition
1✔
335
        return Nothing
×
336

337
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
338
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
339
        if isinstance(node_condition, NodeCondition):
1✔
340
            condition_delta = self.visit(node_condition)
1✔
341
            return condition_delta
1✔
342
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
343

344
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
345
        match pseudo_parameter_name:
1✔
346
            case "AWS::Partition":
1✔
347
                return get_partition(self._change_set.region_name)
1✔
348
            case "AWS::AccountId":
1✔
349
                return self._change_set.stack.account_id
1✔
350
            case "AWS::Region":
1✔
351
                return self._change_set.stack.region_name
1✔
352
            case "AWS::StackName":
1✔
353
                return self._change_set.stack.stack_name
1✔
354
            case "AWS::StackId":
1✔
355
                return self._change_set.stack.stack_id
1✔
356
            case "AWS::URLSuffix":
1✔
357
                return _AWS_URL_SUFFIX
1✔
358
            case "AWS::NoValue":
×
359
                return None
×
360
            case _:
×
361
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
362

363
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
364
        if logical_id in _PSEUDO_PARAMETERS:
1✔
365
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
366
                pseudo_parameter_name=logical_id
367
            )
368
            # Pseudo parameters are constants within the lifecycle of a template.
369
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
370

371
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
372
        if isinstance(node_parameter, NodeParameter):
1✔
373
            parameter_delta = self.visit(node_parameter)
1✔
374
            return parameter_delta
1✔
375

376
        node_resource = self._get_node_resource_for(
1✔
377
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
378
        )
379
        resource_delta = self.visit(node_resource)
1✔
380
        before = resource_delta.before
1✔
381
        after = resource_delta.after
1✔
382
        return PreprocEntityDelta(before=before, after=after)
1✔
383

384
    def _resolve_mapping(
1✔
385
        self, map_name: str, top_level_key: str, second_level_key
386
    ) -> PreprocEntityDelta:
387
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
388
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
389
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
390
        if not isinstance(top_level_value, NodeObject):
1✔
391
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
392
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
393
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
394
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
395
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
396
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
397
        mapping_value_delta = self.visit(second_level_value)
1✔
398
        return mapping_value_delta
1✔
399

400
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
401
        entity_scope = change_set_entity.scope
1✔
402
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
403
            before = self._before_cache[entity_scope]
1✔
404
            after = self._after_cache[entity_scope]
1✔
405
            return PreprocEntityDelta(before=before, after=after)
1✔
406
        delta = super().visit(change_set_entity=change_set_entity)
1✔
407
        if isinstance(delta, PreprocEntityDelta):
1✔
408
            delta = self._maybe_perform_replacements(delta)
1✔
409
            self._before_cache[entity_scope] = delta.before
1✔
410
            self._after_cache[entity_scope] = delta.after
1✔
411
        return delta
1✔
412

413
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
414
        delta = self._maybe_perform_static_replacements(delta)
1✔
415
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
416
        return delta
1✔
417

418
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
419
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
420

421
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
422
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
423

424
    def _maybe_perform_on_delta(
1✔
425
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
426
    ) -> PreprocEntityDelta | None:
427
        if isinstance(delta.before, str):
1✔
428
            delta.before = f(delta.before)
1✔
429
        if isinstance(delta.after, str):
1✔
430
            delta.after = f(delta.after)
1✔
431
        return delta
1✔
432

433
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
434
        if not isinstance(value, str):
1✔
435
            return value
×
436

437
        if dynamic_ref := extract_dynamic_reference(value):
1✔
438
            new_value = perform_dynamic_reference_lookup(
1✔
439
                reference=dynamic_ref,
440
                account_id=self._change_set.account_id,
441
                region_name=self._change_set.region_name,
442
            )
443
            if new_value:
1✔
444
                # We need to use a function here, to avoid backslash processing by regex.
445
                # From the regex sub documentation:
446
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
447
                # Using a function, we can avoid this processing.
448
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
449

450
        return value
1✔
451

452
    @staticmethod
1✔
453
    def _perform_static_replacements(value: str) -> str:
1✔
454
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
455
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
456
            prefix = api_match[1]
1✔
457
            host = api_match[2]
1✔
458
            path = api_match[3]
1✔
459
            port = localstack_host().port
1✔
460
            value = f"{prefix}{host}:{port}/{path}"
1✔
461
            return value
1✔
462

463
        return value
1✔
464

465
    def _cached_apply(
1✔
466
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
467
    ) -> PreprocEntityDelta:
468
        """
469
        Applies the resolver function to the given input delta if and only if the required
470
        values are not already present in the runtime caches. This function handles both
471
        the 'before' and 'after' components of the delta independently.
472

473
        The resolver function receives either the 'before' or 'after' value from the input
474
        delta and returns a resolved value. If the result returned by the resolver is
475
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
476
        component from it:  the 'before' value if the input was 'before', and the 'after'
477
        value if the input was 'after'.
478

479
        This function only reads from the cache and does not update it. It is the caller's
480
        responsibility to handle caching, either manually or via the upstream visit method
481
        of this class.
482

483
        Args:
484
            scope (Scope): The current scope used as a key for cache lookup.
485
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
486
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
487

488
        Returns:
489
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
490
        """
491

492
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
493
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
494
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
495

496
        arguments_before = arguments_delta.before
1✔
497
        arguments_after = arguments_delta.after
1✔
498

499
        before = self._before_cache.get(scope, Nothing)
1✔
500
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
501
            before = resolver(arguments_before)
1✔
502
            if isinstance(before, PreprocEntityDelta):
1✔
503
                before = before.before
1✔
504

505
        after = self._after_cache.get(scope, Nothing)
1✔
506
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
507
            after = resolver(arguments_after)
1✔
508
            if isinstance(after, PreprocEntityDelta):
1✔
509
                after = after.after
1✔
510

511
        return PreprocEntityDelta(before=before, after=after)
1✔
512

513
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
514
        return self.visit(node_property.value)
1✔
515

516
    def visit_terminal_value_modified(
1✔
517
        self, terminal_value_modified: TerminalValueModified
518
    ) -> PreprocEntityDelta:
519
        return PreprocEntityDelta(
1✔
520
            before=terminal_value_modified.value,
521
            after=terminal_value_modified.modified_value,
522
        )
523

524
    def visit_terminal_value_created(
1✔
525
        self, terminal_value_created: TerminalValueCreated
526
    ) -> PreprocEntityDelta:
527
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
528

529
    def visit_terminal_value_removed(
1✔
530
        self, terminal_value_removed: TerminalValueRemoved
531
    ) -> PreprocEntityDelta:
532
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
533

534
    def visit_terminal_value_unchanged(
1✔
535
        self, terminal_value_unchanged: TerminalValueUnchanged
536
    ) -> PreprocEntityDelta:
537
        return PreprocEntityDelta(
1✔
538
            before=terminal_value_unchanged.value,
539
            after=terminal_value_unchanged.value,
540
        )
541

542
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
543
        before_delta = self.visit(node_divergence.value)
1✔
544
        after_delta = self.visit(node_divergence.divergence)
1✔
545
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
546

547
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
548
        node_change_type = node_object.change_type
1✔
549
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
550
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
551
        for name, change_set_entity in node_object.bindings.items():
1✔
552
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
553
            delta_before = delta.before
1✔
554
            delta_after = delta.after
1✔
555
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
556
                before[name] = delta_before
1✔
557
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
558
                after[name] = delta_after
1✔
559
        return PreprocEntityDelta(before=before, after=after)
1✔
560

561
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
562
        # TODO: add arguments validation.
563
        arguments_list: list[str]
564
        if isinstance(arguments, str):
1✔
565
            arguments_list = arguments.split(".")
1✔
566
        else:
567
            arguments_list = arguments
1✔
568
        logical_name_of_resource = arguments_list[0]
1✔
569
        attribute_name = arguments_list[1]
1✔
570

571
        node_resource = self._get_node_resource_for(
1✔
572
            resource_name=logical_name_of_resource,
573
            node_template=self._change_set.update_model.node_template,
574
        )
575
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
576
            property_name=attribute_name, node_resource=node_resource
577
        )
578
        if node_property is not None:
1✔
579
            # The property is statically defined in the template and its value can be computed.
580
            property_delta = self.visit(node_property)
1✔
581
            value = property_delta.before if select_before else property_delta.after
1✔
582
        else:
583
            # The property is not statically defined and must therefore be available in
584
            # the properties deployed set.
585
            if select_before:
1✔
586
                value = self._before_deployed_property_value_of(
1✔
587
                    resource_logical_id=logical_name_of_resource,
588
                    property_name=attribute_name,
589
                )
590
            else:
591
                value = self._after_deployed_property_value_of(
1✔
592
                    resource_logical_id=logical_name_of_resource,
593
                    property_name=attribute_name,
594
                )
595
        return value
1✔
596

597
    def visit_node_intrinsic_function_fn_get_att(
1✔
598
        self, node_intrinsic_function: NodeIntrinsicFunction
599
    ) -> PreprocEntityDelta:
600
        # TODO: validate the return value according to the spec.
601
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
602
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
603
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
604

605
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
606
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
607
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
608

609
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
610
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
611
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
612

613
        return PreprocEntityDelta(before=before, after=after)
1✔
614

615
    def visit_node_intrinsic_function_fn_equals(
1✔
616
        self, node_intrinsic_function: NodeIntrinsicFunction
617
    ) -> PreprocEntityDelta:
618
        # TODO: add argument shape validation.
619
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
620
            return args[0] == args[1]
1✔
621

622
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
623
        delta = self._cached_apply(
1✔
624
            scope=node_intrinsic_function.scope,
625
            arguments_delta=arguments_delta,
626
            resolver=_compute_fn_equals,
627
        )
628
        return delta
1✔
629

630
    def visit_node_intrinsic_function_fn_if(
1✔
631
        self, node_intrinsic_function: NodeIntrinsicFunction
632
    ) -> PreprocEntityDelta:
633
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
634
        # False branch. If the condition is False, we don't evaluate the True branch.
635
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
UNCOV
636
            raise ValueError(
×
637
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
638
            )
639

640
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
641
        if_delta = PreprocEntityDelta()
1✔
642
        if not is_nothing(condition_delta.before):
1✔
643
            node_condition = self._get_node_condition_if_exists(
1✔
644
                condition_name=condition_delta.before
645
            )
646
            condition_value = self.visit(node_condition).before
1✔
647
            if condition_value:
1✔
648
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
649
            else:
650
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
651
            if_delta.before = arg_delta.before
1✔
652

653
        if not is_nothing(condition_delta.after):
1✔
654
            node_condition = self._get_node_condition_if_exists(
1✔
655
                condition_name=condition_delta.after
656
            )
657
            condition_value = self.visit(node_condition).after
1✔
658
            if condition_value:
1✔
659
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
660
            else:
661
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
662
            if_delta.after = arg_delta.after
1✔
663

664
        return if_delta
1✔
665

666
    def visit_node_intrinsic_function_fn_and(
1✔
667
        self, node_intrinsic_function: NodeIntrinsicFunction
668
    ) -> PreprocEntityDelta:
669
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
670
            result = all(args)
1✔
671
            return result
1✔
672

673
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
674
        delta = self._cached_apply(
1✔
675
            scope=node_intrinsic_function.scope,
676
            arguments_delta=arguments_delta,
677
            resolver=_compute_fn_and,
678
        )
679
        return delta
1✔
680

681
    def visit_node_intrinsic_function_fn_or(
1✔
682
        self, node_intrinsic_function: NodeIntrinsicFunction
683
    ) -> PreprocEntityDelta:
684
        def _compute_fn_or(args: list[bool]):
1✔
685
            result = any(args)
1✔
686
            return result
1✔
687

688
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
689
        delta = self._cached_apply(
1✔
690
            scope=node_intrinsic_function.scope,
691
            arguments_delta=arguments_delta,
692
            resolver=_compute_fn_or,
693
        )
694
        return delta
1✔
695

696
    def visit_node_intrinsic_function_fn_not(
1✔
697
        self, node_intrinsic_function: NodeIntrinsicFunction
698
    ) -> PreprocEntityDelta:
699
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
700
            # Is the argument ever a lone boolean?
701
            if isinstance(arg, list):
1✔
702
                return not arg[0]
1✔
703
            else:
UNCOV
704
                return not arg
×
705

706
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
707
        delta = self._cached_apply(
1✔
708
            scope=node_intrinsic_function.scope,
709
            arguments_delta=arguments_delta,
710
            resolver=_compute_fn_not,
711
        )
712
        return delta
1✔
713

714
    def visit_node_intrinsic_function_fn_sub(
1✔
715
        self, node_intrinsic_function: NodeIntrinsicFunction
716
    ) -> PreprocEntityDelta:
717
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
718
            # TODO: add further schema validation.
719
            string_template: str
720
            sub_parameters: dict
721
            if isinstance(args, str):
1✔
722
                string_template = args
1✔
723
                sub_parameters = {}
1✔
724
            elif (
1✔
725
                isinstance(args, list)
726
                and len(args) == 2
727
                and isinstance(args[0], str)
728
                and isinstance(args[1], dict)
729
            ):
730
                string_template = args[0]
1✔
731
                sub_parameters = args[1]
1✔
732
            else:
UNCOV
733
                raise RuntimeError(
×
734
                    "Invalid arguments shape for Fn::Sub, expected a String "
735
                    f"or a Tuple of String and Map but got '{args}'"
736
                )
737
            sub_string = string_template
1✔
738
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
739
            for template_variable_name in template_variable_names:
1✔
740
                template_variable_value = Nothing
1✔
741

742
                # Try to resolve the variable name as pseudo parameter.
743
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
744
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
745
                        pseudo_parameter_name=template_variable_name
746
                    )
747

748
                # Try to resolve the variable name as an entry to the defined parameters.
749
                elif template_variable_name in sub_parameters:
1✔
750
                    template_variable_value = sub_parameters[template_variable_name]
1✔
751

752
                # Try to resolve the variable name as GetAtt.
753
                elif "." in template_variable_name:
1✔
754
                    try:
1✔
755
                        template_variable_value = self._resolve_attribute(
1✔
756
                            arguments=template_variable_name, select_before=select_before
757
                        )
758
                    except RuntimeError:
1✔
759
                        pass
1✔
760

761
                # Try to resolve the variable name as Ref.
762
                else:
763
                    try:
1✔
764
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
765
                        template_variable_value = (
1✔
766
                            resource_delta.before if select_before else resource_delta.after
767
                        )
768
                        if isinstance(template_variable_value, PreprocResource):
1✔
769
                            template_variable_value = template_variable_value.physical_resource_id
1✔
UNCOV
770
                    except RuntimeError:
×
UNCOV
771
                        pass
×
772

773
                if is_nothing(template_variable_value):
1✔
774
                    raise RuntimeError(
1✔
775
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
776
                    )
777

778
                if not isinstance(template_variable_value, str):
1✔
779
                    template_variable_value = str(template_variable_value)
1✔
780

781
                sub_string = sub_string.replace(
1✔
782
                    f"${{{template_variable_name}}}", template_variable_value
783
                )
784

785
            # FIXME: the following type reduction is ported from v1; however it appears as though such
786
            #        reduction is not performed by the engine, and certainly not at this depth given the
787
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
788
            #        and the resource providers reviewed.
789
            account_id = self._change_set.account_id
1✔
790
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
791
            if sub_string == account_id or is_another_account_id:
1✔
792
                result = sub_string
1✔
793
            elif sub_string.isdigit():
1✔
794
                result = int(sub_string)
1✔
795
            else:
796
                try:
1✔
797
                    result = float(sub_string)
1✔
798
                except ValueError:
1✔
799
                    result = sub_string
1✔
800
            return result
1✔
801

802
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
803
        arguments_before = arguments_delta.before
1✔
804
        arguments_after = arguments_delta.after
1✔
805
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
806
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
807
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
808
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
809
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
810
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
811
        return PreprocEntityDelta(before=before, after=after)
1✔
812

813
    def visit_node_intrinsic_function_fn_join(
1✔
814
        self, node_intrinsic_function: NodeIntrinsicFunction
815
    ) -> PreprocEntityDelta:
816
        # TODO: add support for schema validation.
817
        # TODO: add tests for joining non string values.
818
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
819
            if not (isinstance(args, list) and len(args) == 2):
1✔
820
                return Nothing
1✔
821
            delimiter: str = str(args[0])
1✔
822
            values: list[Any] = args[1]
1✔
823
            if not isinstance(values, list):
1✔
824
                # shortcut if values is the empty string, for example:
825
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
826
                # CDK bootstrap does this
827
                if values == "":
1✔
828
                    return ""
1✔
829
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
830
            str_values: list[str] = []
1✔
831
            for value in values:
1✔
832
                if value is None:
1✔
833
                    continue
1✔
834
                str_value = str(value)
1✔
835
                str_values.append(str_value)
1✔
836
            join_result = delimiter.join(str_values)
1✔
837
            return join_result
1✔
838

839
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
840
        delta = self._cached_apply(
1✔
841
            scope=node_intrinsic_function.scope,
842
            arguments_delta=arguments_delta,
843
            resolver=_compute_fn_join,
844
        )
845
        return delta
1✔
846

847
    def visit_node_intrinsic_function_fn_select(
1✔
848
        self, node_intrinsic_function: NodeIntrinsicFunction
849
    ):
850
        # TODO: add further support for schema validation
851
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
852
            values = args[1]
1✔
853
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
854
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
855
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
856

857
            if not isinstance(values, list) or not values:
1✔
858
                raise ValidationError(
1✔
859
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
860
                )
861
            try:
1✔
862
                index: int = int(args[0])
1✔
863
            except ValueError as e:
1✔
864
                raise ValidationError(
1✔
865
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
866
                ) from e
867

868
            values_len = len(values)
1✔
869
            if index < 0 or index >= values_len:
1✔
870
                raise ValidationError(
1✔
871
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
872
                )
873
            selection = values[index]
1✔
874
            return selection
1✔
875

876
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
877
        delta = self._cached_apply(
1✔
878
            scope=node_intrinsic_function.scope,
879
            arguments_delta=arguments_delta,
880
            resolver=_compute_fn_select,
881
        )
882
        return delta
1✔
883

884
    def visit_node_intrinsic_function_fn_split(
1✔
885
        self, node_intrinsic_function: NodeIntrinsicFunction
886
    ):
887
        # TODO: add further support for schema validation
888
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
889
            delimiter = args[0]
1✔
890
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
891
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
892
            source_string = args[1]
1✔
893
            if not isinstance(source_string, str):
1✔
894
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
895
            split_string = source_string.split(delimiter)
1✔
896
            return split_string
1✔
897

898
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
899
        delta = self._cached_apply(
1✔
900
            scope=node_intrinsic_function.scope,
901
            arguments_delta=arguments_delta,
902
            resolver=_compute_fn_split,
903
        )
904
        return delta
1✔
905

906
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
907
        self, node_intrinsic_function: NodeIntrinsicFunction
908
    ) -> PreprocEntityDelta:
909
        # TODO: add further support for schema validation
910

911
        def _compute_fn_get_a_zs(region) -> Any:
1✔
912
            if not isinstance(region, str):
1✔
UNCOV
913
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
914

915
            if not region:
1✔
916
                region = self._change_set.region_name
1✔
917

918
            account_id = self._change_set.account_id
1✔
919
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
920
            try:
1✔
921
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
922
                    ec2_client.describe_availability_zones()
923
                )
UNCOV
924
            except ClientError:
×
UNCOV
925
                raise RuntimeError(
×
926
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
927
                )
928
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
929
                "AvailabilityZones"
930
            ]
931
            azs = [az["ZoneName"] for az in availability_zones]
1✔
932
            return azs
1✔
933

934
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
935
        delta = self._cached_apply(
1✔
936
            scope=node_intrinsic_function.scope,
937
            arguments_delta=arguments_delta,
938
            resolver=_compute_fn_get_a_zs,
939
        )
940
        return delta
1✔
941

942
    def visit_node_intrinsic_function_fn_base64(
1✔
943
        self, node_intrinsic_function: NodeIntrinsicFunction
944
    ) -> PreprocEntityDelta:
945
        # TODO: add further support for schema validation
946
        def _compute_fn_base_64(string) -> Any:
1✔
947
            if not isinstance(string, str):
1✔
UNCOV
948
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
949
            # Ported from v1:
950
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
951
            return base64_string
1✔
952

953
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
954
        delta = self._cached_apply(
1✔
955
            scope=node_intrinsic_function.scope,
956
            arguments_delta=arguments_delta,
957
            resolver=_compute_fn_base_64,
958
        )
959
        return delta
1✔
960

961
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
962
        self, node_intrinsic_function: NodeIntrinsicFunction
963
    ) -> PreprocEntityDelta:
964
        # TODO: add type checking/validation for result unit?
965
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
966
        before_arguments = arguments_delta.before
1✔
967
        after_arguments = arguments_delta.after
1✔
968
        before = Nothing
1✔
969
        if before_arguments:
1✔
970
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
971
            before = before_value_delta.before
1✔
972
        after = Nothing
1✔
973
        if after_arguments:
1✔
974
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
975
            after = after_value_delta.after
1✔
976
        return PreprocEntityDelta(before=before, after=after)
1✔
977

978
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
979
        bindings_delta = self.visit(node_mapping.bindings)
1✔
980
        return bindings_delta
1✔
981

982
    def visit_node_parameters(
1✔
983
        self, node_parameters: NodeParameters
984
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
985
        before_parameters = {}
1✔
986
        after_parameters = {}
1✔
987
        for parameter in node_parameters.parameters:
1✔
988
            parameter_delta = self.visit(parameter)
1✔
989
            parameter_before = parameter_delta.before
1✔
990
            if not is_nothing(parameter_before):
1✔
991
                before_parameters[parameter.name] = parameter_before
1✔
992
            parameter_after = parameter_delta.after
1✔
993
            if not is_nothing(parameter_after):
1✔
994
                after_parameters[parameter.name] = parameter_after
1✔
995
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
996

997
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
998
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
999
            raise ValidationError(
1✔
1000
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1001
            )
1002
        dynamic_value = node_parameter.dynamic_value
1✔
1003
        dynamic_delta = self.visit(dynamic_value)
1✔
1004

1005
        default_value = node_parameter.default_value
1✔
1006
        default_delta = self.visit(default_value)
1✔
1007

1008
        before = dynamic_delta.before or default_delta.before
1✔
1009
        after = dynamic_delta.after or default_delta.after
1✔
1010

1011
        parameter_type = self.visit(node_parameter.type_)
1✔
1012

1013
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1014
            match type_:
1✔
1015
                case "List<String>" | "CommaDelimitedList":
1✔
1016
                    return [item.strip() for item in value.split(",")]
1✔
1017
            return value
1✔
1018

1019
        if not is_nothing(after):
1✔
1020
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1021

1022
        return PreprocEntityDelta(before=before, after=after)
1✔
1023

1024
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1025
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1026
        return array_identifiers_delta
1✔
1027

1028
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1029
        delta = self.visit(node_condition.body)
1✔
1030
        return delta
1✔
1031

1032
    def _resource_physical_resource_id_from(
1✔
1033
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1034
    ) -> str | None:
1035
        # TODO: typing around resolved resources is needed and should be reflected here.
1036
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1037
        if resolved_resource.get("ResourceStatus") not in {
1✔
1038
            ResourceStatus.CREATE_COMPLETE,
1039
            ResourceStatus.UPDATE_COMPLETE,
1040
        }:
1041
            return None
1✔
1042

1043
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1044
        if not isinstance(physical_resource_id, str):
1✔
UNCOV
1045
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1046
        return physical_resource_id
1✔
1047

1048
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1049
        # TODO: typing around resolved resources is needed and should be reflected here.
1050
        return self._resource_physical_resource_id_from(
1✔
1051
            logical_resource_id=resource_logical_id,
1052
            resolved_resources=self._before_resolved_resources,
1053
        )
1054

1055
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1056
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1057

1058
    def visit_node_intrinsic_function_ref(
1✔
1059
        self, node_intrinsic_function: NodeIntrinsicFunction
1060
    ) -> PreprocEntityDelta:
1061
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1062
            if logical_id == "AWS::NoValue":
1✔
1063
                return Nothing
1✔
1064

1065
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1066
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1067
                reference_delta.before = before.physical_resource_id
1✔
1068
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1069
                reference_delta.after = after.physical_resource_id
1✔
1070
            return reference_delta
1✔
1071

1072
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1073
        delta = self._cached_apply(
1✔
1074
            scope=node_intrinsic_function.scope,
1075
            arguments_delta=arguments_delta,
1076
            resolver=_compute_fn_ref,
1077
        )
1078
        return delta
1✔
1079

1080
    def visit_node_intrinsic_function_condition(
1✔
1081
        self, node_intrinsic_function: NodeIntrinsicFunction
1082
    ) -> PreprocEntityDelta:
1083
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1084

1085
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1086
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1087
            if is_nothing(node_condition):
1✔
UNCOV
1088
                raise RuntimeError(f"Undefined condition '{name}'")
×
1089
            condition_delta = self.visit(node_condition)
1✔
1090
            return condition_delta
1✔
1091

1092
        delta = self._cached_apply(
1✔
1093
            resolver=_delta_of_condition,
1094
            scope=node_intrinsic_function.scope,
1095
            arguments_delta=arguments_delta,
1096
        )
1097
        return delta
1✔
1098

1099
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1100
        node_change_type = node_array.change_type
1✔
1101
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1102
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1103
        for change_set_entity in node_array.array:
1✔
1104
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1105
            delta_before = delta.before
1✔
1106
            delta_after = delta.after
1✔
1107
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1108
                before.append(delta_before)
1✔
1109
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1110
                after.append(delta_after)
1✔
1111
        return PreprocEntityDelta(before=before, after=after)
1✔
1112

1113
    def visit_node_properties(
1✔
1114
        self, node_properties: NodeProperties
1115
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1116
        node_change_type = node_properties.change_type
1✔
1117
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1118
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1119
        for node_property in node_properties.properties:
1✔
1120
            property_name = node_property.name
1✔
1121
            delta = self.visit(node_property)
1✔
1122
            delta_before = delta.before
1✔
1123
            delta_after = delta.after
1✔
1124
            if (
1✔
1125
                not is_nothing(before_bindings)
1126
                and not is_nothing(delta_before)
1127
                and delta_before is not None
1128
            ):
1129
                before_bindings[property_name] = delta_before
1✔
1130
            if (
1✔
1131
                not is_nothing(after_bindings)
1132
                and not is_nothing(delta_after)
1133
                and delta_after is not None
1134
            ):
1135
                after_bindings[property_name] = delta_after
1✔
1136
        before = Nothing
1✔
1137
        if not is_nothing(before_bindings):
1✔
1138
            before = PreprocProperties(properties=before_bindings)
1✔
1139
        after = Nothing
1✔
1140
        if not is_nothing(after_bindings):
1✔
1141
            after = PreprocProperties(properties=after_bindings)
1✔
1142
        return PreprocEntityDelta(before=before, after=after)
1✔
1143

1144
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1145
        reference_delta = self.visit(reference)
1✔
1146
        before_reference = reference_delta.before
1✔
1147
        before = Nothing
1✔
1148
        if isinstance(before_reference, str):
1✔
1149
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1150
            before = before_delta.before
1✔
1151
        after = Nothing
1✔
1152
        after_reference = reference_delta.after
1✔
1153
        if isinstance(after_reference, str):
1✔
1154
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1155
            after = after_delta.after
1✔
1156
        return PreprocEntityDelta(before=before, after=after)
1✔
1157

1158
    def visit_node_resource(
1✔
1159
        self, node_resource: NodeResource
1160
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1161
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1162
            raise ValidationError(
1✔
1163
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1164
            )
1165
        change_type = node_resource.change_type
1✔
1166
        condition_before = Nothing
1✔
1167
        condition_after = Nothing
1✔
1168
        if not is_nothing(node_resource.condition_reference):
1✔
1169
            condition_delta = self._resolve_resource_condition_reference(
1✔
1170
                node_resource.condition_reference
1171
            )
1172
            condition_before = condition_delta.before
1✔
1173
            condition_after = condition_delta.after
1✔
1174

1175
        depends_on_before = Nothing
1✔
1176
        depends_on_after = Nothing
1✔
1177
        if not is_nothing(node_resource.depends_on):
1✔
1178
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1179
            depends_on_before = depends_on_delta.before
1✔
1180
            depends_on_after = depends_on_delta.after
1✔
1181

1182
        type_delta = self.visit(node_resource.type_)
1✔
1183
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1184
            node_resource.properties
1185
        )
1186

1187
        before = Nothing
1✔
1188
        after = Nothing
1✔
1189
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1190
            logical_resource_id = node_resource.name
1✔
1191
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1192
                resource_logical_id=logical_resource_id
1193
            )
1194
            before = PreprocResource(
1✔
1195
                logical_id=logical_resource_id,
1196
                physical_resource_id=before_physical_resource_id,
1197
                condition=condition_before,
1198
                resource_type=type_delta.before,
1199
                properties=properties_delta.before,
1200
                depends_on=depends_on_before,
1201
                requires_replacement=False,
1202
            )
1203
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1204
            logical_resource_id = node_resource.name
1✔
1205
            try:
1✔
1206
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1207
                    resource_logical_id=logical_resource_id
1208
                )
UNCOV
1209
            except RuntimeError:
×
UNCOV
1210
                after_physical_resource_id = None
×
1211
            after = PreprocResource(
1✔
1212
                logical_id=logical_resource_id,
1213
                physical_resource_id=after_physical_resource_id,
1214
                condition=condition_after,
1215
                resource_type=type_delta.after,
1216
                properties=properties_delta.after,
1217
                depends_on=depends_on_after,
1218
                requires_replacement=node_resource.requires_replacement,
1219
            )
1220
        return PreprocEntityDelta(before=before, after=after)
1✔
1221

1222
    def visit_node_output(
1✔
1223
        self, node_output: NodeOutput
1224
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1225
        change_type = node_output.change_type
1✔
1226
        value_delta = self.visit(node_output.value)
1✔
1227

1228
        condition_delta = Nothing
1✔
1229
        if not is_nothing(node_output.condition_reference):
1✔
1230
            condition_delta = self._resolve_resource_condition_reference(
1✔
1231
                node_output.condition_reference
1232
            )
1233
            condition_before = condition_delta.before
1✔
1234
            condition_after = condition_delta.after
1✔
1235
            if not condition_before and condition_after:
1✔
1236
                change_type = ChangeType.CREATED
1✔
1237
            elif condition_before and not condition_after:
1✔
1238
                change_type = ChangeType.REMOVED
1✔
1239

1240
        export_delta = Nothing
1✔
1241
        if not is_nothing(node_output.export):
1✔
1242
            export_delta = self.visit(node_output.export)
1✔
1243

1244
        before: Maybe[PreprocOutput] = Nothing
1✔
1245
        if change_type != ChangeType.CREATED:
1✔
1246
            before = PreprocOutput(
1✔
1247
                name=node_output.name,
1248
                value=value_delta.before,
1249
                export=export_delta.before if export_delta else None,
1250
                condition=condition_delta.before if condition_delta else None,
1251
            )
1252
        after: Maybe[PreprocOutput] = Nothing
1✔
1253
        if change_type != ChangeType.REMOVED:
1✔
1254
            after = PreprocOutput(
1✔
1255
                name=node_output.name,
1256
                value=value_delta.after,
1257
                export=export_delta.after if export_delta else None,
1258
                condition=condition_delta.after if condition_delta else None,
1259
            )
1260
        return PreprocEntityDelta(before=before, after=after)
1✔
1261

1262
    def visit_node_outputs(
1✔
1263
        self, node_outputs: NodeOutputs
1264
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1265
        before: list[PreprocOutput] = []
1✔
1266
        after: list[PreprocOutput] = []
1✔
1267
        for node_output in node_outputs.outputs:
1✔
1268
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1269
            output_before = output_delta.before
1✔
1270
            output_after = output_delta.after
1✔
1271
            if not is_nothing(output_before):
1✔
1272
                before.append(output_before)
1✔
1273
            if not is_nothing(output_after):
1✔
1274
                after.append(output_after)
1✔
1275
        return PreprocEntityDelta(before=before, after=after)
1✔
1276

1277
    def visit_node_intrinsic_function_fn_import_value(
1✔
1278
        self, node_intrinsic_function: NodeIntrinsicFunction
1279
    ) -> PreprocEntityDelta:
1280
        def _compute_fn_import_value(string) -> str:
1✔
1281
            if not isinstance(string, str):
1✔
UNCOV
1282
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1283

1284
            exports = exports_map(
1✔
1285
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1286
            )
1287

1288
            return exports.get(string, {}).get("Value") or Nothing
1✔
1289

1290
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1291
        delta = self._cached_apply(
1✔
1292
            scope=node_intrinsic_function.scope,
1293
            arguments_delta=arguments_delta,
1294
            resolver=_compute_fn_import_value,
1295
        )
1296
        return delta
1✔
1297

1298
    def visit_node_intrinsic_function_fn_transform(
1✔
1299
        self, node_intrinsic_function: NodeIntrinsicFunction
1300
    ):
UNCOV
1301
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc