• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16766254352

05 Aug 2025 04:40PM UTC coverage: 86.892% (-0.02%) from 86.91%
16766254352

push

github

web-flow
CFNV2: defer deletions for correcting deploy order (#12936)

92 of 99 new or added lines in 6 files covered. (92.93%)

185 existing lines in 21 files now uncovered.

66597 of 76643 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.61
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from typing import Any, Callable, Final, Generic, Optional, TypeVar
1✔
7

8
from botocore.exceptions import ClientError
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
12
from localstack.aws.connect import connect_to
1✔
13
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
14
    ChangeSetEntity,
15
    ChangeType,
16
    Maybe,
17
    NodeArray,
18
    NodeCondition,
19
    NodeDependsOn,
20
    NodeDivergence,
21
    NodeIntrinsicFunction,
22
    NodeMapping,
23
    NodeObject,
24
    NodeOutput,
25
    NodeOutputs,
26
    NodeParameter,
27
    NodeParameters,
28
    NodeProperties,
29
    NodeProperty,
30
    NodeResource,
31
    NodeTemplate,
32
    Nothing,
33
    NothingType,
34
    Scope,
35
    TerminalValue,
36
    TerminalValueCreated,
37
    TerminalValueModified,
38
    TerminalValueRemoved,
39
    TerminalValueUnchanged,
40
    is_nothing,
41
)
42
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
43
    ChangeSetModelVisitor,
44
)
45
from localstack.services.cloudformation.stores import (
1✔
46
    exports_map,
47
)
48
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
49
from localstack.utils.aws.arns import get_partition
1✔
50
from localstack.utils.objects import get_value_from_path
1✔
51
from localstack.utils.run import to_str
1✔
52
from localstack.utils.strings import to_bytes
1✔
53
from localstack.utils.urls import localstack_host
1✔
54

55
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
56

57
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
58
    "AWS::Partition",
59
    "AWS::AccountId",
60
    "AWS::Region",
61
    "AWS::StackName",
62
    "AWS::StackId",
63
    "AWS::URLSuffix",
64
    "AWS::NoValue",
65
    "AWS::NotificationARNs",
66
}
67

68
TBefore = TypeVar("TBefore")
1✔
69
TAfter = TypeVar("TAfter")
1✔
70

71
MOCKED_REFERENCE = "unknown"
1✔
72

73

74
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
75
    before: Maybe[TBefore]
1✔
76
    after: Maybe[TAfter]
1✔
77

78
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
79
        self.before = before
1✔
80
        self.after = after
1✔
81

82
    def __eq__(self, other):
1✔
UNCOV
83
        if not isinstance(other, PreprocEntityDelta):
×
UNCOV
84
            return False
×
UNCOV
85
        return self.before == other.before and self.after == other.after
×
86

87

88
class PreprocProperties:
1✔
89
    properties: dict[str, Any]
1✔
90

91
    def __init__(self, properties: dict[str, Any]):
1✔
92
        self.properties = properties
1✔
93

94
    def __eq__(self, other):
1✔
95
        if not isinstance(other, PreprocProperties):
1✔
UNCOV
96
            return False
×
97
        return self.properties == other.properties
1✔
98

99

100
class PreprocResource:
1✔
101
    logical_id: str
1✔
102
    physical_resource_id: Optional[str]
1✔
103
    condition: Optional[bool]
1✔
104
    resource_type: str
1✔
105
    properties: PreprocProperties
1✔
106
    depends_on: Optional[list[str]]
1✔
107
    requires_replacement: bool
1✔
108

109
    def __init__(
1✔
110
        self,
111
        logical_id: str,
112
        physical_resource_id: str,
113
        condition: Optional[bool],
114
        resource_type: str,
115
        properties: PreprocProperties,
116
        depends_on: Optional[list[str]],
117
        requires_replacement: bool,
118
    ):
119
        self.logical_id = logical_id
1✔
120
        self.physical_resource_id = physical_resource_id
1✔
121
        self.condition = condition
1✔
122
        self.resource_type = resource_type
1✔
123
        self.properties = properties
1✔
124
        self.depends_on = depends_on
1✔
125
        self.requires_replacement = requires_replacement
1✔
126

127
    @staticmethod
1✔
128
    def _compare_conditions(c1: bool, c2: bool):
1✔
129
        # The lack of condition equates to a true condition.
130
        c1 = c1 if isinstance(c1, bool) else True
1✔
131
        c2 = c2 if isinstance(c2, bool) else True
1✔
132
        return c1 == c2
1✔
133

134
    def __eq__(self, other):
1✔
135
        if not isinstance(other, PreprocResource):
1✔
136
            return False
1✔
137
        return all(
1✔
138
            [
139
                self.logical_id == other.logical_id,
140
                self._compare_conditions(self.condition, other.condition),
141
                self.resource_type == other.resource_type,
142
                self.properties == other.properties,
143
            ]
144
        )
145

146

147
class PreprocOutput:
1✔
148
    name: str
1✔
149
    value: Any
1✔
150
    export: Optional[Any]
1✔
151
    condition: Optional[bool]
1✔
152

153
    def __init__(self, name: str, value: Any, export: Optional[Any], condition: Optional[bool]):
1✔
154
        self.name = name
1✔
155
        self.value = value
1✔
156
        self.export = export
1✔
157
        self.condition = condition
1✔
158

159
    def __eq__(self, other):
1✔
UNCOV
160
        if not isinstance(other, PreprocOutput):
×
UNCOV
161
            return False
×
UNCOV
162
        return all(
×
163
            [
164
                self.name == other.name,
165
                self.value == other.value,
166
                self.export == other.export,
167
                self.condition == other.condition,
168
            ]
169
        )
170

171

172
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
173
    _change_set: Final[ChangeSet]
1✔
174
    _before_resolved_resources: Final[dict]
1✔
175
    _before_cache: Final[dict[Scope, Any]]
1✔
176
    _after_cache: Final[dict[Scope, Any]]
1✔
177

178
    def __init__(self, change_set: ChangeSet):
1✔
179
        self._change_set = change_set
1✔
180
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
181
        self._before_cache = dict()
1✔
182
        self._after_cache = dict()
1✔
183

184
    def _setup_runtime_cache(self) -> None:
1✔
185
        runtime_cache_key = self.__class__.__name__
1✔
186

187
        self._before_cache.clear()
1✔
188
        self._after_cache.clear()
1✔
189

190
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
191
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
192
            self._before_cache.update(cache)
1✔
193

194
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
195
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
UNCOV
196
            self._after_cache.update(cache)
×
197

198
    def _save_runtime_cache(self) -> None:
1✔
199
        runtime_cache_key = self.__class__.__name__
1✔
200

201
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
202
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
203

204
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
205
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
206

207
    def process(self) -> None:
1✔
208
        self._setup_runtime_cache()
1✔
209
        node_template = self._change_set.update_model.node_template
1✔
210
        self.visit(node_template)
1✔
211
        self._save_runtime_cache()
1✔
212

213
    def _get_node_resource_for(
1✔
214
        self, resource_name: str, node_template: NodeTemplate
215
    ) -> NodeResource:
216
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
217
        for node_resource in node_template.resources.resources:
1✔
218
            if node_resource.name == resource_name:
1✔
219
                self.visit(node_resource)
1✔
220
                return node_resource
1✔
UNCOV
221
        raise RuntimeError(f"No resource '{resource_name}' was found")
×
222

223
    def _get_node_property_for(
1✔
224
        self, property_name: str, node_resource: NodeResource
225
    ) -> Optional[NodeProperty]:
226
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
227
        for node_property in node_resource.properties.properties:
1✔
228
            if node_property.name == property_name:
1✔
229
                self.visit(node_property)
1✔
230
                return node_property
1✔
231
        return None
1✔
232

233
    def _deployed_property_value_of(
1✔
234
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
235
    ) -> Any:
236
        # TODO: typing around resolved resources is needed and should be reflected here.
237

238
        # Before we can obtain deployed value for a resource, we need to first ensure to
239
        # process the resource if this wasn't processed already. Ideally, values should only
240
        # be accessible through delta objects, to ensure computation is always complete at
241
        # every level.
242
        _ = self._get_node_resource_for(
1✔
243
            resource_name=resource_logical_id,
244
            node_template=self._change_set.update_model.node_template,
245
        )
246
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
247
        if resolved_resource is None:
1✔
248
            raise RuntimeError(
1✔
249
                f"No deployed instances of resource '{resource_logical_id}' were found"
250
            )
251
        properties = resolved_resource.get("Properties", dict())
1✔
252
        # support structured properties, e.g. NestedStack.Outputs.OutputName
253
        property_value: Optional[Any] = get_value_from_path(properties, property_name)
1✔
254

255
        if property_value:
1✔
256
            if not isinstance(property_value, str):
1✔
257
                # TODO: is this correct? If there is a bug in the logic here, it's probably
258
                #  better to know about it with a clear error message than to receive some form
259
                #  of message about trying to use a dictionary in place of a string
UNCOV
260
                raise RuntimeError(
×
261
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value"
262
                )
263
            return property_value
1✔
264
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
265
            return MOCKED_REFERENCE
1✔
266

UNCOV
267
        return property_value
×
268

269
    def _before_deployed_property_value_of(
1✔
270
        self, resource_logical_id: str, property_name: str
271
    ) -> Any:
272
        return self._deployed_property_value_of(
1✔
273
            resource_logical_id=resource_logical_id,
274
            property_name=property_name,
275
            resolved_resources=self._before_resolved_resources,
276
        )
277

278
    def _after_deployed_property_value_of(
1✔
279
        self, resource_logical_id: str, property_name: str
280
    ) -> Optional[str]:
281
        return self._before_deployed_property_value_of(
1✔
282
            resource_logical_id=resource_logical_id, property_name=property_name
283
        )
284

285
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
286
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
287
        # TODO: another scenarios suggesting property lookups might be preferable.
288
        for mapping in mappings:
1✔
289
            if mapping.name == map_name:
1✔
290
                self.visit(mapping)
1✔
291
                return mapping
1✔
UNCOV
292
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
293

294
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
295
        parameters: list[NodeParameter] = (
1✔
296
            self._change_set.update_model.node_template.parameters.parameters
297
        )
298
        # TODO: another scenarios suggesting property lookups might be preferable.
299
        for parameter in parameters:
1✔
300
            if parameter.name == parameter_name:
1✔
301
                self.visit(parameter)
1✔
302
                return parameter
1✔
303
        return Nothing
1✔
304

305
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
306
        conditions: list[NodeCondition] = (
1✔
307
            self._change_set.update_model.node_template.conditions.conditions
308
        )
309
        # TODO: another scenarios suggesting property lookups might be preferable.
310
        for condition in conditions:
1✔
311
            if condition.name == condition_name:
1✔
312
                self.visit(condition)
1✔
313
                return condition
1✔
UNCOV
314
        return Nothing
×
315

316
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
317
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
318
        if isinstance(node_condition, NodeCondition):
1✔
319
            condition_delta = self.visit(node_condition)
1✔
320
            return condition_delta
1✔
UNCOV
321
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
322

323
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
324
        match pseudo_parameter_name:
1✔
325
            case "AWS::Partition":
1✔
326
                return get_partition(self._change_set.region_name)
1✔
327
            case "AWS::AccountId":
1✔
328
                return self._change_set.stack.account_id
1✔
329
            case "AWS::Region":
1✔
330
                return self._change_set.stack.region_name
1✔
331
            case "AWS::StackName":
1✔
332
                return self._change_set.stack.stack_name
1✔
333
            case "AWS::StackId":
1✔
334
                return self._change_set.stack.stack_id
1✔
335
            case "AWS::URLSuffix":
1✔
336
                return _AWS_URL_SUFFIX
1✔
337
            case "AWS::NoValue":
1✔
338
                return None
1✔
UNCOV
339
            case _:
×
UNCOV
340
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
341

342
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
343
        if logical_id in _PSEUDO_PARAMETERS:
1✔
344
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
345
                pseudo_parameter_name=logical_id
346
            )
347
            # Pseudo parameters are constants within the lifecycle of a template.
348
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
349

350
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
351
        if isinstance(node_parameter, NodeParameter):
1✔
352
            parameter_delta = self.visit(node_parameter)
1✔
353
            return parameter_delta
1✔
354

355
        node_resource = self._get_node_resource_for(
1✔
356
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
357
        )
358
        resource_delta = self.visit(node_resource)
1✔
359
        before = resource_delta.before
1✔
360
        after = resource_delta.after
1✔
361
        return PreprocEntityDelta(before=before, after=after)
1✔
362

363
    def _resolve_mapping(
1✔
364
        self, map_name: str, top_level_key: str, second_level_key
365
    ) -> PreprocEntityDelta:
366
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
367
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
368
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
369
        if not isinstance(top_level_value, NodeObject):
1✔
UNCOV
370
            raise RuntimeError()
×
371
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
372
        mapping_value_delta = self.visit(second_level_value)
1✔
373
        return mapping_value_delta
1✔
374

375
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
376
        entity_scope = change_set_entity.scope
1✔
377
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
378
            before = self._before_cache[entity_scope]
1✔
379
            after = self._after_cache[entity_scope]
1✔
380
            return PreprocEntityDelta(before=before, after=after)
1✔
381
        delta = super().visit(change_set_entity=change_set_entity)
1✔
382
        if isinstance(delta, PreprocEntityDelta):
1✔
383
            self._before_cache[entity_scope] = delta.before
1✔
384
            self._after_cache[entity_scope] = delta.after
1✔
385
        return delta
1✔
386

387
    def _cached_apply(
1✔
388
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
389
    ) -> PreprocEntityDelta:
390
        """
391
        Applies the resolver function to the given input delta if and only if the required
392
        values are not already present in the runtime caches. This function handles both
393
        the 'before' and 'after' components of the delta independently.
394

395
        The resolver function receives either the 'before' or 'after' value from the input
396
        delta and returns a resolved value. If the result returned by the resolver is
397
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
398
        component from it:  the 'before' value if the input was 'before', and the 'after'
399
        value if the input was 'after'.
400

401
        This function only reads from the cache and does not update it. It is the caller's
402
        responsibility to handle caching, either manually or via the upstream visit method
403
        of this class.
404

405
        Args:
406
            scope (Scope): The current scope used as a key for cache lookup.
407
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
408
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
409

410
        Returns:
411
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
412
        """
413

414
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
415
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
416
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
417

418
        arguments_before = arguments_delta.before
1✔
419
        arguments_after = arguments_delta.after
1✔
420

421
        before = self._before_cache.get(scope, Nothing)
1✔
422
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
423
            before = resolver(arguments_before)
1✔
424
            if isinstance(before, PreprocEntityDelta):
1✔
425
                before = before.before
1✔
426

427
        after = self._after_cache.get(scope, Nothing)
1✔
428
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
429
            after = resolver(arguments_after)
1✔
430
            if isinstance(after, PreprocEntityDelta):
1✔
431
                after = after.after
1✔
432

433
        return PreprocEntityDelta(before=before, after=after)
1✔
434

435
    def visit_terminal_value_modified(
1✔
436
        self, terminal_value_modified: TerminalValueModified
437
    ) -> PreprocEntityDelta:
438
        return PreprocEntityDelta(
1✔
439
            before=terminal_value_modified.value,
440
            after=terminal_value_modified.modified_value,
441
        )
442

443
    def visit_terminal_value_created(
1✔
444
        self, terminal_value_created: TerminalValueCreated
445
    ) -> PreprocEntityDelta:
446
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
447

448
    def visit_terminal_value_removed(
1✔
449
        self, terminal_value_removed: TerminalValueRemoved
450
    ) -> PreprocEntityDelta:
451
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
452

453
    def visit_terminal_value_unchanged(
1✔
454
        self, terminal_value_unchanged: TerminalValueUnchanged
455
    ) -> PreprocEntityDelta:
456
        return PreprocEntityDelta(
1✔
457
            before=terminal_value_unchanged.value,
458
            after=terminal_value_unchanged.value,
459
        )
460

461
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
462
        before_delta = self.visit(node_divergence.value)
1✔
463
        after_delta = self.visit(node_divergence.divergence)
1✔
464
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
465

466
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
467
        node_change_type = node_object.change_type
1✔
468
        before = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
469
        after = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
470
        for name, change_set_entity in node_object.bindings.items():
1✔
471
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
472
            delta_before = delta.before
1✔
473
            delta_after = delta.after
1✔
474
            if not is_nothing(before) and not is_nothing(delta_before) and delta_before is not None:
1✔
475
                before[name] = delta_before
1✔
476
            if not is_nothing(after) and not is_nothing(delta_after) and delta_after is not None:
1✔
477
                after[name] = delta_after
1✔
478
        return PreprocEntityDelta(before=before, after=after)
1✔
479

480
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
481
        # TODO: add arguments validation.
482
        arguments_list: list[str]
483
        if isinstance(arguments, str):
1✔
484
            arguments_list = arguments.split(".")
1✔
485
        else:
486
            arguments_list = arguments
1✔
487
        logical_name_of_resource = arguments_list[0]
1✔
488
        attribute_name = arguments_list[1]
1✔
489

490
        node_resource = self._get_node_resource_for(
1✔
491
            resource_name=logical_name_of_resource,
492
            node_template=self._change_set.update_model.node_template,
493
        )
494
        node_property: Optional[NodeProperty] = self._get_node_property_for(
1✔
495
            property_name=attribute_name, node_resource=node_resource
496
        )
497
        if node_property is not None:
1✔
498
            # The property is statically defined in the template and its value can be computed.
499
            property_delta = self.visit(node_property)
1✔
500
            value = property_delta.before if select_before else property_delta.after
1✔
501
        else:
502
            # The property is not statically defined and must therefore be available in
503
            # the properties deployed set.
504
            if select_before:
1✔
505
                value = self._before_deployed_property_value_of(
1✔
506
                    resource_logical_id=logical_name_of_resource,
507
                    property_name=attribute_name,
508
                )
509
            else:
510
                value = self._after_deployed_property_value_of(
1✔
511
                    resource_logical_id=logical_name_of_resource,
512
                    property_name=attribute_name,
513
                )
514
        return value
1✔
515

516
    def visit_node_intrinsic_function_fn_get_att(
1✔
517
        self, node_intrinsic_function: NodeIntrinsicFunction
518
    ) -> PreprocEntityDelta:
519
        # TODO: validate the return value according to the spec.
520
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
521
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
522
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
523

524
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
525
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
526
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
527

528
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
529
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
530
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
531

532
        return PreprocEntityDelta(before=before, after=after)
1✔
533

534
    def visit_node_intrinsic_function_fn_equals(
1✔
535
        self, node_intrinsic_function: NodeIntrinsicFunction
536
    ) -> PreprocEntityDelta:
537
        # TODO: add argument shape validation.
538
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
539
            return args[0] == args[1]
1✔
540

541
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
542
        delta = self._cached_apply(
1✔
543
            scope=node_intrinsic_function.scope,
544
            arguments_delta=arguments_delta,
545
            resolver=_compute_fn_equals,
546
        )
547
        return delta
1✔
548

549
    def visit_node_intrinsic_function_fn_if(
1✔
550
        self, node_intrinsic_function: NodeIntrinsicFunction
551
    ) -> PreprocEntityDelta:
552
        def _compute_delta_for_if_statement(args: list[Any]) -> PreprocEntityDelta:
1✔
553
            condition_name = args[0]
1✔
554
            boolean_expression_delta = self._resolve_condition(logical_id=condition_name)
1✔
555
            return PreprocEntityDelta(
1✔
556
                before=args[1] if boolean_expression_delta.before else args[2],
557
                after=args[1] if boolean_expression_delta.after else args[2],
558
            )
559

560
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
561
        delta = self._cached_apply(
1✔
562
            scope=node_intrinsic_function.scope,
563
            arguments_delta=arguments_delta,
564
            resolver=_compute_delta_for_if_statement,
565
        )
566
        return delta
1✔
567

568
    def visit_node_intrinsic_function_fn_and(
1✔
569
        self, node_intrinsic_function: NodeIntrinsicFunction
570
    ) -> PreprocEntityDelta:
571
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
572
            result = all(args)
1✔
573
            return result
1✔
574

575
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
576
        delta = self._cached_apply(
1✔
577
            scope=node_intrinsic_function.scope,
578
            arguments_delta=arguments_delta,
579
            resolver=_compute_fn_and,
580
        )
581
        return delta
1✔
582

583
    def visit_node_intrinsic_function_fn_or(
1✔
584
        self, node_intrinsic_function: NodeIntrinsicFunction
585
    ) -> PreprocEntityDelta:
586
        def _compute_fn_or(args: list[bool]):
1✔
587
            result = any(args)
1✔
588
            return result
1✔
589

590
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
591
        delta = self._cached_apply(
1✔
592
            scope=node_intrinsic_function.scope,
593
            arguments_delta=arguments_delta,
594
            resolver=_compute_fn_or,
595
        )
596
        return delta
1✔
597

598
    def visit_node_intrinsic_function_fn_not(
1✔
599
        self, node_intrinsic_function: NodeIntrinsicFunction
600
    ) -> PreprocEntityDelta:
601
        def _compute_fn_not(arg: bool) -> bool:
1✔
602
            return not arg
1✔
603

604
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
605
        delta = self._cached_apply(
1✔
606
            scope=node_intrinsic_function.scope,
607
            arguments_delta=arguments_delta,
608
            resolver=_compute_fn_not,
609
        )
610
        return delta
1✔
611

612
    def visit_node_intrinsic_function_fn_sub(
1✔
613
        self, node_intrinsic_function: NodeIntrinsicFunction
614
    ) -> PreprocEntityDelta:
615
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
616
            # TODO: add further schema validation.
617
            string_template: str
618
            sub_parameters: dict
619
            if isinstance(args, str):
1✔
620
                string_template = args
1✔
621
                sub_parameters = dict()
1✔
622
            elif (
1✔
623
                isinstance(args, list)
624
                and len(args) == 2
625
                and isinstance(args[0], str)
626
                and isinstance(args[1], dict)
627
            ):
628
                string_template = args[0]
1✔
629
                sub_parameters = args[1]
1✔
630
            else:
UNCOV
631
                raise RuntimeError(
×
632
                    "Invalid arguments shape for Fn::Sub, expected a String "
633
                    f"or a Tuple of String and Map but got '{args}'"
634
                )
635
            sub_string = string_template
1✔
636
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
637
            for template_variable_name in template_variable_names:
1✔
638
                template_variable_value = Nothing
1✔
639

640
                # Try to resolve the variable name as pseudo parameter.
641
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
642
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
643
                        pseudo_parameter_name=template_variable_name
644
                    )
645

646
                # Try to resolve the variable name as an entry to the defined parameters.
647
                elif template_variable_name in sub_parameters:
1✔
648
                    template_variable_value = sub_parameters[template_variable_name]
1✔
649

650
                # Try to resolve the variable name as GetAtt.
651
                elif "." in template_variable_name:
1✔
652
                    try:
1✔
653
                        template_variable_value = self._resolve_attribute(
1✔
654
                            arguments=template_variable_name, select_before=select_before
655
                        )
UNCOV
656
                    except RuntimeError:
×
UNCOV
657
                        pass
×
658

659
                # Try to resolve the variable name as Ref.
660
                else:
661
                    try:
1✔
662
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
663
                        template_variable_value = (
1✔
664
                            resource_delta.before if select_before else resource_delta.after
665
                        )
666
                        if isinstance(template_variable_value, PreprocResource):
1✔
667
                            template_variable_value = template_variable_value.physical_resource_id
1✔
UNCOV
668
                    except RuntimeError:
×
UNCOV
669
                        pass
×
670

671
                if is_nothing(template_variable_value):
1✔
UNCOV
672
                    raise RuntimeError(
×
673
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
674
                    )
675

676
                if not isinstance(template_variable_value, str):
1✔
677
                    template_variable_value = str(template_variable_value)
1✔
678

679
                sub_string = sub_string.replace(
1✔
680
                    f"${{{template_variable_name}}}", template_variable_value
681
                )
682

683
            # FIXME: the following type reduction is ported from v1; however it appears as though such
684
            #        reduction is not performed by the engine, and certainly not at this depth given the
685
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
686
            #        and the resource providers reviewed.
687
            account_id = self._change_set.account_id
1✔
688
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
689
            if sub_string == account_id or is_another_account_id:
1✔
UNCOV
690
                result = sub_string
×
691
            elif sub_string.isdigit():
1✔
692
                result = int(sub_string)
1✔
693
            else:
694
                try:
1✔
695
                    result = float(sub_string)
1✔
696
                except ValueError:
1✔
697
                    result = sub_string
1✔
698
            return result
1✔
699

700
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
701
        arguments_before = arguments_delta.before
1✔
702
        arguments_after = arguments_delta.after
1✔
703
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
704
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
705
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
706
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
707
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
708
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
709
        return PreprocEntityDelta(before=before, after=after)
1✔
710

711
    def visit_node_intrinsic_function_fn_join(
1✔
712
        self, node_intrinsic_function: NodeIntrinsicFunction
713
    ) -> PreprocEntityDelta:
714
        # TODO: add support for schema validation.
715
        # TODO: add tests for joining non string values.
716
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
717
            if not (isinstance(args, list) and len(args) == 2):
1✔
718
                return Nothing
1✔
719
            delimiter: str = str(args[0])
1✔
720
            values: list[Any] = args[1]
1✔
721
            if not isinstance(values, list):
1✔
722
                # shortcut if values is the empty string, for example:
723
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
724
                # CDK bootstrap does this
725
                if values == "":
1✔
726
                    return ""
1✔
UNCOV
727
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
×
728
            str_values: list[str] = list()
1✔
729
            for value in values:
1✔
730
                if value is None:
1✔
731
                    continue
1✔
732
                str_value = str(value)
1✔
733
                str_values.append(str_value)
1✔
734
            join_result = delimiter.join(str_values)
1✔
735
            return join_result
1✔
736

737
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
738
        delta = self._cached_apply(
1✔
739
            scope=node_intrinsic_function.scope,
740
            arguments_delta=arguments_delta,
741
            resolver=_compute_fn_join,
742
        )
743
        return delta
1✔
744

745
    def visit_node_intrinsic_function_fn_select(
1✔
746
        self, node_intrinsic_function: NodeIntrinsicFunction
747
    ):
748
        # TODO: add further support for schema validation
749
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
750
            values: list[Any] = args[1]
1✔
751
            if not isinstance(values, list) or not values:
1✔
752
                raise RuntimeError(f"Invalid arguments list value for Fn::Select: '{values}'")
×
753
            values_len = len(values)
1✔
754
            index: int = int(args[0])
1✔
755
            if not isinstance(index, int) or index < 0 or index > values_len:
1✔
UNCOV
756
                raise RuntimeError(f"Invalid or out of range index value for Fn::Select: '{index}'")
×
757
            selection = values[index]
1✔
758
            return selection
1✔
759

760
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
761
        delta = self._cached_apply(
1✔
762
            scope=node_intrinsic_function.scope,
763
            arguments_delta=arguments_delta,
764
            resolver=_compute_fn_select,
765
        )
766
        return delta
1✔
767

768
    def visit_node_intrinsic_function_fn_split(
1✔
769
        self, node_intrinsic_function: NodeIntrinsicFunction
770
    ):
771
        # TODO: add further support for schema validation
772
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
773
            delimiter = args[0]
1✔
774
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
775
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
776
            source_string = args[1]
1✔
777
            if not isinstance(source_string, str):
1✔
UNCOV
778
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
×
779
            split_string = source_string.split(delimiter)
1✔
780
            return split_string
1✔
781

782
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
783
        delta = self._cached_apply(
1✔
784
            scope=node_intrinsic_function.scope,
785
            arguments_delta=arguments_delta,
786
            resolver=_compute_fn_split,
787
        )
788
        return delta
1✔
789

790
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
791
        self, node_intrinsic_function: NodeIntrinsicFunction
792
    ) -> PreprocEntityDelta:
793
        # TODO: add further support for schema validation
794

795
        def _compute_fn_get_a_zs(region) -> Any:
1✔
796
            if not isinstance(region, str):
1✔
UNCOV
797
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
798

799
            if not region:
1✔
800
                region = self._change_set.region_name
1✔
801

802
            account_id = self._change_set.account_id
1✔
803
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
804
            try:
1✔
805
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
806
                    ec2_client.describe_availability_zones()
807
                )
UNCOV
808
            except ClientError:
×
UNCOV
809
                raise RuntimeError(
×
810
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
811
                )
812
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
813
                "AvailabilityZones"
814
            ]
815
            azs = [az["ZoneName"] for az in availability_zones]
1✔
816
            return azs
1✔
817

818
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
819
        delta = self._cached_apply(
1✔
820
            scope=node_intrinsic_function.scope,
821
            arguments_delta=arguments_delta,
822
            resolver=_compute_fn_get_a_zs,
823
        )
824
        return delta
1✔
825

826
    def visit_node_intrinsic_function_fn_base64(
1✔
827
        self, node_intrinsic_function: NodeIntrinsicFunction
828
    ) -> PreprocEntityDelta:
829
        # TODO: add further support for schema validation
830
        def _compute_fn_base_64(string) -> Any:
1✔
831
            if not isinstance(string, str):
1✔
UNCOV
832
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
833
            # Ported from v1:
834
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
835
            return base64_string
1✔
836

837
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
838
        delta = self._cached_apply(
1✔
839
            scope=node_intrinsic_function.scope,
840
            arguments_delta=arguments_delta,
841
            resolver=_compute_fn_base_64,
842
        )
843
        return delta
1✔
844

845
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
846
        self, node_intrinsic_function: NodeIntrinsicFunction
847
    ) -> PreprocEntityDelta:
848
        # TODO: add type checking/validation for result unit?
849
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
850
        before_arguments = arguments_delta.before
1✔
851
        after_arguments = arguments_delta.after
1✔
852
        before = Nothing
1✔
853
        if before_arguments:
1✔
854
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
855
            before = before_value_delta.before
1✔
856
        after = Nothing
1✔
857
        if after_arguments:
1✔
858
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
859
            after = after_value_delta.after
1✔
860
        return PreprocEntityDelta(before=before, after=after)
1✔
861

862
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
863
        bindings_delta = self.visit(node_mapping.bindings)
1✔
864
        return bindings_delta
1✔
865

866
    def visit_node_parameters(
1✔
867
        self, node_parameters: NodeParameters
868
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
869
        before_parameters = dict()
1✔
870
        after_parameters = dict()
1✔
871
        for parameter in node_parameters.parameters:
1✔
872
            parameter_delta = self.visit(parameter)
1✔
873
            parameter_before = parameter_delta.before
1✔
874
            if not is_nothing(parameter_before):
1✔
875
                before_parameters[parameter.name] = parameter_before
1✔
876
            parameter_after = parameter_delta.after
1✔
877
            if not is_nothing(parameter_after):
1✔
878
                after_parameters[parameter.name] = parameter_after
1✔
879
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
880

881
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
882
        dynamic_value = node_parameter.dynamic_value
1✔
883
        dynamic_delta = self.visit(dynamic_value)
1✔
884

885
        default_value = node_parameter.default_value
1✔
886
        default_delta = self.visit(default_value)
1✔
887

888
        before = dynamic_delta.before or default_delta.before
1✔
889
        after = dynamic_delta.after or default_delta.after
1✔
890

891
        parameter_type = self.visit(node_parameter.type_)
1✔
892

893
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
894
            match type_:
1✔
895
                case "List<String>":
1✔
896
                    return [item.strip() for item in value.split(",")]
1✔
897
            return value
1✔
898

899
        if not is_nothing(after):
1✔
900
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
901

902
        return PreprocEntityDelta(before=before, after=after)
1✔
903

904
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
905
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
906
        return array_identifiers_delta
1✔
907

908
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
909
        delta = self.visit(node_condition.body)
1✔
910
        return delta
1✔
911

912
    def _resource_physical_resource_id_from(
1✔
913
        self, logical_resource_id: str, resolved_resources: dict
914
    ) -> str:
915
        # TODO: typing around resolved resources is needed and should be reflected here.
916
        resolved_resource = resolved_resources.get(logical_resource_id, dict())
1✔
917
        physical_resource_id: Optional[str] = resolved_resource.get("PhysicalResourceId")
1✔
918
        if not isinstance(physical_resource_id, str):
1✔
919
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
920
        return physical_resource_id
1✔
921

922
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
923
        # TODO: typing around resolved resources is needed and should be reflected here.
924
        return self._resource_physical_resource_id_from(
1✔
925
            logical_resource_id=resource_logical_id,
926
            resolved_resources=self._before_resolved_resources,
927
        )
928

929
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
930
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
931

932
    def visit_node_intrinsic_function_ref(
1✔
933
        self, node_intrinsic_function: NodeIntrinsicFunction
934
    ) -> PreprocEntityDelta:
935
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
936
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
937
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
938
                reference_delta.before = before.physical_resource_id
1✔
939
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
940
                reference_delta.after = after.physical_resource_id
1✔
941
            return reference_delta
1✔
942

943
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
944
        delta = self._cached_apply(
1✔
945
            scope=node_intrinsic_function.scope,
946
            arguments_delta=arguments_delta,
947
            resolver=_compute_fn_ref,
948
        )
949
        return delta
1✔
950

951
    def visit_node_intrinsic_function_condition(
1✔
952
        self, node_intrinsic_function: NodeIntrinsicFunction
953
    ) -> PreprocEntityDelta:
954
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
955

956
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
957
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
958
            if is_nothing(node_condition):
1✔
UNCOV
959
                raise RuntimeError(f"Undefined condition '{name}'")
×
960
            condition_delta = self.visit(node_condition)
1✔
961
            return condition_delta
1✔
962

963
        delta = self._cached_apply(
1✔
964
            resolver=_delta_of_condition,
965
            scope=node_intrinsic_function.scope,
966
            arguments_delta=arguments_delta,
967
        )
968
        return delta
1✔
969

970
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
971
        node_change_type = node_array.change_type
1✔
972
        before = list() if node_change_type != ChangeType.CREATED else Nothing
1✔
973
        after = list() if node_change_type != ChangeType.REMOVED else Nothing
1✔
974
        for change_set_entity in node_array.array:
1✔
975
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
976
            delta_before = delta.before
1✔
977
            delta_after = delta.after
1✔
978
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
979
                before.append(delta_before)
1✔
980
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
981
                after.append(delta_after)
1✔
982
        return PreprocEntityDelta(before=before, after=after)
1✔
983

984
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
985
        return self.visit(node_property.value)
1✔
986

987
    def visit_node_properties(
1✔
988
        self, node_properties: NodeProperties
989
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
990
        node_change_type = node_properties.change_type
1✔
991
        before_bindings = dict() if node_change_type != ChangeType.CREATED else Nothing
1✔
992
        after_bindings = dict() if node_change_type != ChangeType.REMOVED else Nothing
1✔
993
        for node_property in node_properties.properties:
1✔
994
            property_name = node_property.name
1✔
995
            delta = self.visit(node_property)
1✔
996
            delta_before = delta.before
1✔
997
            delta_after = delta.after
1✔
998
            if (
1✔
999
                not is_nothing(before_bindings)
1000
                and not is_nothing(delta_before)
1001
                and delta_before is not None
1002
            ):
1003
                before_bindings[property_name] = delta_before
1✔
1004
            if (
1✔
1005
                not is_nothing(after_bindings)
1006
                and not is_nothing(delta_after)
1007
                and delta_after is not None
1008
            ):
1009
                after_bindings[property_name] = delta_after
1✔
1010
        before = Nothing
1✔
1011
        if not is_nothing(before_bindings):
1✔
1012
            before = PreprocProperties(properties=before_bindings)
1✔
1013
        after = Nothing
1✔
1014
        if not is_nothing(after_bindings):
1✔
1015
            after = PreprocProperties(properties=after_bindings)
1✔
1016
        return PreprocEntityDelta(before=before, after=after)
1✔
1017

1018
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1019
        reference_delta = self.visit(reference)
1✔
1020
        before_reference = reference_delta.before
1✔
1021
        before = Nothing
1✔
1022
        if isinstance(before_reference, str):
1✔
1023
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1024
            before = before_delta.before
1✔
1025
        after = Nothing
1✔
1026
        after_reference = reference_delta.after
1✔
1027
        if isinstance(after_reference, str):
1✔
1028
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1029
            after = after_delta.after
1✔
1030
        return PreprocEntityDelta(before=before, after=after)
1✔
1031

1032
    def visit_node_resource(
1✔
1033
        self, node_resource: NodeResource
1034
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1035
        change_type = node_resource.change_type
1✔
1036
        condition_before = Nothing
1✔
1037
        condition_after = Nothing
1✔
1038
        if not is_nothing(node_resource.condition_reference):
1✔
1039
            condition_delta = self._resolve_resource_condition_reference(
1✔
1040
                node_resource.condition_reference
1041
            )
1042
            condition_before = condition_delta.before
1✔
1043
            condition_after = condition_delta.after
1✔
1044

1045
        depends_on_before = Nothing
1✔
1046
        depends_on_after = Nothing
1✔
1047
        if not is_nothing(node_resource.depends_on):
1✔
1048
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1049
            depends_on_before = depends_on_delta.before
1✔
1050
            depends_on_after = depends_on_delta.after
1✔
1051

1052
        type_delta = self.visit(node_resource.type_)
1✔
1053
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1054
            node_resource.properties
1055
        )
1056

1057
        before = Nothing
1✔
1058
        after = Nothing
1✔
1059
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1060
            logical_resource_id = node_resource.name
1✔
1061
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1062
                resource_logical_id=logical_resource_id
1063
            )
1064
            before = PreprocResource(
1✔
1065
                logical_id=logical_resource_id,
1066
                physical_resource_id=before_physical_resource_id,
1067
                condition=condition_before,
1068
                resource_type=type_delta.before,
1069
                properties=properties_delta.before,
1070
                depends_on=depends_on_before,
1071
                requires_replacement=False,
1072
            )
1073
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1074
            logical_resource_id = node_resource.name
1✔
1075
            try:
1✔
1076
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1077
                    resource_logical_id=logical_resource_id
1078
                )
1079
            except RuntimeError:
1✔
1080
                after_physical_resource_id = None
1✔
1081
            after = PreprocResource(
1✔
1082
                logical_id=logical_resource_id,
1083
                physical_resource_id=after_physical_resource_id,
1084
                condition=condition_after,
1085
                resource_type=type_delta.after,
1086
                properties=properties_delta.after,
1087
                depends_on=depends_on_after,
1088
                requires_replacement=node_resource.requires_replacement,
1089
            )
1090
        return PreprocEntityDelta(before=before, after=after)
1✔
1091

1092
    def visit_node_output(
1✔
1093
        self, node_output: NodeOutput
1094
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1095
        change_type = node_output.change_type
1✔
1096
        value_delta = self.visit(node_output.value)
1✔
1097

1098
        condition_delta = Nothing
1✔
1099
        if not is_nothing(node_output.condition_reference):
1✔
1100
            condition_delta = self._resolve_resource_condition_reference(
1✔
1101
                node_output.condition_reference
1102
            )
1103
            condition_before = condition_delta.before
1✔
1104
            condition_after = condition_delta.after
1✔
1105
            if not condition_before and condition_after:
1✔
1106
                change_type = ChangeType.CREATED
1✔
1107
            elif condition_before and not condition_after:
1✔
1108
                change_type = ChangeType.REMOVED
1✔
1109

1110
        export_delta = Nothing
1✔
1111
        if not is_nothing(node_output.export):
1✔
1112
            export_delta = self.visit(node_output.export)
1✔
1113

1114
        before: Maybe[PreprocOutput] = Nothing
1✔
1115
        if change_type != ChangeType.CREATED:
1✔
1116
            before = PreprocOutput(
1✔
1117
                name=node_output.name,
1118
                value=value_delta.before,
1119
                export=export_delta.before if export_delta else None,
1120
                condition=condition_delta.before if condition_delta else None,
1121
            )
1122
        after: Maybe[PreprocOutput] = Nothing
1✔
1123
        if change_type != ChangeType.REMOVED:
1✔
1124
            after = PreprocOutput(
1✔
1125
                name=node_output.name,
1126
                value=value_delta.after,
1127
                export=export_delta.after if export_delta else None,
1128
                condition=condition_delta.after if condition_delta else None,
1129
            )
1130
        return PreprocEntityDelta(before=before, after=after)
1✔
1131

1132
    def visit_node_outputs(
1✔
1133
        self, node_outputs: NodeOutputs
1134
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1135
        before: list[PreprocOutput] = list()
1✔
1136
        after: list[PreprocOutput] = list()
1✔
1137
        for node_output in node_outputs.outputs:
1✔
1138
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1139
            output_before = output_delta.before
1✔
1140
            output_after = output_delta.after
1✔
1141
            if not is_nothing(output_before):
1✔
1142
                before.append(output_before)
1✔
1143
            if not is_nothing(output_after):
1✔
1144
                after.append(output_after)
1✔
1145
        return PreprocEntityDelta(before=before, after=after)
1✔
1146

1147
    def visit_node_intrinsic_function_fn_import_value(
1✔
1148
        self, node_intrinsic_function: NodeIntrinsicFunction
1149
    ) -> PreprocEntityDelta:
1150
        def _compute_fn_import_value(string) -> str:
1✔
1151
            if not isinstance(string, str):
1✔
UNCOV
1152
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1153

1154
            exports = exports_map(
1✔
1155
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1156
            )
1157

1158
            return exports.get(string, {}).get("Value") or Nothing
1✔
1159

1160
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1161
        delta = self._cached_apply(
1✔
1162
            scope=node_intrinsic_function.scope,
1163
            arguments_delta=arguments_delta,
1164
            resolver=_compute_fn_import_value,
1165
        )
1166
        return delta
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc