• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19656300538

24 Nov 2025 06:52PM UTC coverage: 86.867% (-0.01%) from 86.879%
19656300538

push

github

web-flow
CFn: validate conditions exist in Fn::If (#13243)

3 of 4 new or added lines in 1 file covered. (75.0%)

236 existing lines in 7 files now uncovered.

68861 of 79272 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.5
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeResources,
34
    NodeTemplate,
35
    Nothing,
36
    NothingType,
37
    Scope,
38
    TerminalValue,
39
    TerminalValueCreated,
40
    TerminalValueModified,
41
    TerminalValueRemoved,
42
    TerminalValueUnchanged,
43
    is_nothing,
44
)
45
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
46
    ChangeSetModelVisitor,
47
)
48
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
49
    REGEX_DYNAMIC_REF,
50
    extract_dynamic_reference,
51
    perform_dynamic_reference_lookup,
52
)
53
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
54
from localstack.services.cloudformation.stores import (
1✔
55
    exports_map,
56
)
57
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
58
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
59
from localstack.utils.aws.arns import get_partition
1✔
60
from localstack.utils.numbers import to_number
1✔
61
from localstack.utils.objects import get_value_from_path
1✔
62
from localstack.utils.run import to_str
1✔
63
from localstack.utils.strings import to_bytes
1✔
64
from localstack.utils.urls import localstack_host
1✔
65

66
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
67

68
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
69
    "AWS::Partition",
70
    "AWS::AccountId",
71
    "AWS::Region",
72
    "AWS::StackName",
73
    "AWS::StackId",
74
    "AWS::URLSuffix",
75
    "AWS::NoValue",
76
    "AWS::NotificationARNs",
77
}
78

79
TBefore = TypeVar("TBefore")
1✔
80
TAfter = TypeVar("TAfter")
1✔
81
_T = TypeVar("_T")
1✔
82

83
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
84
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
85
)
86
MOCKED_REFERENCE = "unknown"
1✔
87

88
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
89

90

91
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
92
    before: Maybe[TBefore]
1✔
93
    after: Maybe[TAfter]
1✔
94

95
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
96
        self.before = before
1✔
97
        self.after = after
1✔
98

99
    def __eq__(self, other):
1✔
100
        if not isinstance(other, PreprocEntityDelta):
×
101
            return False
×
102
        return self.before == other.before and self.after == other.after
×
103

104

105
class PreprocProperties:
1✔
106
    properties: dict[str, Any]
1✔
107

108
    def __init__(self, properties: dict[str, Any]):
1✔
109
        self.properties = properties
1✔
110

111
    def __eq__(self, other):
1✔
112
        if not isinstance(other, PreprocProperties):
1✔
113
            return False
×
114
        return self.properties == other.properties
1✔
115

116

117
class PreprocResource:
1✔
118
    logical_id: str
1✔
119
    physical_resource_id: str | None
1✔
120
    condition: bool | None
1✔
121
    resource_type: str
1✔
122
    properties: PreprocProperties
1✔
123
    depends_on: list[str] | None
1✔
124
    requires_replacement: bool
1✔
125
    status: ResourceStatus | None
1✔
126

127
    def __init__(
1✔
128
        self,
129
        logical_id: str,
130
        physical_resource_id: str,
131
        condition: bool | None,
132
        resource_type: str,
133
        properties: PreprocProperties,
134
        depends_on: list[str] | None,
135
        requires_replacement: bool,
136
        status: ResourceStatus | None = None,
137
    ):
138
        self.logical_id = logical_id
1✔
139
        self.physical_resource_id = physical_resource_id
1✔
140
        self.condition = condition
1✔
141
        self.resource_type = resource_type
1✔
142
        self.properties = properties
1✔
143
        self.depends_on = depends_on
1✔
144
        self.requires_replacement = requires_replacement
1✔
145
        self.status = status
1✔
146

147
    @staticmethod
1✔
148
    def _compare_conditions(c1: bool, c2: bool):
1✔
149
        # The lack of condition equates to a true condition.
150
        c1 = c1 if isinstance(c1, bool) else True
1✔
151
        c2 = c2 if isinstance(c2, bool) else True
1✔
152
        return c1 == c2
1✔
153

154
    def __eq__(self, other):
1✔
155
        if not isinstance(other, PreprocResource):
1✔
156
            return False
1✔
157
        return all(
1✔
158
            [
159
                self.logical_id == other.logical_id,
160
                self._compare_conditions(self.condition, other.condition),
161
                self.resource_type == other.resource_type,
162
                self.properties == other.properties,
163
            ]
164
        )
165

166

167
class PreprocOutput:
1✔
168
    name: str
1✔
169
    value: Any
1✔
170
    export: Any | None
1✔
171
    condition: bool | None
1✔
172

173
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
174
        self.name = name
1✔
175
        self.value = value
1✔
176
        self.export = export
1✔
177
        self.condition = condition
1✔
178

179
    def __eq__(self, other):
1✔
180
        if not isinstance(other, PreprocOutput):
×
181
            return False
×
182
        return all(
×
183
            [
184
                self.name == other.name,
185
                self.value == other.value,
186
                self.export == other.export,
187
                self.condition == other.condition,
188
            ]
189
        )
190

191

192
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
193
    _change_set: Final[ChangeSet]
1✔
194
    _before_resolved_resources: Final[dict]
1✔
195
    _before_cache: Final[dict[Scope, Any]]
1✔
196
    _after_cache: Final[dict[Scope, Any]]
1✔
197

198
    def __init__(self, change_set: ChangeSet):
1✔
199
        self._change_set = change_set
1✔
200
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
201
        self._before_cache = {}
1✔
202
        self._after_cache = {}
1✔
203

204
    def _setup_runtime_cache(self) -> None:
1✔
205
        runtime_cache_key = self.__class__.__name__
1✔
206

207
        self._before_cache.clear()
1✔
208
        self._after_cache.clear()
1✔
209

210
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
211
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
212
            self._before_cache.update(cache)
1✔
213

214
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
215
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
216
            self._after_cache.update(cache)
×
217

218
    def _save_runtime_cache(self) -> None:
1✔
219
        runtime_cache_key = self.__class__.__name__
1✔
220

221
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
222
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
223

224
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
225
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
226

227
    def process(self) -> None:
1✔
228
        self._setup_runtime_cache()
1✔
229
        node_template = self._change_set.update_model.node_template
1✔
230
        node_conditions = self._change_set.update_model.node_template.conditions
1✔
231
        self.visit(node_conditions)
1✔
232
        self.visit(node_template)
1✔
233
        self._save_runtime_cache()
1✔
234

235
    def _get_node_resource_for(
1✔
236
        self, resource_name: str, node_template: NodeTemplate
237
    ) -> NodeResource:
238
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
239
        for node_resource in node_template.resources.resources:
1✔
240
            if node_resource.name == resource_name:
1✔
241
                self.visit(node_resource)
1✔
242
                return node_resource
1✔
243
        raise ValidationError(
1✔
244
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
245
        )
246

247
    def _get_node_property_for(
1✔
248
        self, property_name: str, node_resource: NodeResource
249
    ) -> NodeProperty | None:
250
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
251
        for node_property in node_resource.properties.properties:
1✔
252
            if node_property.name == property_name:
1✔
253
                self.visit(node_property)
1✔
254
                return node_property
1✔
255
        return None
1✔
256

257
    def _deployed_property_value_of(
1✔
258
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
259
    ) -> Any:
260
        # We have to override this function to make sure it does not try to access the
261
        # resolved resource
262

263
        # Before we can obtain deployed value for a resource, we need to first ensure to
264
        # process the resource if this wasn't processed already. Ideally, values should only
265
        # be accessible through delta objects, to ensure computation is always complete at
266
        # every level.
267
        _ = self._get_node_resource_for(
1✔
268
            resource_name=resource_logical_id,
269
            node_template=self._change_set.update_model.node_template,
270
        )
271
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
272
        if resolved_resource is None:
1✔
273
            raise RuntimeError(
1✔
274
                f"No deployed instances of resource '{resource_logical_id}' were found"
275
            )
276
        properties = resolved_resource.get("Properties", {})
1✔
277
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
278
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
279

280
        if property_value:
1✔
281
            if not isinstance(property_value, (str, list, dict)):
1✔
282
                # Str: Standard expected type. TODO validate bools and numbers
283
                # List: Multiple resource types can return a list of values e.g. AWS::EC2::VPC.
284
                # Dict: Custom resources in CloudFormation can return arbitrary data structures.
285
                raise RuntimeError(
×
286
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
287
                )
288
            return property_value
1✔
289
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
290
            return MOCKED_REFERENCE
1✔
291

292
        return property_value
×
293

294
    def _before_deployed_property_value_of(
1✔
295
        self, resource_logical_id: str, property_name: str
296
    ) -> Any:
297
        return self._deployed_property_value_of(
1✔
298
            resource_logical_id=resource_logical_id,
299
            property_name=property_name,
300
            resolved_resources=self._before_resolved_resources,
301
        )
302

303
    def _after_deployed_property_value_of(
1✔
304
        self, resource_logical_id: str, property_name: str
305
    ) -> str | None:
306
        return self._before_deployed_property_value_of(
1✔
307
            resource_logical_id=resource_logical_id, property_name=property_name
308
        )
309

310
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
311
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
312
        # TODO: another scenarios suggesting property lookups might be preferable.
313
        for mapping in mappings:
1✔
314
            if mapping.name == map_name:
1✔
315
                self.visit(mapping)
1✔
316
                return mapping
1✔
317
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
318

319
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
320
        parameters: list[NodeParameter] = (
1✔
321
            self._change_set.update_model.node_template.parameters.parameters
322
        )
323
        # TODO: another scenarios suggesting property lookups might be preferable.
324
        for parameter in parameters:
1✔
325
            if parameter.name == parameter_name:
1✔
326
                self.visit(parameter)
1✔
327
                return parameter
1✔
328
        return Nothing
1✔
329

330
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
331
        conditions: list[NodeCondition] = (
1✔
332
            self._change_set.update_model.node_template.conditions.conditions
333
        )
334
        # TODO: another scenarios suggesting property lookups might be preferable.
335
        for condition in conditions:
1✔
336
            if condition.name == condition_name:
1✔
337
                self.visit(condition)
1✔
338
                return condition
1✔
339
        return Nothing
1✔
340

341
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
342
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
343
        if isinstance(node_condition, NodeCondition):
1✔
344
            condition_delta = self.visit(node_condition)
1✔
345
            return condition_delta
1✔
346
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
347

348
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
349
        match pseudo_parameter_name:
1✔
350
            case "AWS::Partition":
1✔
351
                return get_partition(self._change_set.region_name)
1✔
352
            case "AWS::AccountId":
1✔
353
                return self._change_set.stack.account_id
1✔
354
            case "AWS::Region":
1✔
355
                return self._change_set.stack.region_name
1✔
356
            case "AWS::StackName":
1✔
357
                return self._change_set.stack.stack_name
1✔
358
            case "AWS::StackId":
1✔
359
                return self._change_set.stack.stack_id
1✔
360
            case "AWS::URLSuffix":
1✔
361
                return _AWS_URL_SUFFIX
1✔
362
            case "AWS::NoValue":
×
363
                return None
×
364
            case _:
×
365
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
366

367
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
368
        if logical_id in _PSEUDO_PARAMETERS:
1✔
369
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
370
                pseudo_parameter_name=logical_id
371
            )
372
            # Pseudo parameters are constants within the lifecycle of a template.
373
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
374

375
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
376
        if isinstance(node_parameter, NodeParameter):
1✔
377
            parameter_delta = self.visit(node_parameter)
1✔
378
            return parameter_delta
1✔
379

380
        node_resource = self._get_node_resource_for(
1✔
381
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
382
        )
383
        resource_delta = self.visit(node_resource)
1✔
384
        before = resource_delta.before
1✔
385
        after = resource_delta.after
1✔
386
        return PreprocEntityDelta(before=before, after=after)
1✔
387

388
    def _resolve_mapping(
1✔
389
        self, map_name: str, top_level_key: str, second_level_key
390
    ) -> PreprocEntityDelta:
391
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
392
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
393
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
394
        if not isinstance(top_level_value, NodeObject):
1✔
395
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
396
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
397
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
398
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
399
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
400
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
401
        mapping_value_delta = self.visit(second_level_value)
1✔
402
        return mapping_value_delta
1✔
403

404
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
405
        entity_scope = change_set_entity.scope
1✔
406
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
407
            before = self._before_cache[entity_scope]
1✔
408
            after = self._after_cache[entity_scope]
1✔
409
            return PreprocEntityDelta(before=before, after=after)
1✔
410
        delta = super().visit(change_set_entity=change_set_entity)
1✔
411
        if isinstance(delta, PreprocEntityDelta):
1✔
412
            delta = self._maybe_perform_replacements(delta)
1✔
413
            self._before_cache[entity_scope] = delta.before
1✔
414
            self._after_cache[entity_scope] = delta.after
1✔
415
        return delta
1✔
416

417
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
418
        delta = self._maybe_perform_static_replacements(delta)
1✔
419
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
420
        return delta
1✔
421

422
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
423
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
424

425
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
426
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
427

428
    def _maybe_perform_on_delta(
1✔
429
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
430
    ) -> PreprocEntityDelta | None:
431
        if isinstance(delta.before, str):
1✔
432
            delta.before = f(delta.before)
1✔
433
        if isinstance(delta.after, str):
1✔
434
            delta.after = f(delta.after)
1✔
435
        return delta
1✔
436

437
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
438
        if not isinstance(value, str):
1✔
439
            return value
×
440

441
        if dynamic_ref := extract_dynamic_reference(value):
1✔
442
            new_value = perform_dynamic_reference_lookup(
1✔
443
                reference=dynamic_ref,
444
                account_id=self._change_set.account_id,
445
                region_name=self._change_set.region_name,
446
            )
447
            if new_value:
1✔
448
                # We need to use a function here, to avoid backslash processing by regex.
449
                # From the regex sub documentation:
450
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
451
                # Using a function, we can avoid this processing.
452
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
453

454
        return value
1✔
455

456
    @staticmethod
1✔
457
    def _perform_static_replacements(value: str) -> str:
1✔
458
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
459
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
460
            prefix = api_match[1]
1✔
461
            host = api_match[2]
1✔
462
            path = api_match[3]
1✔
463
            port = localstack_host().port
1✔
464
            value = f"{prefix}{host}:{port}/{path}"
1✔
465
            return value
1✔
466

467
        return value
1✔
468

469
    def _cached_apply(
1✔
470
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
471
    ) -> PreprocEntityDelta:
472
        """
473
        Applies the resolver function to the given input delta if and only if the required
474
        values are not already present in the runtime caches. This function handles both
475
        the 'before' and 'after' components of the delta independently.
476

477
        The resolver function receives either the 'before' or 'after' value from the input
478
        delta and returns a resolved value. If the result returned by the resolver is
479
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
480
        component from it:  the 'before' value if the input was 'before', and the 'after'
481
        value if the input was 'after'.
482

483
        This function only reads from the cache and does not update it. It is the caller's
484
        responsibility to handle caching, either manually or via the upstream visit method
485
        of this class.
486

487
        Args:
488
            scope (Scope): The current scope used as a key for cache lookup.
489
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
490
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
491

492
        Returns:
493
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
494
        """
495

496
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
497
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
498
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
499

500
        arguments_before = arguments_delta.before
1✔
501
        arguments_after = arguments_delta.after
1✔
502

503
        before = self._before_cache.get(scope, Nothing)
1✔
504
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
505
            before = resolver(arguments_before)
1✔
506
            if isinstance(before, PreprocEntityDelta):
1✔
507
                before = before.before
1✔
508

509
        after = self._after_cache.get(scope, Nothing)
1✔
510
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
511
            after = resolver(arguments_after)
1✔
512
            if isinstance(after, PreprocEntityDelta):
1✔
513
                after = after.after
1✔
514

515
        return PreprocEntityDelta(before=before, after=after)
1✔
516

517
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
518
        return self.visit(node_property.value)
1✔
519

520
    def visit_terminal_value_modified(
1✔
521
        self, terminal_value_modified: TerminalValueModified
522
    ) -> PreprocEntityDelta:
523
        return PreprocEntityDelta(
1✔
524
            before=terminal_value_modified.value,
525
            after=terminal_value_modified.modified_value,
526
        )
527

528
    def visit_terminal_value_created(
1✔
529
        self, terminal_value_created: TerminalValueCreated
530
    ) -> PreprocEntityDelta:
531
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
532

533
    def visit_terminal_value_removed(
1✔
534
        self, terminal_value_removed: TerminalValueRemoved
535
    ) -> PreprocEntityDelta:
536
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
537

538
    def visit_terminal_value_unchanged(
1✔
539
        self, terminal_value_unchanged: TerminalValueUnchanged
540
    ) -> PreprocEntityDelta:
541
        return PreprocEntityDelta(
1✔
542
            before=terminal_value_unchanged.value,
543
            after=terminal_value_unchanged.value,
544
        )
545

546
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
547
        before_delta = self.visit(node_divergence.value)
1✔
548
        after_delta = self.visit(node_divergence.divergence)
1✔
549
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
550

551
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
552
        node_change_type = node_object.change_type
1✔
553
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
554
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
555
        for name, change_set_entity in node_object.bindings.items():
1✔
556
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
557
            delta_before = delta.before
1✔
558
            delta_after = delta.after
1✔
559
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
560
                before[name] = delta_before
1✔
561
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
562
                after[name] = delta_after
1✔
563
        return PreprocEntityDelta(before=before, after=after)
1✔
564

565
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
566
        # TODO: add arguments validation.
567
        arguments_list: list[str]
568
        if isinstance(arguments, str):
1✔
569
            arguments_list = arguments.split(".")
1✔
570
        else:
571
            arguments_list = arguments
1✔
572

573
        if len(arguments_list) < 2:
1✔
574
            raise ValidationError(
1✔
575
                "Template error: every Fn::GetAtt object requires two non-empty parameters, the resource name and the resource attribute"
576
            )
577

578
        logical_name_of_resource = arguments_list[0]
1✔
579
        attribute_name = ".".join(arguments_list[1:])
1✔
580

581
        node_resource = self._get_node_resource_for(
1✔
582
            resource_name=logical_name_of_resource,
583
            node_template=self._change_set.update_model.node_template,
584
        )
585

586
        if not is_nothing(node_resource.condition_reference):
1✔
587
            condition = self._get_node_condition_if_exists(node_resource.condition_reference.value)
1✔
588
            evaluation_result = self._resolve_condition(condition.name)
1✔
589

590
            if select_before and not evaluation_result.before:
1✔
591
                raise ValidationError(
1✔
592
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
593
                )
594

595
            if not select_before and not evaluation_result.after:
1✔
596
                raise ValidationError(
×
597
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
598
                )
599

600
        # Custom Resources can mutate their definition
601
        # So the preproc should search first in the resource values and then check the template
602
        if select_before:
1✔
603
            value = self._before_deployed_property_value_of(
1✔
604
                resource_logical_id=logical_name_of_resource,
605
                property_name=attribute_name,
606
            )
607
        else:
608
            value = self._after_deployed_property_value_of(
1✔
609
                resource_logical_id=logical_name_of_resource,
610
                property_name=attribute_name,
611
            )
612
        if value is not None:
1✔
613
            return value
1✔
614

615
        node_property: NodeProperty | None = self._get_node_property_for(
×
616
            property_name=attribute_name, node_resource=node_resource
617
        )
618
        if node_property is not None:
×
619
            # The property is statically defined in the template and its value can be computed.
620
            property_delta = self.visit(node_property)
×
621
            value = property_delta.before if select_before else property_delta.after
×
622

623
        return value
×
624

625
    def visit_node_intrinsic_function_fn_get_att(
1✔
626
        self, node_intrinsic_function: NodeIntrinsicFunction
627
    ) -> PreprocEntityDelta:
628
        # TODO: validate the return value according to the spec.
629
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
630
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
631
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
632

633
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
634
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
635
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
636

637
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
638
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
639
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
640

641
        return PreprocEntityDelta(before=before, after=after)
1✔
642

643
    def visit_node_intrinsic_function_fn_equals(
1✔
644
        self, node_intrinsic_function: NodeIntrinsicFunction
645
    ) -> PreprocEntityDelta:
646
        # TODO: add argument shape validation.
647
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
648
            return args[0] == args[1]
1✔
649

650
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
651

652
        if isinstance(arguments_delta.after, list) and len(arguments_delta.after) != 2:
1✔
653
            raise ValidationError(
×
654
                "Template error: every Fn::Equals object requires a list of 2 string parameters."
655
            )
656

657
        delta = self._cached_apply(
1✔
658
            scope=node_intrinsic_function.scope,
659
            arguments_delta=arguments_delta,
660
            resolver=_compute_fn_equals,
661
        )
662
        return delta
1✔
663

664
    def visit_node_intrinsic_function_fn_if(
1✔
665
        self, node_intrinsic_function: NodeIntrinsicFunction
666
    ) -> PreprocEntityDelta:
667
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
668
        # False branch. If the condition is False, we don't evaluate the True branch.
669
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
670
            raise ValueError(
×
671
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
672
            )
673

674
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
675
        if_delta = PreprocEntityDelta()
1✔
676
        if not is_nothing(condition_delta.before):
1✔
677
            node_condition = self._get_node_condition_if_exists(
1✔
678
                condition_name=condition_delta.before
679
            )
680
            if is_nothing(node_condition):
1✔
681
                # TODO: I don't think this is a possible state since for us to be evaluating the before state,
682
                #  we must have successfully deployed the stack and as such this case was not reached before
NEW
683
                raise ValidationError(
×
684
                    f"Template error: unresolved condition dependency {condition_delta.before} in Fn::If"
685
                )
686

687
            condition_value = self.visit(node_condition).before
1✔
688
            if condition_value:
1✔
689
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
690
            else:
691
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
692
            if_delta.before = arg_delta.before
1✔
693

694
        if not is_nothing(condition_delta.after):
1✔
695
            node_condition = self._get_node_condition_if_exists(
1✔
696
                condition_name=condition_delta.after
697
            )
698
            if is_nothing(node_condition):
1✔
699
                raise ValidationError(
1✔
700
                    f"Template error: unresolved condition dependency {condition_delta.after} in Fn::If"
701
                )
702

703
            condition_value = self.visit(node_condition).after
1✔
704
            if condition_value:
1✔
705
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
706
            else:
707
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
708
            if_delta.after = arg_delta.after
1✔
709

710
        return if_delta
1✔
711

712
    def visit_node_intrinsic_function_fn_and(
1✔
713
        self, node_intrinsic_function: NodeIntrinsicFunction
714
    ) -> PreprocEntityDelta:
715
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
716
            result = all(args)
1✔
717
            return result
1✔
718

719
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
720
        delta = self._cached_apply(
1✔
721
            scope=node_intrinsic_function.scope,
722
            arguments_delta=arguments_delta,
723
            resolver=_compute_fn_and,
724
        )
725
        return delta
1✔
726

727
    def visit_node_intrinsic_function_fn_or(
1✔
728
        self, node_intrinsic_function: NodeIntrinsicFunction
729
    ) -> PreprocEntityDelta:
730
        def _compute_fn_or(args: list[bool]):
1✔
731
            result = any(args)
1✔
732
            return result
1✔
733

734
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
735
        delta = self._cached_apply(
1✔
736
            scope=node_intrinsic_function.scope,
737
            arguments_delta=arguments_delta,
738
            resolver=_compute_fn_or,
739
        )
740
        return delta
1✔
741

742
    def visit_node_intrinsic_function_fn_not(
1✔
743
        self, node_intrinsic_function: NodeIntrinsicFunction
744
    ) -> PreprocEntityDelta:
745
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
746
            # Is the argument ever a lone boolean?
747
            if isinstance(arg, list):
1✔
748
                return not arg[0]
1✔
749
            else:
750
                return not arg
×
751

752
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
753
        delta = self._cached_apply(
1✔
754
            scope=node_intrinsic_function.scope,
755
            arguments_delta=arguments_delta,
756
            resolver=_compute_fn_not,
757
        )
758
        return delta
1✔
759

760
    def visit_node_intrinsic_function_fn_sub(
1✔
761
        self, node_intrinsic_function: NodeIntrinsicFunction
762
    ) -> PreprocEntityDelta:
763
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
764
            # TODO: add further schema validation.
765
            string_template: str
766
            sub_parameters: dict
767
            if isinstance(args, str):
1✔
768
                string_template = args
1✔
769
                sub_parameters = {}
1✔
770
            elif (
1✔
771
                isinstance(args, list)
772
                and len(args) == 2
773
                and isinstance(args[0], str)
774
                and isinstance(args[1], dict)
775
            ):
776
                string_template = args[0]
1✔
777
                sub_parameters = args[1]
1✔
778
            else:
779
                raise RuntimeError(
×
780
                    "Invalid arguments shape for Fn::Sub, expected a String "
781
                    f"or a Tuple of String and Map but got '{args}'"
782
                )
783
            sub_string = string_template
1✔
784
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
785
            for template_variable_name in template_variable_names:
1✔
786
                template_variable_value = Nothing
1✔
787

788
                # Try to resolve the variable name as pseudo parameter.
789
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
790
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
791
                        pseudo_parameter_name=template_variable_name
792
                    )
793

794
                # Try to resolve the variable name as an entry to the defined parameters.
795
                elif template_variable_name in sub_parameters:
1✔
796
                    template_variable_value = sub_parameters[template_variable_name]
1✔
797

798
                # Try to resolve the variable name as GetAtt.
799
                elif "." in template_variable_name:
1✔
800
                    try:
1✔
801
                        template_variable_value = self._resolve_attribute(
1✔
802
                            arguments=template_variable_name, select_before=select_before
803
                        )
804
                    except RuntimeError:
1✔
805
                        pass
1✔
806

807
                # Try to resolve the variable name as Ref.
808
                else:
809
                    try:
1✔
810
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
811
                        template_variable_value = (
1✔
812
                            resource_delta.before if select_before else resource_delta.after
813
                        )
814
                        if isinstance(template_variable_value, PreprocResource):
1✔
815
                            template_variable_value = template_variable_value.physical_resource_id
1✔
816
                    except RuntimeError:
×
817
                        pass
×
818

819
                if is_nothing(template_variable_value):
1✔
820
                    raise RuntimeError(
1✔
821
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
822
                    )
823

824
                if not isinstance(template_variable_value, str):
1✔
825
                    template_variable_value = str(template_variable_value)
1✔
826

827
                sub_string = sub_string.replace(
1✔
828
                    f"${{{template_variable_name}}}", template_variable_value
829
                )
830

831
            # FIXME: the following type reduction is ported from v1; however it appears as though such
832
            #        reduction is not performed by the engine, and certainly not at this depth given the
833
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
834
            #        and the resource providers reviewed.
835
            account_id = self._change_set.account_id
1✔
836
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
837
            if sub_string == account_id or is_another_account_id:
1✔
838
                result = sub_string
1✔
839
            elif sub_string.isdigit():
1✔
840
                result = int(sub_string)
1✔
841
            else:
842
                try:
1✔
843
                    result = float(sub_string)
1✔
844
                except ValueError:
1✔
845
                    result = sub_string
1✔
846
            return result
1✔
847

848
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
849
        arguments_before = arguments_delta.before
1✔
850
        arguments_after = arguments_delta.after
1✔
851
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
852
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
853
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
854
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
855
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
856
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
857
        return PreprocEntityDelta(before=before, after=after)
1✔
858

859
    def visit_node_intrinsic_function_fn_join(
1✔
860
        self, node_intrinsic_function: NodeIntrinsicFunction
861
    ) -> PreprocEntityDelta:
862
        # TODO: add support for schema validation.
863
        # TODO: add tests for joining non string values.
864
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
865
            if not (isinstance(args, list) and len(args) == 2):
1✔
866
                return Nothing
1✔
867
            delimiter: str = str(args[0])
1✔
868
            values: list[Any] = args[1]
1✔
869
            if not isinstance(values, list):
1✔
870
                # shortcut if values is the empty string, for example:
871
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
872
                # CDK bootstrap does this
873
                if values == "":
1✔
874
                    return ""
1✔
875
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
876
            str_values: list[str] = []
1✔
877
            for value in values:
1✔
878
                if value is None:
1✔
879
                    continue
1✔
880
                str_value = str(value)
1✔
881
                str_values.append(str_value)
1✔
882
            join_result = delimiter.join(str_values)
1✔
883
            return join_result
1✔
884

885
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
886
        delta = self._cached_apply(
1✔
887
            scope=node_intrinsic_function.scope,
888
            arguments_delta=arguments_delta,
889
            resolver=_compute_fn_join,
890
        )
891
        return delta
1✔
892

893
    def visit_node_intrinsic_function_fn_select(
1✔
894
        self, node_intrinsic_function: NodeIntrinsicFunction
895
    ):
896
        # TODO: add further support for schema validation
897
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
898
            values = args[1]
1✔
899
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
900
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
901
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
902

903
            if not isinstance(values, list) or not values:
1✔
904
                raise ValidationError(
1✔
905
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
906
                )
907
            try:
1✔
908
                index: int = int(args[0])
1✔
909
            except ValueError as e:
1✔
910
                raise ValidationError(
1✔
911
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
912
                ) from e
913

914
            values_len = len(values)
1✔
915
            if index < 0 or index >= values_len:
1✔
916
                raise ValidationError(
1✔
917
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
918
                )
919
            selection = values[index]
1✔
920
            return selection
1✔
921

922
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
923
        delta = self._cached_apply(
1✔
924
            scope=node_intrinsic_function.scope,
925
            arguments_delta=arguments_delta,
926
            resolver=_compute_fn_select,
927
        )
928
        return delta
1✔
929

930
    def visit_node_intrinsic_function_fn_split(
1✔
931
        self, node_intrinsic_function: NodeIntrinsicFunction
932
    ):
933
        # TODO: add further support for schema validation
934
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
935
            delimiter = args[0]
1✔
936
            if not isinstance(delimiter, str) or not delimiter:
1✔
937
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
938
            source_string = args[1]
1✔
939
            if not isinstance(source_string, str):
1✔
940
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
941
            split_string = source_string.split(delimiter)
1✔
942
            return split_string
1✔
943

944
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
945

946
        if not (
1✔
947
            is_nothing(arguments_delta.after)
948
            or isinstance(arguments_delta.after, list)
949
            and len(arguments_delta.after) == 2
950
        ):
951
            raise ValidationError(
×
952
                "Template error: every Fn::Split object requires two parameters, "
953
                "(1) a string delimiter and (2) a string to be split or a function that returns a string to be split."
954
            )
955

956
        delta = self._cached_apply(
1✔
957
            scope=node_intrinsic_function.scope,
958
            arguments_delta=arguments_delta,
959
            resolver=_compute_fn_split,
960
        )
961
        return delta
1✔
962

963
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
964
        self, node_intrinsic_function: NodeIntrinsicFunction
965
    ) -> PreprocEntityDelta:
966
        # TODO: add further support for schema validation
967

968
        def _compute_fn_get_a_zs(region) -> Any:
1✔
969
            if not isinstance(region, str):
1✔
970
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
971

972
            if not region:
1✔
973
                region = self._change_set.region_name
1✔
974

975
            account_id = self._change_set.account_id
1✔
976
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
977
            try:
1✔
978
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
979
                    ec2_client.describe_availability_zones()
980
                )
981
            except ClientError:
×
982
                raise RuntimeError(
×
983
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
984
                )
985
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
986
                "AvailabilityZones"
987
            ]
988
            azs = [az["ZoneName"] for az in availability_zones]
1✔
989
            return azs
1✔
990

991
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
992
        delta = self._cached_apply(
1✔
993
            scope=node_intrinsic_function.scope,
994
            arguments_delta=arguments_delta,
995
            resolver=_compute_fn_get_a_zs,
996
        )
997
        return delta
1✔
998

999
    def visit_node_intrinsic_function_fn_base64(
1✔
1000
        self, node_intrinsic_function: NodeIntrinsicFunction
1001
    ) -> PreprocEntityDelta:
1002
        # TODO: add further support for schema validation
1003
        def _compute_fn_base_64(string) -> Any:
1✔
1004
            if not isinstance(string, str):
1✔
1005
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
1006
            # Ported from v1:
1007
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
1008
            return base64_string
1✔
1009

1010
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1011
        delta = self._cached_apply(
1✔
1012
            scope=node_intrinsic_function.scope,
1013
            arguments_delta=arguments_delta,
1014
            resolver=_compute_fn_base_64,
1015
        )
1016
        return delta
1✔
1017

1018
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
1019
        self, node_intrinsic_function: NodeIntrinsicFunction
1020
    ) -> PreprocEntityDelta:
1021
        # TODO: add type checking/validation for result unit?
1022
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1023
        before_arguments = arguments_delta.before
1✔
1024
        after_arguments = arguments_delta.after
1✔
1025
        before = Nothing
1✔
1026
        if before_arguments:
1✔
1027
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
1028
            before = before_value_delta.before
1✔
1029
        after = Nothing
1✔
1030
        if after_arguments:
1✔
1031
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
1032
            after = after_value_delta.after
1✔
1033
        return PreprocEntityDelta(before=before, after=after)
1✔
1034

1035
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
1036
        bindings_delta = self.visit(node_mapping.bindings)
1✔
1037
        return bindings_delta
1✔
1038

1039
    def visit_node_parameters(
1✔
1040
        self, node_parameters: NodeParameters
1041
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
1042
        before_parameters = {}
1✔
1043
        after_parameters = {}
1✔
1044
        for parameter in node_parameters.parameters:
1✔
1045
            parameter_delta = self.visit(parameter)
1✔
1046
            parameter_before = parameter_delta.before
1✔
1047
            if not is_nothing(parameter_before):
1✔
1048
                before_parameters[parameter.name] = parameter_before
1✔
1049
            parameter_after = parameter_delta.after
1✔
1050
            if not is_nothing(parameter_after):
1✔
1051
                after_parameters[parameter.name] = parameter_after
1✔
1052
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
1053

1054
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
1055
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
1056
            raise ValidationError(
1✔
1057
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1058
            )
1059
        dynamic_value = node_parameter.dynamic_value
1✔
1060
        dynamic_delta = self.visit(dynamic_value)
1✔
1061

1062
        default_value = node_parameter.default_value
1✔
1063
        default_delta = self.visit(default_value)
1✔
1064

1065
        before = dynamic_delta.before or default_delta.before
1✔
1066
        after = dynamic_delta.after or default_delta.after
1✔
1067

1068
        parameter_type = self.visit(node_parameter.type_)
1✔
1069

1070
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1071
            match type_:
1✔
1072
                case "List<String>" | "CommaDelimitedList":
1✔
1073
                    return [item.strip() for item in value.split(",")]
1✔
1074
                case "Number":
1✔
1075
                    # TODO: validate the parameter type at template parse time (or whatever is in parity with AWS) so we know this cannot fail
1076
                    return to_number(value)
1✔
1077
            return value
1✔
1078

1079
        if not is_nothing(after):
1✔
1080
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1081

1082
        return PreprocEntityDelta(before=before, after=after)
1✔
1083

1084
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1085
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1086
        return array_identifiers_delta
1✔
1087

1088
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1089
        delta = self.visit(node_condition.body)
1✔
1090
        return delta
1✔
1091

1092
    def _resource_physical_resource_id_from(
1✔
1093
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1094
    ) -> str | None:
1095
        # TODO: typing around resolved resources is needed and should be reflected here.
1096
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1097
        if resolved_resource.get("ResourceStatus") not in {
1✔
1098
            ResourceStatus.CREATE_COMPLETE,
1099
            ResourceStatus.UPDATE_COMPLETE,
1100
        }:
1101
            return None
1✔
1102

1103
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1104
        if not isinstance(physical_resource_id, str):
1✔
1105
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1106
        return physical_resource_id
1✔
1107

1108
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1109
        # TODO: typing around resolved resources is needed and should be reflected here.
1110
        return self._resource_physical_resource_id_from(
1✔
1111
            logical_resource_id=resource_logical_id,
1112
            resolved_resources=self._before_resolved_resources,
1113
        )
1114

1115
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1116
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1117

1118
    def visit_node_intrinsic_function_ref(
1✔
1119
        self, node_intrinsic_function: NodeIntrinsicFunction
1120
    ) -> PreprocEntityDelta:
1121
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1122
            if logical_id == "AWS::NoValue":
1✔
1123
                return Nothing
1✔
1124

1125
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1126
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1127
                reference_delta.before = before.physical_resource_id
1✔
1128
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1129
                reference_delta.after = after.physical_resource_id
1✔
1130
            return reference_delta
1✔
1131

1132
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1133
        delta = self._cached_apply(
1✔
1134
            scope=node_intrinsic_function.scope,
1135
            arguments_delta=arguments_delta,
1136
            resolver=_compute_fn_ref,
1137
        )
1138
        return delta
1✔
1139

1140
    def visit_node_intrinsic_function_condition(
1✔
1141
        self, node_intrinsic_function: NodeIntrinsicFunction
1142
    ) -> PreprocEntityDelta:
1143
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1144

1145
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1146
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1147
            if is_nothing(node_condition):
1✔
1148
                raise RuntimeError(f"Undefined condition '{name}'")
×
1149
            condition_delta = self.visit(node_condition)
1✔
1150
            return condition_delta
1✔
1151

1152
        delta = self._cached_apply(
1✔
1153
            resolver=_delta_of_condition,
1154
            scope=node_intrinsic_function.scope,
1155
            arguments_delta=arguments_delta,
1156
        )
1157
        return delta
1✔
1158

1159
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1160
        node_change_type = node_array.change_type
1✔
1161
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1162
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1163
        for change_set_entity in node_array.array:
1✔
1164
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1165
            delta_before = delta.before
1✔
1166
            delta_after = delta.after
1✔
1167
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1168
                before.append(delta_before)
1✔
1169
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1170
                after.append(delta_after)
1✔
1171
        return PreprocEntityDelta(before=before, after=after)
1✔
1172

1173
    def visit_node_properties(
1✔
1174
        self, node_properties: NodeProperties
1175
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1176
        node_change_type = node_properties.change_type
1✔
1177
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1178
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1179
        for node_property in node_properties.properties:
1✔
1180
            property_name = node_property.name
1✔
1181
            delta = self.visit(node_property)
1✔
1182
            delta_before = delta.before
1✔
1183
            delta_after = delta.after
1✔
1184
            if (
1✔
1185
                not is_nothing(before_bindings)
1186
                and not is_nothing(delta_before)
1187
                and delta_before is not None
1188
            ):
1189
                before_bindings[property_name] = delta_before
1✔
1190
            if (
1✔
1191
                not is_nothing(after_bindings)
1192
                and not is_nothing(delta_after)
1193
                and delta_after is not None
1194
            ):
1195
                after_bindings[property_name] = delta_after
1✔
1196
        before = Nothing
1✔
1197
        if not is_nothing(before_bindings):
1✔
1198
            before = PreprocProperties(properties=before_bindings)
1✔
1199
        after = Nothing
1✔
1200
        if not is_nothing(after_bindings):
1✔
1201
            after = PreprocProperties(properties=after_bindings)
1✔
1202
        return PreprocEntityDelta(before=before, after=after)
1✔
1203

1204
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1205
        reference_delta = self.visit(reference)
1✔
1206
        before_reference = reference_delta.before
1✔
1207
        before = Nothing
1✔
1208
        if isinstance(before_reference, str):
1✔
1209
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1210
            before = before_delta.before
1✔
1211
        after = Nothing
1✔
1212
        after_reference = reference_delta.after
1✔
1213
        if isinstance(after_reference, str):
1✔
1214
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1215
            after = after_delta.after
1✔
1216
        return PreprocEntityDelta(before=before, after=after)
1✔
1217

1218
    def visit_node_resources(self, node_resources: NodeResources):
1✔
1219
        """
1220
        Skip resources where they conditionally evaluate to False
1221
        """
1222
        for node_resource in node_resources.resources:
1✔
1223
            if not is_nothing(node_resource.condition_reference):
1✔
1224
                condition_delta = self._resolve_resource_condition_reference(
1✔
1225
                    node_resource.condition_reference
1226
                )
1227
                condition_after = condition_delta.after
1✔
1228
                if condition_after is False:
1✔
1229
                    continue
1✔
1230
            self.visit(node_resource)
1✔
1231

1232
    def visit_node_resource(
1✔
1233
        self, node_resource: NodeResource
1234
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1235
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1236
            raise ValidationError(
1✔
1237
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1238
            )
1239
        change_type = node_resource.change_type
1✔
1240
        condition_before = Nothing
1✔
1241
        condition_after = Nothing
1✔
1242
        if not is_nothing(node_resource.condition_reference):
1✔
1243
            condition_delta = self._resolve_resource_condition_reference(
1✔
1244
                node_resource.condition_reference
1245
            )
1246
            condition_before = condition_delta.before
1✔
1247
            condition_after = condition_delta.after
1✔
1248

1249
        depends_on_before = Nothing
1✔
1250
        depends_on_after = Nothing
1✔
1251
        if not is_nothing(node_resource.depends_on):
1✔
1252
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1253
            depends_on_before = depends_on_delta.before
1✔
1254
            depends_on_after = depends_on_delta.after
1✔
1255

1256
        type_delta = self.visit(node_resource.type_)
1✔
1257
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1258
            node_resource.properties
1259
        )
1260

1261
        before = Nothing
1✔
1262
        after = Nothing
1✔
1263
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1264
            logical_resource_id = node_resource.name
1✔
1265
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1266
                resource_logical_id=logical_resource_id
1267
            )
1268
            before = PreprocResource(
1✔
1269
                logical_id=logical_resource_id,
1270
                physical_resource_id=before_physical_resource_id,
1271
                condition=condition_before,
1272
                resource_type=type_delta.before,
1273
                properties=properties_delta.before,
1274
                depends_on=depends_on_before,
1275
                requires_replacement=False,
1276
            )
1277
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1278
            logical_resource_id = node_resource.name
1✔
1279
            try:
1✔
1280
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1281
                    resource_logical_id=logical_resource_id
1282
                )
1283
            except RuntimeError:
×
1284
                after_physical_resource_id = None
×
1285
            after = PreprocResource(
1✔
1286
                logical_id=logical_resource_id,
1287
                physical_resource_id=after_physical_resource_id,
1288
                condition=condition_after,
1289
                resource_type=type_delta.after,
1290
                properties=properties_delta.after,
1291
                depends_on=depends_on_after,
1292
                requires_replacement=node_resource.requires_replacement,
1293
            )
1294
        return PreprocEntityDelta(before=before, after=after)
1✔
1295

1296
    def visit_node_output(
1✔
1297
        self, node_output: NodeOutput
1298
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1299
        change_type = node_output.change_type
1✔
1300
        value_delta = self.visit(node_output.value)
1✔
1301

1302
        condition_delta = Nothing
1✔
1303
        if not is_nothing(node_output.condition_reference):
1✔
1304
            condition_delta = self._resolve_resource_condition_reference(
1✔
1305
                node_output.condition_reference
1306
            )
1307
            condition_before = condition_delta.before
1✔
1308
            condition_after = condition_delta.after
1✔
1309
            if not condition_before and condition_after:
1✔
1310
                change_type = ChangeType.CREATED
1✔
1311
            elif condition_before and not condition_after:
1✔
1312
                change_type = ChangeType.REMOVED
1✔
1313

1314
        export_delta = Nothing
1✔
1315
        if not is_nothing(node_output.export):
1✔
1316
            export_delta = self.visit(node_output.export)
1✔
1317

1318
        before: Maybe[PreprocOutput] = Nothing
1✔
1319
        if change_type != ChangeType.CREATED:
1✔
1320
            before = PreprocOutput(
1✔
1321
                name=node_output.name,
1322
                value=value_delta.before,
1323
                export=export_delta.before if export_delta else None,
1324
                condition=condition_delta.before if condition_delta else None,
1325
            )
1326
        after: Maybe[PreprocOutput] = Nothing
1✔
1327
        if change_type != ChangeType.REMOVED:
1✔
1328
            after = PreprocOutput(
1✔
1329
                name=node_output.name,
1330
                value=value_delta.after,
1331
                export=export_delta.after if export_delta else None,
1332
                condition=condition_delta.after if condition_delta else None,
1333
            )
1334
        return PreprocEntityDelta(before=before, after=after)
1✔
1335

1336
    def visit_node_outputs(
1✔
1337
        self, node_outputs: NodeOutputs
1338
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1339
        before: list[PreprocOutput] = []
1✔
1340
        after: list[PreprocOutput] = []
1✔
1341
        for node_output in node_outputs.outputs:
1✔
1342
            if not is_nothing(node_output.condition_reference):
1✔
1343
                condition_delta = self._resolve_resource_condition_reference(
1✔
1344
                    node_output.condition_reference
1345
                )
1346
                condition_after = condition_delta.after
1✔
1347
                if condition_after is False:
1✔
1348
                    continue
1✔
1349

1350
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1351
            output_before = output_delta.before
1✔
1352
            output_after = output_delta.after
1✔
1353
            if not is_nothing(output_before):
1✔
1354
                before.append(output_before)
1✔
1355
            if not is_nothing(output_after):
1✔
1356
                after.append(output_after)
1✔
1357
        return PreprocEntityDelta(before=before, after=after)
1✔
1358

1359
    def visit_node_intrinsic_function_fn_import_value(
1✔
1360
        self, node_intrinsic_function: NodeIntrinsicFunction
1361
    ) -> PreprocEntityDelta:
1362
        def _compute_fn_import_value(string) -> str:
1✔
1363
            if not isinstance(string, str):
1✔
1364
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1365

1366
            exports = exports_map(
1✔
1367
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1368
            )
1369

1370
            return exports.get(string, {}).get("Value") or Nothing
1✔
1371

1372
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1373
        delta = self._cached_apply(
1✔
1374
            scope=node_intrinsic_function.scope,
1375
            arguments_delta=arguments_delta,
1376
            resolver=_compute_fn_import_value,
1377
        )
1378
        return delta
1✔
1379

1380
    def visit_node_intrinsic_function_fn_transform(
1✔
1381
        self, node_intrinsic_function: NodeIntrinsicFunction
1382
    ):
1383
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc