• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18211137013

02 Oct 2025 04:55PM UTC coverage: 86.877% (-0.001%) from 86.878%
18211137013

push

github

web-flow
CFN: add validation in GetAtt for conditionally canceled resources (#13213)

5 of 7 new or added lines in 1 file covered. (71.43%)

1 existing line in 1 file now uncovered.

67816 of 78060 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.09
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeTemplate,
34
    Nothing,
35
    NothingType,
36
    Scope,
37
    TerminalValue,
38
    TerminalValueCreated,
39
    TerminalValueModified,
40
    TerminalValueRemoved,
41
    TerminalValueUnchanged,
42
    is_nothing,
43
)
44
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
45
    ChangeSetModelVisitor,
46
)
47
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
48
    REGEX_DYNAMIC_REF,
49
    extract_dynamic_reference,
50
    perform_dynamic_reference_lookup,
51
)
52
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
53
from localstack.services.cloudformation.stores import (
1✔
54
    exports_map,
55
)
56
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
57
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
58
from localstack.utils.aws.arns import get_partition
1✔
59
from localstack.utils.objects import get_value_from_path
1✔
60
from localstack.utils.run import to_str
1✔
61
from localstack.utils.strings import to_bytes
1✔
62
from localstack.utils.urls import localstack_host
1✔
63

64
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
65

66
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
67
    "AWS::Partition",
68
    "AWS::AccountId",
69
    "AWS::Region",
70
    "AWS::StackName",
71
    "AWS::StackId",
72
    "AWS::URLSuffix",
73
    "AWS::NoValue",
74
    "AWS::NotificationARNs",
75
}
76

77
TBefore = TypeVar("TBefore")
1✔
78
TAfter = TypeVar("TAfter")
1✔
79
_T = TypeVar("_T")
1✔
80

81
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
82
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
83
)
84
MOCKED_REFERENCE = "unknown"
1✔
85

86
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
87

88

89
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
90
    before: Maybe[TBefore]
1✔
91
    after: Maybe[TAfter]
1✔
92

93
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
94
        self.before = before
1✔
95
        self.after = after
1✔
96

97
    def __eq__(self, other):
1✔
98
        if not isinstance(other, PreprocEntityDelta):
×
99
            return False
×
100
        return self.before == other.before and self.after == other.after
×
101

102

103
class PreprocProperties:
1✔
104
    properties: dict[str, Any]
1✔
105

106
    def __init__(self, properties: dict[str, Any]):
1✔
107
        self.properties = properties
1✔
108

109
    def __eq__(self, other):
1✔
110
        if not isinstance(other, PreprocProperties):
1✔
111
            return False
×
112
        return self.properties == other.properties
1✔
113

114

115
class PreprocResource:
1✔
116
    logical_id: str
1✔
117
    physical_resource_id: str | None
1✔
118
    condition: bool | None
1✔
119
    resource_type: str
1✔
120
    properties: PreprocProperties
1✔
121
    depends_on: list[str] | None
1✔
122
    requires_replacement: bool
1✔
123
    status: ResourceStatus | None
1✔
124

125
    def __init__(
1✔
126
        self,
127
        logical_id: str,
128
        physical_resource_id: str,
129
        condition: bool | None,
130
        resource_type: str,
131
        properties: PreprocProperties,
132
        depends_on: list[str] | None,
133
        requires_replacement: bool,
134
        status: ResourceStatus | None = None,
135
    ):
136
        self.logical_id = logical_id
1✔
137
        self.physical_resource_id = physical_resource_id
1✔
138
        self.condition = condition
1✔
139
        self.resource_type = resource_type
1✔
140
        self.properties = properties
1✔
141
        self.depends_on = depends_on
1✔
142
        self.requires_replacement = requires_replacement
1✔
143
        self.status = status
1✔
144

145
    @staticmethod
1✔
146
    def _compare_conditions(c1: bool, c2: bool):
1✔
147
        # The lack of condition equates to a true condition.
148
        c1 = c1 if isinstance(c1, bool) else True
1✔
149
        c2 = c2 if isinstance(c2, bool) else True
1✔
150
        return c1 == c2
1✔
151

152
    def __eq__(self, other):
1✔
153
        if not isinstance(other, PreprocResource):
1✔
154
            return False
1✔
155
        return all(
1✔
156
            [
157
                self.logical_id == other.logical_id,
158
                self._compare_conditions(self.condition, other.condition),
159
                self.resource_type == other.resource_type,
160
                self.properties == other.properties,
161
            ]
162
        )
163

164

165
class PreprocOutput:
1✔
166
    name: str
1✔
167
    value: Any
1✔
168
    export: Any | None
1✔
169
    condition: bool | None
1✔
170

171
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
172
        self.name = name
1✔
173
        self.value = value
1✔
174
        self.export = export
1✔
175
        self.condition = condition
1✔
176

177
    def __eq__(self, other):
1✔
178
        if not isinstance(other, PreprocOutput):
×
179
            return False
×
180
        return all(
×
181
            [
182
                self.name == other.name,
183
                self.value == other.value,
184
                self.export == other.export,
185
                self.condition == other.condition,
186
            ]
187
        )
188

189

190
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
191
    _change_set: Final[ChangeSet]
1✔
192
    _before_resolved_resources: Final[dict]
1✔
193
    _before_cache: Final[dict[Scope, Any]]
1✔
194
    _after_cache: Final[dict[Scope, Any]]
1✔
195

196
    def __init__(self, change_set: ChangeSet):
1✔
197
        self._change_set = change_set
1✔
198
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
199
        self._before_cache = {}
1✔
200
        self._after_cache = {}
1✔
201

202
    def _setup_runtime_cache(self) -> None:
1✔
203
        runtime_cache_key = self.__class__.__name__
1✔
204

205
        self._before_cache.clear()
1✔
206
        self._after_cache.clear()
1✔
207

208
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
209
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
210
            self._before_cache.update(cache)
1✔
211

212
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
213
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
214
            self._after_cache.update(cache)
×
215

216
    def _save_runtime_cache(self) -> None:
1✔
217
        runtime_cache_key = self.__class__.__name__
1✔
218

219
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
220
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
221

222
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
223
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
224

225
    def process(self) -> None:
1✔
226
        self._setup_runtime_cache()
1✔
227
        node_template = self._change_set.update_model.node_template
1✔
228
        self.visit(node_template)
1✔
229
        self._save_runtime_cache()
1✔
230

231
    def _get_node_resource_for(
1✔
232
        self, resource_name: str, node_template: NodeTemplate
233
    ) -> NodeResource:
234
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
235
        for node_resource in node_template.resources.resources:
1✔
236
            if node_resource.name == resource_name:
1✔
237
                self.visit(node_resource)
1✔
238
                return node_resource
1✔
239
        raise ValidationError(
1✔
240
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
241
        )
242

243
    def _get_node_property_for(
1✔
244
        self, property_name: str, node_resource: NodeResource
245
    ) -> NodeProperty | None:
246
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
247
        for node_property in node_resource.properties.properties:
1✔
248
            if node_property.name == property_name:
1✔
249
                self.visit(node_property)
1✔
250
                return node_property
1✔
251
        return None
1✔
252

253
    def _deployed_property_value_of(
1✔
254
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
255
    ) -> Any:
256
        # We have to override this function to make sure it does not try to access the
257
        # resolved resource
258

259
        # Before we can obtain deployed value for a resource, we need to first ensure to
260
        # process the resource if this wasn't processed already. Ideally, values should only
261
        # be accessible through delta objects, to ensure computation is always complete at
262
        # every level.
263
        _ = self._get_node_resource_for(
1✔
264
            resource_name=resource_logical_id,
265
            node_template=self._change_set.update_model.node_template,
266
        )
267
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
268
        if resolved_resource is None:
1✔
269
            raise RuntimeError(
1✔
270
                f"No deployed instances of resource '{resource_logical_id}' were found"
271
            )
272
        properties = resolved_resource.get("Properties", {})
1✔
273
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
274
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
275

276
        if property_value:
1✔
277
            if not isinstance(property_value, (str, list)):
1✔
278
                # TODO: is this correct? If there is a bug in the logic here, it's probably
279
                #  better to know about it with a clear error message than to receive some form
280
                #  of message about trying to use a dictionary in place of a string
281
                raise RuntimeError(
×
282
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
283
                )
284
            return property_value
1✔
285
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
286
            return MOCKED_REFERENCE
1✔
287

288
        return property_value
×
289

290
    def _before_deployed_property_value_of(
1✔
291
        self, resource_logical_id: str, property_name: str
292
    ) -> Any:
293
        return self._deployed_property_value_of(
1✔
294
            resource_logical_id=resource_logical_id,
295
            property_name=property_name,
296
            resolved_resources=self._before_resolved_resources,
297
        )
298

299
    def _after_deployed_property_value_of(
1✔
300
        self, resource_logical_id: str, property_name: str
301
    ) -> str | None:
302
        return self._before_deployed_property_value_of(
1✔
303
            resource_logical_id=resource_logical_id, property_name=property_name
304
        )
305

306
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
307
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
308
        # TODO: another scenarios suggesting property lookups might be preferable.
309
        for mapping in mappings:
1✔
310
            if mapping.name == map_name:
1✔
311
                self.visit(mapping)
1✔
312
                return mapping
1✔
313
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
314

315
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
316
        parameters: list[NodeParameter] = (
1✔
317
            self._change_set.update_model.node_template.parameters.parameters
318
        )
319
        # TODO: another scenarios suggesting property lookups might be preferable.
320
        for parameter in parameters:
1✔
321
            if parameter.name == parameter_name:
1✔
322
                self.visit(parameter)
1✔
323
                return parameter
1✔
324
        return Nothing
1✔
325

326
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
327
        conditions: list[NodeCondition] = (
1✔
328
            self._change_set.update_model.node_template.conditions.conditions
329
        )
330
        # TODO: another scenarios suggesting property lookups might be preferable.
331
        for condition in conditions:
1✔
332
            if condition.name == condition_name:
1✔
333
                self.visit(condition)
1✔
334
                return condition
1✔
335
        return Nothing
×
336

337
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
338
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
339
        if isinstance(node_condition, NodeCondition):
1✔
340
            condition_delta = self.visit(node_condition)
1✔
341
            return condition_delta
1✔
342
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
343

344
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
345
        match pseudo_parameter_name:
1✔
346
            case "AWS::Partition":
1✔
347
                return get_partition(self._change_set.region_name)
1✔
348
            case "AWS::AccountId":
1✔
349
                return self._change_set.stack.account_id
1✔
350
            case "AWS::Region":
1✔
351
                return self._change_set.stack.region_name
1✔
352
            case "AWS::StackName":
1✔
353
                return self._change_set.stack.stack_name
1✔
354
            case "AWS::StackId":
1✔
355
                return self._change_set.stack.stack_id
1✔
356
            case "AWS::URLSuffix":
1✔
357
                return _AWS_URL_SUFFIX
1✔
358
            case "AWS::NoValue":
×
359
                return None
×
360
            case _:
×
361
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
362

363
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
364
        if logical_id in _PSEUDO_PARAMETERS:
1✔
365
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
366
                pseudo_parameter_name=logical_id
367
            )
368
            # Pseudo parameters are constants within the lifecycle of a template.
369
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
370

371
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
372
        if isinstance(node_parameter, NodeParameter):
1✔
373
            parameter_delta = self.visit(node_parameter)
1✔
374
            return parameter_delta
1✔
375

376
        node_resource = self._get_node_resource_for(
1✔
377
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
378
        )
379
        resource_delta = self.visit(node_resource)
1✔
380
        before = resource_delta.before
1✔
381
        after = resource_delta.after
1✔
382
        return PreprocEntityDelta(before=before, after=after)
1✔
383

384
    def _resolve_mapping(
1✔
385
        self, map_name: str, top_level_key: str, second_level_key
386
    ) -> PreprocEntityDelta:
387
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
388
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
389
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
390
        if not isinstance(top_level_value, NodeObject):
1✔
391
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
392
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
393
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
394
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
395
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
396
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
397
        mapping_value_delta = self.visit(second_level_value)
1✔
398
        return mapping_value_delta
1✔
399

400
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
401
        entity_scope = change_set_entity.scope
1✔
402
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
403
            before = self._before_cache[entity_scope]
1✔
404
            after = self._after_cache[entity_scope]
1✔
405
            return PreprocEntityDelta(before=before, after=after)
1✔
406
        delta = super().visit(change_set_entity=change_set_entity)
1✔
407
        if isinstance(delta, PreprocEntityDelta):
1✔
408
            delta = self._maybe_perform_replacements(delta)
1✔
409
            self._before_cache[entity_scope] = delta.before
1✔
410
            self._after_cache[entity_scope] = delta.after
1✔
411
        return delta
1✔
412

413
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
414
        delta = self._maybe_perform_static_replacements(delta)
1✔
415
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
416
        return delta
1✔
417

418
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
419
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
420

421
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
422
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
423

424
    def _maybe_perform_on_delta(
1✔
425
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
426
    ) -> PreprocEntityDelta | None:
427
        if isinstance(delta.before, str):
1✔
428
            delta.before = f(delta.before)
1✔
429
        if isinstance(delta.after, str):
1✔
430
            delta.after = f(delta.after)
1✔
431
        return delta
1✔
432

433
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
434
        if not isinstance(value, str):
1✔
435
            return value
×
436

437
        if dynamic_ref := extract_dynamic_reference(value):
1✔
438
            new_value = perform_dynamic_reference_lookup(
1✔
439
                reference=dynamic_ref,
440
                account_id=self._change_set.account_id,
441
                region_name=self._change_set.region_name,
442
            )
443
            if new_value:
1✔
444
                # We need to use a function here, to avoid backslash processing by regex.
445
                # From the regex sub documentation:
446
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
447
                # Using a function, we can avoid this processing.
448
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
449

450
        return value
1✔
451

452
    @staticmethod
1✔
453
    def _perform_static_replacements(value: str) -> str:
1✔
454
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
455
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
456
            prefix = api_match[1]
1✔
457
            host = api_match[2]
1✔
458
            path = api_match[3]
1✔
459
            port = localstack_host().port
1✔
460
            value = f"{prefix}{host}:{port}/{path}"
1✔
461
            return value
1✔
462

463
        return value
1✔
464

465
    def _cached_apply(
1✔
466
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
467
    ) -> PreprocEntityDelta:
468
        """
469
        Applies the resolver function to the given input delta if and only if the required
470
        values are not already present in the runtime caches. This function handles both
471
        the 'before' and 'after' components of the delta independently.
472

473
        The resolver function receives either the 'before' or 'after' value from the input
474
        delta and returns a resolved value. If the result returned by the resolver is
475
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
476
        component from it:  the 'before' value if the input was 'before', and the 'after'
477
        value if the input was 'after'.
478

479
        This function only reads from the cache and does not update it. It is the caller's
480
        responsibility to handle caching, either manually or via the upstream visit method
481
        of this class.
482

483
        Args:
484
            scope (Scope): The current scope used as a key for cache lookup.
485
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
486
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
487

488
        Returns:
489
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
490
        """
491

492
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
493
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
494
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
495

496
        arguments_before = arguments_delta.before
1✔
497
        arguments_after = arguments_delta.after
1✔
498

499
        before = self._before_cache.get(scope, Nothing)
1✔
500
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
501
            before = resolver(arguments_before)
1✔
502
            if isinstance(before, PreprocEntityDelta):
1✔
503
                before = before.before
1✔
504

505
        after = self._after_cache.get(scope, Nothing)
1✔
506
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
507
            after = resolver(arguments_after)
1✔
508
            if isinstance(after, PreprocEntityDelta):
1✔
509
                after = after.after
1✔
510

511
        return PreprocEntityDelta(before=before, after=after)
1✔
512

513
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
514
        return self.visit(node_property.value)
1✔
515

516
    def visit_terminal_value_modified(
1✔
517
        self, terminal_value_modified: TerminalValueModified
518
    ) -> PreprocEntityDelta:
519
        return PreprocEntityDelta(
1✔
520
            before=terminal_value_modified.value,
521
            after=terminal_value_modified.modified_value,
522
        )
523

524
    def visit_terminal_value_created(
1✔
525
        self, terminal_value_created: TerminalValueCreated
526
    ) -> PreprocEntityDelta:
527
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
528

529
    def visit_terminal_value_removed(
1✔
530
        self, terminal_value_removed: TerminalValueRemoved
531
    ) -> PreprocEntityDelta:
532
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
533

534
    def visit_terminal_value_unchanged(
1✔
535
        self, terminal_value_unchanged: TerminalValueUnchanged
536
    ) -> PreprocEntityDelta:
537
        return PreprocEntityDelta(
1✔
538
            before=terminal_value_unchanged.value,
539
            after=terminal_value_unchanged.value,
540
        )
541

542
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
543
        before_delta = self.visit(node_divergence.value)
1✔
544
        after_delta = self.visit(node_divergence.divergence)
1✔
545
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
546

547
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
548
        node_change_type = node_object.change_type
1✔
549
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
550
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
551
        for name, change_set_entity in node_object.bindings.items():
1✔
552
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
553
            delta_before = delta.before
1✔
554
            delta_after = delta.after
1✔
555
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
556
                before[name] = delta_before
1✔
557
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
558
                after[name] = delta_after
1✔
559
        return PreprocEntityDelta(before=before, after=after)
1✔
560

561
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
562
        # TODO: add arguments validation.
563
        arguments_list: list[str]
564
        if isinstance(arguments, str):
1✔
565
            arguments_list = arguments.split(".")
1✔
566
        else:
567
            arguments_list = arguments
1✔
568
        logical_name_of_resource = arguments_list[0]
1✔
569
        attribute_name = arguments_list[1]
1✔
570

571
        node_resource = self._get_node_resource_for(
1✔
572
            resource_name=logical_name_of_resource,
573
            node_template=self._change_set.update_model.node_template,
574
        )
575

576
        if not is_nothing(node_resource.condition_reference):
1✔
577
            condition = self._get_node_condition_if_exists(node_resource.condition_reference.value)
1✔
578
            evaluation_result = self._resolve_condition(condition.name)
1✔
579

580
            if select_before and not evaluation_result.before:
1✔
NEW
581
                raise ValidationError(
×
582
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
583
                )
584

585
            if not select_before and not evaluation_result.after:
1✔
NEW
586
                raise ValidationError(
×
587
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
588
                )
589

590
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
591
            property_name=attribute_name, node_resource=node_resource
592
        )
593
        if node_property is not None:
1✔
594
            # The property is statically defined in the template and its value can be computed.
595
            property_delta = self.visit(node_property)
1✔
596
            value = property_delta.before if select_before else property_delta.after
1✔
597
        else:
598
            # The property is not statically defined and must therefore be available in
599
            # the properties deployed set.
600
            if select_before:
1✔
601
                value = self._before_deployed_property_value_of(
1✔
602
                    resource_logical_id=logical_name_of_resource,
603
                    property_name=attribute_name,
604
                )
605
            else:
606
                value = self._after_deployed_property_value_of(
1✔
607
                    resource_logical_id=logical_name_of_resource,
608
                    property_name=attribute_name,
609
                )
610
        return value
1✔
611

612
    def visit_node_intrinsic_function_fn_get_att(
1✔
613
        self, node_intrinsic_function: NodeIntrinsicFunction
614
    ) -> PreprocEntityDelta:
615
        # TODO: validate the return value according to the spec.
616
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
617
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
618
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
619

620
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
621
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
622
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
623

624
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
625
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
626
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
627

628
        return PreprocEntityDelta(before=before, after=after)
1✔
629

630
    def visit_node_intrinsic_function_fn_equals(
1✔
631
        self, node_intrinsic_function: NodeIntrinsicFunction
632
    ) -> PreprocEntityDelta:
633
        # TODO: add argument shape validation.
634
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
635
            return args[0] == args[1]
1✔
636

637
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
638
        delta = self._cached_apply(
1✔
639
            scope=node_intrinsic_function.scope,
640
            arguments_delta=arguments_delta,
641
            resolver=_compute_fn_equals,
642
        )
643
        return delta
1✔
644

645
    def visit_node_intrinsic_function_fn_if(
1✔
646
        self, node_intrinsic_function: NodeIntrinsicFunction
647
    ) -> PreprocEntityDelta:
648
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
649
        # False branch. If the condition is False, we don't evaluate the True branch.
650
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
651
            raise ValueError(
×
652
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
653
            )
654

655
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
656
        if_delta = PreprocEntityDelta()
1✔
657
        if not is_nothing(condition_delta.before):
1✔
658
            node_condition = self._get_node_condition_if_exists(
1✔
659
                condition_name=condition_delta.before
660
            )
661
            condition_value = self.visit(node_condition).before
1✔
662
            if condition_value:
1✔
663
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
664
            else:
665
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
666
            if_delta.before = arg_delta.before
1✔
667

668
        if not is_nothing(condition_delta.after):
1✔
669
            node_condition = self._get_node_condition_if_exists(
1✔
670
                condition_name=condition_delta.after
671
            )
672
            condition_value = self.visit(node_condition).after
1✔
673
            if condition_value:
1✔
674
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
675
            else:
676
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
677
            if_delta.after = arg_delta.after
1✔
678

679
        return if_delta
1✔
680

681
    def visit_node_intrinsic_function_fn_and(
1✔
682
        self, node_intrinsic_function: NodeIntrinsicFunction
683
    ) -> PreprocEntityDelta:
684
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
685
            result = all(args)
1✔
686
            return result
1✔
687

688
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
689
        delta = self._cached_apply(
1✔
690
            scope=node_intrinsic_function.scope,
691
            arguments_delta=arguments_delta,
692
            resolver=_compute_fn_and,
693
        )
694
        return delta
1✔
695

696
    def visit_node_intrinsic_function_fn_or(
1✔
697
        self, node_intrinsic_function: NodeIntrinsicFunction
698
    ) -> PreprocEntityDelta:
699
        def _compute_fn_or(args: list[bool]):
1✔
700
            result = any(args)
1✔
701
            return result
1✔
702

703
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
704
        delta = self._cached_apply(
1✔
705
            scope=node_intrinsic_function.scope,
706
            arguments_delta=arguments_delta,
707
            resolver=_compute_fn_or,
708
        )
709
        return delta
1✔
710

711
    def visit_node_intrinsic_function_fn_not(
1✔
712
        self, node_intrinsic_function: NodeIntrinsicFunction
713
    ) -> PreprocEntityDelta:
714
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
715
            # Is the argument ever a lone boolean?
716
            if isinstance(arg, list):
1✔
717
                return not arg[0]
1✔
718
            else:
719
                return not arg
×
720

721
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
722
        delta = self._cached_apply(
1✔
723
            scope=node_intrinsic_function.scope,
724
            arguments_delta=arguments_delta,
725
            resolver=_compute_fn_not,
726
        )
727
        return delta
1✔
728

729
    def visit_node_intrinsic_function_fn_sub(
1✔
730
        self, node_intrinsic_function: NodeIntrinsicFunction
731
    ) -> PreprocEntityDelta:
732
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
733
            # TODO: add further schema validation.
734
            string_template: str
735
            sub_parameters: dict
736
            if isinstance(args, str):
1✔
737
                string_template = args
1✔
738
                sub_parameters = {}
1✔
739
            elif (
1✔
740
                isinstance(args, list)
741
                and len(args) == 2
742
                and isinstance(args[0], str)
743
                and isinstance(args[1], dict)
744
            ):
745
                string_template = args[0]
1✔
746
                sub_parameters = args[1]
1✔
747
            else:
748
                raise RuntimeError(
×
749
                    "Invalid arguments shape for Fn::Sub, expected a String "
750
                    f"or a Tuple of String and Map but got '{args}'"
751
                )
752
            sub_string = string_template
1✔
753
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
754
            for template_variable_name in template_variable_names:
1✔
755
                template_variable_value = Nothing
1✔
756

757
                # Try to resolve the variable name as pseudo parameter.
758
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
759
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
760
                        pseudo_parameter_name=template_variable_name
761
                    )
762

763
                # Try to resolve the variable name as an entry to the defined parameters.
764
                elif template_variable_name in sub_parameters:
1✔
765
                    template_variable_value = sub_parameters[template_variable_name]
1✔
766

767
                # Try to resolve the variable name as GetAtt.
768
                elif "." in template_variable_name:
1✔
769
                    try:
1✔
770
                        template_variable_value = self._resolve_attribute(
1✔
771
                            arguments=template_variable_name, select_before=select_before
772
                        )
773
                    except RuntimeError:
1✔
774
                        pass
1✔
775

776
                # Try to resolve the variable name as Ref.
777
                else:
778
                    try:
1✔
779
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
780
                        template_variable_value = (
1✔
781
                            resource_delta.before if select_before else resource_delta.after
782
                        )
783
                        if isinstance(template_variable_value, PreprocResource):
1✔
784
                            template_variable_value = template_variable_value.physical_resource_id
1✔
785
                    except RuntimeError:
×
786
                        pass
×
787

788
                if is_nothing(template_variable_value):
1✔
789
                    raise RuntimeError(
1✔
790
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
791
                    )
792

793
                if not isinstance(template_variable_value, str):
1✔
794
                    template_variable_value = str(template_variable_value)
1✔
795

796
                sub_string = sub_string.replace(
1✔
797
                    f"${{{template_variable_name}}}", template_variable_value
798
                )
799

800
            # FIXME: the following type reduction is ported from v1; however it appears as though such
801
            #        reduction is not performed by the engine, and certainly not at this depth given the
802
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
803
            #        and the resource providers reviewed.
804
            account_id = self._change_set.account_id
1✔
805
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
806
            if sub_string == account_id or is_another_account_id:
1✔
807
                result = sub_string
1✔
808
            elif sub_string.isdigit():
1✔
809
                result = int(sub_string)
1✔
810
            else:
811
                try:
1✔
812
                    result = float(sub_string)
1✔
813
                except ValueError:
1✔
814
                    result = sub_string
1✔
815
            return result
1✔
816

817
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
818
        arguments_before = arguments_delta.before
1✔
819
        arguments_after = arguments_delta.after
1✔
820
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
821
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
822
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
823
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
824
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
825
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
826
        return PreprocEntityDelta(before=before, after=after)
1✔
827

828
    def visit_node_intrinsic_function_fn_join(
1✔
829
        self, node_intrinsic_function: NodeIntrinsicFunction
830
    ) -> PreprocEntityDelta:
831
        # TODO: add support for schema validation.
832
        # TODO: add tests for joining non string values.
833
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
834
            if not (isinstance(args, list) and len(args) == 2):
1✔
835
                return Nothing
1✔
836
            delimiter: str = str(args[0])
1✔
837
            values: list[Any] = args[1]
1✔
838
            if not isinstance(values, list):
1✔
839
                # shortcut if values is the empty string, for example:
840
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
841
                # CDK bootstrap does this
842
                if values == "":
1✔
843
                    return ""
1✔
844
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
845
            str_values: list[str] = []
1✔
846
            for value in values:
1✔
847
                if value is None:
1✔
848
                    continue
1✔
849
                str_value = str(value)
1✔
850
                str_values.append(str_value)
1✔
851
            join_result = delimiter.join(str_values)
1✔
852
            return join_result
1✔
853

854
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
855
        delta = self._cached_apply(
1✔
856
            scope=node_intrinsic_function.scope,
857
            arguments_delta=arguments_delta,
858
            resolver=_compute_fn_join,
859
        )
860
        return delta
1✔
861

862
    def visit_node_intrinsic_function_fn_select(
1✔
863
        self, node_intrinsic_function: NodeIntrinsicFunction
864
    ):
865
        # TODO: add further support for schema validation
866
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
867
            values = args[1]
1✔
868
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
869
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
870
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
871

872
            if not isinstance(values, list) or not values:
1✔
873
                raise ValidationError(
1✔
874
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
875
                )
876
            try:
1✔
877
                index: int = int(args[0])
1✔
878
            except ValueError as e:
1✔
879
                raise ValidationError(
1✔
880
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
881
                ) from e
882

883
            values_len = len(values)
1✔
884
            if index < 0 or index >= values_len:
1✔
885
                raise ValidationError(
1✔
886
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
887
                )
888
            selection = values[index]
1✔
889
            return selection
1✔
890

891
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
892
        delta = self._cached_apply(
1✔
893
            scope=node_intrinsic_function.scope,
894
            arguments_delta=arguments_delta,
895
            resolver=_compute_fn_select,
896
        )
897
        return delta
1✔
898

899
    def visit_node_intrinsic_function_fn_split(
1✔
900
        self, node_intrinsic_function: NodeIntrinsicFunction
901
    ):
902
        # TODO: add further support for schema validation
903
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
904
            delimiter = args[0]
1✔
905
            if not isinstance(delimiter, str) or not delimiter:
1✔
906
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
907
            source_string = args[1]
1✔
908
            if not isinstance(source_string, str):
1✔
909
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
910
            split_string = source_string.split(delimiter)
1✔
911
            return split_string
1✔
912

913
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
914
        delta = self._cached_apply(
1✔
915
            scope=node_intrinsic_function.scope,
916
            arguments_delta=arguments_delta,
917
            resolver=_compute_fn_split,
918
        )
919
        return delta
1✔
920

921
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
922
        self, node_intrinsic_function: NodeIntrinsicFunction
923
    ) -> PreprocEntityDelta:
924
        # TODO: add further support for schema validation
925

926
        def _compute_fn_get_a_zs(region) -> Any:
1✔
927
            if not isinstance(region, str):
1✔
928
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
929

930
            if not region:
1✔
931
                region = self._change_set.region_name
1✔
932

933
            account_id = self._change_set.account_id
1✔
934
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
935
            try:
1✔
936
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
937
                    ec2_client.describe_availability_zones()
938
                )
939
            except ClientError:
×
940
                raise RuntimeError(
×
941
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
942
                )
943
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
944
                "AvailabilityZones"
945
            ]
946
            azs = [az["ZoneName"] for az in availability_zones]
1✔
947
            return azs
1✔
948

949
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
950
        delta = self._cached_apply(
1✔
951
            scope=node_intrinsic_function.scope,
952
            arguments_delta=arguments_delta,
953
            resolver=_compute_fn_get_a_zs,
954
        )
955
        return delta
1✔
956

957
    def visit_node_intrinsic_function_fn_base64(
1✔
958
        self, node_intrinsic_function: NodeIntrinsicFunction
959
    ) -> PreprocEntityDelta:
960
        # TODO: add further support for schema validation
961
        def _compute_fn_base_64(string) -> Any:
1✔
962
            if not isinstance(string, str):
1✔
963
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
964
            # Ported from v1:
965
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
966
            return base64_string
1✔
967

968
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
969
        delta = self._cached_apply(
1✔
970
            scope=node_intrinsic_function.scope,
971
            arguments_delta=arguments_delta,
972
            resolver=_compute_fn_base_64,
973
        )
974
        return delta
1✔
975

976
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
977
        self, node_intrinsic_function: NodeIntrinsicFunction
978
    ) -> PreprocEntityDelta:
979
        # TODO: add type checking/validation for result unit?
980
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
981
        before_arguments = arguments_delta.before
1✔
982
        after_arguments = arguments_delta.after
1✔
983
        before = Nothing
1✔
984
        if before_arguments:
1✔
985
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
986
            before = before_value_delta.before
1✔
987
        after = Nothing
1✔
988
        if after_arguments:
1✔
989
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
990
            after = after_value_delta.after
1✔
991
        return PreprocEntityDelta(before=before, after=after)
1✔
992

993
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
994
        bindings_delta = self.visit(node_mapping.bindings)
1✔
995
        return bindings_delta
1✔
996

997
    def visit_node_parameters(
1✔
998
        self, node_parameters: NodeParameters
999
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
1000
        before_parameters = {}
1✔
1001
        after_parameters = {}
1✔
1002
        for parameter in node_parameters.parameters:
1✔
1003
            parameter_delta = self.visit(parameter)
1✔
1004
            parameter_before = parameter_delta.before
1✔
1005
            if not is_nothing(parameter_before):
1✔
1006
                before_parameters[parameter.name] = parameter_before
1✔
1007
            parameter_after = parameter_delta.after
1✔
1008
            if not is_nothing(parameter_after):
1✔
1009
                after_parameters[parameter.name] = parameter_after
1✔
1010
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
1011

1012
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
1013
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
1014
            raise ValidationError(
1✔
1015
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1016
            )
1017
        dynamic_value = node_parameter.dynamic_value
1✔
1018
        dynamic_delta = self.visit(dynamic_value)
1✔
1019

1020
        default_value = node_parameter.default_value
1✔
1021
        default_delta = self.visit(default_value)
1✔
1022

1023
        before = dynamic_delta.before or default_delta.before
1✔
1024
        after = dynamic_delta.after or default_delta.after
1✔
1025

1026
        parameter_type = self.visit(node_parameter.type_)
1✔
1027

1028
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1029
            match type_:
1✔
1030
                case "List<String>" | "CommaDelimitedList":
1✔
1031
                    return [item.strip() for item in value.split(",")]
1✔
1032
            return value
1✔
1033

1034
        if not is_nothing(after):
1✔
1035
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1036

1037
        return PreprocEntityDelta(before=before, after=after)
1✔
1038

1039
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1040
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1041
        return array_identifiers_delta
1✔
1042

1043
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1044
        delta = self.visit(node_condition.body)
1✔
1045
        return delta
1✔
1046

1047
    def _resource_physical_resource_id_from(
1✔
1048
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1049
    ) -> str | None:
1050
        # TODO: typing around resolved resources is needed and should be reflected here.
1051
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1052
        if resolved_resource.get("ResourceStatus") not in {
1✔
1053
            ResourceStatus.CREATE_COMPLETE,
1054
            ResourceStatus.UPDATE_COMPLETE,
1055
        }:
1056
            return None
1✔
1057

1058
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1059
        if not isinstance(physical_resource_id, str):
1✔
1060
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1061
        return physical_resource_id
1✔
1062

1063
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1064
        # TODO: typing around resolved resources is needed and should be reflected here.
1065
        return self._resource_physical_resource_id_from(
1✔
1066
            logical_resource_id=resource_logical_id,
1067
            resolved_resources=self._before_resolved_resources,
1068
        )
1069

1070
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1071
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1072

1073
    def visit_node_intrinsic_function_ref(
1✔
1074
        self, node_intrinsic_function: NodeIntrinsicFunction
1075
    ) -> PreprocEntityDelta:
1076
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1077
            if logical_id == "AWS::NoValue":
1✔
1078
                return Nothing
1✔
1079

1080
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1081
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1082
                reference_delta.before = before.physical_resource_id
1✔
1083
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1084
                reference_delta.after = after.physical_resource_id
1✔
1085
            return reference_delta
1✔
1086

1087
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1088
        delta = self._cached_apply(
1✔
1089
            scope=node_intrinsic_function.scope,
1090
            arguments_delta=arguments_delta,
1091
            resolver=_compute_fn_ref,
1092
        )
1093
        return delta
1✔
1094

1095
    def visit_node_intrinsic_function_condition(
1✔
1096
        self, node_intrinsic_function: NodeIntrinsicFunction
1097
    ) -> PreprocEntityDelta:
1098
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1099

1100
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1101
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1102
            if is_nothing(node_condition):
1✔
1103
                raise RuntimeError(f"Undefined condition '{name}'")
×
1104
            condition_delta = self.visit(node_condition)
1✔
1105
            return condition_delta
1✔
1106

1107
        delta = self._cached_apply(
1✔
1108
            resolver=_delta_of_condition,
1109
            scope=node_intrinsic_function.scope,
1110
            arguments_delta=arguments_delta,
1111
        )
1112
        return delta
1✔
1113

1114
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1115
        node_change_type = node_array.change_type
1✔
1116
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1117
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1118
        for change_set_entity in node_array.array:
1✔
1119
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1120
            delta_before = delta.before
1✔
1121
            delta_after = delta.after
1✔
1122
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1123
                before.append(delta_before)
1✔
1124
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1125
                after.append(delta_after)
1✔
1126
        return PreprocEntityDelta(before=before, after=after)
1✔
1127

1128
    def visit_node_properties(
1✔
1129
        self, node_properties: NodeProperties
1130
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1131
        node_change_type = node_properties.change_type
1✔
1132
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1133
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1134
        for node_property in node_properties.properties:
1✔
1135
            property_name = node_property.name
1✔
1136
            delta = self.visit(node_property)
1✔
1137
            delta_before = delta.before
1✔
1138
            delta_after = delta.after
1✔
1139
            if (
1✔
1140
                not is_nothing(before_bindings)
1141
                and not is_nothing(delta_before)
1142
                and delta_before is not None
1143
            ):
1144
                before_bindings[property_name] = delta_before
1✔
1145
            if (
1✔
1146
                not is_nothing(after_bindings)
1147
                and not is_nothing(delta_after)
1148
                and delta_after is not None
1149
            ):
1150
                after_bindings[property_name] = delta_after
1✔
1151
        before = Nothing
1✔
1152
        if not is_nothing(before_bindings):
1✔
1153
            before = PreprocProperties(properties=before_bindings)
1✔
1154
        after = Nothing
1✔
1155
        if not is_nothing(after_bindings):
1✔
1156
            after = PreprocProperties(properties=after_bindings)
1✔
1157
        return PreprocEntityDelta(before=before, after=after)
1✔
1158

1159
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1160
        reference_delta = self.visit(reference)
1✔
1161
        before_reference = reference_delta.before
1✔
1162
        before = Nothing
1✔
1163
        if isinstance(before_reference, str):
1✔
1164
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1165
            before = before_delta.before
1✔
1166
        after = Nothing
1✔
1167
        after_reference = reference_delta.after
1✔
1168
        if isinstance(after_reference, str):
1✔
1169
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1170
            after = after_delta.after
1✔
1171
        return PreprocEntityDelta(before=before, after=after)
1✔
1172

1173
    def visit_node_resource(
1✔
1174
        self, node_resource: NodeResource
1175
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1176
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1177
            raise ValidationError(
1✔
1178
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1179
            )
1180
        change_type = node_resource.change_type
1✔
1181
        condition_before = Nothing
1✔
1182
        condition_after = Nothing
1✔
1183
        if not is_nothing(node_resource.condition_reference):
1✔
1184
            condition_delta = self._resolve_resource_condition_reference(
1✔
1185
                node_resource.condition_reference
1186
            )
1187
            condition_before = condition_delta.before
1✔
1188
            condition_after = condition_delta.after
1✔
1189

1190
        depends_on_before = Nothing
1✔
1191
        depends_on_after = Nothing
1✔
1192
        if not is_nothing(node_resource.depends_on):
1✔
1193
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1194
            depends_on_before = depends_on_delta.before
1✔
1195
            depends_on_after = depends_on_delta.after
1✔
1196

1197
        type_delta = self.visit(node_resource.type_)
1✔
1198
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1199
            node_resource.properties
1200
        )
1201

1202
        before = Nothing
1✔
1203
        after = Nothing
1✔
1204
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1205
            logical_resource_id = node_resource.name
1✔
1206
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1207
                resource_logical_id=logical_resource_id
1208
            )
1209
            before = PreprocResource(
1✔
1210
                logical_id=logical_resource_id,
1211
                physical_resource_id=before_physical_resource_id,
1212
                condition=condition_before,
1213
                resource_type=type_delta.before,
1214
                properties=properties_delta.before,
1215
                depends_on=depends_on_before,
1216
                requires_replacement=False,
1217
            )
1218
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1219
            logical_resource_id = node_resource.name
1✔
1220
            try:
1✔
1221
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1222
                    resource_logical_id=logical_resource_id
1223
                )
1224
            except RuntimeError:
×
1225
                after_physical_resource_id = None
×
1226
            after = PreprocResource(
1✔
1227
                logical_id=logical_resource_id,
1228
                physical_resource_id=after_physical_resource_id,
1229
                condition=condition_after,
1230
                resource_type=type_delta.after,
1231
                properties=properties_delta.after,
1232
                depends_on=depends_on_after,
1233
                requires_replacement=node_resource.requires_replacement,
1234
            )
1235
        return PreprocEntityDelta(before=before, after=after)
1✔
1236

1237
    def visit_node_output(
1✔
1238
        self, node_output: NodeOutput
1239
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1240
        change_type = node_output.change_type
1✔
1241
        value_delta = self.visit(node_output.value)
1✔
1242

1243
        condition_delta = Nothing
1✔
1244
        if not is_nothing(node_output.condition_reference):
1✔
1245
            condition_delta = self._resolve_resource_condition_reference(
1✔
1246
                node_output.condition_reference
1247
            )
1248
            condition_before = condition_delta.before
1✔
1249
            condition_after = condition_delta.after
1✔
1250
            if not condition_before and condition_after:
1✔
1251
                change_type = ChangeType.CREATED
1✔
1252
            elif condition_before and not condition_after:
1✔
1253
                change_type = ChangeType.REMOVED
1✔
1254

1255
        export_delta = Nothing
1✔
1256
        if not is_nothing(node_output.export):
1✔
1257
            export_delta = self.visit(node_output.export)
1✔
1258

1259
        before: Maybe[PreprocOutput] = Nothing
1✔
1260
        if change_type != ChangeType.CREATED:
1✔
1261
            before = PreprocOutput(
1✔
1262
                name=node_output.name,
1263
                value=value_delta.before,
1264
                export=export_delta.before if export_delta else None,
1265
                condition=condition_delta.before if condition_delta else None,
1266
            )
1267
        after: Maybe[PreprocOutput] = Nothing
1✔
1268
        if change_type != ChangeType.REMOVED:
1✔
1269
            after = PreprocOutput(
1✔
1270
                name=node_output.name,
1271
                value=value_delta.after,
1272
                export=export_delta.after if export_delta else None,
1273
                condition=condition_delta.after if condition_delta else None,
1274
            )
1275
        return PreprocEntityDelta(before=before, after=after)
1✔
1276

1277
    def visit_node_outputs(
1✔
1278
        self, node_outputs: NodeOutputs
1279
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1280
        before: list[PreprocOutput] = []
1✔
1281
        after: list[PreprocOutput] = []
1✔
1282
        for node_output in node_outputs.outputs:
1✔
1283
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1284
            output_before = output_delta.before
1✔
1285
            output_after = output_delta.after
1✔
1286
            if not is_nothing(output_before):
1✔
1287
                before.append(output_before)
1✔
1288
            if not is_nothing(output_after):
1✔
1289
                after.append(output_after)
1✔
1290
        return PreprocEntityDelta(before=before, after=after)
1✔
1291

1292
    def visit_node_intrinsic_function_fn_import_value(
1✔
1293
        self, node_intrinsic_function: NodeIntrinsicFunction
1294
    ) -> PreprocEntityDelta:
1295
        def _compute_fn_import_value(string) -> str:
1✔
1296
            if not isinstance(string, str):
1✔
1297
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1298

1299
            exports = exports_map(
1✔
1300
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1301
            )
1302

1303
            return exports.get(string, {}).get("Value") or Nothing
1✔
1304

1305
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1306
        delta = self._cached_apply(
1✔
1307
            scope=node_intrinsic_function.scope,
1308
            arguments_delta=arguments_delta,
1309
            resolver=_compute_fn_import_value,
1310
        )
1311
        return delta
1✔
1312

1313
    def visit_node_intrinsic_function_fn_transform(
1✔
1314
        self, node_intrinsic_function: NodeIntrinsicFunction
1315
    ):
1316
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc