• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17115498067

20 Aug 2025 09:05PM UTC coverage: 86.876% (-0.01%) from 86.889%
17115498067

push

github

simonrw
Handle parameter conversions for different transforms

13 of 13 new or added lines in 2 files covered. (100.0%)

42 existing lines in 7 files now uncovered.

67023 of 77148 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.71
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
13
from localstack.aws.connect import connect_to
1✔
14
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
15
    ChangeSetEntity,
16
    ChangeType,
17
    Maybe,
18
    NodeArray,
19
    NodeCondition,
20
    NodeDependsOn,
21
    NodeDivergence,
22
    NodeIntrinsicFunction,
23
    NodeMapping,
24
    NodeObject,
25
    NodeOutput,
26
    NodeOutputs,
27
    NodeParameter,
28
    NodeParameters,
29
    NodeProperties,
30
    NodeProperty,
31
    NodeResource,
32
    NodeTemplate,
33
    Nothing,
34
    NothingType,
35
    Scope,
36
    TerminalValue,
37
    TerminalValueCreated,
38
    TerminalValueModified,
39
    TerminalValueRemoved,
40
    TerminalValueUnchanged,
41
    is_nothing,
42
)
43
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
44
    ChangeSetModelVisitor,
45
)
46
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
47
    extract_dynamic_reference,
48
    perform_dynamic_reference_lookup,
49
)
50
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
51
from localstack.services.cloudformation.stores import (
1✔
52
    exports_map,
53
)
54
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
55
from localstack.utils.aws.arns import get_partition
1✔
56
from localstack.utils.objects import get_value_from_path
1✔
57
from localstack.utils.run import to_str
1✔
58
from localstack.utils.strings import to_bytes
1✔
59
from localstack.utils.urls import localstack_host
1✔
60

61
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
62

63
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
64
    "AWS::Partition",
65
    "AWS::AccountId",
66
    "AWS::Region",
67
    "AWS::StackName",
68
    "AWS::StackId",
69
    "AWS::URLSuffix",
70
    "AWS::NoValue",
71
    "AWS::NotificationARNs",
72
}
73

74
TBefore = TypeVar("TBefore")
1✔
75
TAfter = TypeVar("TAfter")
1✔
76

77
MOCKED_REFERENCE = "unknown"
1✔
78

79
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
80

81

82
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
83
    before: Maybe[TBefore]
1✔
84
    after: Maybe[TAfter]
1✔
85

86
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
87
        self.before = before
1✔
88
        self.after = after
1✔
89

90
    def __eq__(self, other):
1✔
91
        if not isinstance(other, PreprocEntityDelta):
×
92
            return False
×
93
        return self.before == other.before and self.after == other.after
×
94

95

96
class PreprocProperties:
1✔
97
    properties: dict[str, Any]
1✔
98

99
    def __init__(self, properties: dict[str, Any]):
1✔
100
        self.properties = properties
1✔
101

102
    def __eq__(self, other):
1✔
103
        if not isinstance(other, PreprocProperties):
1✔
104
            return False
×
105
        return self.properties == other.properties
1✔
106

107

108
class PreprocResource:
1✔
109
    logical_id: str
1✔
110
    physical_resource_id: str | None
1✔
111
    condition: bool | None
1✔
112
    resource_type: str
1✔
113
    properties: PreprocProperties
1✔
114
    depends_on: list[str] | None
1✔
115
    requires_replacement: bool
1✔
116

117
    def __init__(
1✔
118
        self,
119
        logical_id: str,
120
        physical_resource_id: str,
121
        condition: bool | None,
122
        resource_type: str,
123
        properties: PreprocProperties,
124
        depends_on: list[str] | None,
125
        requires_replacement: bool,
126
    ):
127
        self.logical_id = logical_id
1✔
128
        self.physical_resource_id = physical_resource_id
1✔
129
        self.condition = condition
1✔
130
        self.resource_type = resource_type
1✔
131
        self.properties = properties
1✔
132
        self.depends_on = depends_on
1✔
133
        self.requires_replacement = requires_replacement
1✔
134

135
    @staticmethod
1✔
136
    def _compare_conditions(c1: bool, c2: bool):
1✔
137
        # The lack of condition equates to a true condition.
138
        c1 = c1 if isinstance(c1, bool) else True
1✔
139
        c2 = c2 if isinstance(c2, bool) else True
1✔
140
        return c1 == c2
1✔
141

142
    def __eq__(self, other):
1✔
143
        if not isinstance(other, PreprocResource):
1✔
144
            return False
1✔
145
        return all(
1✔
146
            [
147
                self.logical_id == other.logical_id,
148
                self._compare_conditions(self.condition, other.condition),
149
                self.resource_type == other.resource_type,
150
                self.properties == other.properties,
151
            ]
152
        )
153

154

155
class PreprocOutput:
1✔
156
    name: str
1✔
157
    value: Any
1✔
158
    export: Any | None
1✔
159
    condition: bool | None
1✔
160

161
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
162
        self.name = name
1✔
163
        self.value = value
1✔
164
        self.export = export
1✔
165
        self.condition = condition
1✔
166

167
    def __eq__(self, other):
1✔
168
        if not isinstance(other, PreprocOutput):
×
169
            return False
×
170
        return all(
×
171
            [
172
                self.name == other.name,
173
                self.value == other.value,
174
                self.export == other.export,
175
                self.condition == other.condition,
176
            ]
177
        )
178

179

180
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
181
    _change_set: Final[ChangeSet]
1✔
182
    _before_resolved_resources: Final[dict]
1✔
183
    _before_cache: Final[dict[Scope, Any]]
1✔
184
    _after_cache: Final[dict[Scope, Any]]
1✔
185

186
    def __init__(self, change_set: ChangeSet):
1✔
187
        self._change_set = change_set
1✔
188
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
189
        self._before_cache = {}
1✔
190
        self._after_cache = {}
1✔
191

192
    def _setup_runtime_cache(self) -> None:
1✔
193
        runtime_cache_key = self.__class__.__name__
1✔
194

195
        self._before_cache.clear()
1✔
196
        self._after_cache.clear()
1✔
197

198
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
199
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
200
            self._before_cache.update(cache)
1✔
201

202
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
203
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
204
            self._after_cache.update(cache)
×
205

206
    def _save_runtime_cache(self) -> None:
1✔
207
        runtime_cache_key = self.__class__.__name__
1✔
208

209
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
210
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
211

212
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
213
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
214

215
    def process(self) -> None:
1✔
216
        self._setup_runtime_cache()
1✔
217
        node_template = self._change_set.update_model.node_template
1✔
218
        self.visit(node_template)
1✔
219
        self._save_runtime_cache()
1✔
220

221
    def _get_node_resource_for(
1✔
222
        self, resource_name: str, node_template: NodeTemplate
223
    ) -> NodeResource:
224
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
225
        for node_resource in node_template.resources.resources:
1✔
226
            if node_resource.name == resource_name:
1✔
227
                self.visit(node_resource)
1✔
228
                return node_resource
1✔
229
        raise ValidationError(
1✔
230
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
231
        )
232

233
    def _get_node_property_for(
1✔
234
        self, property_name: str, node_resource: NodeResource
235
    ) -> NodeProperty | None:
236
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
237
        for node_property in node_resource.properties.properties:
1✔
238
            if node_property.name == property_name:
1✔
239
                self.visit(node_property)
1✔
240
                return node_property
1✔
241
        return None
1✔
242

243
    def _deployed_property_value_of(
1✔
244
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
245
    ) -> Any:
246
        # We have to override this function to make sure it does not try to access the
247
        # resolved resource
248

249
        # Before we can obtain deployed value for a resource, we need to first ensure to
250
        # process the resource if this wasn't processed already. Ideally, values should only
251
        # be accessible through delta objects, to ensure computation is always complete at
252
        # every level.
253
        _ = self._get_node_resource_for(
1✔
254
            resource_name=resource_logical_id,
255
            node_template=self._change_set.update_model.node_template,
256
        )
257
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
258
        if resolved_resource is None:
1✔
259
            raise RuntimeError(
1✔
260
                f"No deployed instances of resource '{resource_logical_id}' were found"
261
            )
262
        properties = resolved_resource.get("Properties", {})
1✔
263
        # support structured properties, e.g. NestedStack.Outputs.OutputName
264
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
265

266
        if property_value:
1✔
267
            if not isinstance(property_value, str):
1✔
268
                # TODO: is this correct? If there is a bug in the logic here, it's probably
269
                #  better to know about it with a clear error message than to receive some form
270
                #  of message about trying to use a dictionary in place of a string
271
                raise RuntimeError(
×
272
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value"
273
                )
274
            return property_value
1✔
275
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
276
            return MOCKED_REFERENCE
1✔
277

278
        return property_value
×
279

280
    def _before_deployed_property_value_of(
1✔
281
        self, resource_logical_id: str, property_name: str
282
    ) -> Any:
283
        return self._deployed_property_value_of(
1✔
284
            resource_logical_id=resource_logical_id,
285
            property_name=property_name,
286
            resolved_resources=self._before_resolved_resources,
287
        )
288

289
    def _after_deployed_property_value_of(
1✔
290
        self, resource_logical_id: str, property_name: str
291
    ) -> str | None:
292
        return self._before_deployed_property_value_of(
1✔
293
            resource_logical_id=resource_logical_id, property_name=property_name
294
        )
295

296
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
297
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
298
        # TODO: another scenarios suggesting property lookups might be preferable.
299
        for mapping in mappings:
1✔
300
            if mapping.name == map_name:
1✔
301
                self.visit(mapping)
1✔
302
                return mapping
1✔
303
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
304

305
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
306
        parameters: list[NodeParameter] = (
1✔
307
            self._change_set.update_model.node_template.parameters.parameters
308
        )
309
        # TODO: another scenarios suggesting property lookups might be preferable.
310
        for parameter in parameters:
1✔
311
            if parameter.name == parameter_name:
1✔
312
                self.visit(parameter)
1✔
313
                return parameter
1✔
314
        return Nothing
1✔
315

316
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
317
        conditions: list[NodeCondition] = (
1✔
318
            self._change_set.update_model.node_template.conditions.conditions
319
        )
320
        # TODO: another scenarios suggesting property lookups might be preferable.
321
        for condition in conditions:
1✔
322
            if condition.name == condition_name:
1✔
323
                self.visit(condition)
1✔
324
                return condition
1✔
325
        return Nothing
×
326

327
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
328
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
329
        if isinstance(node_condition, NodeCondition):
1✔
330
            condition_delta = self.visit(node_condition)
1✔
331
            return condition_delta
1✔
332
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
333

334
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
335
        match pseudo_parameter_name:
1✔
336
            case "AWS::Partition":
1✔
337
                return get_partition(self._change_set.region_name)
1✔
338
            case "AWS::AccountId":
1✔
339
                return self._change_set.stack.account_id
1✔
340
            case "AWS::Region":
1✔
341
                return self._change_set.stack.region_name
1✔
342
            case "AWS::StackName":
1✔
343
                return self._change_set.stack.stack_name
1✔
344
            case "AWS::StackId":
1✔
345
                return self._change_set.stack.stack_id
1✔
346
            case "AWS::URLSuffix":
1✔
347
                return _AWS_URL_SUFFIX
1✔
348
            case "AWS::NoValue":
×
349
                return None
×
350
            case _:
×
351
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
352

353
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
354
        if logical_id in _PSEUDO_PARAMETERS:
1✔
355
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
356
                pseudo_parameter_name=logical_id
357
            )
358
            # Pseudo parameters are constants within the lifecycle of a template.
359
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
360

361
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
362
        if isinstance(node_parameter, NodeParameter):
1✔
363
            parameter_delta = self.visit(node_parameter)
1✔
364
            return parameter_delta
1✔
365

366
        node_resource = self._get_node_resource_for(
1✔
367
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
368
        )
369
        resource_delta = self.visit(node_resource)
1✔
370
        before = resource_delta.before
1✔
371
        after = resource_delta.after
1✔
372
        return PreprocEntityDelta(before=before, after=after)
1✔
373

374
    def _resolve_mapping(
1✔
375
        self, map_name: str, top_level_key: str, second_level_key
376
    ) -> PreprocEntityDelta:
377
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
378
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
379
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
380
        if not isinstance(top_level_value, NodeObject):
1✔
381
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
382
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
383
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
384
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
385
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
386
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
387
        mapping_value_delta = self.visit(second_level_value)
1✔
388
        return mapping_value_delta
1✔
389

390
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
391
        entity_scope = change_set_entity.scope
1✔
392
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
393
            before = self._before_cache[entity_scope]
1✔
394
            after = self._after_cache[entity_scope]
1✔
395
            return PreprocEntityDelta(before=before, after=after)
1✔
396
        delta = super().visit(change_set_entity=change_set_entity)
1✔
397
        if isinstance(delta, PreprocEntityDelta):
1✔
398
            self._before_cache[entity_scope] = delta.before
1✔
399
            self._after_cache[entity_scope] = delta.after
1✔
400
        return delta
1✔
401

402
    def _cached_apply(
1✔
403
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
404
    ) -> PreprocEntityDelta:
405
        """
406
        Applies the resolver function to the given input delta if and only if the required
407
        values are not already present in the runtime caches. This function handles both
408
        the 'before' and 'after' components of the delta independently.
409

410
        The resolver function receives either the 'before' or 'after' value from the input
411
        delta and returns a resolved value. If the result returned by the resolver is
412
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
413
        component from it:  the 'before' value if the input was 'before', and the 'after'
414
        value if the input was 'after'.
415

416
        This function only reads from the cache and does not update it. It is the caller's
417
        responsibility to handle caching, either manually or via the upstream visit method
418
        of this class.
419

420
        Args:
421
            scope (Scope): The current scope used as a key for cache lookup.
422
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
423
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
424

425
        Returns:
426
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
427
        """
428

429
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
430
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
431
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
432

433
        arguments_before = arguments_delta.before
1✔
434
        arguments_after = arguments_delta.after
1✔
435

436
        before = self._before_cache.get(scope, Nothing)
1✔
437
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
438
            before = resolver(arguments_before)
1✔
439
            if isinstance(before, PreprocEntityDelta):
1✔
440
                before = before.before
1✔
441

442
        after = self._after_cache.get(scope, Nothing)
1✔
443
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
444
            after = resolver(arguments_after)
1✔
445
            if isinstance(after, PreprocEntityDelta):
1✔
446
                after = after.after
1✔
447

448
        return PreprocEntityDelta(before=before, after=after)
1✔
449

450
    def visit_terminal_value_modified(
1✔
451
        self, terminal_value_modified: TerminalValueModified
452
    ) -> PreprocEntityDelta:
453
        return PreprocEntityDelta(
1✔
454
            before=terminal_value_modified.value,
455
            after=terminal_value_modified.modified_value,
456
        )
457

458
    def visit_terminal_value_created(
1✔
459
        self, terminal_value_created: TerminalValueCreated
460
    ) -> PreprocEntityDelta:
461
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
462

463
    def visit_terminal_value_removed(
1✔
464
        self, terminal_value_removed: TerminalValueRemoved
465
    ) -> PreprocEntityDelta:
466
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
467

468
    def visit_terminal_value_unchanged(
1✔
469
        self, terminal_value_unchanged: TerminalValueUnchanged
470
    ) -> PreprocEntityDelta:
471
        return PreprocEntityDelta(
1✔
472
            before=terminal_value_unchanged.value,
473
            after=terminal_value_unchanged.value,
474
        )
475

476
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
477
        before_delta = self.visit(node_divergence.value)
1✔
478
        after_delta = self.visit(node_divergence.divergence)
1✔
479
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
480

481
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
482
        node_change_type = node_object.change_type
1✔
483
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
484
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
485
        for name, change_set_entity in node_object.bindings.items():
1✔
486
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
487
            delta_before = delta.before
1✔
488
            delta_after = delta.after
1✔
489
            if not is_nothing(before) and not is_nothing(delta_before) and delta_before is not None:
1✔
490
                before[name] = delta_before
1✔
491
            if not is_nothing(after) and not is_nothing(delta_after) and delta_after is not None:
1✔
492
                after[name] = delta_after
1✔
493
        return PreprocEntityDelta(before=before, after=after)
1✔
494

495
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
496
        # TODO: add arguments validation.
497
        arguments_list: list[str]
498
        if isinstance(arguments, str):
1✔
499
            arguments_list = arguments.split(".")
1✔
500
        else:
501
            arguments_list = arguments
1✔
502
        logical_name_of_resource = arguments_list[0]
1✔
503
        attribute_name = arguments_list[1]
1✔
504

505
        node_resource = self._get_node_resource_for(
1✔
506
            resource_name=logical_name_of_resource,
507
            node_template=self._change_set.update_model.node_template,
508
        )
509
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
510
            property_name=attribute_name, node_resource=node_resource
511
        )
512
        if node_property is not None:
1✔
513
            # The property is statically defined in the template and its value can be computed.
514
            property_delta = self.visit(node_property)
1✔
515
            value = property_delta.before if select_before else property_delta.after
1✔
516
        else:
517
            # The property is not statically defined and must therefore be available in
518
            # the properties deployed set.
519
            if select_before:
1✔
520
                value = self._before_deployed_property_value_of(
1✔
521
                    resource_logical_id=logical_name_of_resource,
522
                    property_name=attribute_name,
523
                )
524
            else:
525
                value = self._after_deployed_property_value_of(
1✔
526
                    resource_logical_id=logical_name_of_resource,
527
                    property_name=attribute_name,
528
                )
529
        return value
1✔
530

531
    def visit_node_intrinsic_function_fn_get_att(
1✔
532
        self, node_intrinsic_function: NodeIntrinsicFunction
533
    ) -> PreprocEntityDelta:
534
        # TODO: validate the return value according to the spec.
535
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
536
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
537
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
538

539
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
540
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
541
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
542

543
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
544
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
545
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
546

547
        return PreprocEntityDelta(before=before, after=after)
1✔
548

549
    def visit_node_intrinsic_function_fn_equals(
1✔
550
        self, node_intrinsic_function: NodeIntrinsicFunction
551
    ) -> PreprocEntityDelta:
552
        # TODO: add argument shape validation.
553
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
554
            return args[0] == args[1]
1✔
555

556
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
557
        delta = self._cached_apply(
1✔
558
            scope=node_intrinsic_function.scope,
559
            arguments_delta=arguments_delta,
560
            resolver=_compute_fn_equals,
561
        )
562
        return delta
1✔
563

564
    def visit_node_intrinsic_function_fn_if(
1✔
565
        self, node_intrinsic_function: NodeIntrinsicFunction
566
    ) -> PreprocEntityDelta:
567
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
568
        # False branch. If the condition is False, we don't evaluate the True branch.
569
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
570
            raise ValueError(
×
571
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
572
            )
573

574
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
575
        if_delta = PreprocEntityDelta()
1✔
576
        if not is_nothing(condition_delta.before):
1✔
577
            node_condition = self._get_node_condition_if_exists(
1✔
578
                condition_name=condition_delta.before
579
            )
580
            condition_value = self.visit(node_condition).before
1✔
581
            if condition_value:
1✔
582
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
583
            else:
584
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
585
            if_delta.before = arg_delta.before
1✔
586

587
        if not is_nothing(condition_delta.after):
1✔
588
            node_condition = self._get_node_condition_if_exists(
1✔
589
                condition_name=condition_delta.after
590
            )
591
            condition_value = self.visit(node_condition).after
1✔
592
            if condition_value:
1✔
593
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
594
            else:
595
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
596
            if_delta.after = arg_delta.after
1✔
597

598
        return if_delta
1✔
599

600
    def visit_node_intrinsic_function_fn_and(
1✔
601
        self, node_intrinsic_function: NodeIntrinsicFunction
602
    ) -> PreprocEntityDelta:
603
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
604
            result = all(args)
1✔
605
            return result
1✔
606

607
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
608
        delta = self._cached_apply(
1✔
609
            scope=node_intrinsic_function.scope,
610
            arguments_delta=arguments_delta,
611
            resolver=_compute_fn_and,
612
        )
613
        return delta
1✔
614

615
    def visit_node_intrinsic_function_fn_or(
1✔
616
        self, node_intrinsic_function: NodeIntrinsicFunction
617
    ) -> PreprocEntityDelta:
618
        def _compute_fn_or(args: list[bool]):
1✔
619
            result = any(args)
1✔
620
            return result
1✔
621

622
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
623
        delta = self._cached_apply(
1✔
624
            scope=node_intrinsic_function.scope,
625
            arguments_delta=arguments_delta,
626
            resolver=_compute_fn_or,
627
        )
628
        return delta
1✔
629

630
    def visit_node_intrinsic_function_fn_not(
1✔
631
        self, node_intrinsic_function: NodeIntrinsicFunction
632
    ) -> PreprocEntityDelta:
633
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
634
            # Is the argument ever a lone boolean?
635
            if isinstance(arg, list):
1✔
636
                return not arg[0]
1✔
637
            else:
638
                return not arg
×
639

640
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
641
        delta = self._cached_apply(
1✔
642
            scope=node_intrinsic_function.scope,
643
            arguments_delta=arguments_delta,
644
            resolver=_compute_fn_not,
645
        )
646
        return delta
1✔
647

648
    def visit_node_intrinsic_function_fn_sub(
1✔
649
        self, node_intrinsic_function: NodeIntrinsicFunction
650
    ) -> PreprocEntityDelta:
651
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
652
            # TODO: add further schema validation.
653
            string_template: str
654
            sub_parameters: dict
655
            if isinstance(args, str):
1✔
656
                string_template = args
1✔
657
                sub_parameters = {}
1✔
658
            elif (
1✔
659
                isinstance(args, list)
660
                and len(args) == 2
661
                and isinstance(args[0], str)
662
                and isinstance(args[1], dict)
663
            ):
664
                string_template = args[0]
1✔
665
                sub_parameters = args[1]
1✔
666
            else:
667
                raise RuntimeError(
×
668
                    "Invalid arguments shape for Fn::Sub, expected a String "
669
                    f"or a Tuple of String and Map but got '{args}'"
670
                )
671
            sub_string = string_template
1✔
672
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
673
            for template_variable_name in template_variable_names:
1✔
674
                template_variable_value = Nothing
1✔
675

676
                # Try to resolve the variable name as pseudo parameter.
677
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
678
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
679
                        pseudo_parameter_name=template_variable_name
680
                    )
681

682
                # Try to resolve the variable name as an entry to the defined parameters.
683
                elif template_variable_name in sub_parameters:
1✔
684
                    template_variable_value = sub_parameters[template_variable_name]
1✔
685

686
                # Try to resolve the variable name as GetAtt.
687
                elif "." in template_variable_name:
1✔
688
                    try:
1✔
689
                        template_variable_value = self._resolve_attribute(
1✔
690
                            arguments=template_variable_name, select_before=select_before
691
                        )
692
                    except RuntimeError:
1✔
693
                        pass
1✔
694

695
                # Try to resolve the variable name as Ref.
696
                else:
697
                    try:
1✔
698
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
699
                        template_variable_value = (
1✔
700
                            resource_delta.before if select_before else resource_delta.after
701
                        )
702
                        if isinstance(template_variable_value, PreprocResource):
1✔
703
                            template_variable_value = template_variable_value.physical_resource_id
1✔
704
                    except RuntimeError:
×
705
                        pass
×
706

707
                if is_nothing(template_variable_value):
1✔
708
                    raise RuntimeError(
1✔
709
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
710
                    )
711

712
                if not isinstance(template_variable_value, str):
1✔
713
                    template_variable_value = str(template_variable_value)
1✔
714

715
                sub_string = sub_string.replace(
1✔
716
                    f"${{{template_variable_name}}}", template_variable_value
717
                )
718

719
            # FIXME: the following type reduction is ported from v1; however it appears as though such
720
            #        reduction is not performed by the engine, and certainly not at this depth given the
721
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
722
            #        and the resource providers reviewed.
723
            account_id = self._change_set.account_id
1✔
724
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
725
            if sub_string == account_id or is_another_account_id:
1✔
726
                result = sub_string
1✔
727
            elif sub_string.isdigit():
1✔
728
                result = int(sub_string)
1✔
729
            else:
730
                try:
1✔
731
                    result = float(sub_string)
1✔
732
                except ValueError:
1✔
733
                    result = sub_string
1✔
734
            return result
1✔
735

736
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
737
        arguments_before = arguments_delta.before
1✔
738
        arguments_after = arguments_delta.after
1✔
739
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
740
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
741
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
742
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
743
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
744
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
745
        return PreprocEntityDelta(before=before, after=after)
1✔
746

747
    def visit_node_intrinsic_function_fn_join(
1✔
748
        self, node_intrinsic_function: NodeIntrinsicFunction
749
    ) -> PreprocEntityDelta:
750
        # TODO: add support for schema validation.
751
        # TODO: add tests for joining non string values.
752
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
753
            if not (isinstance(args, list) and len(args) == 2):
1✔
754
                return Nothing
1✔
755
            delimiter: str = str(args[0])
1✔
756
            values: list[Any] = args[1]
1✔
757
            if not isinstance(values, list):
1✔
758
                # shortcut if values is the empty string, for example:
759
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
760
                # CDK bootstrap does this
761
                if values == "":
1✔
762
                    return ""
1✔
763
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
764
            str_values: list[str] = []
1✔
765
            for value in values:
1✔
766
                if value is None:
1✔
767
                    continue
1✔
768
                str_value = str(value)
1✔
769
                str_values.append(str_value)
1✔
770
            join_result = delimiter.join(str_values)
1✔
771
            return join_result
1✔
772

773
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
774
        delta = self._cached_apply(
1✔
775
            scope=node_intrinsic_function.scope,
776
            arguments_delta=arguments_delta,
777
            resolver=_compute_fn_join,
778
        )
779
        return delta
1✔
780

781
    def visit_node_intrinsic_function_fn_select(
1✔
782
        self, node_intrinsic_function: NodeIntrinsicFunction
783
    ):
784
        # TODO: add further support for schema validation
785
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
786
            values: list[Any] = args[1]
1✔
787
            if not isinstance(values, list) or not values:
1✔
788
                raise RuntimeError(f"Invalid arguments list value for Fn::Select: '{values}'")
×
789
            values_len = len(values)
1✔
790
            index: int = int(args[0])
1✔
791
            if not isinstance(index, int) or index < 0 or index > values_len:
1✔
792
                raise RuntimeError(f"Invalid or out of range index value for Fn::Select: '{index}'")
1✔
793
            selection = values[index]
1✔
794
            return selection
1✔
795

796
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
797
        delta = self._cached_apply(
1✔
798
            scope=node_intrinsic_function.scope,
799
            arguments_delta=arguments_delta,
800
            resolver=_compute_fn_select,
801
        )
802
        return delta
1✔
803

804
    def visit_node_intrinsic_function_fn_split(
1✔
805
        self, node_intrinsic_function: NodeIntrinsicFunction
806
    ):
807
        # TODO: add further support for schema validation
808
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
809
            delimiter = args[0]
1✔
810
            if not isinstance(delimiter, str) or not delimiter:
1✔
811
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
812
            source_string = args[1]
1✔
813
            if not isinstance(source_string, str):
1✔
814
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
815
            split_string = source_string.split(delimiter)
1✔
816
            return split_string
1✔
817

818
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
819
        delta = self._cached_apply(
1✔
820
            scope=node_intrinsic_function.scope,
821
            arguments_delta=arguments_delta,
822
            resolver=_compute_fn_split,
823
        )
824
        return delta
1✔
825

826
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
827
        self, node_intrinsic_function: NodeIntrinsicFunction
828
    ) -> PreprocEntityDelta:
829
        # TODO: add further support for schema validation
830

831
        def _compute_fn_get_a_zs(region) -> Any:
1✔
832
            if not isinstance(region, str):
1✔
833
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
834

835
            if not region:
1✔
836
                region = self._change_set.region_name
1✔
837

838
            account_id = self._change_set.account_id
1✔
839
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
840
            try:
1✔
841
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
842
                    ec2_client.describe_availability_zones()
843
                )
844
            except ClientError:
×
845
                raise RuntimeError(
×
846
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
847
                )
848
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
849
                "AvailabilityZones"
850
            ]
851
            azs = [az["ZoneName"] for az in availability_zones]
1✔
852
            return azs
1✔
853

854
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
855
        delta = self._cached_apply(
1✔
856
            scope=node_intrinsic_function.scope,
857
            arguments_delta=arguments_delta,
858
            resolver=_compute_fn_get_a_zs,
859
        )
860
        return delta
1✔
861

862
    def visit_node_intrinsic_function_fn_base64(
1✔
863
        self, node_intrinsic_function: NodeIntrinsicFunction
864
    ) -> PreprocEntityDelta:
865
        # TODO: add further support for schema validation
866
        def _compute_fn_base_64(string) -> Any:
1✔
867
            if not isinstance(string, str):
1✔
868
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
869
            # Ported from v1:
870
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
871
            return base64_string
1✔
872

873
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
874
        delta = self._cached_apply(
1✔
875
            scope=node_intrinsic_function.scope,
876
            arguments_delta=arguments_delta,
877
            resolver=_compute_fn_base_64,
878
        )
879
        return delta
1✔
880

881
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
882
        self, node_intrinsic_function: NodeIntrinsicFunction
883
    ) -> PreprocEntityDelta:
884
        # TODO: add type checking/validation for result unit?
885
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
886
        before_arguments = arguments_delta.before
1✔
887
        after_arguments = arguments_delta.after
1✔
888
        before = Nothing
1✔
889
        if before_arguments:
1✔
890
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
891
            before = before_value_delta.before
1✔
892
        after = Nothing
1✔
893
        if after_arguments:
1✔
894
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
895
            after = after_value_delta.after
1✔
896
        return PreprocEntityDelta(before=before, after=after)
1✔
897

898
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
899
        bindings_delta = self.visit(node_mapping.bindings)
1✔
900
        return bindings_delta
1✔
901

902
    def visit_node_parameters(
1✔
903
        self, node_parameters: NodeParameters
904
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
UNCOV
905
        before_parameters = {}
×
UNCOV
906
        after_parameters = {}
×
UNCOV
907
        for parameter in node_parameters.parameters:
×
UNCOV
908
            parameter_delta = self.visit(parameter)
×
UNCOV
909
            parameter_before = parameter_delta.before
×
UNCOV
910
            if not is_nothing(parameter_before):
×
UNCOV
911
                before_parameters[parameter.name] = parameter_before
×
UNCOV
912
            parameter_after = parameter_delta.after
×
UNCOV
913
            if not is_nothing(parameter_after):
×
UNCOV
914
                after_parameters[parameter.name] = parameter_after
×
UNCOV
915
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
×
916

917
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
918
        dynamic_value = node_parameter.dynamic_value
1✔
919
        dynamic_delta = self.visit(dynamic_value)
1✔
920

921
        default_value = node_parameter.default_value
1✔
922
        default_delta = self.visit(default_value)
1✔
923

924
        before = dynamic_delta.before or default_delta.before
1✔
925
        after = dynamic_delta.after or default_delta.after
1✔
926

927
        parameter_type = self.visit(node_parameter.type_)
1✔
928

929
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
930
            match type_:
1✔
931
                case "List<String>" | "CommaDelimitedList":
1✔
932
                    return [item.strip() for item in value.split(",")]
1✔
933
            return value
1✔
934

935
        if not is_nothing(after):
1✔
936
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
937

938
        return PreprocEntityDelta(before=before, after=after)
1✔
939

940
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
941
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
942
        return array_identifiers_delta
1✔
943

944
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
945
        delta = self.visit(node_condition.body)
1✔
946
        return delta
1✔
947

948
    def _resource_physical_resource_id_from(
1✔
949
        self, logical_resource_id: str, resolved_resources: dict
950
    ) -> str:
951
        # TODO: typing around resolved resources is needed and should be reflected here.
952
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
953
        physical_resource_id: str | None = resolved_resource.get("PhysicalResourceId")
1✔
954
        if not isinstance(physical_resource_id, str):
1✔
955
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
956
        return physical_resource_id
1✔
957

958
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
959
        # TODO: typing around resolved resources is needed and should be reflected here.
960
        return self._resource_physical_resource_id_from(
1✔
961
            logical_resource_id=resource_logical_id,
962
            resolved_resources=self._before_resolved_resources,
963
        )
964

965
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
966
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
967

968
    def visit_node_intrinsic_function_ref(
1✔
969
        self, node_intrinsic_function: NodeIntrinsicFunction
970
    ) -> PreprocEntityDelta:
971
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
972
            if logical_id == "AWS::NoValue":
1✔
973
                return Nothing
×
974

975
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
976
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
977
                reference_delta.before = before.physical_resource_id
1✔
978
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
979
                reference_delta.after = after.physical_resource_id
1✔
980
            return reference_delta
1✔
981

982
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
983
        delta = self._cached_apply(
1✔
984
            scope=node_intrinsic_function.scope,
985
            arguments_delta=arguments_delta,
986
            resolver=_compute_fn_ref,
987
        )
988
        return delta
1✔
989

990
    def visit_node_intrinsic_function_condition(
1✔
991
        self, node_intrinsic_function: NodeIntrinsicFunction
992
    ) -> PreprocEntityDelta:
993
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
994

995
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
996
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
997
            if is_nothing(node_condition):
1✔
998
                raise RuntimeError(f"Undefined condition '{name}'")
×
999
            condition_delta = self.visit(node_condition)
1✔
1000
            return condition_delta
1✔
1001

1002
        delta = self._cached_apply(
1✔
1003
            resolver=_delta_of_condition,
1004
            scope=node_intrinsic_function.scope,
1005
            arguments_delta=arguments_delta,
1006
        )
1007
        return delta
1✔
1008

1009
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1010
        node_change_type = node_array.change_type
1✔
1011
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1012
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1013
        for change_set_entity in node_array.array:
1✔
1014
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1015
            delta_before = delta.before
1✔
1016
            delta_after = delta.after
1✔
1017
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1018
                before.append(delta_before)
1✔
1019
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1020
                after.append(delta_after)
1✔
1021
        return PreprocEntityDelta(before=before, after=after)
1✔
1022

1023
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
1024
        # TODO: what about other positions?
1025
        value = self.visit(node_property.value)
1✔
1026
        if not is_nothing(value.before):
1✔
1027
            if dynamic_ref := extract_dynamic_reference(value.before):
1✔
1028
                value.before = perform_dynamic_reference_lookup(
1✔
1029
                    reference=dynamic_ref,
1030
                    account_id=self._change_set.account_id,
1031
                    region_name=self._change_set.region_name,
1032
                )
1033
        if not is_nothing(value.after):
1✔
1034
            if dynamic_ref := extract_dynamic_reference(value.after):
1✔
1035
                value.after = perform_dynamic_reference_lookup(
1✔
1036
                    reference=dynamic_ref,
1037
                    account_id=self._change_set.account_id,
1038
                    region_name=self._change_set.region_name,
1039
                )
1040
        return value
1✔
1041

1042
    def visit_node_properties(
1✔
1043
        self, node_properties: NodeProperties
1044
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1045
        node_change_type = node_properties.change_type
1✔
1046
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1047
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1048
        for node_property in node_properties.properties:
1✔
1049
            property_name = node_property.name
1✔
1050
            delta = self.visit(node_property)
1✔
1051
            delta_before = delta.before
1✔
1052
            delta_after = delta.after
1✔
1053
            if (
1✔
1054
                not is_nothing(before_bindings)
1055
                and not is_nothing(delta_before)
1056
                and delta_before is not None
1057
            ):
1058
                before_bindings[property_name] = delta_before
1✔
1059
            if (
1✔
1060
                not is_nothing(after_bindings)
1061
                and not is_nothing(delta_after)
1062
                and delta_after is not None
1063
            ):
1064
                after_bindings[property_name] = delta_after
1✔
1065
        before = Nothing
1✔
1066
        if not is_nothing(before_bindings):
1✔
1067
            before = PreprocProperties(properties=before_bindings)
1✔
1068
        after = Nothing
1✔
1069
        if not is_nothing(after_bindings):
1✔
1070
            after = PreprocProperties(properties=after_bindings)
1✔
1071
        return PreprocEntityDelta(before=before, after=after)
1✔
1072

1073
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1074
        reference_delta = self.visit(reference)
1✔
1075
        before_reference = reference_delta.before
1✔
1076
        before = Nothing
1✔
1077
        if isinstance(before_reference, str):
1✔
1078
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1079
            before = before_delta.before
1✔
1080
        after = Nothing
1✔
1081
        after_reference = reference_delta.after
1✔
1082
        if isinstance(after_reference, str):
1✔
1083
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1084
            after = after_delta.after
1✔
1085
        return PreprocEntityDelta(before=before, after=after)
1✔
1086

1087
    def visit_node_resource(
1✔
1088
        self, node_resource: NodeResource
1089
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1090
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1091
            raise ValidationError(
1✔
1092
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1093
            )
1094
        change_type = node_resource.change_type
1✔
1095
        condition_before = Nothing
1✔
1096
        condition_after = Nothing
1✔
1097
        if not is_nothing(node_resource.condition_reference):
1✔
1098
            condition_delta = self._resolve_resource_condition_reference(
1✔
1099
                node_resource.condition_reference
1100
            )
1101
            condition_before = condition_delta.before
1✔
1102
            condition_after = condition_delta.after
1✔
1103

1104
        depends_on_before = Nothing
1✔
1105
        depends_on_after = Nothing
1✔
1106
        if not is_nothing(node_resource.depends_on):
1✔
1107
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1108
            depends_on_before = depends_on_delta.before
1✔
1109
            depends_on_after = depends_on_delta.after
1✔
1110

1111
        type_delta = self.visit(node_resource.type_)
1✔
1112
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1113
            node_resource.properties
1114
        )
1115

1116
        before = Nothing
1✔
1117
        after = Nothing
1✔
1118
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1119
            logical_resource_id = node_resource.name
1✔
1120
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1121
                resource_logical_id=logical_resource_id
1122
            )
1123
            before = PreprocResource(
1✔
1124
                logical_id=logical_resource_id,
1125
                physical_resource_id=before_physical_resource_id,
1126
                condition=condition_before,
1127
                resource_type=type_delta.before,
1128
                properties=properties_delta.before,
1129
                depends_on=depends_on_before,
1130
                requires_replacement=False,
1131
            )
1132
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1133
            logical_resource_id = node_resource.name
1✔
1134
            try:
1✔
1135
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1136
                    resource_logical_id=logical_resource_id
1137
                )
1138
            except RuntimeError:
1✔
1139
                after_physical_resource_id = None
1✔
1140
            after = PreprocResource(
1✔
1141
                logical_id=logical_resource_id,
1142
                physical_resource_id=after_physical_resource_id,
1143
                condition=condition_after,
1144
                resource_type=type_delta.after,
1145
                properties=properties_delta.after,
1146
                depends_on=depends_on_after,
1147
                requires_replacement=node_resource.requires_replacement,
1148
            )
1149
        return PreprocEntityDelta(before=before, after=after)
1✔
1150

1151
    def visit_node_output(
1✔
1152
        self, node_output: NodeOutput
1153
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1154
        change_type = node_output.change_type
1✔
1155
        value_delta = self.visit(node_output.value)
1✔
1156

1157
        condition_delta = Nothing
1✔
1158
        if not is_nothing(node_output.condition_reference):
1✔
1159
            condition_delta = self._resolve_resource_condition_reference(
1✔
1160
                node_output.condition_reference
1161
            )
1162
            condition_before = condition_delta.before
1✔
1163
            condition_after = condition_delta.after
1✔
1164
            if not condition_before and condition_after:
1✔
1165
                change_type = ChangeType.CREATED
1✔
1166
            elif condition_before and not condition_after:
1✔
1167
                change_type = ChangeType.REMOVED
1✔
1168

1169
        export_delta = Nothing
1✔
1170
        if not is_nothing(node_output.export):
1✔
1171
            export_delta = self.visit(node_output.export)
1✔
1172

1173
        before: Maybe[PreprocOutput] = Nothing
1✔
1174
        if change_type != ChangeType.CREATED:
1✔
1175
            before = PreprocOutput(
1✔
1176
                name=node_output.name,
1177
                value=value_delta.before,
1178
                export=export_delta.before if export_delta else None,
1179
                condition=condition_delta.before if condition_delta else None,
1180
            )
1181
        after: Maybe[PreprocOutput] = Nothing
1✔
1182
        if change_type != ChangeType.REMOVED:
1✔
1183
            after = PreprocOutput(
1✔
1184
                name=node_output.name,
1185
                value=value_delta.after,
1186
                export=export_delta.after if export_delta else None,
1187
                condition=condition_delta.after if condition_delta else None,
1188
            )
1189
        return PreprocEntityDelta(before=before, after=after)
1✔
1190

1191
    def visit_node_outputs(
1✔
1192
        self, node_outputs: NodeOutputs
1193
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1194
        before: list[PreprocOutput] = []
1✔
1195
        after: list[PreprocOutput] = []
1✔
1196
        for node_output in node_outputs.outputs:
1✔
1197
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1198
            output_before = output_delta.before
1✔
1199
            output_after = output_delta.after
1✔
1200
            if not is_nothing(output_before):
1✔
1201
                before.append(output_before)
1✔
1202
            if not is_nothing(output_after):
1✔
1203
                after.append(output_after)
1✔
1204
        return PreprocEntityDelta(before=before, after=after)
1✔
1205

1206
    def visit_node_intrinsic_function_fn_import_value(
1✔
1207
        self, node_intrinsic_function: NodeIntrinsicFunction
1208
    ) -> PreprocEntityDelta:
1209
        def _compute_fn_import_value(string) -> str:
1✔
1210
            if not isinstance(string, str):
1✔
1211
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1212

1213
            exports = exports_map(
1✔
1214
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1215
            )
1216

1217
            return exports.get(string, {}).get("Value") or Nothing
1✔
1218

1219
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1220
        delta = self._cached_apply(
1✔
1221
            scope=node_intrinsic_function.scope,
1222
            arguments_delta=arguments_delta,
1223
            resolver=_compute_fn_import_value,
1224
        )
1225
        return delta
1✔
1226

1227
    def visit_node_intrinsic_function_fn_transform(
1✔
1228
        self, node_intrinsic_function: NodeIntrinsicFunction
1229
    ):
1230
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc