• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17784817164

16 Sep 2025 03:11PM UTC coverage: 86.879%. Remained the same
17784817164

push

github

localstack-bot
prepare next development iteration

67198 of 77347 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.32
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeTemplate,
34
    Nothing,
35
    NothingType,
36
    Scope,
37
    TerminalValue,
38
    TerminalValueCreated,
39
    TerminalValueModified,
40
    TerminalValueRemoved,
41
    TerminalValueUnchanged,
42
    is_nothing,
43
)
44
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
45
    ChangeSetModelVisitor,
46
)
47
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
48
    extract_dynamic_reference,
49
    perform_dynamic_reference_lookup,
50
)
51
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
52
from localstack.services.cloudformation.stores import (
1✔
53
    exports_map,
54
)
55
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
56
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
57
from localstack.utils.aws.arns import get_partition
1✔
58
from localstack.utils.objects import get_value_from_path
1✔
59
from localstack.utils.run import to_str
1✔
60
from localstack.utils.strings import to_bytes
1✔
61
from localstack.utils.urls import localstack_host
1✔
62

63
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
64

65
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
66
    "AWS::Partition",
67
    "AWS::AccountId",
68
    "AWS::Region",
69
    "AWS::StackName",
70
    "AWS::StackId",
71
    "AWS::URLSuffix",
72
    "AWS::NoValue",
73
    "AWS::NotificationARNs",
74
}
75

76
TBefore = TypeVar("TBefore")
1✔
77
TAfter = TypeVar("TAfter")
1✔
78
_T = TypeVar("_T")
1✔
79

80
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
81
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
82
)
83
MOCKED_REFERENCE = "unknown"
1✔
84

85
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
86

87

88
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
89
    before: Maybe[TBefore]
1✔
90
    after: Maybe[TAfter]
1✔
91

92
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
93
        self.before = before
1✔
94
        self.after = after
1✔
95

96
    def __eq__(self, other):
1✔
97
        if not isinstance(other, PreprocEntityDelta):
×
98
            return False
×
99
        return self.before == other.before and self.after == other.after
×
100

101

102
class PreprocProperties:
1✔
103
    properties: dict[str, Any]
1✔
104

105
    def __init__(self, properties: dict[str, Any]):
1✔
106
        self.properties = properties
1✔
107

108
    def __eq__(self, other):
1✔
109
        if not isinstance(other, PreprocProperties):
1✔
110
            return False
×
111
        return self.properties == other.properties
1✔
112

113

114
class PreprocResource:
1✔
115
    logical_id: str
1✔
116
    physical_resource_id: str | None
1✔
117
    condition: bool | None
1✔
118
    resource_type: str
1✔
119
    properties: PreprocProperties
1✔
120
    depends_on: list[str] | None
1✔
121
    requires_replacement: bool
1✔
122
    status: ResourceStatus | None
1✔
123

124
    def __init__(
1✔
125
        self,
126
        logical_id: str,
127
        physical_resource_id: str,
128
        condition: bool | None,
129
        resource_type: str,
130
        properties: PreprocProperties,
131
        depends_on: list[str] | None,
132
        requires_replacement: bool,
133
        status: ResourceStatus | None = None,
134
    ):
135
        self.logical_id = logical_id
1✔
136
        self.physical_resource_id = physical_resource_id
1✔
137
        self.condition = condition
1✔
138
        self.resource_type = resource_type
1✔
139
        self.properties = properties
1✔
140
        self.depends_on = depends_on
1✔
141
        self.requires_replacement = requires_replacement
1✔
142
        self.status = status
1✔
143

144
    @staticmethod
1✔
145
    def _compare_conditions(c1: bool, c2: bool):
1✔
146
        # The lack of condition equates to a true condition.
147
        c1 = c1 if isinstance(c1, bool) else True
1✔
148
        c2 = c2 if isinstance(c2, bool) else True
1✔
149
        return c1 == c2
1✔
150

151
    def __eq__(self, other):
1✔
152
        if not isinstance(other, PreprocResource):
1✔
153
            return False
1✔
154
        return all(
1✔
155
            [
156
                self.logical_id == other.logical_id,
157
                self._compare_conditions(self.condition, other.condition),
158
                self.resource_type == other.resource_type,
159
                self.properties == other.properties,
160
            ]
161
        )
162

163

164
class PreprocOutput:
1✔
165
    name: str
1✔
166
    value: Any
1✔
167
    export: Any | None
1✔
168
    condition: bool | None
1✔
169

170
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
171
        self.name = name
1✔
172
        self.value = value
1✔
173
        self.export = export
1✔
174
        self.condition = condition
1✔
175

176
    def __eq__(self, other):
1✔
177
        if not isinstance(other, PreprocOutput):
×
178
            return False
×
179
        return all(
×
180
            [
181
                self.name == other.name,
182
                self.value == other.value,
183
                self.export == other.export,
184
                self.condition == other.condition,
185
            ]
186
        )
187

188

189
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
190
    _change_set: Final[ChangeSet]
1✔
191
    _before_resolved_resources: Final[dict]
1✔
192
    _before_cache: Final[dict[Scope, Any]]
1✔
193
    _after_cache: Final[dict[Scope, Any]]
1✔
194

195
    def __init__(self, change_set: ChangeSet):
1✔
196
        self._change_set = change_set
1✔
197
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
198
        self._before_cache = {}
1✔
199
        self._after_cache = {}
1✔
200

201
    def _setup_runtime_cache(self) -> None:
1✔
202
        runtime_cache_key = self.__class__.__name__
1✔
203

204
        self._before_cache.clear()
1✔
205
        self._after_cache.clear()
1✔
206

207
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
208
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
209
            self._before_cache.update(cache)
1✔
210

211
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
212
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
213
            self._after_cache.update(cache)
×
214

215
    def _save_runtime_cache(self) -> None:
1✔
216
        runtime_cache_key = self.__class__.__name__
1✔
217

218
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
219
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
220

221
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
222
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
223

224
    def process(self) -> None:
1✔
225
        self._setup_runtime_cache()
1✔
226
        node_template = self._change_set.update_model.node_template
1✔
227
        self.visit(node_template)
1✔
228
        self._save_runtime_cache()
1✔
229

230
    def _get_node_resource_for(
1✔
231
        self, resource_name: str, node_template: NodeTemplate
232
    ) -> NodeResource:
233
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
234
        for node_resource in node_template.resources.resources:
1✔
235
            if node_resource.name == resource_name:
1✔
236
                self.visit(node_resource)
1✔
237
                return node_resource
1✔
238
        raise ValidationError(
1✔
239
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
240
        )
241

242
    def _get_node_property_for(
1✔
243
        self, property_name: str, node_resource: NodeResource
244
    ) -> NodeProperty | None:
245
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
246
        for node_property in node_resource.properties.properties:
1✔
247
            if node_property.name == property_name:
1✔
248
                self.visit(node_property)
1✔
249
                return node_property
1✔
250
        return None
1✔
251

252
    def _deployed_property_value_of(
1✔
253
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
254
    ) -> Any:
255
        # We have to override this function to make sure it does not try to access the
256
        # resolved resource
257

258
        # Before we can obtain deployed value for a resource, we need to first ensure to
259
        # process the resource if this wasn't processed already. Ideally, values should only
260
        # be accessible through delta objects, to ensure computation is always complete at
261
        # every level.
262
        _ = self._get_node_resource_for(
1✔
263
            resource_name=resource_logical_id,
264
            node_template=self._change_set.update_model.node_template,
265
        )
266
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
267
        if resolved_resource is None:
1✔
268
            raise RuntimeError(
1✔
269
                f"No deployed instances of resource '{resource_logical_id}' were found"
270
            )
271
        properties = resolved_resource.get("Properties", {})
1✔
272
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
273
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
274

275
        if property_value:
1✔
276
            if not isinstance(property_value, (str, list)):
1✔
277
                # TODO: is this correct? If there is a bug in the logic here, it's probably
278
                #  better to know about it with a clear error message than to receive some form
279
                #  of message about trying to use a dictionary in place of a string
280
                raise RuntimeError(
×
281
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
282
                )
283
            return property_value
1✔
284
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
285
            return MOCKED_REFERENCE
1✔
286

287
        return property_value
×
288

289
    def _before_deployed_property_value_of(
1✔
290
        self, resource_logical_id: str, property_name: str
291
    ) -> Any:
292
        return self._deployed_property_value_of(
1✔
293
            resource_logical_id=resource_logical_id,
294
            property_name=property_name,
295
            resolved_resources=self._before_resolved_resources,
296
        )
297

298
    def _after_deployed_property_value_of(
1✔
299
        self, resource_logical_id: str, property_name: str
300
    ) -> str | None:
301
        return self._before_deployed_property_value_of(
1✔
302
            resource_logical_id=resource_logical_id, property_name=property_name
303
        )
304

305
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
306
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
307
        # TODO: another scenarios suggesting property lookups might be preferable.
308
        for mapping in mappings:
1✔
309
            if mapping.name == map_name:
1✔
310
                self.visit(mapping)
1✔
311
                return mapping
1✔
312
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
313

314
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
315
        parameters: list[NodeParameter] = (
1✔
316
            self._change_set.update_model.node_template.parameters.parameters
317
        )
318
        # TODO: another scenarios suggesting property lookups might be preferable.
319
        for parameter in parameters:
1✔
320
            if parameter.name == parameter_name:
1✔
321
                self.visit(parameter)
1✔
322
                return parameter
1✔
323
        return Nothing
1✔
324

325
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
326
        conditions: list[NodeCondition] = (
1✔
327
            self._change_set.update_model.node_template.conditions.conditions
328
        )
329
        # TODO: another scenarios suggesting property lookups might be preferable.
330
        for condition in conditions:
1✔
331
            if condition.name == condition_name:
1✔
332
                self.visit(condition)
1✔
333
                return condition
1✔
334
        return Nothing
×
335

336
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
337
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
338
        if isinstance(node_condition, NodeCondition):
1✔
339
            condition_delta = self.visit(node_condition)
1✔
340
            return condition_delta
1✔
341
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
342

343
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
344
        match pseudo_parameter_name:
1✔
345
            case "AWS::Partition":
1✔
346
                return get_partition(self._change_set.region_name)
1✔
347
            case "AWS::AccountId":
1✔
348
                return self._change_set.stack.account_id
1✔
349
            case "AWS::Region":
1✔
350
                return self._change_set.stack.region_name
1✔
351
            case "AWS::StackName":
1✔
352
                return self._change_set.stack.stack_name
1✔
353
            case "AWS::StackId":
1✔
354
                return self._change_set.stack.stack_id
1✔
355
            case "AWS::URLSuffix":
1✔
356
                return _AWS_URL_SUFFIX
1✔
357
            case "AWS::NoValue":
×
358
                return None
×
359
            case _:
×
360
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
361

362
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
363
        if logical_id in _PSEUDO_PARAMETERS:
1✔
364
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
365
                pseudo_parameter_name=logical_id
366
            )
367
            # Pseudo parameters are constants within the lifecycle of a template.
368
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
369

370
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
371
        if isinstance(node_parameter, NodeParameter):
1✔
372
            parameter_delta = self.visit(node_parameter)
1✔
373
            return parameter_delta
1✔
374

375
        node_resource = self._get_node_resource_for(
1✔
376
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
377
        )
378
        resource_delta = self.visit(node_resource)
1✔
379
        before = resource_delta.before
1✔
380
        after = resource_delta.after
1✔
381
        return PreprocEntityDelta(before=before, after=after)
1✔
382

383
    def _resolve_mapping(
1✔
384
        self, map_name: str, top_level_key: str, second_level_key
385
    ) -> PreprocEntityDelta:
386
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
387
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
388
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
389
        if not isinstance(top_level_value, NodeObject):
1✔
390
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
391
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
392
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
393
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
394
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
395
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
396
        mapping_value_delta = self.visit(second_level_value)
1✔
397
        return mapping_value_delta
1✔
398

399
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
400
        entity_scope = change_set_entity.scope
1✔
401
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
402
            before = self._before_cache[entity_scope]
1✔
403
            after = self._after_cache[entity_scope]
1✔
404
            return PreprocEntityDelta(before=before, after=after)
1✔
405
        delta = super().visit(change_set_entity=change_set_entity)
1✔
406
        if isinstance(delta, PreprocEntityDelta):
1✔
407
            delta = self._maybe_perform_replacements(delta)
1✔
408
            self._before_cache[entity_scope] = delta.before
1✔
409
            self._after_cache[entity_scope] = delta.after
1✔
410
        return delta
1✔
411

412
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
413
        delta = self._maybe_perform_static_replacements(delta)
1✔
414
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
415
        return delta
1✔
416

417
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
418
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
419

420
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
421
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
422

423
    def _maybe_perform_on_delta(
1✔
424
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
425
    ) -> PreprocEntityDelta | None:
426
        if isinstance(delta.before, str):
1✔
427
            delta.before = f(delta.before)
1✔
428
        if isinstance(delta.after, str):
1✔
429
            delta.after = f(delta.after)
1✔
430
        return delta
1✔
431

432
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
433
        if not isinstance(value, str):
1✔
434
            return value
×
435
        if dynamic_ref := extract_dynamic_reference(value):
1✔
436
            new_value = perform_dynamic_reference_lookup(
1✔
437
                reference=dynamic_ref,
438
                account_id=self._change_set.account_id,
439
                region_name=self._change_set.region_name,
440
            )
441
            if new_value:
1✔
442
                return new_value
1✔
443

444
        return value
1✔
445

446
    @staticmethod
1✔
447
    def _perform_static_replacements(value: str) -> str:
1✔
448
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
449
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
450
            prefix = api_match[1]
1✔
451
            host = api_match[2]
1✔
452
            path = api_match[3]
1✔
453
            port = localstack_host().port
1✔
454
            value = f"{prefix}{host}:{port}/{path}"
1✔
455
            return value
1✔
456

457
        return value
1✔
458

459
    def _cached_apply(
1✔
460
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
461
    ) -> PreprocEntityDelta:
462
        """
463
        Applies the resolver function to the given input delta if and only if the required
464
        values are not already present in the runtime caches. This function handles both
465
        the 'before' and 'after' components of the delta independently.
466

467
        The resolver function receives either the 'before' or 'after' value from the input
468
        delta and returns a resolved value. If the result returned by the resolver is
469
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
470
        component from it:  the 'before' value if the input was 'before', and the 'after'
471
        value if the input was 'after'.
472

473
        This function only reads from the cache and does not update it. It is the caller's
474
        responsibility to handle caching, either manually or via the upstream visit method
475
        of this class.
476

477
        Args:
478
            scope (Scope): The current scope used as a key for cache lookup.
479
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
480
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
481

482
        Returns:
483
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
484
        """
485

486
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
487
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
488
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
489

490
        arguments_before = arguments_delta.before
1✔
491
        arguments_after = arguments_delta.after
1✔
492

493
        before = self._before_cache.get(scope, Nothing)
1✔
494
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
495
            before = resolver(arguments_before)
1✔
496
            if isinstance(before, PreprocEntityDelta):
1✔
497
                before = before.before
1✔
498

499
        after = self._after_cache.get(scope, Nothing)
1✔
500
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
501
            after = resolver(arguments_after)
1✔
502
            if isinstance(after, PreprocEntityDelta):
1✔
503
                after = after.after
1✔
504

505
        return PreprocEntityDelta(before=before, after=after)
1✔
506

507
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
508
        return self.visit(node_property.value)
1✔
509

510
    def visit_terminal_value_modified(
1✔
511
        self, terminal_value_modified: TerminalValueModified
512
    ) -> PreprocEntityDelta:
513
        return PreprocEntityDelta(
1✔
514
            before=terminal_value_modified.value,
515
            after=terminal_value_modified.modified_value,
516
        )
517

518
    def visit_terminal_value_created(
1✔
519
        self, terminal_value_created: TerminalValueCreated
520
    ) -> PreprocEntityDelta:
521
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
522

523
    def visit_terminal_value_removed(
1✔
524
        self, terminal_value_removed: TerminalValueRemoved
525
    ) -> PreprocEntityDelta:
526
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
527

528
    def visit_terminal_value_unchanged(
1✔
529
        self, terminal_value_unchanged: TerminalValueUnchanged
530
    ) -> PreprocEntityDelta:
531
        return PreprocEntityDelta(
1✔
532
            before=terminal_value_unchanged.value,
533
            after=terminal_value_unchanged.value,
534
        )
535

536
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
537
        before_delta = self.visit(node_divergence.value)
1✔
538
        after_delta = self.visit(node_divergence.divergence)
1✔
539
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
540

541
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
542
        node_change_type = node_object.change_type
1✔
543
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
544
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
545
        for name, change_set_entity in node_object.bindings.items():
1✔
546
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
547
            delta_before = delta.before
1✔
548
            delta_after = delta.after
1✔
549
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
550
                before[name] = delta_before
1✔
551
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
552
                after[name] = delta_after
1✔
553
        return PreprocEntityDelta(before=before, after=after)
1✔
554

555
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
556
        # TODO: add arguments validation.
557
        arguments_list: list[str]
558
        if isinstance(arguments, str):
1✔
559
            arguments_list = arguments.split(".")
1✔
560
        else:
561
            arguments_list = arguments
1✔
562
        logical_name_of_resource = arguments_list[0]
1✔
563
        attribute_name = arguments_list[1]
1✔
564

565
        node_resource = self._get_node_resource_for(
1✔
566
            resource_name=logical_name_of_resource,
567
            node_template=self._change_set.update_model.node_template,
568
        )
569
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
570
            property_name=attribute_name, node_resource=node_resource
571
        )
572
        if node_property is not None:
1✔
573
            # The property is statically defined in the template and its value can be computed.
574
            property_delta = self.visit(node_property)
1✔
575
            value = property_delta.before if select_before else property_delta.after
1✔
576
        else:
577
            # The property is not statically defined and must therefore be available in
578
            # the properties deployed set.
579
            if select_before:
1✔
580
                value = self._before_deployed_property_value_of(
1✔
581
                    resource_logical_id=logical_name_of_resource,
582
                    property_name=attribute_name,
583
                )
584
            else:
585
                value = self._after_deployed_property_value_of(
1✔
586
                    resource_logical_id=logical_name_of_resource,
587
                    property_name=attribute_name,
588
                )
589
        return value
1✔
590

591
    def visit_node_intrinsic_function_fn_get_att(
1✔
592
        self, node_intrinsic_function: NodeIntrinsicFunction
593
    ) -> PreprocEntityDelta:
594
        # TODO: validate the return value according to the spec.
595
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
596
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
597
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
598

599
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
600
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
601
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
602

603
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
604
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
605
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
606

607
        return PreprocEntityDelta(before=before, after=after)
1✔
608

609
    def visit_node_intrinsic_function_fn_equals(
1✔
610
        self, node_intrinsic_function: NodeIntrinsicFunction
611
    ) -> PreprocEntityDelta:
612
        # TODO: add argument shape validation.
613
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
614
            return args[0] == args[1]
1✔
615

616
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
617
        delta = self._cached_apply(
1✔
618
            scope=node_intrinsic_function.scope,
619
            arguments_delta=arguments_delta,
620
            resolver=_compute_fn_equals,
621
        )
622
        return delta
1✔
623

624
    def visit_node_intrinsic_function_fn_if(
1✔
625
        self, node_intrinsic_function: NodeIntrinsicFunction
626
    ) -> PreprocEntityDelta:
627
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
628
        # False branch. If the condition is False, we don't evaluate the True branch.
629
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
630
            raise ValueError(
×
631
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
632
            )
633

634
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
635
        if_delta = PreprocEntityDelta()
1✔
636
        if not is_nothing(condition_delta.before):
1✔
637
            node_condition = self._get_node_condition_if_exists(
1✔
638
                condition_name=condition_delta.before
639
            )
640
            condition_value = self.visit(node_condition).before
1✔
641
            if condition_value:
1✔
642
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
643
            else:
644
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
645
            if_delta.before = arg_delta.before
1✔
646

647
        if not is_nothing(condition_delta.after):
1✔
648
            node_condition = self._get_node_condition_if_exists(
1✔
649
                condition_name=condition_delta.after
650
            )
651
            condition_value = self.visit(node_condition).after
1✔
652
            if condition_value:
1✔
653
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
654
            else:
655
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
656
            if_delta.after = arg_delta.after
1✔
657

658
        return if_delta
1✔
659

660
    def visit_node_intrinsic_function_fn_and(
1✔
661
        self, node_intrinsic_function: NodeIntrinsicFunction
662
    ) -> PreprocEntityDelta:
663
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
664
            result = all(args)
1✔
665
            return result
1✔
666

667
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
668
        delta = self._cached_apply(
1✔
669
            scope=node_intrinsic_function.scope,
670
            arguments_delta=arguments_delta,
671
            resolver=_compute_fn_and,
672
        )
673
        return delta
1✔
674

675
    def visit_node_intrinsic_function_fn_or(
1✔
676
        self, node_intrinsic_function: NodeIntrinsicFunction
677
    ) -> PreprocEntityDelta:
678
        def _compute_fn_or(args: list[bool]):
1✔
679
            result = any(args)
1✔
680
            return result
1✔
681

682
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
683
        delta = self._cached_apply(
1✔
684
            scope=node_intrinsic_function.scope,
685
            arguments_delta=arguments_delta,
686
            resolver=_compute_fn_or,
687
        )
688
        return delta
1✔
689

690
    def visit_node_intrinsic_function_fn_not(
1✔
691
        self, node_intrinsic_function: NodeIntrinsicFunction
692
    ) -> PreprocEntityDelta:
693
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
694
            # Is the argument ever a lone boolean?
695
            if isinstance(arg, list):
1✔
696
                return not arg[0]
1✔
697
            else:
698
                return not arg
×
699

700
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
701
        delta = self._cached_apply(
1✔
702
            scope=node_intrinsic_function.scope,
703
            arguments_delta=arguments_delta,
704
            resolver=_compute_fn_not,
705
        )
706
        return delta
1✔
707

708
    def visit_node_intrinsic_function_fn_sub(
1✔
709
        self, node_intrinsic_function: NodeIntrinsicFunction
710
    ) -> PreprocEntityDelta:
711
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
712
            # TODO: add further schema validation.
713
            string_template: str
714
            sub_parameters: dict
715
            if isinstance(args, str):
1✔
716
                string_template = args
1✔
717
                sub_parameters = {}
1✔
718
            elif (
1✔
719
                isinstance(args, list)
720
                and len(args) == 2
721
                and isinstance(args[0], str)
722
                and isinstance(args[1], dict)
723
            ):
724
                string_template = args[0]
1✔
725
                sub_parameters = args[1]
1✔
726
            else:
727
                raise RuntimeError(
×
728
                    "Invalid arguments shape for Fn::Sub, expected a String "
729
                    f"or a Tuple of String and Map but got '{args}'"
730
                )
731
            sub_string = string_template
1✔
732
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
733
            for template_variable_name in template_variable_names:
1✔
734
                template_variable_value = Nothing
1✔
735

736
                # Try to resolve the variable name as pseudo parameter.
737
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
738
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
739
                        pseudo_parameter_name=template_variable_name
740
                    )
741

742
                # Try to resolve the variable name as an entry to the defined parameters.
743
                elif template_variable_name in sub_parameters:
1✔
744
                    template_variable_value = sub_parameters[template_variable_name]
1✔
745

746
                # Try to resolve the variable name as GetAtt.
747
                elif "." in template_variable_name:
1✔
748
                    try:
1✔
749
                        template_variable_value = self._resolve_attribute(
1✔
750
                            arguments=template_variable_name, select_before=select_before
751
                        )
752
                    except RuntimeError:
1✔
753
                        pass
1✔
754

755
                # Try to resolve the variable name as Ref.
756
                else:
757
                    try:
1✔
758
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
759
                        template_variable_value = (
1✔
760
                            resource_delta.before if select_before else resource_delta.after
761
                        )
762
                        if isinstance(template_variable_value, PreprocResource):
1✔
763
                            template_variable_value = template_variable_value.physical_resource_id
1✔
764
                    except RuntimeError:
×
765
                        pass
×
766

767
                if is_nothing(template_variable_value):
1✔
768
                    raise RuntimeError(
1✔
769
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
770
                    )
771

772
                if not isinstance(template_variable_value, str):
1✔
773
                    template_variable_value = str(template_variable_value)
1✔
774

775
                sub_string = sub_string.replace(
1✔
776
                    f"${{{template_variable_name}}}", template_variable_value
777
                )
778

779
            # FIXME: the following type reduction is ported from v1; however it appears as though such
780
            #        reduction is not performed by the engine, and certainly not at this depth given the
781
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
782
            #        and the resource providers reviewed.
783
            account_id = self._change_set.account_id
1✔
784
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
785
            if sub_string == account_id or is_another_account_id:
1✔
786
                result = sub_string
1✔
787
            elif sub_string.isdigit():
1✔
788
                result = int(sub_string)
1✔
789
            else:
790
                try:
1✔
791
                    result = float(sub_string)
1✔
792
                except ValueError:
1✔
793
                    result = sub_string
1✔
794
            return result
1✔
795

796
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
797
        arguments_before = arguments_delta.before
1✔
798
        arguments_after = arguments_delta.after
1✔
799
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
800
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
801
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
802
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
803
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
804
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
805
        return PreprocEntityDelta(before=before, after=after)
1✔
806

807
    def visit_node_intrinsic_function_fn_join(
1✔
808
        self, node_intrinsic_function: NodeIntrinsicFunction
809
    ) -> PreprocEntityDelta:
810
        # TODO: add support for schema validation.
811
        # TODO: add tests for joining non string values.
812
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
813
            if not (isinstance(args, list) and len(args) == 2):
1✔
814
                return Nothing
1✔
815
            delimiter: str = str(args[0])
1✔
816
            values: list[Any] = args[1]
1✔
817
            if not isinstance(values, list):
1✔
818
                # shortcut if values is the empty string, for example:
819
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
820
                # CDK bootstrap does this
821
                if values == "":
1✔
822
                    return ""
1✔
823
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
824
            str_values: list[str] = []
1✔
825
            for value in values:
1✔
826
                if value is None:
1✔
827
                    continue
1✔
828
                str_value = str(value)
1✔
829
                str_values.append(str_value)
1✔
830
            join_result = delimiter.join(str_values)
1✔
831
            return join_result
1✔
832

833
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
834
        delta = self._cached_apply(
1✔
835
            scope=node_intrinsic_function.scope,
836
            arguments_delta=arguments_delta,
837
            resolver=_compute_fn_join,
838
        )
839
        return delta
1✔
840

841
    def visit_node_intrinsic_function_fn_select(
1✔
842
        self, node_intrinsic_function: NodeIntrinsicFunction
843
    ):
844
        # TODO: add further support for schema validation
845
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
846
            values = args[1]
1✔
847
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
848
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
849
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
850

851
            if not isinstance(values, list) or not values:
1✔
852
                raise ValidationError(
1✔
853
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
854
                )
855
            try:
1✔
856
                index: int = int(args[0])
1✔
857
            except ValueError as e:
1✔
858
                raise ValidationError(
1✔
859
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
860
                ) from e
861

862
            values_len = len(values)
1✔
863
            if index < 0 or index >= values_len:
1✔
864
                raise ValidationError(
1✔
865
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
866
                )
867
            selection = values[index]
1✔
868
            return selection
1✔
869

870
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
871
        delta = self._cached_apply(
1✔
872
            scope=node_intrinsic_function.scope,
873
            arguments_delta=arguments_delta,
874
            resolver=_compute_fn_select,
875
        )
876
        return delta
1✔
877

878
    def visit_node_intrinsic_function_fn_split(
1✔
879
        self, node_intrinsic_function: NodeIntrinsicFunction
880
    ):
881
        # TODO: add further support for schema validation
882
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
883
            delimiter = args[0]
1✔
884
            if not isinstance(delimiter, str) or not delimiter:
1✔
885
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
886
            source_string = args[1]
1✔
887
            if not isinstance(source_string, str):
1✔
888
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
889
            split_string = source_string.split(delimiter)
1✔
890
            return split_string
1✔
891

892
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
893
        delta = self._cached_apply(
1✔
894
            scope=node_intrinsic_function.scope,
895
            arguments_delta=arguments_delta,
896
            resolver=_compute_fn_split,
897
        )
898
        return delta
1✔
899

900
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
901
        self, node_intrinsic_function: NodeIntrinsicFunction
902
    ) -> PreprocEntityDelta:
903
        # TODO: add further support for schema validation
904

905
        def _compute_fn_get_a_zs(region) -> Any:
1✔
906
            if not isinstance(region, str):
1✔
907
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
908

909
            if not region:
1✔
910
                region = self._change_set.region_name
1✔
911

912
            account_id = self._change_set.account_id
1✔
913
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
914
            try:
1✔
915
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
916
                    ec2_client.describe_availability_zones()
917
                )
918
            except ClientError:
×
919
                raise RuntimeError(
×
920
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
921
                )
922
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
923
                "AvailabilityZones"
924
            ]
925
            azs = [az["ZoneName"] for az in availability_zones]
1✔
926
            return azs
1✔
927

928
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
929
        delta = self._cached_apply(
1✔
930
            scope=node_intrinsic_function.scope,
931
            arguments_delta=arguments_delta,
932
            resolver=_compute_fn_get_a_zs,
933
        )
934
        return delta
1✔
935

936
    def visit_node_intrinsic_function_fn_base64(
1✔
937
        self, node_intrinsic_function: NodeIntrinsicFunction
938
    ) -> PreprocEntityDelta:
939
        # TODO: add further support for schema validation
940
        def _compute_fn_base_64(string) -> Any:
1✔
941
            if not isinstance(string, str):
1✔
942
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
943
            # Ported from v1:
944
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
945
            return base64_string
1✔
946

947
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
948
        delta = self._cached_apply(
1✔
949
            scope=node_intrinsic_function.scope,
950
            arguments_delta=arguments_delta,
951
            resolver=_compute_fn_base_64,
952
        )
953
        return delta
1✔
954

955
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
956
        self, node_intrinsic_function: NodeIntrinsicFunction
957
    ) -> PreprocEntityDelta:
958
        # TODO: add type checking/validation for result unit?
959
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
960
        before_arguments = arguments_delta.before
1✔
961
        after_arguments = arguments_delta.after
1✔
962
        before = Nothing
1✔
963
        if before_arguments:
1✔
964
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
965
            before = before_value_delta.before
1✔
966
        after = Nothing
1✔
967
        if after_arguments:
1✔
968
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
969
            after = after_value_delta.after
1✔
970
        return PreprocEntityDelta(before=before, after=after)
1✔
971

972
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
973
        bindings_delta = self.visit(node_mapping.bindings)
1✔
974
        return bindings_delta
1✔
975

976
    def visit_node_parameters(
1✔
977
        self, node_parameters: NodeParameters
978
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
979
        before_parameters = {}
1✔
980
        after_parameters = {}
1✔
981
        for parameter in node_parameters.parameters:
1✔
982
            parameter_delta = self.visit(parameter)
1✔
983
            parameter_before = parameter_delta.before
1✔
984
            if not is_nothing(parameter_before):
1✔
985
                before_parameters[parameter.name] = parameter_before
1✔
986
            parameter_after = parameter_delta.after
1✔
987
            if not is_nothing(parameter_after):
1✔
988
                after_parameters[parameter.name] = parameter_after
1✔
989
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
990

991
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
992
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
993
            raise ValidationError(
1✔
994
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
995
            )
996
        dynamic_value = node_parameter.dynamic_value
1✔
997
        dynamic_delta = self.visit(dynamic_value)
1✔
998

999
        default_value = node_parameter.default_value
1✔
1000
        default_delta = self.visit(default_value)
1✔
1001

1002
        before = dynamic_delta.before or default_delta.before
1✔
1003
        after = dynamic_delta.after or default_delta.after
1✔
1004

1005
        parameter_type = self.visit(node_parameter.type_)
1✔
1006

1007
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1008
            match type_:
1✔
1009
                case "List<String>" | "CommaDelimitedList":
1✔
1010
                    return [item.strip() for item in value.split(",")]
1✔
1011
            return value
1✔
1012

1013
        if not is_nothing(after):
1✔
1014
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1015

1016
        return PreprocEntityDelta(before=before, after=after)
1✔
1017

1018
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1019
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1020
        return array_identifiers_delta
1✔
1021

1022
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1023
        delta = self.visit(node_condition.body)
1✔
1024
        return delta
1✔
1025

1026
    def _resource_physical_resource_id_from(
1✔
1027
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1028
    ) -> str | None:
1029
        # TODO: typing around resolved resources is needed and should be reflected here.
1030
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1031
        if resolved_resource.get("ResourceStatus") not in {
1✔
1032
            ResourceStatus.CREATE_COMPLETE,
1033
            ResourceStatus.UPDATE_COMPLETE,
1034
        }:
1035
            return None
1✔
1036

1037
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1038
        if not isinstance(physical_resource_id, str):
1✔
1039
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1040
        return physical_resource_id
1✔
1041

1042
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1043
        # TODO: typing around resolved resources is needed and should be reflected here.
1044
        return self._resource_physical_resource_id_from(
1✔
1045
            logical_resource_id=resource_logical_id,
1046
            resolved_resources=self._before_resolved_resources,
1047
        )
1048

1049
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1050
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1051

1052
    def visit_node_intrinsic_function_ref(
1✔
1053
        self, node_intrinsic_function: NodeIntrinsicFunction
1054
    ) -> PreprocEntityDelta:
1055
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1056
            if logical_id == "AWS::NoValue":
1✔
1057
                return Nothing
1✔
1058

1059
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1060
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1061
                reference_delta.before = before.physical_resource_id
1✔
1062
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1063
                reference_delta.after = after.physical_resource_id
1✔
1064
            return reference_delta
1✔
1065

1066
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1067
        delta = self._cached_apply(
1✔
1068
            scope=node_intrinsic_function.scope,
1069
            arguments_delta=arguments_delta,
1070
            resolver=_compute_fn_ref,
1071
        )
1072
        return delta
1✔
1073

1074
    def visit_node_intrinsic_function_condition(
1✔
1075
        self, node_intrinsic_function: NodeIntrinsicFunction
1076
    ) -> PreprocEntityDelta:
1077
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1078

1079
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1080
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1081
            if is_nothing(node_condition):
1✔
1082
                raise RuntimeError(f"Undefined condition '{name}'")
×
1083
            condition_delta = self.visit(node_condition)
1✔
1084
            return condition_delta
1✔
1085

1086
        delta = self._cached_apply(
1✔
1087
            resolver=_delta_of_condition,
1088
            scope=node_intrinsic_function.scope,
1089
            arguments_delta=arguments_delta,
1090
        )
1091
        return delta
1✔
1092

1093
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1094
        node_change_type = node_array.change_type
1✔
1095
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1096
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1097
        for change_set_entity in node_array.array:
1✔
1098
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1099
            delta_before = delta.before
1✔
1100
            delta_after = delta.after
1✔
1101
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1102
                before.append(delta_before)
1✔
1103
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1104
                after.append(delta_after)
1✔
1105
        return PreprocEntityDelta(before=before, after=after)
1✔
1106

1107
    def visit_node_properties(
1✔
1108
        self, node_properties: NodeProperties
1109
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1110
        node_change_type = node_properties.change_type
1✔
1111
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1112
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1113
        for node_property in node_properties.properties:
1✔
1114
            property_name = node_property.name
1✔
1115
            delta = self.visit(node_property)
1✔
1116
            delta_before = delta.before
1✔
1117
            delta_after = delta.after
1✔
1118
            if (
1✔
1119
                not is_nothing(before_bindings)
1120
                and not is_nothing(delta_before)
1121
                and delta_before is not None
1122
            ):
1123
                before_bindings[property_name] = delta_before
1✔
1124
            if (
1✔
1125
                not is_nothing(after_bindings)
1126
                and not is_nothing(delta_after)
1127
                and delta_after is not None
1128
            ):
1129
                after_bindings[property_name] = delta_after
1✔
1130
        before = Nothing
1✔
1131
        if not is_nothing(before_bindings):
1✔
1132
            before = PreprocProperties(properties=before_bindings)
1✔
1133
        after = Nothing
1✔
1134
        if not is_nothing(after_bindings):
1✔
1135
            after = PreprocProperties(properties=after_bindings)
1✔
1136
        return PreprocEntityDelta(before=before, after=after)
1✔
1137

1138
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1139
        reference_delta = self.visit(reference)
1✔
1140
        before_reference = reference_delta.before
1✔
1141
        before = Nothing
1✔
1142
        if isinstance(before_reference, str):
1✔
1143
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1144
            before = before_delta.before
1✔
1145
        after = Nothing
1✔
1146
        after_reference = reference_delta.after
1✔
1147
        if isinstance(after_reference, str):
1✔
1148
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1149
            after = after_delta.after
1✔
1150
        return PreprocEntityDelta(before=before, after=after)
1✔
1151

1152
    def visit_node_resource(
1✔
1153
        self, node_resource: NodeResource
1154
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1155
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1156
            raise ValidationError(
1✔
1157
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1158
            )
1159
        change_type = node_resource.change_type
1✔
1160
        condition_before = Nothing
1✔
1161
        condition_after = Nothing
1✔
1162
        if not is_nothing(node_resource.condition_reference):
1✔
1163
            condition_delta = self._resolve_resource_condition_reference(
1✔
1164
                node_resource.condition_reference
1165
            )
1166
            condition_before = condition_delta.before
1✔
1167
            condition_after = condition_delta.after
1✔
1168

1169
        depends_on_before = Nothing
1✔
1170
        depends_on_after = Nothing
1✔
1171
        if not is_nothing(node_resource.depends_on):
1✔
1172
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1173
            depends_on_before = depends_on_delta.before
1✔
1174
            depends_on_after = depends_on_delta.after
1✔
1175

1176
        type_delta = self.visit(node_resource.type_)
1✔
1177
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1178
            node_resource.properties
1179
        )
1180

1181
        before = Nothing
1✔
1182
        after = Nothing
1✔
1183
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1184
            logical_resource_id = node_resource.name
1✔
1185
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1186
                resource_logical_id=logical_resource_id
1187
            )
1188
            before = PreprocResource(
1✔
1189
                logical_id=logical_resource_id,
1190
                physical_resource_id=before_physical_resource_id,
1191
                condition=condition_before,
1192
                resource_type=type_delta.before,
1193
                properties=properties_delta.before,
1194
                depends_on=depends_on_before,
1195
                requires_replacement=False,
1196
            )
1197
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1198
            logical_resource_id = node_resource.name
1✔
1199
            try:
1✔
1200
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1201
                    resource_logical_id=logical_resource_id
1202
                )
1203
            except RuntimeError:
×
1204
                after_physical_resource_id = None
×
1205
            after = PreprocResource(
1✔
1206
                logical_id=logical_resource_id,
1207
                physical_resource_id=after_physical_resource_id,
1208
                condition=condition_after,
1209
                resource_type=type_delta.after,
1210
                properties=properties_delta.after,
1211
                depends_on=depends_on_after,
1212
                requires_replacement=node_resource.requires_replacement,
1213
            )
1214
        return PreprocEntityDelta(before=before, after=after)
1✔
1215

1216
    def visit_node_output(
1✔
1217
        self, node_output: NodeOutput
1218
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1219
        change_type = node_output.change_type
1✔
1220
        value_delta = self.visit(node_output.value)
1✔
1221

1222
        condition_delta = Nothing
1✔
1223
        if not is_nothing(node_output.condition_reference):
1✔
1224
            condition_delta = self._resolve_resource_condition_reference(
1✔
1225
                node_output.condition_reference
1226
            )
1227
            condition_before = condition_delta.before
1✔
1228
            condition_after = condition_delta.after
1✔
1229
            if not condition_before and condition_after:
1✔
1230
                change_type = ChangeType.CREATED
1✔
1231
            elif condition_before and not condition_after:
1✔
1232
                change_type = ChangeType.REMOVED
1✔
1233

1234
        export_delta = Nothing
1✔
1235
        if not is_nothing(node_output.export):
1✔
1236
            export_delta = self.visit(node_output.export)
1✔
1237

1238
        before: Maybe[PreprocOutput] = Nothing
1✔
1239
        if change_type != ChangeType.CREATED:
1✔
1240
            before = PreprocOutput(
1✔
1241
                name=node_output.name,
1242
                value=value_delta.before,
1243
                export=export_delta.before if export_delta else None,
1244
                condition=condition_delta.before if condition_delta else None,
1245
            )
1246
        after: Maybe[PreprocOutput] = Nothing
1✔
1247
        if change_type != ChangeType.REMOVED:
1✔
1248
            after = PreprocOutput(
1✔
1249
                name=node_output.name,
1250
                value=value_delta.after,
1251
                export=export_delta.after if export_delta else None,
1252
                condition=condition_delta.after if condition_delta else None,
1253
            )
1254
        return PreprocEntityDelta(before=before, after=after)
1✔
1255

1256
    def visit_node_outputs(
1✔
1257
        self, node_outputs: NodeOutputs
1258
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1259
        before: list[PreprocOutput] = []
1✔
1260
        after: list[PreprocOutput] = []
1✔
1261
        for node_output in node_outputs.outputs:
1✔
1262
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1263
            output_before = output_delta.before
1✔
1264
            output_after = output_delta.after
1✔
1265
            if not is_nothing(output_before):
1✔
1266
                before.append(output_before)
1✔
1267
            if not is_nothing(output_after):
1✔
1268
                after.append(output_after)
1✔
1269
        return PreprocEntityDelta(before=before, after=after)
1✔
1270

1271
    def visit_node_intrinsic_function_fn_import_value(
1✔
1272
        self, node_intrinsic_function: NodeIntrinsicFunction
1273
    ) -> PreprocEntityDelta:
1274
        def _compute_fn_import_value(string) -> str:
1✔
1275
            if not isinstance(string, str):
1✔
1276
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1277

1278
            exports = exports_map(
1✔
1279
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1280
            )
1281

1282
            return exports.get(string, {}).get("Value") or Nothing
1✔
1283

1284
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1285
        delta = self._cached_apply(
1✔
1286
            scope=node_intrinsic_function.scope,
1287
            arguments_delta=arguments_delta,
1288
            resolver=_compute_fn_import_value,
1289
        )
1290
        return delta
1✔
1291

1292
    def visit_node_intrinsic_function_fn_transform(
1✔
1293
        self, node_intrinsic_function: NodeIntrinsicFunction
1294
    ):
1295
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc