• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20085422227

09 Dec 2025 10:17PM UTC coverage: 86.887% (+0.01%) from 86.875%
20085422227

push

github

web-flow
ecs/add service principal (#13474)

2 of 2 new or added lines in 1 file covered. (100.0%)

224 existing lines in 8 files now uncovered.

69922 of 80475 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.64
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeResources,
34
    NodeTemplate,
35
    Nothing,
36
    NothingType,
37
    Scope,
38
    TerminalValue,
39
    TerminalValueCreated,
40
    TerminalValueModified,
41
    TerminalValueRemoved,
42
    TerminalValueUnchanged,
43
    is_nothing,
44
)
45
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
46
    ChangeSetModelVisitor,
47
)
48
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
49
    REGEX_DYNAMIC_REF,
50
    extract_dynamic_reference,
51
    perform_dynamic_reference_lookup,
52
)
53
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
54
from localstack.services.cloudformation.stores import (
1✔
55
    exports_map,
56
)
57
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
58
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
59
from localstack.utils.aws.arns import get_partition
1✔
60
from localstack.utils.numbers import to_number
1✔
61
from localstack.utils.objects import get_value_from_path
1✔
62
from localstack.utils.run import to_str
1✔
63
from localstack.utils.strings import to_bytes
1✔
64
from localstack.utils.urls import localstack_host
1✔
65

66
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
67

68
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
69
    "AWS::Partition",
70
    "AWS::AccountId",
71
    "AWS::Region",
72
    "AWS::StackName",
73
    "AWS::StackId",
74
    "AWS::URLSuffix",
75
    "AWS::NoValue",
76
    "AWS::NotificationARNs",
77
}
78

79
TBefore = TypeVar("TBefore")
1✔
80
TAfter = TypeVar("TAfter")
1✔
81
_T = TypeVar("_T")
1✔
82

83
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
84
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
85
)
86
MOCKED_REFERENCE = "unknown"
1✔
87

88
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
89

90

91
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
92
    before: Maybe[TBefore]
1✔
93
    after: Maybe[TAfter]
1✔
94

95
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
96
        self.before = before
1✔
97
        self.after = after
1✔
98

99
    def __eq__(self, other):
1✔
100
        if not isinstance(other, PreprocEntityDelta):
×
101
            return False
×
102
        return self.before == other.before and self.after == other.after
×
103

104

105
class PreprocProperties:
1✔
106
    properties: dict[str, Any]
1✔
107

108
    def __init__(self, properties: dict[str, Any]):
1✔
109
        self.properties = properties
1✔
110

111
    def __eq__(self, other):
1✔
112
        if not isinstance(other, PreprocProperties):
1✔
113
            return False
1✔
114
        return self.properties == other.properties
1✔
115

116

117
class PreprocResource:
1✔
118
    logical_id: str
1✔
119
    physical_resource_id: str | None
1✔
120
    condition: bool | None
1✔
121
    resource_type: str
1✔
122
    properties: PreprocProperties
1✔
123
    depends_on: list[str] | None
1✔
124
    requires_replacement: bool
1✔
125
    status: ResourceStatus | None
1✔
126

127
    def __init__(
1✔
128
        self,
129
        logical_id: str,
130
        physical_resource_id: str,
131
        condition: bool | None,
132
        resource_type: str,
133
        properties: PreprocProperties,
134
        depends_on: list[str] | None,
135
        requires_replacement: bool,
136
        status: ResourceStatus | None = None,
137
    ):
138
        self.logical_id = logical_id
1✔
139
        self.physical_resource_id = physical_resource_id
1✔
140
        self.condition = condition
1✔
141
        self.resource_type = resource_type
1✔
142
        self.properties = properties
1✔
143
        self.depends_on = depends_on
1✔
144
        self.requires_replacement = requires_replacement
1✔
145
        self.status = status
1✔
146

147
    @staticmethod
1✔
148
    def _compare_conditions(c1: bool, c2: bool):
1✔
149
        # The lack of condition equates to a true condition.
150
        c1 = c1 if isinstance(c1, bool) else True
1✔
151
        c2 = c2 if isinstance(c2, bool) else True
1✔
152
        return c1 == c2
1✔
153

154
    def __eq__(self, other):
1✔
155
        if not isinstance(other, PreprocResource):
1✔
156
            return False
1✔
157
        return all(
1✔
158
            [
159
                self.logical_id == other.logical_id,
160
                self._compare_conditions(self.condition, other.condition),
161
                self.resource_type == other.resource_type,
162
                self.properties == other.properties,
163
            ]
164
        )
165

166

167
class PreprocOutput:
1✔
168
    name: str
1✔
169
    value: Any
1✔
170
    export: Any | None
1✔
171
    condition: bool | None
1✔
172

173
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
174
        self.name = name
1✔
175
        self.value = value
1✔
176
        self.export = export
1✔
177
        self.condition = condition
1✔
178

179
    def __eq__(self, other):
1✔
180
        if not isinstance(other, PreprocOutput):
×
181
            return False
×
182
        return all(
×
183
            [
184
                self.name == other.name,
185
                self.value == other.value,
186
                self.export == other.export,
187
                self.condition == other.condition,
188
            ]
189
        )
190

191

192
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
193
    _change_set: Final[ChangeSet]
1✔
194
    _before_resolved_resources: Final[dict]
1✔
195
    _before_cache: Final[dict[Scope, Any]]
1✔
196
    _after_cache: Final[dict[Scope, Any]]
1✔
197

198
    def __init__(self, change_set: ChangeSet):
1✔
199
        self._change_set = change_set
1✔
200
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
201
        self._before_cache = {}
1✔
202
        self._after_cache = {}
1✔
203

204
    def _setup_runtime_cache(self) -> None:
1✔
205
        runtime_cache_key = self.__class__.__name__
1✔
206

207
        self._before_cache.clear()
1✔
208
        self._after_cache.clear()
1✔
209

210
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
211
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
212
            self._before_cache.update(cache)
1✔
213

214
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
215
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
216
            self._after_cache.update(cache)
×
217

218
    def _save_runtime_cache(self) -> None:
1✔
219
        runtime_cache_key = self.__class__.__name__
1✔
220

221
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
222
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
223

224
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
225
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
226

227
    def process(self) -> None:
1✔
228
        self._setup_runtime_cache()
1✔
229
        node_template = self._change_set.update_model.node_template
1✔
230
        node_conditions = self._change_set.update_model.node_template.conditions
1✔
231
        self.visit(node_conditions)
1✔
232
        self.visit(node_template)
1✔
233
        self._save_runtime_cache()
1✔
234

235
    def _get_node_resource_for(
1✔
236
        self, resource_name: str, node_template: NodeTemplate
237
    ) -> NodeResource:
238
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
239
        for node_resource in node_template.resources.resources:
1✔
240
            if node_resource.name == resource_name:
1✔
241
                self.visit(node_resource)
1✔
242
                return node_resource
1✔
243
        raise ValidationError(
1✔
244
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
245
        )
246

247
    def _get_node_property_for(
1✔
248
        self, property_name: str, node_resource: NodeResource
249
    ) -> NodeProperty | None:
250
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
251
        for node_property in node_resource.properties.properties:
1✔
252
            if node_property.name == property_name:
1✔
253
                self.visit(node_property)
1✔
254
                return node_property
1✔
255
        return None
1✔
256

257
    def _deployed_property_value_of(
1✔
258
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
259
    ) -> Any:
260
        # We have to override this function to make sure it does not try to access the
261
        # resolved resource
262

263
        # Before we can obtain deployed value for a resource, we need to first ensure to
264
        # process the resource if this wasn't processed already. Ideally, values should only
265
        # be accessible through delta objects, to ensure computation is always complete at
266
        # every level.
267
        _ = self._get_node_resource_for(
1✔
268
            resource_name=resource_logical_id,
269
            node_template=self._change_set.update_model.node_template,
270
        )
271
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
272
        if resolved_resource is None:
1✔
273
            raise RuntimeError(
1✔
274
                f"No deployed instances of resource '{resource_logical_id}' were found"
275
            )
276
        properties = resolved_resource.get("Properties", {})
1✔
277
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
278
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
279

280
        if property_value:
1✔
281
            if not isinstance(property_value, (str, list, dict)):
1✔
282
                # Str: Standard expected type. TODO validate bools and numbers
283
                # List: Multiple resource types can return a list of values e.g. AWS::EC2::VPC.
284
                # Dict: Custom resources in CloudFormation can return arbitrary data structures.
285
                raise RuntimeError(
×
286
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
287
                )
288
            return property_value
1✔
289
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
290
            return MOCKED_REFERENCE
1✔
291

292
        return property_value
×
293

294
    def _before_deployed_property_value_of(
1✔
295
        self, resource_logical_id: str, property_name: str
296
    ) -> Any:
297
        return self._deployed_property_value_of(
1✔
298
            resource_logical_id=resource_logical_id,
299
            property_name=property_name,
300
            resolved_resources=self._before_resolved_resources,
301
        )
302

303
    def _after_deployed_property_value_of(
1✔
304
        self, resource_logical_id: str, property_name: str
305
    ) -> str | None:
306
        return self._before_deployed_property_value_of(
1✔
307
            resource_logical_id=resource_logical_id, property_name=property_name
308
        )
309

310
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
311
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
312
        # TODO: another scenarios suggesting property lookups might be preferable.
313
        for mapping in mappings:
1✔
314
            if mapping.name == map_name:
1✔
315
                self.visit(mapping)
1✔
316
                return mapping
1✔
317
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
318

319
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
320
        parameters: list[NodeParameter] = (
1✔
321
            self._change_set.update_model.node_template.parameters.parameters
322
        )
323
        # TODO: another scenarios suggesting property lookups might be preferable.
324
        for parameter in parameters:
1✔
325
            if parameter.name == parameter_name:
1✔
326
                self.visit(parameter)
1✔
327
                return parameter
1✔
328
        return Nothing
1✔
329

330
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
331
        conditions: list[NodeCondition] = (
1✔
332
            self._change_set.update_model.node_template.conditions.conditions
333
        )
334
        # TODO: another scenarios suggesting property lookups might be preferable.
335
        for condition in conditions:
1✔
336
            if condition.name == condition_name:
1✔
337
                self.visit(condition)
1✔
338
                return condition
1✔
339
        return Nothing
1✔
340

341
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
342
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
343
        if isinstance(node_condition, NodeCondition):
1✔
344
            condition_delta = self.visit(node_condition)
1✔
345
            return condition_delta
1✔
346
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
347

348
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
349
        match pseudo_parameter_name:
1✔
350
            case "AWS::Partition":
1✔
351
                return get_partition(self._change_set.region_name)
1✔
352
            case "AWS::AccountId":
1✔
353
                return self._change_set.stack.account_id
1✔
354
            case "AWS::Region":
1✔
355
                return self._change_set.stack.region_name
1✔
356
            case "AWS::StackName":
1✔
357
                return self._change_set.stack.stack_name
1✔
358
            case "AWS::StackId":
1✔
359
                return self._change_set.stack.stack_id
1✔
360
            case "AWS::URLSuffix":
1✔
361
                return _AWS_URL_SUFFIX
1✔
362
            case "AWS::NoValue":
×
363
                return None
×
364
            case _:
×
365
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
366

367
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
368
        if logical_id in _PSEUDO_PARAMETERS:
1✔
369
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
370
                pseudo_parameter_name=logical_id
371
            )
372
            # Pseudo parameters are constants within the lifecycle of a template.
373
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
374

375
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
376
        if isinstance(node_parameter, NodeParameter):
1✔
377
            parameter_delta = self.visit(node_parameter)
1✔
378
            return parameter_delta
1✔
379

380
        node_resource = self._get_node_resource_for(
1✔
381
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
382
        )
383
        resource_delta = self.visit(node_resource)
1✔
384
        before = resource_delta.before
1✔
385
        after = resource_delta.after
1✔
386
        return PreprocEntityDelta(before=before, after=after)
1✔
387

388
    def _resolve_mapping(
1✔
389
        self, map_name: str, top_level_key: str, second_level_key
390
    ) -> PreprocEntityDelta:
391
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
392
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
393
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
394
        if not isinstance(top_level_value, NodeObject):
1✔
395
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
396
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
397
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
398
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
399
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
400
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
401
        mapping_value_delta = self.visit(second_level_value)
1✔
402
        return mapping_value_delta
1✔
403

404
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
405
        entity_scope = change_set_entity.scope
1✔
406
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
407
            before = self._before_cache[entity_scope]
1✔
408
            after = self._after_cache[entity_scope]
1✔
409
            return PreprocEntityDelta(before=before, after=after)
1✔
410
        delta = super().visit(change_set_entity=change_set_entity)
1✔
411
        if isinstance(delta, PreprocEntityDelta):
1✔
412
            delta = self._maybe_perform_replacements(delta)
1✔
413
            self._before_cache[entity_scope] = delta.before
1✔
414
            self._after_cache[entity_scope] = delta.after
1✔
415
        return delta
1✔
416

417
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
418
        delta = self._maybe_perform_static_replacements(delta)
1✔
419
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
420
        return delta
1✔
421

422
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
423
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
424

425
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
426
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
427

428
    def _maybe_perform_on_delta(
1✔
429
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
430
    ) -> PreprocEntityDelta | None:
431
        if isinstance(delta.before, str):
1✔
432
            delta.before = f(delta.before)
1✔
433
        if isinstance(delta.after, str):
1✔
434
            delta.after = f(delta.after)
1✔
435
        return delta
1✔
436

437
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
438
        if not isinstance(value, str):
1✔
439
            return value
×
440

441
        if dynamic_ref := extract_dynamic_reference(value):
1✔
442
            new_value = perform_dynamic_reference_lookup(
1✔
443
                reference=dynamic_ref,
444
                account_id=self._change_set.account_id,
445
                region_name=self._change_set.region_name,
446
            )
447
            if new_value:
1✔
448
                # We need to use a function here, to avoid backslash processing by regex.
449
                # From the regex sub documentation:
450
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
451
                # Using a function, we can avoid this processing.
452
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
453

454
        return value
1✔
455

456
    @staticmethod
1✔
457
    def _perform_static_replacements(value: str) -> str:
1✔
458
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
459
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
460
            prefix = api_match[1]
1✔
461
            host = api_match[2]
1✔
462
            path = api_match[3]
1✔
463
            port = localstack_host().port
1✔
464
            value = f"{prefix}{host}:{port}/{path}"
1✔
465
            return value
1✔
466

467
        return value
1✔
468

469
    def _cached_apply(
1✔
470
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
471
    ) -> PreprocEntityDelta:
472
        """
473
        Applies the resolver function to the given input delta if and only if the required
474
        values are not already present in the runtime caches. This function handles both
475
        the 'before' and 'after' components of the delta independently.
476

477
        The resolver function receives either the 'before' or 'after' value from the input
478
        delta and returns a resolved value. If the result returned by the resolver is
479
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
480
        component from it:  the 'before' value if the input was 'before', and the 'after'
481
        value if the input was 'after'.
482

483
        This function only reads from the cache and does not update it. It is the caller's
484
        responsibility to handle caching, either manually or via the upstream visit method
485
        of this class.
486

487
        Args:
488
            scope (Scope): The current scope used as a key for cache lookup.
489
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
490
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
491

492
        Returns:
493
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
494
        """
495

496
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
497
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
498
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
499

500
        arguments_before = arguments_delta.before
1✔
501
        arguments_after = arguments_delta.after
1✔
502

503
        before = self._before_cache.get(scope, Nothing)
1✔
504
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
505
            before = resolver(arguments_before)
1✔
506
            if isinstance(before, PreprocEntityDelta):
1✔
507
                before = before.before
1✔
508

509
        after = self._after_cache.get(scope, Nothing)
1✔
510
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
511
            after = resolver(arguments_after)
1✔
512
            if isinstance(after, PreprocEntityDelta):
1✔
513
                after = after.after
1✔
514

515
        return PreprocEntityDelta(before=before, after=after)
1✔
516

517
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
518
        return self.visit(node_property.value)
1✔
519

520
    def visit_terminal_value_modified(
1✔
521
        self, terminal_value_modified: TerminalValueModified
522
    ) -> PreprocEntityDelta:
523
        return PreprocEntityDelta(
1✔
524
            before=terminal_value_modified.value,
525
            after=terminal_value_modified.modified_value,
526
        )
527

528
    def visit_terminal_value_created(
1✔
529
        self, terminal_value_created: TerminalValueCreated
530
    ) -> PreprocEntityDelta:
531
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
532

533
    def visit_terminal_value_removed(
1✔
534
        self, terminal_value_removed: TerminalValueRemoved
535
    ) -> PreprocEntityDelta:
536
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
537

538
    def visit_terminal_value_unchanged(
1✔
539
        self, terminal_value_unchanged: TerminalValueUnchanged
540
    ) -> PreprocEntityDelta:
541
        return PreprocEntityDelta(
1✔
542
            before=terminal_value_unchanged.value,
543
            after=terminal_value_unchanged.value,
544
        )
545

546
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
547
        before_delta = self.visit(node_divergence.value)
1✔
548
        after_delta = self.visit(node_divergence.divergence)
1✔
549
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
550

551
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
552
        node_change_type = node_object.change_type
1✔
553
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
554
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
555
        for name, change_set_entity in node_object.bindings.items():
1✔
556
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
557
            delta_before = delta.before
1✔
558
            delta_after = delta.after
1✔
559
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
560
                before[name] = delta_before
1✔
561
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
562
                after[name] = delta_after
1✔
563
        return PreprocEntityDelta(before=before, after=after)
1✔
564

565
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
566
        # TODO: add arguments validation.
567
        arguments_list: list[str]
568
        if isinstance(arguments, str):
1✔
569
            arguments_list = arguments.split(".")
1✔
570
        else:
571
            arguments_list = arguments
1✔
572

573
        if len(arguments_list) < 2:
1✔
574
            raise ValidationError(
1✔
575
                "Template error: every Fn::GetAtt object requires two non-empty parameters, the resource name and the resource attribute"
576
            )
577

578
        logical_name_of_resource = arguments_list[0]
1✔
579
        attribute_name = ".".join(arguments_list[1:])
1✔
580

581
        node_resource = self._get_node_resource_for(
1✔
582
            resource_name=logical_name_of_resource,
583
            node_template=self._change_set.update_model.node_template,
584
        )
585

586
        if not is_nothing(node_resource.condition_reference):
1✔
587
            condition = self._get_node_condition_if_exists(node_resource.condition_reference.value)
1✔
588
            evaluation_result = self._resolve_condition(condition.name)
1✔
589

590
            if select_before and not evaluation_result.before:
1✔
591
                raise ValidationError(
1✔
592
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
593
                )
594

595
            if not select_before and not evaluation_result.after:
1✔
596
                raise ValidationError(
×
597
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
598
                )
599

600
        # Custom Resources can mutate their definition
601
        # So the preproc should search first in the resource values and then check the template
602
        if select_before:
1✔
603
            value = self._before_deployed_property_value_of(
1✔
604
                resource_logical_id=logical_name_of_resource,
605
                property_name=attribute_name,
606
            )
607
        else:
608
            value = self._after_deployed_property_value_of(
1✔
609
                resource_logical_id=logical_name_of_resource,
610
                property_name=attribute_name,
611
            )
612
        if value is not None:
1✔
613
            return value
1✔
614

615
        node_property: NodeProperty | None = self._get_node_property_for(
×
616
            property_name=attribute_name, node_resource=node_resource
617
        )
618
        if node_property is not None:
×
619
            # The property is statically defined in the template and its value can be computed.
620
            property_delta = self.visit(node_property)
×
621
            value = property_delta.before if select_before else property_delta.after
×
622

623
        return value
×
624

625
    def visit_node_intrinsic_function_fn_get_att(
1✔
626
        self, node_intrinsic_function: NodeIntrinsicFunction
627
    ) -> PreprocEntityDelta:
628
        # TODO: validate the return value according to the spec.
629
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
630
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
631
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
632

633
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
634
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
635
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
636

637
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
638
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
639
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
640

641
        return PreprocEntityDelta(before=before, after=after)
1✔
642

643
    def visit_node_intrinsic_function_fn_equals(
1✔
644
        self, node_intrinsic_function: NodeIntrinsicFunction
645
    ) -> PreprocEntityDelta:
646
        # TODO: add argument shape validation.
647
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
648
            return args[0] == args[1]
1✔
649

650
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
651

652
        if isinstance(arguments_delta.after, list) and len(arguments_delta.after) != 2:
1✔
653
            raise ValidationError(
×
654
                "Template error: every Fn::Equals object requires a list of 2 string parameters."
655
            )
656

657
        delta = self._cached_apply(
1✔
658
            scope=node_intrinsic_function.scope,
659
            arguments_delta=arguments_delta,
660
            resolver=_compute_fn_equals,
661
        )
662
        return delta
1✔
663

664
    def visit_node_intrinsic_function_fn_if(
1✔
665
        self, node_intrinsic_function: NodeIntrinsicFunction
666
    ) -> PreprocEntityDelta:
667
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
668
        # False branch. If the condition is False, we don't evaluate the True branch.
669
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
670
            raise ValueError(
×
671
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
672
            )
673

674
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
675
        if_delta = PreprocEntityDelta()
1✔
676
        if not is_nothing(condition_delta.before):
1✔
677
            node_condition = self._get_node_condition_if_exists(
1✔
678
                condition_name=condition_delta.before
679
            )
680
            if is_nothing(node_condition):
1✔
681
                # TODO: I don't think this is a possible state since for us to be evaluating the before state,
682
                #  we must have successfully deployed the stack and as such this case was not reached before
683
                raise ValidationError(
×
684
                    f"Template error: unresolved condition dependency {condition_delta.before} in Fn::If"
685
                )
686

687
            condition_value = self.visit(node_condition).before
1✔
688
            if condition_value:
1✔
689
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
690
            else:
691
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
692
            if_delta.before = arg_delta.before
1✔
693

694
        if not is_nothing(condition_delta.after):
1✔
695
            node_condition = self._get_node_condition_if_exists(
1✔
696
                condition_name=condition_delta.after
697
            )
698
            if is_nothing(node_condition):
1✔
699
                raise ValidationError(
1✔
700
                    f"Template error: unresolved condition dependency {condition_delta.after} in Fn::If"
701
                )
702

703
            condition_value = self.visit(node_condition).after
1✔
704
            if condition_value:
1✔
705
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
706
            else:
707
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
708
            if_delta.after = arg_delta.after
1✔
709

710
        return if_delta
1✔
711

712
    def visit_node_intrinsic_function_fn_and(
1✔
713
        self, node_intrinsic_function: NodeIntrinsicFunction
714
    ) -> PreprocEntityDelta:
715
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
716
            result = all(args)
1✔
717
            return result
1✔
718

719
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
720
        delta = self._cached_apply(
1✔
721
            scope=node_intrinsic_function.scope,
722
            arguments_delta=arguments_delta,
723
            resolver=_compute_fn_and,
724
        )
725
        return delta
1✔
726

727
    def visit_node_intrinsic_function_fn_or(
1✔
728
        self, node_intrinsic_function: NodeIntrinsicFunction
729
    ) -> PreprocEntityDelta:
730
        def _compute_fn_or(args: list[bool]):
1✔
731
            result = any(args)
1✔
732
            return result
1✔
733

734
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
735
        delta = self._cached_apply(
1✔
736
            scope=node_intrinsic_function.scope,
737
            arguments_delta=arguments_delta,
738
            resolver=_compute_fn_or,
739
        )
740
        return delta
1✔
741

742
    def visit_node_intrinsic_function_fn_not(
1✔
743
        self, node_intrinsic_function: NodeIntrinsicFunction
744
    ) -> PreprocEntityDelta:
745
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
746
            # Is the argument ever a lone boolean?
747
            if isinstance(arg, list):
1✔
748
                return not arg[0]
1✔
749
            else:
750
                return not arg
×
751

752
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
753
        delta = self._cached_apply(
1✔
754
            scope=node_intrinsic_function.scope,
755
            arguments_delta=arguments_delta,
756
            resolver=_compute_fn_not,
757
        )
758
        return delta
1✔
759

760
    def visit_node_intrinsic_function_fn_sub(
1✔
761
        self, node_intrinsic_function: NodeIntrinsicFunction
762
    ) -> PreprocEntityDelta:
763
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
764
            # TODO: add further schema validation.
765
            string_template: str
766
            sub_parameters: dict
767
            if isinstance(args, str):
1✔
768
                string_template = args
1✔
769
                sub_parameters = {}
1✔
770
            elif (
1✔
771
                isinstance(args, list)
772
                and len(args) == 2
773
                and isinstance(args[0], str)
774
                and isinstance(args[1], dict)
775
            ):
776
                string_template = args[0]
1✔
777
                sub_parameters = args[1]
1✔
778
            else:
779
                raise RuntimeError(
×
780
                    "Invalid arguments shape for Fn::Sub, expected a String "
781
                    f"or a Tuple of String and Map but got '{args}'"
782
                )
783
            sub_string = string_template
1✔
784
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
785
            for template_variable_name in template_variable_names:
1✔
786
                template_variable_value = Nothing
1✔
787

788
                # Try to resolve the variable name as pseudo parameter.
789
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
790
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
791
                        pseudo_parameter_name=template_variable_name
792
                    )
793

794
                # Try to resolve the variable name as an entry to the defined parameters.
795
                elif template_variable_name in sub_parameters:
1✔
796
                    template_variable_value = sub_parameters[template_variable_name]
1✔
797

798
                # Try to resolve the variable name as GetAtt.
799
                elif "." in template_variable_name:
1✔
800
                    try:
1✔
801
                        template_variable_value = self._resolve_attribute(
1✔
802
                            arguments=template_variable_name, select_before=select_before
803
                        )
804
                    except RuntimeError:
1✔
805
                        pass
1✔
806

807
                # Try to resolve the variable name as Ref.
808
                else:
809
                    try:
1✔
810
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
811
                        template_variable_value = (
1✔
812
                            resource_delta.before if select_before else resource_delta.after
813
                        )
814
                        if isinstance(template_variable_value, PreprocResource):
1✔
815
                            template_variable_value = template_variable_value.physical_resource_id
1✔
816
                    except RuntimeError:
×
817
                        pass
×
818

819
                if is_nothing(template_variable_value):
1✔
820
                    raise RuntimeError(
1✔
821
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
822
                    )
823

824
                if not isinstance(template_variable_value, str):
1✔
825
                    template_variable_value = str(template_variable_value)
1✔
826

827
                sub_string = sub_string.replace(
1✔
828
                    f"${{{template_variable_name}}}", template_variable_value
829
                )
830

831
            # FIXME: the following type reduction is ported from v1; however it appears as though such
832
            #        reduction is not performed by the engine, and certainly not at this depth given the
833
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
834
            #        and the resource providers reviewed.
835
            account_id = self._change_set.account_id
1✔
836
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
837
            if sub_string == account_id or is_another_account_id:
1✔
838
                result = sub_string
1✔
839
            elif sub_string.isdigit():
1✔
840
                result = int(sub_string)
1✔
841
            else:
842
                try:
1✔
843
                    result = float(sub_string)
1✔
844
                except ValueError:
1✔
845
                    result = sub_string
1✔
846
            return result
1✔
847

848
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
849
        arguments_before = arguments_delta.before
1✔
850
        arguments_after = arguments_delta.after
1✔
851
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
852
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
853
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
854
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
855
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
856
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
857
        return PreprocEntityDelta(before=before, after=after)
1✔
858

859
    def visit_node_intrinsic_function_fn_join(
1✔
860
        self, node_intrinsic_function: NodeIntrinsicFunction
861
    ) -> PreprocEntityDelta:
862
        # TODO: add support for schema validation.
863
        # TODO: add tests for joining non string values.
864
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
865
            if not (isinstance(args, list) and len(args) == 2):
1✔
866
                return Nothing
1✔
867
            delimiter: str = str(args[0])
1✔
868
            values: list[Any] = args[1]
1✔
869
            if not isinstance(values, list):
1✔
870
                # shortcut if values is the empty string, for example:
871
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
872
                # CDK bootstrap does this
873
                if values == "":
1✔
874
                    return ""
1✔
875
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
876
            str_values: list[str] = []
1✔
877
            for value in values:
1✔
878
                if value is None:
1✔
879
                    continue
1✔
880
                str_value = str(value)
1✔
881
                str_values.append(str_value)
1✔
882
            join_result = delimiter.join(str_values)
1✔
883
            return join_result
1✔
884

885
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
886
        delta = self._cached_apply(
1✔
887
            scope=node_intrinsic_function.scope,
888
            arguments_delta=arguments_delta,
889
            resolver=_compute_fn_join,
890
        )
891
        return delta
1✔
892

893
    def visit_node_intrinsic_function_fn_select(
1✔
894
        self, node_intrinsic_function: NodeIntrinsicFunction
895
    ):
896
        # TODO: add further support for schema validation
897
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
898
            values = args[1]
1✔
899
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
900
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
901
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
902

903
            if not isinstance(values, list) or not values:
1✔
904
                raise ValidationError(
1✔
905
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
906
                )
907
            try:
1✔
908
                index: int = int(args[0])
1✔
909
            except ValueError as e:
1✔
910
                raise ValidationError(
1✔
911
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
912
                ) from e
913

914
            values_len = len(values)
1✔
915
            if index < 0 or index >= values_len:
1✔
916
                raise ValidationError(
1✔
917
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
918
                )
919
            selection = values[index]
1✔
920
            return selection
1✔
921

922
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
923
        delta = self._cached_apply(
1✔
924
            scope=node_intrinsic_function.scope,
925
            arguments_delta=arguments_delta,
926
            resolver=_compute_fn_select,
927
        )
928
        return delta
1✔
929

930
    def visit_node_intrinsic_function_fn_split(
1✔
931
        self, node_intrinsic_function: NodeIntrinsicFunction
932
    ):
933
        # TODO: add further support for schema validation
934
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
935
            delimiter = args[0]
1✔
936
            if not isinstance(delimiter, str) or not delimiter:
1✔
937
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
938
            source_string = args[1]
1✔
939
            if not isinstance(source_string, str):
1✔
940
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
941
            split_string = source_string.split(delimiter)
1✔
942
            return split_string
1✔
943

944
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
945

946
        if not (
1✔
947
            is_nothing(arguments_delta.after)
948
            or isinstance(arguments_delta.after, list)
949
            and len(arguments_delta.after) == 2
950
        ):
951
            raise ValidationError(
×
952
                "Template error: every Fn::Split object requires two parameters, "
953
                "(1) a string delimiter and (2) a string to be split or a function that returns a string to be split."
954
            )
955

956
        delta = self._cached_apply(
1✔
957
            scope=node_intrinsic_function.scope,
958
            arguments_delta=arguments_delta,
959
            resolver=_compute_fn_split,
960
        )
961
        return delta
1✔
962

963
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
964
        self, node_intrinsic_function: NodeIntrinsicFunction
965
    ) -> PreprocEntityDelta:
966
        # TODO: add further support for schema validation
967

968
        def _compute_fn_get_a_zs(region) -> Any:
1✔
969
            if not isinstance(region, str):
1✔
970
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
971

972
            if not region:
1✔
973
                region = self._change_set.region_name
1✔
974

975
            account_id = self._change_set.account_id
1✔
976
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
977
            try:
1✔
978
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
979
                    ec2_client.describe_availability_zones()
980
                )
981
            except ClientError:
×
982
                raise RuntimeError(
×
983
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
984
                )
985
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
986
                "AvailabilityZones"
987
            ]
988
            azs = [az["ZoneName"] for az in availability_zones]
1✔
989
            return azs
1✔
990

991
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
992
        delta = self._cached_apply(
1✔
993
            scope=node_intrinsic_function.scope,
994
            arguments_delta=arguments_delta,
995
            resolver=_compute_fn_get_a_zs,
996
        )
997
        return delta
1✔
998

999
    def visit_node_intrinsic_function_fn_base64(
1✔
1000
        self, node_intrinsic_function: NodeIntrinsicFunction
1001
    ) -> PreprocEntityDelta:
1002
        # TODO: add further support for schema validation
1003
        def _compute_fn_base_64(string) -> Any:
1✔
1004
            if not isinstance(string, str):
1✔
1005
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
1006
            # Ported from v1:
1007
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
1008
            return base64_string
1✔
1009

1010
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1011
        delta = self._cached_apply(
1✔
1012
            scope=node_intrinsic_function.scope,
1013
            arguments_delta=arguments_delta,
1014
            resolver=_compute_fn_base_64,
1015
        )
1016
        return delta
1✔
1017

1018
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
1019
        self, node_intrinsic_function: NodeIntrinsicFunction
1020
    ) -> PreprocEntityDelta:
1021
        # TODO: add type checking/validation for result unit?
1022
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1023
        before_arguments = arguments_delta.before
1✔
1024
        after_arguments = arguments_delta.after
1✔
1025
        before = Nothing
1✔
1026
        if before_arguments:
1✔
1027
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
1028
            before = before_value_delta.before
1✔
1029
        after = Nothing
1✔
1030
        if after_arguments:
1✔
1031
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
1032
            after = after_value_delta.after
1✔
1033
        return PreprocEntityDelta(before=before, after=after)
1✔
1034

1035
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
1036
        bindings_delta = self.visit(node_mapping.bindings)
1✔
1037
        return bindings_delta
1✔
1038

1039
    def visit_node_parameters(
1✔
1040
        self, node_parameters: NodeParameters
1041
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
1042
        before_parameters = {}
1✔
1043
        after_parameters = {}
1✔
1044
        for parameter in node_parameters.parameters:
1✔
1045
            parameter_delta = self.visit(parameter)
1✔
1046
            parameter_before = parameter_delta.before
1✔
1047
            if not is_nothing(parameter_before):
1✔
1048
                before_parameters[parameter.name] = parameter_before
1✔
1049
            parameter_after = parameter_delta.after
1✔
1050
            if not is_nothing(parameter_after):
1✔
1051
                after_parameters[parameter.name] = parameter_after
1✔
1052
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
1053

1054
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
1055
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
1056
            raise ValidationError(
1✔
1057
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1058
            )
1059
        dynamic_value = node_parameter.dynamic_value
1✔
1060
        dynamic_delta = self.visit(dynamic_value)
1✔
1061

1062
        default_value = node_parameter.default_value
1✔
1063
        default_delta = self.visit(default_value)
1✔
1064

1065
        before = dynamic_delta.before or default_delta.before
1✔
1066
        after = dynamic_delta.after or default_delta.after
1✔
1067

1068
        parameter_type = self.visit(node_parameter.type_)
1✔
1069

1070
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1071
            match type_:
1✔
1072
                case s if re.match(r"List<[^>]+>", s):
1✔
1073
                    return [item.strip() for item in value.split(",")]
1✔
1074
                case "CommaDelimitedList":
1✔
1075
                    return [item.strip() for item in value.split(",")]
1✔
1076
                case "Number":
1✔
1077
                    # TODO: validate the parameter type at template parse time (or whatever is in parity with AWS) so we know this cannot fail
1078
                    return to_number(value)
1✔
1079
            return value
1✔
1080

1081
        if not is_nothing(after):
1✔
1082
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1083

1084
        return PreprocEntityDelta(before=before, after=after)
1✔
1085

1086
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1087
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1088
        return array_identifiers_delta
1✔
1089

1090
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1091
        delta = self.visit(node_condition.body)
1✔
1092
        return delta
1✔
1093

1094
    def _resource_physical_resource_id_from(
1✔
1095
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1096
    ) -> str | None:
1097
        # TODO: typing around resolved resources is needed and should be reflected here.
1098
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1099
        if resolved_resource.get("ResourceStatus") not in {
1✔
1100
            ResourceStatus.CREATE_COMPLETE,
1101
            ResourceStatus.UPDATE_COMPLETE,
1102
        }:
1103
            return None
1✔
1104

1105
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1106
        if not isinstance(physical_resource_id, str):
1✔
UNCOV
1107
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1108
        return physical_resource_id
1✔
1109

1110
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1111
        # TODO: typing around resolved resources is needed and should be reflected here.
1112
        return self._resource_physical_resource_id_from(
1✔
1113
            logical_resource_id=resource_logical_id,
1114
            resolved_resources=self._before_resolved_resources,
1115
        )
1116

1117
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1118
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1119

1120
    def visit_node_intrinsic_function_ref(
1✔
1121
        self, node_intrinsic_function: NodeIntrinsicFunction
1122
    ) -> PreprocEntityDelta:
1123
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1124
            if logical_id == "AWS::NoValue":
1✔
1125
                return Nothing
1✔
1126

1127
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1128
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1129
                reference_delta.before = before.physical_resource_id
1✔
1130
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1131
                reference_delta.after = after.physical_resource_id
1✔
1132
            return reference_delta
1✔
1133

1134
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1135
        delta = self._cached_apply(
1✔
1136
            scope=node_intrinsic_function.scope,
1137
            arguments_delta=arguments_delta,
1138
            resolver=_compute_fn_ref,
1139
        )
1140
        return delta
1✔
1141

1142
    def visit_node_intrinsic_function_condition(
1✔
1143
        self, node_intrinsic_function: NodeIntrinsicFunction
1144
    ) -> PreprocEntityDelta:
1145
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1146

1147
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1148
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1149
            if is_nothing(node_condition):
1✔
UNCOV
1150
                raise RuntimeError(f"Undefined condition '{name}'")
×
1151
            condition_delta = self.visit(node_condition)
1✔
1152
            return condition_delta
1✔
1153

1154
        delta = self._cached_apply(
1✔
1155
            resolver=_delta_of_condition,
1156
            scope=node_intrinsic_function.scope,
1157
            arguments_delta=arguments_delta,
1158
        )
1159
        return delta
1✔
1160

1161
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1162
        node_change_type = node_array.change_type
1✔
1163
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1164
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1165
        for change_set_entity in node_array.array:
1✔
1166
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1167
            delta_before = delta.before
1✔
1168
            delta_after = delta.after
1✔
1169
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1170
                before.append(delta_before)
1✔
1171
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1172
                after.append(delta_after)
1✔
1173
        return PreprocEntityDelta(before=before, after=after)
1✔
1174

1175
    def visit_node_properties(
1✔
1176
        self, node_properties: NodeProperties
1177
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1178
        node_change_type = node_properties.change_type
1✔
1179
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1180
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1181
        for node_property in node_properties.properties:
1✔
1182
            property_name = node_property.name
1✔
1183
            delta = self.visit(node_property)
1✔
1184
            delta_before = delta.before
1✔
1185
            delta_after = delta.after
1✔
1186
            if (
1✔
1187
                not is_nothing(before_bindings)
1188
                and not is_nothing(delta_before)
1189
                and delta_before is not None
1190
            ):
1191
                before_bindings[property_name] = delta_before
1✔
1192
            if (
1✔
1193
                not is_nothing(after_bindings)
1194
                and not is_nothing(delta_after)
1195
                and delta_after is not None
1196
            ):
1197
                after_bindings[property_name] = delta_after
1✔
1198
        before = Nothing
1✔
1199
        if not is_nothing(before_bindings):
1✔
1200
            before = PreprocProperties(properties=before_bindings)
1✔
1201
        after = Nothing
1✔
1202
        if not is_nothing(after_bindings):
1✔
1203
            after = PreprocProperties(properties=after_bindings)
1✔
1204
        return PreprocEntityDelta(before=before, after=after)
1✔
1205

1206
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1207
        reference_delta = self.visit(reference)
1✔
1208
        before_reference = reference_delta.before
1✔
1209
        before = Nothing
1✔
1210
        if isinstance(before_reference, str):
1✔
1211
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1212
            before = before_delta.before
1✔
1213
        after = Nothing
1✔
1214
        after_reference = reference_delta.after
1✔
1215
        if isinstance(after_reference, str):
1✔
1216
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1217
            after = after_delta.after
1✔
1218
        return PreprocEntityDelta(before=before, after=after)
1✔
1219

1220
    def visit_node_resources(self, node_resources: NodeResources):
1✔
1221
        """
1222
        Skip resources where they conditionally evaluate to False
1223
        """
1224
        for node_resource in node_resources.resources:
1✔
1225
            if not is_nothing(node_resource.condition_reference):
1✔
1226
                condition_delta = self._resolve_resource_condition_reference(
1✔
1227
                    node_resource.condition_reference
1228
                )
1229
                condition_after = condition_delta.after
1✔
1230
                if condition_after is False:
1✔
1231
                    continue
1✔
1232
            self.visit(node_resource)
1✔
1233

1234
    def visit_node_resource(
1✔
1235
        self, node_resource: NodeResource
1236
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1237
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1238
            raise ValidationError(
1✔
1239
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1240
            )
1241
        change_type = node_resource.change_type
1✔
1242
        condition_before = Nothing
1✔
1243
        condition_after = Nothing
1✔
1244
        if not is_nothing(node_resource.condition_reference):
1✔
1245
            condition_delta = self._resolve_resource_condition_reference(
1✔
1246
                node_resource.condition_reference
1247
            )
1248
            condition_before = condition_delta.before
1✔
1249
            condition_after = condition_delta.after
1✔
1250

1251
        depends_on_before = Nothing
1✔
1252
        depends_on_after = Nothing
1✔
1253
        if not is_nothing(node_resource.depends_on):
1✔
1254
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1255
            depends_on_before = depends_on_delta.before
1✔
1256
            depends_on_after = depends_on_delta.after
1✔
1257

1258
        type_delta = self.visit(node_resource.type_)
1✔
1259
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1260
            node_resource.properties
1261
        )
1262

1263
        before = Nothing
1✔
1264
        after = Nothing
1✔
1265
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1266
            logical_resource_id = node_resource.name
1✔
1267
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1268
                resource_logical_id=logical_resource_id
1269
            )
1270
            before = PreprocResource(
1✔
1271
                logical_id=logical_resource_id,
1272
                physical_resource_id=before_physical_resource_id,
1273
                condition=condition_before,
1274
                resource_type=type_delta.before,
1275
                properties=properties_delta.before,
1276
                depends_on=depends_on_before,
1277
                requires_replacement=False,
1278
            )
1279
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1280
            logical_resource_id = node_resource.name
1✔
1281
            try:
1✔
1282
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1283
                    resource_logical_id=logical_resource_id
1284
                )
UNCOV
1285
            except RuntimeError:
×
UNCOV
1286
                after_physical_resource_id = None
×
1287
            after = PreprocResource(
1✔
1288
                logical_id=logical_resource_id,
1289
                physical_resource_id=after_physical_resource_id,
1290
                condition=condition_after,
1291
                resource_type=type_delta.after,
1292
                properties=properties_delta.after,
1293
                depends_on=depends_on_after,
1294
                requires_replacement=node_resource.requires_replacement,
1295
            )
1296
        return PreprocEntityDelta(before=before, after=after)
1✔
1297

1298
    def visit_node_output(
1✔
1299
        self, node_output: NodeOutput
1300
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1301
        change_type = node_output.change_type
1✔
1302
        value_delta = self.visit(node_output.value)
1✔
1303

1304
        condition_delta = Nothing
1✔
1305
        if not is_nothing(node_output.condition_reference):
1✔
1306
            condition_delta = self._resolve_resource_condition_reference(
1✔
1307
                node_output.condition_reference
1308
            )
1309
            condition_before = condition_delta.before
1✔
1310
            condition_after = condition_delta.after
1✔
1311
            if not condition_before and condition_after:
1✔
1312
                change_type = ChangeType.CREATED
1✔
1313
            elif condition_before and not condition_after:
1✔
1314
                change_type = ChangeType.REMOVED
1✔
1315

1316
        export_delta = Nothing
1✔
1317
        if not is_nothing(node_output.export):
1✔
1318
            export_delta = self.visit(node_output.export)
1✔
1319

1320
        before: Maybe[PreprocOutput] = Nothing
1✔
1321
        if change_type != ChangeType.CREATED:
1✔
1322
            before = PreprocOutput(
1✔
1323
                name=node_output.name,
1324
                value=value_delta.before,
1325
                export=export_delta.before if export_delta else None,
1326
                condition=condition_delta.before if condition_delta else None,
1327
            )
1328
        after: Maybe[PreprocOutput] = Nothing
1✔
1329
        if change_type != ChangeType.REMOVED:
1✔
1330
            after = PreprocOutput(
1✔
1331
                name=node_output.name,
1332
                value=value_delta.after,
1333
                export=export_delta.after if export_delta else None,
1334
                condition=condition_delta.after if condition_delta else None,
1335
            )
1336
        return PreprocEntityDelta(before=before, after=after)
1✔
1337

1338
    def visit_node_outputs(
1✔
1339
        self, node_outputs: NodeOutputs
1340
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1341
        before: list[PreprocOutput] = []
1✔
1342
        after: list[PreprocOutput] = []
1✔
1343
        for node_output in node_outputs.outputs:
1✔
1344
            if not is_nothing(node_output.condition_reference):
1✔
1345
                condition_delta = self._resolve_resource_condition_reference(
1✔
1346
                    node_output.condition_reference
1347
                )
1348
                condition_after = condition_delta.after
1✔
1349
                if condition_after is False:
1✔
1350
                    continue
1✔
1351

1352
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1353
            output_before = output_delta.before
1✔
1354
            output_after = output_delta.after
1✔
1355
            if not is_nothing(output_before):
1✔
1356
                before.append(output_before)
1✔
1357
            if not is_nothing(output_after):
1✔
1358
                after.append(output_after)
1✔
1359
        return PreprocEntityDelta(before=before, after=after)
1✔
1360

1361
    def visit_node_intrinsic_function_fn_import_value(
1✔
1362
        self, node_intrinsic_function: NodeIntrinsicFunction
1363
    ) -> PreprocEntityDelta:
1364
        def _compute_fn_import_value(string) -> str:
1✔
1365
            if not isinstance(string, str):
1✔
UNCOV
1366
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1367

1368
            exports = exports_map(
1✔
1369
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1370
            )
1371

1372
            return exports.get(string, {}).get("Value") or Nothing
1✔
1373

1374
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1375
        delta = self._cached_apply(
1✔
1376
            scope=node_intrinsic_function.scope,
1377
            arguments_delta=arguments_delta,
1378
            resolver=_compute_fn_import_value,
1379
        )
1380
        return delta
1✔
1381

1382
    def visit_node_intrinsic_function_fn_transform(
1✔
1383
        self, node_intrinsic_function: NodeIntrinsicFunction
1384
    ):
UNCOV
1385
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc