• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21737074253

05 Feb 2026 11:58PM UTC coverage: 86.883% (+0.01%) from 86.873%
21737074253

push

github

web-flow
CFn: implement DeletionPolicy and UpdateReplacePolicy (#13535)

40 of 40 new or added lines in 2 files covered. (100.0%)

8 existing lines in 2 files now uncovered.

69970 of 80534 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.84
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from enum import StrEnum
1✔
8
from typing import Any, Final
1✔
9

10
from botocore.exceptions import ClientError
1✔
11

12
from localstack import config
1✔
13
from localstack.aws.api.cloudformation import ResourceStatus
1✔
14
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
15
from localstack.aws.connect import connect_to
1✔
16
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
17
    ChangeSetEntity,
18
    ChangeType,
19
    Maybe,
20
    NodeArray,
21
    NodeCondition,
22
    NodeDependsOn,
23
    NodeDivergence,
24
    NodeIntrinsicFunction,
25
    NodeMapping,
26
    NodeObject,
27
    NodeOutput,
28
    NodeOutputs,
29
    NodeParameter,
30
    NodeParameters,
31
    NodeProperties,
32
    NodeProperty,
33
    NodeResource,
34
    NodeResources,
35
    NodeTemplate,
36
    Nothing,
37
    NothingType,
38
    Scope,
39
    TerminalValue,
40
    TerminalValueCreated,
41
    TerminalValueModified,
42
    TerminalValueRemoved,
43
    TerminalValueUnchanged,
44
    is_nothing,
45
)
46
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
47
    ChangeSetModelVisitor,
48
)
49
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
50
    REGEX_DYNAMIC_REF,
51
    extract_dynamic_reference,
52
    perform_dynamic_reference_lookup,
53
)
54
from localstack.services.cloudformation.engine.v2.unsupported_resource import (
1✔
55
    should_ignore_unsupported_resource_type,
56
)
57
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
58
from localstack.services.cloudformation.stores import (
1✔
59
    exports_map,
60
)
61
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
62
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
63
from localstack.utils.aws.arns import get_partition
1✔
64
from localstack.utils.numbers import to_number
1✔
65
from localstack.utils.objects import get_value_from_path
1✔
66
from localstack.utils.run import to_str
1✔
67
from localstack.utils.strings import to_bytes
1✔
68
from localstack.utils.urls import localstack_host
1✔
69

70
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
71

72
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
73
    "AWS::Partition",
74
    "AWS::AccountId",
75
    "AWS::Region",
76
    "AWS::StackName",
77
    "AWS::StackId",
78
    "AWS::URLSuffix",
79
    "AWS::NoValue",
80
    "AWS::NotificationARNs",
81
}
82

83

84
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
85
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
86
)
87
MOCKED_REFERENCE = "unknown"
1✔
88

89
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
90

91

92
class PreprocEntityDelta[TBefore, TAfter]:
1✔
93
    before: Maybe[TBefore]
1✔
94
    after: Maybe[TAfter]
1✔
95

96
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
97
        self.before = before
1✔
98
        self.after = after
1✔
99

100
    def __eq__(self, other):
1✔
101
        if not isinstance(other, PreprocEntityDelta):
×
102
            return False
×
103
        return self.before == other.before and self.after == other.after
×
104

105

106
class PreprocProperties:
1✔
107
    properties: dict[str, Any]
1✔
108

109
    def __init__(self, properties: dict[str, Any]):
1✔
110
        self.properties = properties
1✔
111

112
    def __eq__(self, other):
1✔
113
        if not isinstance(other, PreprocProperties):
1✔
114
            return False
1✔
115
        return self.properties == other.properties
1✔
116

117

118
class DeletionPolicy(StrEnum):
1✔
119
    Retain = "Retain"
1✔
120
    Delete = "Delete"
1✔
121
    RetainExceptOnCreate = "RetainExceptOnCreate"
1✔
122
    Snapshot = "Snapshot"
1✔
123

124

125
class UpdateReplacePolicy(StrEnum):
1✔
126
    Delete = "Delete"
1✔
127
    Retain = "Retain"
1✔
128
    Snapshot = "Snapshot"
1✔
129

130

131
class PreprocResource:
1✔
132
    logical_id: str
1✔
133
    physical_resource_id: str | None
1✔
134
    condition: bool | None
1✔
135
    resource_type: str
1✔
136
    properties: PreprocProperties
1✔
137
    depends_on: list[str] | None
1✔
138
    requires_replacement: bool
1✔
139
    status: ResourceStatus | None
1✔
140
    # TODO: typing
141
    deletion_policy: DeletionPolicy | None
1✔
142
    update_replace_policy: UpdateReplacePolicy | None
1✔
143

144
    def __init__(
1✔
145
        self,
146
        logical_id: str,
147
        physical_resource_id: str,
148
        condition: bool | None,
149
        resource_type: str,
150
        properties: PreprocProperties,
151
        depends_on: list[str] | None,
152
        requires_replacement: bool,
153
        status: ResourceStatus | None = None,
154
        deletion_policy: str | None = None,
155
        update_replace_policy: str | None = None,
156
    ):
157
        self.logical_id = logical_id
1✔
158
        self.physical_resource_id = physical_resource_id
1✔
159
        self.condition = condition
1✔
160
        self.resource_type = resource_type
1✔
161
        self.properties = properties
1✔
162
        self.depends_on = depends_on
1✔
163
        self.requires_replacement = requires_replacement
1✔
164
        self.status = status
1✔
165
        self.deletion_policy = deletion_policy
1✔
166
        self.update_replace_policy = update_replace_policy
1✔
167

168
    @staticmethod
1✔
169
    def _compare_conditions(c1: bool, c2: bool):
1✔
170
        # The lack of condition equates to a true condition.
171
        c1 = c1 if isinstance(c1, bool) else True
1✔
172
        c2 = c2 if isinstance(c2, bool) else True
1✔
173
        return c1 == c2
1✔
174

175
    def __eq__(self, other):
1✔
176
        if not isinstance(other, PreprocResource):
1✔
177
            return False
1✔
178
        return all(
1✔
179
            [
180
                self.logical_id == other.logical_id,
181
                self._compare_conditions(self.condition, other.condition),
182
                self.resource_type == other.resource_type,
183
                self.properties == other.properties,
184
            ]
185
        )
186

187

188
class PreprocOutput:
1✔
189
    name: str
1✔
190
    value: Any
1✔
191
    export: Any | None
1✔
192
    condition: bool | None
1✔
193

194
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
195
        self.name = name
1✔
196
        self.value = value
1✔
197
        self.export = export
1✔
198
        self.condition = condition
1✔
199

200
    def __eq__(self, other):
1✔
201
        if not isinstance(other, PreprocOutput):
×
202
            return False
×
203
        return all(
×
204
            [
205
                self.name == other.name,
206
                self.value == other.value,
207
                self.export == other.export,
208
                self.condition == other.condition,
209
            ]
210
        )
211

212

213
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
214
    _change_set: Final[ChangeSet]
1✔
215
    _before_resolved_resources: Final[dict]
1✔
216
    _before_cache: Final[dict[Scope, Any]]
1✔
217
    _after_cache: Final[dict[Scope, Any]]
1✔
218

219
    def __init__(self, change_set: ChangeSet):
1✔
220
        self._change_set = change_set
1✔
221
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
222
        self._before_cache = {}
1✔
223
        self._after_cache = {}
1✔
224

225
    def _setup_runtime_cache(self) -> None:
1✔
226
        runtime_cache_key = self.__class__.__name__
1✔
227

228
        self._before_cache.clear()
1✔
229
        self._after_cache.clear()
1✔
230

231
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
232
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
233
            self._before_cache.update(cache)
1✔
234

235
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
236
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
237
            self._after_cache.update(cache)
×
238

239
    def _save_runtime_cache(self) -> None:
1✔
240
        runtime_cache_key = self.__class__.__name__
1✔
241

242
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
243
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
244

245
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
246
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
247

248
    def process(self) -> None:
1✔
249
        self._setup_runtime_cache()
1✔
250
        node_template = self._change_set.update_model.node_template
1✔
251
        node_conditions = self._change_set.update_model.node_template.conditions
1✔
252
        self.visit(node_conditions)
1✔
253
        self.visit(node_template)
1✔
254
        self._save_runtime_cache()
1✔
255

256
    def _get_node_resource_for(
1✔
257
        self, resource_name: str, node_template: NodeTemplate
258
    ) -> NodeResource:
259
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
260
        for node_resource in node_template.resources.resources:
1✔
261
            if node_resource.name == resource_name:
1✔
262
                self.visit(node_resource)
1✔
263
                return node_resource
1✔
264
        raise ValidationError(
1✔
265
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
266
        )
267

268
    def _get_node_property_for(
1✔
269
        self, property_name: str, node_resource: NodeResource
270
    ) -> NodeProperty | None:
271
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
272
        for node_property in node_resource.properties.properties:
1✔
273
            if node_property.name == property_name:
1✔
274
                self.visit(node_property)
1✔
275
                return node_property
1✔
276
        return None
1✔
277

278
    def _deployed_property_value_of(
1✔
279
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
280
    ) -> Any:
281
        # We have to override this function to make sure it does not try to access the
282
        # resolved resource
283

284
        # Before we can obtain deployed value for a resource, we need to first ensure to
285
        # process the resource if this wasn't processed already. Ideally, values should only
286
        # be accessible through delta objects, to ensure computation is always complete at
287
        # every level.
288
        _ = self._get_node_resource_for(
1✔
289
            resource_name=resource_logical_id,
290
            node_template=self._change_set.update_model.node_template,
291
        )
292
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
293
        if resolved_resource is None:
1✔
294
            raise RuntimeError(
1✔
295
                f"No deployed instances of resource '{resource_logical_id}' were found"
296
            )
297
        properties = resolved_resource.get("Properties", {})
1✔
298
        resource_type = resolved_resource.get("Type")
1✔
299
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
300
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
301

302
        if property_value:
1✔
303
            if not isinstance(property_value, (str, list, dict)):
1✔
304
                # Str: Standard expected type. TODO validate bools and numbers
305
                # List: Multiple resource types can return a list of values e.g. AWS::EC2::VPC.
306
                # Dict: Custom resources in CloudFormation can return arbitrary data structures.
307
                raise RuntimeError(
×
308
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
309
                )
310
            return property_value
1✔
311
        elif resource_type and should_ignore_unsupported_resource_type(
1✔
312
            resource_type=resource_type,
313
            change_set_type=self._change_set.change_set_type,
314
        ):
315
            return MOCKED_REFERENCE
1✔
316

317
        return property_value
×
318

319
    def _before_deployed_property_value_of(
1✔
320
        self, resource_logical_id: str, property_name: str
321
    ) -> Any:
322
        return self._deployed_property_value_of(
1✔
323
            resource_logical_id=resource_logical_id,
324
            property_name=property_name,
325
            resolved_resources=self._before_resolved_resources,
326
        )
327

328
    def _after_deployed_property_value_of(
1✔
329
        self, resource_logical_id: str, property_name: str
330
    ) -> str | None:
331
        return self._before_deployed_property_value_of(
1✔
332
            resource_logical_id=resource_logical_id, property_name=property_name
333
        )
334

335
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
336
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
337
        # TODO: another scenarios suggesting property lookups might be preferable.
338
        for mapping in mappings:
1✔
339
            if mapping.name == map_name:
1✔
340
                self.visit(mapping)
1✔
341
                return mapping
1✔
342
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
343

344
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
345
        parameters: list[NodeParameter] = (
1✔
346
            self._change_set.update_model.node_template.parameters.parameters
347
        )
348
        # TODO: another scenarios suggesting property lookups might be preferable.
349
        for parameter in parameters:
1✔
350
            if parameter.name == parameter_name:
1✔
351
                self.visit(parameter)
1✔
352
                return parameter
1✔
353
        return Nothing
1✔
354

355
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
356
        conditions: list[NodeCondition] = (
1✔
357
            self._change_set.update_model.node_template.conditions.conditions
358
        )
359
        # TODO: another scenarios suggesting property lookups might be preferable.
360
        for condition in conditions:
1✔
361
            if condition.name == condition_name:
1✔
362
                self.visit(condition)
1✔
363
                return condition
1✔
364
        return Nothing
1✔
365

366
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
367
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
368
        if isinstance(node_condition, NodeCondition):
1✔
369
            condition_delta = self.visit(node_condition)
1✔
370
            return condition_delta
1✔
371
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
372

373
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
374
        match pseudo_parameter_name:
1✔
375
            case "AWS::Partition":
1✔
376
                return get_partition(self._change_set.region_name)
1✔
377
            case "AWS::AccountId":
1✔
378
                return self._change_set.stack.account_id
1✔
379
            case "AWS::Region":
1✔
380
                return self._change_set.stack.region_name
1✔
381
            case "AWS::StackName":
1✔
382
                return self._change_set.stack.stack_name
1✔
383
            case "AWS::StackId":
1✔
384
                return self._change_set.stack.stack_id
1✔
385
            case "AWS::URLSuffix":
1✔
386
                return _AWS_URL_SUFFIX
1✔
387
            case "AWS::NoValue":
×
388
                return None
×
389
            case _:
×
390
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
391

392
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
393
        if logical_id in _PSEUDO_PARAMETERS:
1✔
394
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
395
                pseudo_parameter_name=logical_id
396
            )
397
            # Pseudo parameters are constants within the lifecycle of a template.
398
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
399

400
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
401
        if isinstance(node_parameter, NodeParameter):
1✔
402
            parameter_delta = self.visit(node_parameter)
1✔
403
            return parameter_delta
1✔
404

405
        node_resource = self._get_node_resource_for(
1✔
406
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
407
        )
408
        resource_delta = self.visit(node_resource)
1✔
409
        before = resource_delta.before
1✔
410
        after = resource_delta.after
1✔
411
        return PreprocEntityDelta(before=before, after=after)
1✔
412

413
    def _resolve_mapping(
1✔
414
        self, map_name: str, top_level_key: str, second_level_key
415
    ) -> PreprocEntityDelta:
416
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
417
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
418
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
419
        if not isinstance(top_level_value, NodeObject):
1✔
420
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
421
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
422
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
423
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
424
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
425
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
426
        mapping_value_delta = self.visit(second_level_value)
1✔
427
        return mapping_value_delta
1✔
428

429
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
430
        entity_scope = change_set_entity.scope
1✔
431
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
432
            before = self._before_cache[entity_scope]
1✔
433
            after = self._after_cache[entity_scope]
1✔
434
            return PreprocEntityDelta(before=before, after=after)
1✔
435
        delta = super().visit(change_set_entity=change_set_entity)
1✔
436
        if isinstance(delta, PreprocEntityDelta):
1✔
437
            delta = self._maybe_perform_replacements(delta)
1✔
438
            self._before_cache[entity_scope] = delta.before
1✔
439
            self._after_cache[entity_scope] = delta.after
1✔
440
        return delta
1✔
441

442
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
443
        delta = self._maybe_perform_static_replacements(delta)
1✔
444
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
445
        return delta
1✔
446

447
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
448
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
449

450
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
451
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
452

453
    def _maybe_perform_on_delta[T](
1✔
454
        self, delta: PreprocEntityDelta | None, f: Callable[[T], T]
455
    ) -> PreprocEntityDelta | None:
456
        if isinstance(delta.before, str):
1✔
457
            delta.before = f(delta.before)
1✔
458
        if isinstance(delta.after, str):
1✔
459
            delta.after = f(delta.after)
1✔
460
        return delta
1✔
461

462
    def _perform_dynamic_replacements[T](self, value: T) -> T:
1✔
463
        if not isinstance(value, str):
1✔
464
            return value
×
465

466
        if dynamic_ref := extract_dynamic_reference(value):
1✔
467
            new_value = perform_dynamic_reference_lookup(
1✔
468
                reference=dynamic_ref,
469
                account_id=self._change_set.account_id,
470
                region_name=self._change_set.region_name,
471
            )
472
            if new_value:
1✔
473
                # We need to use a function here, to avoid backslash processing by regex.
474
                # From the regex sub documentation:
475
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
476
                # Using a function, we can avoid this processing.
477
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
478

479
        return value
1✔
480

481
    @staticmethod
1✔
482
    def _perform_static_replacements(value: str) -> str:
1✔
483
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
484
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
485
            prefix = api_match[1]
1✔
486
            host = api_match[2]
1✔
487
            path = api_match[3]
1✔
488
            port = localstack_host().port
1✔
489
            value = f"{prefix}{host}:{port}/{path}"
1✔
490
            return value
1✔
491

492
        return value
1✔
493

494
    def _cached_apply(
1✔
495
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
496
    ) -> PreprocEntityDelta:
497
        """
498
        Applies the resolver function to the given input delta if and only if the required
499
        values are not already present in the runtime caches. This function handles both
500
        the 'before' and 'after' components of the delta independently.
501

502
        The resolver function receives either the 'before' or 'after' value from the input
503
        delta and returns a resolved value. If the result returned by the resolver is
504
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
505
        component from it:  the 'before' value if the input was 'before', and the 'after'
506
        value if the input was 'after'.
507

508
        This function only reads from the cache and does not update it. It is the caller's
509
        responsibility to handle caching, either manually or via the upstream visit method
510
        of this class.
511

512
        Args:
513
            scope (Scope): The current scope used as a key for cache lookup.
514
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
515
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
516

517
        Returns:
518
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
519
        """
520

521
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
522
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
523
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
524

525
        arguments_before = arguments_delta.before
1✔
526
        arguments_after = arguments_delta.after
1✔
527

528
        before = self._before_cache.get(scope, Nothing)
1✔
529
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
530
            before = resolver(arguments_before)
1✔
531
            if isinstance(before, PreprocEntityDelta):
1✔
532
                before = before.before
1✔
533

534
        after = self._after_cache.get(scope, Nothing)
1✔
535
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
536
            after = resolver(arguments_after)
1✔
537
            if isinstance(after, PreprocEntityDelta):
1✔
538
                after = after.after
1✔
539

540
        return PreprocEntityDelta(before=before, after=after)
1✔
541

542
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
543
        return self.visit(node_property.value)
1✔
544

545
    def visit_terminal_value_modified(
1✔
546
        self, terminal_value_modified: TerminalValueModified
547
    ) -> PreprocEntityDelta:
548
        return PreprocEntityDelta(
1✔
549
            before=terminal_value_modified.value,
550
            after=terminal_value_modified.modified_value,
551
        )
552

553
    def visit_terminal_value_created(
1✔
554
        self, terminal_value_created: TerminalValueCreated
555
    ) -> PreprocEntityDelta:
556
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
557

558
    def visit_terminal_value_removed(
1✔
559
        self, terminal_value_removed: TerminalValueRemoved
560
    ) -> PreprocEntityDelta:
561
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
562

563
    def visit_terminal_value_unchanged(
1✔
564
        self, terminal_value_unchanged: TerminalValueUnchanged
565
    ) -> PreprocEntityDelta:
566
        return PreprocEntityDelta(
1✔
567
            before=terminal_value_unchanged.value,
568
            after=terminal_value_unchanged.value,
569
        )
570

571
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
572
        before_delta = self.visit(node_divergence.value)
1✔
573
        after_delta = self.visit(node_divergence.divergence)
1✔
574
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
575

576
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
577
        node_change_type = node_object.change_type
1✔
578
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
579
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
580
        for name, change_set_entity in node_object.bindings.items():
1✔
581
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
582
            delta_before = delta.before
1✔
583
            delta_after = delta.after
1✔
584
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
585
                before[name] = delta_before
1✔
586
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
587
                after[name] = delta_after
1✔
588
        return PreprocEntityDelta(before=before, after=after)
1✔
589

590
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
591
        # TODO: add arguments validation.
592
        arguments_list: list[str]
593
        if isinstance(arguments, str):
1✔
594
            arguments_list = arguments.split(".")
1✔
595
        else:
596
            arguments_list = arguments
1✔
597

598
        if len(arguments_list) < 2:
1✔
599
            raise ValidationError(
1✔
600
                "Template error: every Fn::GetAtt object requires two non-empty parameters, the resource name and the resource attribute"
601
            )
602

603
        logical_name_of_resource = arguments_list[0]
1✔
604
        attribute_name = ".".join(arguments_list[1:])
1✔
605

606
        node_resource = self._get_node_resource_for(
1✔
607
            resource_name=logical_name_of_resource,
608
            node_template=self._change_set.update_model.node_template,
609
        )
610

611
        if not is_nothing(node_resource.condition_reference):
1✔
612
            condition = self._get_node_condition_if_exists(node_resource.condition_reference.value)
1✔
613
            evaluation_result = self._resolve_condition(condition.name)
1✔
614

615
            if select_before and not evaluation_result.before:
1✔
616
                raise ValidationError(
1✔
617
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
618
                )
619

620
            if not select_before and not evaluation_result.after:
1✔
621
                raise ValidationError(
×
622
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
623
                )
624

625
        # Custom Resources can mutate their definition
626
        # So the preproc should search first in the resource values and then check the template
627
        if select_before:
1✔
628
            value = self._before_deployed_property_value_of(
1✔
629
                resource_logical_id=logical_name_of_resource,
630
                property_name=attribute_name,
631
            )
632
        else:
633
            value = self._after_deployed_property_value_of(
1✔
634
                resource_logical_id=logical_name_of_resource,
635
                property_name=attribute_name,
636
            )
637
        if value is not None:
1✔
638
            return value
1✔
639

640
        node_property: NodeProperty | None = self._get_node_property_for(
×
641
            property_name=attribute_name, node_resource=node_resource
642
        )
643
        if node_property is not None:
×
644
            # The property is statically defined in the template and its value can be computed.
645
            property_delta = self.visit(node_property)
×
646
            value = property_delta.before if select_before else property_delta.after
×
647

648
        return value
×
649

650
    def visit_node_intrinsic_function_fn_get_att(
1✔
651
        self, node_intrinsic_function: NodeIntrinsicFunction
652
    ) -> PreprocEntityDelta:
653
        # TODO: validate the return value according to the spec.
654
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
655
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
656
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
657

658
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
659
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
660
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
661

662
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
663
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
664
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
665

666
        return PreprocEntityDelta(before=before, after=after)
1✔
667

668
    def visit_node_intrinsic_function_fn_equals(
1✔
669
        self, node_intrinsic_function: NodeIntrinsicFunction
670
    ) -> PreprocEntityDelta:
671
        # TODO: add argument shape validation.
672
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
673
            return args[0] == args[1]
1✔
674

675
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
676

677
        if isinstance(arguments_delta.after, list) and len(arguments_delta.after) != 2:
1✔
678
            raise ValidationError(
×
679
                "Template error: every Fn::Equals object requires a list of 2 string parameters."
680
            )
681

682
        delta = self._cached_apply(
1✔
683
            scope=node_intrinsic_function.scope,
684
            arguments_delta=arguments_delta,
685
            resolver=_compute_fn_equals,
686
        )
687
        return delta
1✔
688

689
    def visit_node_intrinsic_function_fn_if(
1✔
690
        self, node_intrinsic_function: NodeIntrinsicFunction
691
    ) -> PreprocEntityDelta:
692
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
693
        # False branch. If the condition is False, we don't evaluate the True branch.
694
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
695
            raise ValueError(
×
696
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
697
            )
698

699
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
700
        if_delta = PreprocEntityDelta()
1✔
701
        if not is_nothing(condition_delta.before):
1✔
702
            node_condition = self._get_node_condition_if_exists(
1✔
703
                condition_name=condition_delta.before
704
            )
705
            if is_nothing(node_condition):
1✔
706
                # TODO: I don't think this is a possible state since for us to be evaluating the before state,
707
                #  we must have successfully deployed the stack and as such this case was not reached before
708
                raise ValidationError(
×
709
                    f"Template error: unresolved condition dependency {condition_delta.before} in Fn::If"
710
                )
711

712
            condition_value = self.visit(node_condition).before
1✔
713
            if condition_value:
1✔
714
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
715
            else:
716
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
717
            if_delta.before = arg_delta.before
1✔
718

719
        if not is_nothing(condition_delta.after):
1✔
720
            node_condition = self._get_node_condition_if_exists(
1✔
721
                condition_name=condition_delta.after
722
            )
723
            if is_nothing(node_condition):
1✔
724
                raise ValidationError(
1✔
725
                    f"Template error: unresolved condition dependency {condition_delta.after} in Fn::If"
726
                )
727

728
            condition_value = self.visit(node_condition).after
1✔
729
            if condition_value:
1✔
730
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
731
            else:
732
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
733
            if_delta.after = arg_delta.after
1✔
734

735
        return if_delta
1✔
736

737
    def visit_node_intrinsic_function_fn_and(
1✔
738
        self, node_intrinsic_function: NodeIntrinsicFunction
739
    ) -> PreprocEntityDelta:
740
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
741
            result = all(args)
1✔
742
            return result
1✔
743

744
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
745
        delta = self._cached_apply(
1✔
746
            scope=node_intrinsic_function.scope,
747
            arguments_delta=arguments_delta,
748
            resolver=_compute_fn_and,
749
        )
750
        return delta
1✔
751

752
    def visit_node_intrinsic_function_fn_or(
1✔
753
        self, node_intrinsic_function: NodeIntrinsicFunction
754
    ) -> PreprocEntityDelta:
755
        def _compute_fn_or(args: list[bool]):
1✔
756
            result = any(args)
1✔
757
            return result
1✔
758

759
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
760
        delta = self._cached_apply(
1✔
761
            scope=node_intrinsic_function.scope,
762
            arguments_delta=arguments_delta,
763
            resolver=_compute_fn_or,
764
        )
765
        return delta
1✔
766

767
    def visit_node_intrinsic_function_fn_not(
1✔
768
        self, node_intrinsic_function: NodeIntrinsicFunction
769
    ) -> PreprocEntityDelta:
770
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
771
            # Is the argument ever a lone boolean?
772
            if isinstance(arg, list):
1✔
773
                return not arg[0]
1✔
774
            else:
775
                return not arg
×
776

777
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
778
        delta = self._cached_apply(
1✔
779
            scope=node_intrinsic_function.scope,
780
            arguments_delta=arguments_delta,
781
            resolver=_compute_fn_not,
782
        )
783
        return delta
1✔
784

785
    def visit_node_intrinsic_function_fn_sub(
1✔
786
        self, node_intrinsic_function: NodeIntrinsicFunction
787
    ) -> PreprocEntityDelta:
788
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
789
            # TODO: add further schema validation.
790
            string_template: str
791
            sub_parameters: dict
792
            if isinstance(args, str):
1✔
793
                string_template = args
1✔
794
                sub_parameters = {}
1✔
795
            elif (
1✔
796
                isinstance(args, list)
797
                and len(args) == 2
798
                and isinstance(args[0], str)
799
                and isinstance(args[1], dict)
800
            ):
801
                string_template = args[0]
1✔
802
                sub_parameters = args[1]
1✔
803
            else:
804
                raise RuntimeError(
×
805
                    "Invalid arguments shape for Fn::Sub, expected a String "
806
                    f"or a Tuple of String and Map but got '{args}'"
807
                )
808
            sub_string = string_template
1✔
809
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
810
            for template_variable_name in template_variable_names:
1✔
811
                template_variable_value = Nothing
1✔
812

813
                # Try to resolve the variable name as pseudo parameter.
814
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
815
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
816
                        pseudo_parameter_name=template_variable_name
817
                    )
818

819
                # Try to resolve the variable name as an entry to the defined parameters.
820
                elif template_variable_name in sub_parameters:
1✔
821
                    template_variable_value = sub_parameters[template_variable_name]
1✔
822

823
                # Try to resolve the variable name as GetAtt.
824
                elif "." in template_variable_name:
1✔
825
                    try:
1✔
826
                        template_variable_value = self._resolve_attribute(
1✔
827
                            arguments=template_variable_name, select_before=select_before
828
                        )
829
                    except RuntimeError:
1✔
830
                        pass
1✔
831

832
                # Try to resolve the variable name as Ref.
833
                else:
834
                    try:
1✔
835
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
836
                        template_variable_value = (
1✔
837
                            resource_delta.before if select_before else resource_delta.after
838
                        )
839
                        if isinstance(template_variable_value, PreprocResource):
1✔
840
                            template_variable_value = template_variable_value.physical_resource_id
1✔
841
                    except RuntimeError:
×
842
                        pass
×
843

844
                if is_nothing(template_variable_value):
1✔
845
                    raise RuntimeError(
1✔
846
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
847
                    )
848

849
                if not isinstance(template_variable_value, str):
1✔
850
                    template_variable_value = str(template_variable_value)
1✔
851

852
                sub_string = sub_string.replace(
1✔
853
                    f"${{{template_variable_name}}}", template_variable_value
854
                )
855

856
            # FIXME: the following type reduction is ported from v1; however it appears as though such
857
            #        reduction is not performed by the engine, and certainly not at this depth given the
858
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
859
            #        and the resource providers reviewed.
860
            account_id = self._change_set.account_id
1✔
861
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
862
            if sub_string == account_id or is_another_account_id:
1✔
863
                result = sub_string
1✔
864
            elif sub_string.isdigit():
1✔
865
                result = int(sub_string)
1✔
866
            else:
867
                try:
1✔
868
                    result = float(sub_string)
1✔
869
                except ValueError:
1✔
870
                    result = sub_string
1✔
871
            return result
1✔
872

873
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
874
        arguments_before = arguments_delta.before
1✔
875
        arguments_after = arguments_delta.after
1✔
876
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
877
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
878
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
879
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
880
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
881
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
882
        return PreprocEntityDelta(before=before, after=after)
1✔
883

884
    def visit_node_intrinsic_function_fn_join(
1✔
885
        self, node_intrinsic_function: NodeIntrinsicFunction
886
    ) -> PreprocEntityDelta:
887
        # TODO: add support for schema validation.
888
        # TODO: add tests for joining non string values.
889
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
890
            if not (isinstance(args, list) and len(args) == 2):
1✔
891
                return Nothing
1✔
892
            delimiter: str = str(args[0])
1✔
893
            values: list[Any] = args[1]
1✔
894
            if not isinstance(values, list):
1✔
895
                # shortcut if values is the empty string, for example:
896
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
897
                # CDK bootstrap does this
898
                if values == "":
1✔
899
                    return ""
1✔
900
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
901
            str_values: list[str] = []
1✔
902
            for value in values:
1✔
903
                if value is None:
1✔
904
                    continue
1✔
905
                str_value = str(value)
1✔
906
                str_values.append(str_value)
1✔
907
            join_result = delimiter.join(str_values)
1✔
908
            return join_result
1✔
909

910
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
911
        delta = self._cached_apply(
1✔
912
            scope=node_intrinsic_function.scope,
913
            arguments_delta=arguments_delta,
914
            resolver=_compute_fn_join,
915
        )
916
        return delta
1✔
917

918
    def visit_node_intrinsic_function_fn_select(
1✔
919
        self, node_intrinsic_function: NodeIntrinsicFunction
920
    ):
921
        # TODO: add further support for schema validation
922
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
923
            values = args[1]
1✔
924
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
925
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
926
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
927

928
            if not isinstance(values, list) or not values:
1✔
929
                raise ValidationError(
1✔
930
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
931
                )
932
            try:
1✔
933
                index: int = int(args[0])
1✔
934
            except ValueError as e:
1✔
935
                raise ValidationError(
1✔
936
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
937
                ) from e
938

939
            values_len = len(values)
1✔
940
            if index < 0 or index >= values_len:
1✔
941
                raise ValidationError(
1✔
942
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
943
                )
944
            selection = values[index]
1✔
945
            return selection
1✔
946

947
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
948
        delta = self._cached_apply(
1✔
949
            scope=node_intrinsic_function.scope,
950
            arguments_delta=arguments_delta,
951
            resolver=_compute_fn_select,
952
        )
953
        return delta
1✔
954

955
    def visit_node_intrinsic_function_fn_split(
1✔
956
        self, node_intrinsic_function: NodeIntrinsicFunction
957
    ):
958
        # TODO: add further support for schema validation
959
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
960
            delimiter = args[0]
1✔
961
            if not isinstance(delimiter, str) or not delimiter:
1✔
962
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
963
            source_string = args[1]
1✔
964
            if not isinstance(source_string, str):
1✔
965
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
966
            split_string = source_string.split(delimiter)
1✔
967
            return split_string
1✔
968

969
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
970

971
        if not (
1✔
972
            is_nothing(arguments_delta.after)
973
            or isinstance(arguments_delta.after, list)
974
            and len(arguments_delta.after) == 2
975
        ):
976
            raise ValidationError(
×
977
                "Template error: every Fn::Split object requires two parameters, "
978
                "(1) a string delimiter and (2) a string to be split or a function that returns a string to be split."
979
            )
980

981
        delta = self._cached_apply(
1✔
982
            scope=node_intrinsic_function.scope,
983
            arguments_delta=arguments_delta,
984
            resolver=_compute_fn_split,
985
        )
986
        return delta
1✔
987

988
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
989
        self, node_intrinsic_function: NodeIntrinsicFunction
990
    ) -> PreprocEntityDelta:
991
        # TODO: add further support for schema validation
992

993
        def _compute_fn_get_a_zs(region) -> Any:
1✔
994
            if not isinstance(region, str):
1✔
995
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
996

997
            if not region:
1✔
998
                region = self._change_set.region_name
1✔
999

1000
            account_id = self._change_set.account_id
1✔
1001
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
1002
            try:
1✔
1003
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
1004
                    ec2_client.describe_availability_zones()
1005
                )
1006
            except ClientError:
×
1007
                raise RuntimeError(
×
1008
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
1009
                )
1010
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
1011
                "AvailabilityZones"
1012
            ]
1013
            azs = [az["ZoneName"] for az in availability_zones]
1✔
1014
            return azs
1✔
1015

1016
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1017
        delta = self._cached_apply(
1✔
1018
            scope=node_intrinsic_function.scope,
1019
            arguments_delta=arguments_delta,
1020
            resolver=_compute_fn_get_a_zs,
1021
        )
1022
        return delta
1✔
1023

1024
    def visit_node_intrinsic_function_fn_base64(
1✔
1025
        self, node_intrinsic_function: NodeIntrinsicFunction
1026
    ) -> PreprocEntityDelta:
1027
        # TODO: add further support for schema validation
1028
        def _compute_fn_base_64(string) -> Any:
1✔
1029
            if not isinstance(string, str):
1✔
1030
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
1031
            # Ported from v1:
1032
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
1033
            return base64_string
1✔
1034

1035
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1036
        delta = self._cached_apply(
1✔
1037
            scope=node_intrinsic_function.scope,
1038
            arguments_delta=arguments_delta,
1039
            resolver=_compute_fn_base_64,
1040
        )
1041
        return delta
1✔
1042

1043
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
1044
        self, node_intrinsic_function: NodeIntrinsicFunction
1045
    ) -> PreprocEntityDelta:
1046
        # TODO: add type checking/validation for result unit?
1047
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1048
        before_arguments = arguments_delta.before
1✔
1049
        after_arguments = arguments_delta.after
1✔
1050
        before = Nothing
1✔
1051
        if before_arguments:
1✔
1052
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
1053
            before = before_value_delta.before
1✔
1054
        after = Nothing
1✔
1055
        if after_arguments:
1✔
1056
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
1057
            after = after_value_delta.after
1✔
1058
        return PreprocEntityDelta(before=before, after=after)
1✔
1059

1060
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
1061
        bindings_delta = self.visit(node_mapping.bindings)
1✔
1062
        return bindings_delta
1✔
1063

1064
    def visit_node_parameters(
1✔
1065
        self, node_parameters: NodeParameters
1066
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
1067
        before_parameters = {}
1✔
1068
        after_parameters = {}
1✔
1069
        for parameter in node_parameters.parameters:
1✔
1070
            parameter_delta = self.visit(parameter)
1✔
1071
            parameter_before = parameter_delta.before
1✔
1072
            if not is_nothing(parameter_before):
1✔
1073
                before_parameters[parameter.name] = parameter_before
1✔
1074
            parameter_after = parameter_delta.after
1✔
1075
            if not is_nothing(parameter_after):
1✔
1076
                after_parameters[parameter.name] = parameter_after
1✔
1077
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
1078

1079
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
1080
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
1081
            raise ValidationError(
1✔
1082
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1083
            )
1084
        dynamic_value = node_parameter.dynamic_value
1✔
1085
        dynamic_delta = self.visit(dynamic_value)
1✔
1086

1087
        default_value = node_parameter.default_value
1✔
1088
        default_delta = self.visit(default_value)
1✔
1089

1090
        before = dynamic_delta.before or default_delta.before
1✔
1091
        after = dynamic_delta.after or default_delta.after
1✔
1092

1093
        parameter_type = self.visit(node_parameter.type_)
1✔
1094

1095
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1096
            match type_:
1✔
1097
                case s if re.match(r"List<[^>]+>", s):
1✔
1098
                    return [item.strip() for item in value.split(",")]
1✔
1099
                case "CommaDelimitedList":
1✔
1100
                    return [item.strip() for item in value.split(",")]
1✔
1101
                case "Number":
1✔
1102
                    # TODO: validate the parameter type at template parse time (or whatever is in parity with AWS) so we know this cannot fail
1103
                    return to_number(value)
1✔
1104
            return value
1✔
1105

1106
        if not is_nothing(after):
1✔
1107
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1108

1109
        return PreprocEntityDelta(before=before, after=after)
1✔
1110

1111
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1112
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1113
        return array_identifiers_delta
1✔
1114

1115
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1116
        delta = self.visit(node_condition.body)
1✔
1117
        return delta
1✔
1118

1119
    def _resource_physical_resource_id_from(
1✔
1120
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1121
    ) -> str | None:
1122
        # TODO: typing around resolved resources is needed and should be reflected here.
1123
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1124
        if resolved_resource.get("ResourceStatus") not in {
1✔
1125
            ResourceStatus.CREATE_COMPLETE,
1126
            ResourceStatus.UPDATE_COMPLETE,
1127
        }:
1128
            return None
1✔
1129

1130
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1131
        if not isinstance(physical_resource_id, str):
1✔
1132
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1133
        return physical_resource_id
1✔
1134

1135
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1136
        # TODO: typing around resolved resources is needed and should be reflected here.
1137
        return self._resource_physical_resource_id_from(
1✔
1138
            logical_resource_id=resource_logical_id,
1139
            resolved_resources=self._before_resolved_resources,
1140
        )
1141

1142
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1143
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1144

1145
    def visit_node_intrinsic_function_ref(
1✔
1146
        self, node_intrinsic_function: NodeIntrinsicFunction
1147
    ) -> PreprocEntityDelta:
1148
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1149
            if logical_id == "AWS::NoValue":
1✔
1150
                return Nothing
1✔
1151

1152
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1153
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1154
                reference_delta.before = before.physical_resource_id
1✔
1155
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1156
                reference_delta.after = after.physical_resource_id
1✔
1157
            return reference_delta
1✔
1158

1159
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1160
        delta = self._cached_apply(
1✔
1161
            scope=node_intrinsic_function.scope,
1162
            arguments_delta=arguments_delta,
1163
            resolver=_compute_fn_ref,
1164
        )
1165
        return delta
1✔
1166

1167
    def visit_node_intrinsic_function_condition(
1✔
1168
        self, node_intrinsic_function: NodeIntrinsicFunction
1169
    ) -> PreprocEntityDelta:
1170
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1171

1172
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1173
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1174
            if is_nothing(node_condition):
1✔
1175
                raise RuntimeError(f"Undefined condition '{name}'")
×
1176
            condition_delta = self.visit(node_condition)
1✔
1177
            return condition_delta
1✔
1178

1179
        delta = self._cached_apply(
1✔
1180
            resolver=_delta_of_condition,
1181
            scope=node_intrinsic_function.scope,
1182
            arguments_delta=arguments_delta,
1183
        )
1184
        return delta
1✔
1185

1186
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1187
        node_change_type = node_array.change_type
1✔
1188
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1189
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1190
        for change_set_entity in node_array.array:
1✔
1191
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1192
            delta_before = delta.before
1✔
1193
            delta_after = delta.after
1✔
1194
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1195
                before.append(delta_before)
1✔
1196
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1197
                after.append(delta_after)
1✔
1198
        return PreprocEntityDelta(before=before, after=after)
1✔
1199

1200
    def visit_node_properties(
1✔
1201
        self, node_properties: NodeProperties
1202
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1203
        node_change_type = node_properties.change_type
1✔
1204
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1205
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1206
        for node_property in node_properties.properties:
1✔
1207
            property_name = node_property.name
1✔
1208
            delta = self.visit(node_property)
1✔
1209
            delta_before = delta.before
1✔
1210
            delta_after = delta.after
1✔
1211
            if (
1✔
1212
                not is_nothing(before_bindings)
1213
                and not is_nothing(delta_before)
1214
                and delta_before is not None
1215
            ):
1216
                before_bindings[property_name] = delta_before
1✔
1217
            if (
1✔
1218
                not is_nothing(after_bindings)
1219
                and not is_nothing(delta_after)
1220
                and delta_after is not None
1221
            ):
1222
                after_bindings[property_name] = delta_after
1✔
1223
        before = Nothing
1✔
1224
        if not is_nothing(before_bindings):
1✔
1225
            before = PreprocProperties(properties=before_bindings)
1✔
1226
        after = Nothing
1✔
1227
        if not is_nothing(after_bindings):
1✔
1228
            after = PreprocProperties(properties=after_bindings)
1✔
1229
        return PreprocEntityDelta(before=before, after=after)
1✔
1230

1231
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1232
        reference_delta = self.visit(reference)
1✔
1233
        before_reference = reference_delta.before
1✔
1234
        before = Nothing
1✔
1235
        if isinstance(before_reference, str):
1✔
1236
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1237
            before = before_delta.before
1✔
1238
        after = Nothing
1✔
1239
        after_reference = reference_delta.after
1✔
1240
        if isinstance(after_reference, str):
1✔
1241
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1242
            after = after_delta.after
1✔
1243
        return PreprocEntityDelta(before=before, after=after)
1✔
1244

1245
    def visit_node_resources(self, node_resources: NodeResources):
1✔
1246
        """
1247
        Skip resources where they conditionally evaluate to False
1248
        """
1249
        for node_resource in node_resources.resources:
1✔
1250
            if not is_nothing(node_resource.condition_reference):
1✔
1251
                condition_delta = self._resolve_resource_condition_reference(
1✔
1252
                    node_resource.condition_reference
1253
                )
1254
                condition_after = condition_delta.after
1✔
1255
                if condition_after is False:
1✔
1256
                    continue
1✔
1257
            self.visit(node_resource)
1✔
1258

1259
    def visit_node_resource(
1✔
1260
        self, node_resource: NodeResource
1261
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1262
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1263
            raise ValidationError(
1✔
1264
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1265
            )
1266
        change_type = node_resource.change_type
1✔
1267
        condition_before = Nothing
1✔
1268
        condition_after = Nothing
1✔
1269
        if not is_nothing(node_resource.condition_reference):
1✔
1270
            condition_delta = self._resolve_resource_condition_reference(
1✔
1271
                node_resource.condition_reference
1272
            )
1273
            condition_before = condition_delta.before
1✔
1274
            condition_after = condition_delta.after
1✔
1275

1276
        depends_on_before = Nothing
1✔
1277
        depends_on_after = Nothing
1✔
1278
        if not is_nothing(node_resource.depends_on):
1✔
1279
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1280
            depends_on_before = depends_on_delta.before
1✔
1281
            depends_on_after = depends_on_delta.after
1✔
1282

1283
        type_delta = self.visit(node_resource.type_)
1✔
1284

1285
        # Check conditions before visiting properties to avoid resolving references
1286
        # (e.g. GetAtt) to conditional resources that were never created.
1287
        should_process_before = change_type != ChangeType.CREATED and (
1✔
1288
            is_nothing(condition_before) or condition_before
1289
        )
1290
        should_process_after = change_type != ChangeType.REMOVED and (
1✔
1291
            is_nothing(condition_after) or condition_after
1292
        )
1293

1294
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties]
1295
        if should_process_before or should_process_after:
1✔
1296
            properties_delta = self.visit(node_resource.properties)
1✔
1297
        else:
1298
            properties_delta = PreprocEntityDelta(before=Nothing, after=Nothing)
1✔
1299

1300
        deletion_policy_before = Nothing
1✔
1301
        deletion_policy_after = Nothing
1✔
1302
        if not is_nothing(node_resource.deletion_policy):
1✔
1303
            deletion_policy_delta = self.visit(node_resource.deletion_policy)
1✔
1304
            deletion_policy_before = deletion_policy_delta.before
1✔
1305
            deletion_policy_after = deletion_policy_delta.after
1✔
1306

1307
        update_replace_policy_before = Nothing
1✔
1308
        update_replace_policy_after = Nothing
1✔
1309
        if not is_nothing(node_resource.update_replace_policy):
1✔
1310
            update_replace_policy_delta = self.visit(node_resource.update_replace_policy)
1✔
1311
            update_replace_policy_before = update_replace_policy_delta.before
1✔
1312
            update_replace_policy_after = update_replace_policy_delta.after
1✔
1313

1314
        before = Nothing
1✔
1315
        after = Nothing
1✔
1316
        if should_process_before:
1✔
1317
            logical_resource_id = node_resource.name
1✔
1318
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1319
                resource_logical_id=logical_resource_id
1320
            )
1321
            before = PreprocResource(
1✔
1322
                logical_id=logical_resource_id,
1323
                physical_resource_id=before_physical_resource_id,
1324
                condition=condition_before,
1325
                resource_type=type_delta.before,
1326
                properties=properties_delta.before,
1327
                depends_on=depends_on_before,
1328
                requires_replacement=False,
1329
                deletion_policy=deletion_policy_before,
1330
                update_replace_policy=update_replace_policy_before,
1331
            )
1332
        if should_process_after:
1✔
1333
            logical_resource_id = node_resource.name
1✔
1334
            try:
1✔
1335
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1336
                    resource_logical_id=logical_resource_id
1337
                )
UNCOV
1338
            except RuntimeError:
×
UNCOV
1339
                after_physical_resource_id = None
×
1340
            after = PreprocResource(
1✔
1341
                logical_id=logical_resource_id,
1342
                physical_resource_id=after_physical_resource_id,
1343
                condition=condition_after,
1344
                resource_type=type_delta.after,
1345
                properties=properties_delta.after,
1346
                depends_on=depends_on_after,
1347
                requires_replacement=node_resource.requires_replacement,
1348
                deletion_policy=deletion_policy_after,
1349
                update_replace_policy=update_replace_policy_after,
1350
            )
1351
        return PreprocEntityDelta(before=before, after=after)
1✔
1352

1353
    def visit_node_output(
1✔
1354
        self, node_output: NodeOutput
1355
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1356
        change_type = node_output.change_type
1✔
1357
        value_delta = self.visit(node_output.value)
1✔
1358

1359
        condition_delta = Nothing
1✔
1360
        if not is_nothing(node_output.condition_reference):
1✔
1361
            condition_delta = self._resolve_resource_condition_reference(
1✔
1362
                node_output.condition_reference
1363
            )
1364
            condition_before = condition_delta.before
1✔
1365
            condition_after = condition_delta.after
1✔
1366
            if not condition_before and condition_after:
1✔
1367
                change_type = ChangeType.CREATED
1✔
1368
            elif condition_before and not condition_after:
1✔
1369
                change_type = ChangeType.REMOVED
1✔
1370

1371
        export_delta = Nothing
1✔
1372
        if not is_nothing(node_output.export):
1✔
1373
            export_delta = self.visit(node_output.export)
1✔
1374

1375
        before: Maybe[PreprocOutput] = Nothing
1✔
1376
        if change_type != ChangeType.CREATED:
1✔
1377
            before = PreprocOutput(
1✔
1378
                name=node_output.name,
1379
                value=value_delta.before,
1380
                export=export_delta.before if export_delta else None,
1381
                condition=condition_delta.before if condition_delta else None,
1382
            )
1383
        after: Maybe[PreprocOutput] = Nothing
1✔
1384
        if change_type != ChangeType.REMOVED:
1✔
1385
            after = PreprocOutput(
1✔
1386
                name=node_output.name,
1387
                value=value_delta.after,
1388
                export=export_delta.after if export_delta else None,
1389
                condition=condition_delta.after if condition_delta else None,
1390
            )
1391
        return PreprocEntityDelta(before=before, after=after)
1✔
1392

1393
    def visit_node_outputs(
1✔
1394
        self, node_outputs: NodeOutputs
1395
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1396
        before: list[PreprocOutput] = []
1✔
1397
        after: list[PreprocOutput] = []
1✔
1398
        for node_output in node_outputs.outputs:
1✔
1399
            if not is_nothing(node_output.condition_reference):
1✔
1400
                condition_delta = self._resolve_resource_condition_reference(
1✔
1401
                    node_output.condition_reference
1402
                )
1403
                condition_after = condition_delta.after
1✔
1404
                if condition_after is False:
1✔
1405
                    continue
1✔
1406

1407
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1408
            output_before = output_delta.before
1✔
1409
            output_after = output_delta.after
1✔
1410
            if not is_nothing(output_before):
1✔
1411
                before.append(output_before)
1✔
1412
            if not is_nothing(output_after):
1✔
1413
                after.append(output_after)
1✔
1414
        return PreprocEntityDelta(before=before, after=after)
1✔
1415

1416
    def visit_node_intrinsic_function_fn_import_value(
1✔
1417
        self, node_intrinsic_function: NodeIntrinsicFunction
1418
    ) -> PreprocEntityDelta:
1419
        def _compute_fn_import_value(string) -> str:
1✔
1420
            if not isinstance(string, str):
1✔
UNCOV
1421
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1422

1423
            exports = exports_map(
1✔
1424
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1425
            )
1426

1427
            return exports.get(string, {}).get("Value") or Nothing
1✔
1428

1429
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1430
        delta = self._cached_apply(
1✔
1431
            scope=node_intrinsic_function.scope,
1432
            arguments_delta=arguments_delta,
1433
            resolver=_compute_fn_import_value,
1434
        )
1435
        return delta
1✔
1436

1437
    def visit_node_intrinsic_function_fn_transform(
1✔
1438
        self, node_intrinsic_function: NodeIntrinsicFunction
1439
    ):
UNCOV
1440
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc