• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 18083953951

26 Sep 2025 07:54PM UTC coverage: 86.89% (+0.01%) from 86.88%
18083953951

push

github

web-flow
fix sqs dev endpoint to show invisible fifo messages correctly (#13196)

5 of 5 new or added lines in 1 file covered. (100.0%)

59 existing lines in 3 files now uncovered.

67757 of 77980 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.32
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final, Generic, TypeVar
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeTemplate,
34
    Nothing,
35
    NothingType,
36
    Scope,
37
    TerminalValue,
38
    TerminalValueCreated,
39
    TerminalValueModified,
40
    TerminalValueRemoved,
41
    TerminalValueUnchanged,
42
    is_nothing,
43
)
44
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
45
    ChangeSetModelVisitor,
46
)
47
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
48
    REGEX_DYNAMIC_REF,
49
    extract_dynamic_reference,
50
    perform_dynamic_reference_lookup,
51
)
52
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
53
from localstack.services.cloudformation.stores import (
1✔
54
    exports_map,
55
)
56
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
57
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
58
from localstack.utils.aws.arns import get_partition
1✔
59
from localstack.utils.objects import get_value_from_path
1✔
60
from localstack.utils.run import to_str
1✔
61
from localstack.utils.strings import to_bytes
1✔
62
from localstack.utils.urls import localstack_host
1✔
63

64
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
65

66
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
67
    "AWS::Partition",
68
    "AWS::AccountId",
69
    "AWS::Region",
70
    "AWS::StackName",
71
    "AWS::StackId",
72
    "AWS::URLSuffix",
73
    "AWS::NoValue",
74
    "AWS::NotificationARNs",
75
}
76

77
TBefore = TypeVar("TBefore")
1✔
78
TAfter = TypeVar("TAfter")
1✔
79
_T = TypeVar("_T")
1✔
80

81
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
82
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
83
)
84
MOCKED_REFERENCE = "unknown"
1✔
85

86
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
87

88

89
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
90
    before: Maybe[TBefore]
1✔
91
    after: Maybe[TAfter]
1✔
92

93
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
94
        self.before = before
1✔
95
        self.after = after
1✔
96

97
    def __eq__(self, other):
1✔
98
        if not isinstance(other, PreprocEntityDelta):
×
99
            return False
×
UNCOV
100
        return self.before == other.before and self.after == other.after
×
101

102

103
class PreprocProperties:
1✔
104
    properties: dict[str, Any]
1✔
105

106
    def __init__(self, properties: dict[str, Any]):
1✔
107
        self.properties = properties
1✔
108

109
    def __eq__(self, other):
1✔
110
        if not isinstance(other, PreprocProperties):
1✔
UNCOV
111
            return False
×
112
        return self.properties == other.properties
1✔
113

114

115
class PreprocResource:
1✔
116
    logical_id: str
1✔
117
    physical_resource_id: str | None
1✔
118
    condition: bool | None
1✔
119
    resource_type: str
1✔
120
    properties: PreprocProperties
1✔
121
    depends_on: list[str] | None
1✔
122
    requires_replacement: bool
1✔
123
    status: ResourceStatus | None
1✔
124

125
    def __init__(
1✔
126
        self,
127
        logical_id: str,
128
        physical_resource_id: str,
129
        condition: bool | None,
130
        resource_type: str,
131
        properties: PreprocProperties,
132
        depends_on: list[str] | None,
133
        requires_replacement: bool,
134
        status: ResourceStatus | None = None,
135
    ):
136
        self.logical_id = logical_id
1✔
137
        self.physical_resource_id = physical_resource_id
1✔
138
        self.condition = condition
1✔
139
        self.resource_type = resource_type
1✔
140
        self.properties = properties
1✔
141
        self.depends_on = depends_on
1✔
142
        self.requires_replacement = requires_replacement
1✔
143
        self.status = status
1✔
144

145
    @staticmethod
1✔
146
    def _compare_conditions(c1: bool, c2: bool):
1✔
147
        # The lack of condition equates to a true condition.
148
        c1 = c1 if isinstance(c1, bool) else True
1✔
149
        c2 = c2 if isinstance(c2, bool) else True
1✔
150
        return c1 == c2
1✔
151

152
    def __eq__(self, other):
1✔
153
        if not isinstance(other, PreprocResource):
1✔
154
            return False
1✔
155
        return all(
1✔
156
            [
157
                self.logical_id == other.logical_id,
158
                self._compare_conditions(self.condition, other.condition),
159
                self.resource_type == other.resource_type,
160
                self.properties == other.properties,
161
            ]
162
        )
163

164

165
class PreprocOutput:
1✔
166
    name: str
1✔
167
    value: Any
1✔
168
    export: Any | None
1✔
169
    condition: bool | None
1✔
170

171
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
172
        self.name = name
1✔
173
        self.value = value
1✔
174
        self.export = export
1✔
175
        self.condition = condition
1✔
176

177
    def __eq__(self, other):
1✔
178
        if not isinstance(other, PreprocOutput):
×
179
            return False
×
UNCOV
180
        return all(
×
181
            [
182
                self.name == other.name,
183
                self.value == other.value,
184
                self.export == other.export,
185
                self.condition == other.condition,
186
            ]
187
        )
188

189

190
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
191
    _change_set: Final[ChangeSet]
1✔
192
    _before_resolved_resources: Final[dict]
1✔
193
    _before_cache: Final[dict[Scope, Any]]
1✔
194
    _after_cache: Final[dict[Scope, Any]]
1✔
195

196
    def __init__(self, change_set: ChangeSet):
1✔
197
        self._change_set = change_set
1✔
198
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
199
        self._before_cache = {}
1✔
200
        self._after_cache = {}
1✔
201

202
    def _setup_runtime_cache(self) -> None:
1✔
203
        runtime_cache_key = self.__class__.__name__
1✔
204

205
        self._before_cache.clear()
1✔
206
        self._after_cache.clear()
1✔
207

208
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
209
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
210
            self._before_cache.update(cache)
1✔
211

212
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
213
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
UNCOV
214
            self._after_cache.update(cache)
×
215

216
    def _save_runtime_cache(self) -> None:
1✔
217
        runtime_cache_key = self.__class__.__name__
1✔
218

219
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
220
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
221

222
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
223
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
224

225
    def process(self) -> None:
1✔
226
        self._setup_runtime_cache()
1✔
227
        node_template = self._change_set.update_model.node_template
1✔
228
        self.visit(node_template)
1✔
229
        self._save_runtime_cache()
1✔
230

231
    def _get_node_resource_for(
1✔
232
        self, resource_name: str, node_template: NodeTemplate
233
    ) -> NodeResource:
234
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
235
        for node_resource in node_template.resources.resources:
1✔
236
            if node_resource.name == resource_name:
1✔
237
                self.visit(node_resource)
1✔
238
                return node_resource
1✔
239
        raise ValidationError(
1✔
240
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
241
        )
242

243
    def _get_node_property_for(
1✔
244
        self, property_name: str, node_resource: NodeResource
245
    ) -> NodeProperty | None:
246
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
247
        for node_property in node_resource.properties.properties:
1✔
248
            if node_property.name == property_name:
1✔
249
                self.visit(node_property)
1✔
250
                return node_property
1✔
251
        return None
1✔
252

253
    def _deployed_property_value_of(
1✔
254
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
255
    ) -> Any:
256
        # We have to override this function to make sure it does not try to access the
257
        # resolved resource
258

259
        # Before we can obtain deployed value for a resource, we need to first ensure to
260
        # process the resource if this wasn't processed already. Ideally, values should only
261
        # be accessible through delta objects, to ensure computation is always complete at
262
        # every level.
263
        _ = self._get_node_resource_for(
1✔
264
            resource_name=resource_logical_id,
265
            node_template=self._change_set.update_model.node_template,
266
        )
267
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
268
        if resolved_resource is None:
1✔
269
            raise RuntimeError(
1✔
270
                f"No deployed instances of resource '{resource_logical_id}' were found"
271
            )
272
        properties = resolved_resource.get("Properties", {})
1✔
273
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
274
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
275

276
        if property_value:
1✔
277
            if not isinstance(property_value, (str, list)):
1✔
278
                # TODO: is this correct? If there is a bug in the logic here, it's probably
279
                #  better to know about it with a clear error message than to receive some form
280
                #  of message about trying to use a dictionary in place of a string
UNCOV
281
                raise RuntimeError(
×
282
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
283
                )
284
            return property_value
1✔
285
        elif config.CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES:
1✔
286
            return MOCKED_REFERENCE
1✔
287

UNCOV
288
        return property_value
×
289

290
    def _before_deployed_property_value_of(
1✔
291
        self, resource_logical_id: str, property_name: str
292
    ) -> Any:
293
        return self._deployed_property_value_of(
1✔
294
            resource_logical_id=resource_logical_id,
295
            property_name=property_name,
296
            resolved_resources=self._before_resolved_resources,
297
        )
298

299
    def _after_deployed_property_value_of(
1✔
300
        self, resource_logical_id: str, property_name: str
301
    ) -> str | None:
302
        return self._before_deployed_property_value_of(
1✔
303
            resource_logical_id=resource_logical_id, property_name=property_name
304
        )
305

306
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
307
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
308
        # TODO: another scenarios suggesting property lookups might be preferable.
309
        for mapping in mappings:
1✔
310
            if mapping.name == map_name:
1✔
311
                self.visit(mapping)
1✔
312
                return mapping
1✔
UNCOV
313
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
314

315
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
316
        parameters: list[NodeParameter] = (
1✔
317
            self._change_set.update_model.node_template.parameters.parameters
318
        )
319
        # TODO: another scenarios suggesting property lookups might be preferable.
320
        for parameter in parameters:
1✔
321
            if parameter.name == parameter_name:
1✔
322
                self.visit(parameter)
1✔
323
                return parameter
1✔
324
        return Nothing
1✔
325

326
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
327
        conditions: list[NodeCondition] = (
1✔
328
            self._change_set.update_model.node_template.conditions.conditions
329
        )
330
        # TODO: another scenarios suggesting property lookups might be preferable.
331
        for condition in conditions:
1✔
332
            if condition.name == condition_name:
1✔
333
                self.visit(condition)
1✔
334
                return condition
1✔
UNCOV
335
        return Nothing
×
336

337
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
338
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
339
        if isinstance(node_condition, NodeCondition):
1✔
340
            condition_delta = self.visit(node_condition)
1✔
341
            return condition_delta
1✔
UNCOV
342
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
343

344
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
345
        match pseudo_parameter_name:
1✔
346
            case "AWS::Partition":
1✔
347
                return get_partition(self._change_set.region_name)
1✔
348
            case "AWS::AccountId":
1✔
349
                return self._change_set.stack.account_id
1✔
350
            case "AWS::Region":
1✔
351
                return self._change_set.stack.region_name
1✔
352
            case "AWS::StackName":
1✔
353
                return self._change_set.stack.stack_name
1✔
354
            case "AWS::StackId":
1✔
355
                return self._change_set.stack.stack_id
1✔
356
            case "AWS::URLSuffix":
1✔
357
                return _AWS_URL_SUFFIX
1✔
358
            case "AWS::NoValue":
×
359
                return None
×
360
            case _:
×
UNCOV
361
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
362

363
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
364
        if logical_id in _PSEUDO_PARAMETERS:
1✔
365
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
366
                pseudo_parameter_name=logical_id
367
            )
368
            # Pseudo parameters are constants within the lifecycle of a template.
369
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
370

371
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
372
        if isinstance(node_parameter, NodeParameter):
1✔
373
            parameter_delta = self.visit(node_parameter)
1✔
374
            return parameter_delta
1✔
375

376
        node_resource = self._get_node_resource_for(
1✔
377
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
378
        )
379
        resource_delta = self.visit(node_resource)
1✔
380
        before = resource_delta.before
1✔
381
        after = resource_delta.after
1✔
382
        return PreprocEntityDelta(before=before, after=after)
1✔
383

384
    def _resolve_mapping(
1✔
385
        self, map_name: str, top_level_key: str, second_level_key
386
    ) -> PreprocEntityDelta:
387
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
388
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
389
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
390
        if not isinstance(top_level_value, NodeObject):
1✔
391
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
392
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
393
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
394
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
395
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
396
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
397
        mapping_value_delta = self.visit(second_level_value)
1✔
398
        return mapping_value_delta
1✔
399

400
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
401
        entity_scope = change_set_entity.scope
1✔
402
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
403
            before = self._before_cache[entity_scope]
1✔
404
            after = self._after_cache[entity_scope]
1✔
405
            return PreprocEntityDelta(before=before, after=after)
1✔
406
        delta = super().visit(change_set_entity=change_set_entity)
1✔
407
        if isinstance(delta, PreprocEntityDelta):
1✔
408
            delta = self._maybe_perform_replacements(delta)
1✔
409
            self._before_cache[entity_scope] = delta.before
1✔
410
            self._after_cache[entity_scope] = delta.after
1✔
411
        return delta
1✔
412

413
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
414
        delta = self._maybe_perform_static_replacements(delta)
1✔
415
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
416
        return delta
1✔
417

418
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
419
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
420

421
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
422
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
423

424
    def _maybe_perform_on_delta(
1✔
425
        self, delta: PreprocEntityDelta | None, f: Callable[[_T], _T]
426
    ) -> PreprocEntityDelta | None:
427
        if isinstance(delta.before, str):
1✔
428
            delta.before = f(delta.before)
1✔
429
        if isinstance(delta.after, str):
1✔
430
            delta.after = f(delta.after)
1✔
431
        return delta
1✔
432

433
    def _perform_dynamic_replacements(self, value: _T) -> _T:
1✔
434
        if not isinstance(value, str):
1✔
UNCOV
435
            return value
×
436

437
        if dynamic_ref := extract_dynamic_reference(value):
1✔
438
            new_value = perform_dynamic_reference_lookup(
1✔
439
                reference=dynamic_ref,
440
                account_id=self._change_set.account_id,
441
                region_name=self._change_set.region_name,
442
            )
443
            if new_value:
1✔
444
                return REGEX_DYNAMIC_REF.sub(new_value, value)
1✔
445

446
        return value
1✔
447

448
    @staticmethod
1✔
449
    def _perform_static_replacements(value: str) -> str:
1✔
450
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
451
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
452
            prefix = api_match[1]
1✔
453
            host = api_match[2]
1✔
454
            path = api_match[3]
1✔
455
            port = localstack_host().port
1✔
456
            value = f"{prefix}{host}:{port}/{path}"
1✔
457
            return value
1✔
458

459
        return value
1✔
460

461
    def _cached_apply(
1✔
462
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
463
    ) -> PreprocEntityDelta:
464
        """
465
        Applies the resolver function to the given input delta if and only if the required
466
        values are not already present in the runtime caches. This function handles both
467
        the 'before' and 'after' components of the delta independently.
468

469
        The resolver function receives either the 'before' or 'after' value from the input
470
        delta and returns a resolved value. If the result returned by the resolver is
471
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
472
        component from it:  the 'before' value if the input was 'before', and the 'after'
473
        value if the input was 'after'.
474

475
        This function only reads from the cache and does not update it. It is the caller's
476
        responsibility to handle caching, either manually or via the upstream visit method
477
        of this class.
478

479
        Args:
480
            scope (Scope): The current scope used as a key for cache lookup.
481
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
482
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
483

484
        Returns:
485
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
486
        """
487

488
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
489
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
490
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
491

492
        arguments_before = arguments_delta.before
1✔
493
        arguments_after = arguments_delta.after
1✔
494

495
        before = self._before_cache.get(scope, Nothing)
1✔
496
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
497
            before = resolver(arguments_before)
1✔
498
            if isinstance(before, PreprocEntityDelta):
1✔
499
                before = before.before
1✔
500

501
        after = self._after_cache.get(scope, Nothing)
1✔
502
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
503
            after = resolver(arguments_after)
1✔
504
            if isinstance(after, PreprocEntityDelta):
1✔
505
                after = after.after
1✔
506

507
        return PreprocEntityDelta(before=before, after=after)
1✔
508

509
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
510
        return self.visit(node_property.value)
1✔
511

512
    def visit_terminal_value_modified(
1✔
513
        self, terminal_value_modified: TerminalValueModified
514
    ) -> PreprocEntityDelta:
515
        return PreprocEntityDelta(
1✔
516
            before=terminal_value_modified.value,
517
            after=terminal_value_modified.modified_value,
518
        )
519

520
    def visit_terminal_value_created(
1✔
521
        self, terminal_value_created: TerminalValueCreated
522
    ) -> PreprocEntityDelta:
523
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
524

525
    def visit_terminal_value_removed(
1✔
526
        self, terminal_value_removed: TerminalValueRemoved
527
    ) -> PreprocEntityDelta:
528
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
529

530
    def visit_terminal_value_unchanged(
1✔
531
        self, terminal_value_unchanged: TerminalValueUnchanged
532
    ) -> PreprocEntityDelta:
533
        return PreprocEntityDelta(
1✔
534
            before=terminal_value_unchanged.value,
535
            after=terminal_value_unchanged.value,
536
        )
537

538
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
539
        before_delta = self.visit(node_divergence.value)
1✔
540
        after_delta = self.visit(node_divergence.divergence)
1✔
541
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
542

543
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
544
        node_change_type = node_object.change_type
1✔
545
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
546
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
547
        for name, change_set_entity in node_object.bindings.items():
1✔
548
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
549
            delta_before = delta.before
1✔
550
            delta_after = delta.after
1✔
551
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
552
                before[name] = delta_before
1✔
553
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
554
                after[name] = delta_after
1✔
555
        return PreprocEntityDelta(before=before, after=after)
1✔
556

557
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
558
        # TODO: add arguments validation.
559
        arguments_list: list[str]
560
        if isinstance(arguments, str):
1✔
561
            arguments_list = arguments.split(".")
1✔
562
        else:
563
            arguments_list = arguments
1✔
564
        logical_name_of_resource = arguments_list[0]
1✔
565
        attribute_name = arguments_list[1]
1✔
566

567
        node_resource = self._get_node_resource_for(
1✔
568
            resource_name=logical_name_of_resource,
569
            node_template=self._change_set.update_model.node_template,
570
        )
571
        node_property: NodeProperty | None = self._get_node_property_for(
1✔
572
            property_name=attribute_name, node_resource=node_resource
573
        )
574
        if node_property is not None:
1✔
575
            # The property is statically defined in the template and its value can be computed.
576
            property_delta = self.visit(node_property)
1✔
577
            value = property_delta.before if select_before else property_delta.after
1✔
578
        else:
579
            # The property is not statically defined and must therefore be available in
580
            # the properties deployed set.
581
            if select_before:
1✔
582
                value = self._before_deployed_property_value_of(
1✔
583
                    resource_logical_id=logical_name_of_resource,
584
                    property_name=attribute_name,
585
                )
586
            else:
587
                value = self._after_deployed_property_value_of(
1✔
588
                    resource_logical_id=logical_name_of_resource,
589
                    property_name=attribute_name,
590
                )
591
        return value
1✔
592

593
    def visit_node_intrinsic_function_fn_get_att(
1✔
594
        self, node_intrinsic_function: NodeIntrinsicFunction
595
    ) -> PreprocEntityDelta:
596
        # TODO: validate the return value according to the spec.
597
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
598
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
599
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
600

601
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
602
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
603
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
604

605
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
606
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
607
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
608

609
        return PreprocEntityDelta(before=before, after=after)
1✔
610

611
    def visit_node_intrinsic_function_fn_equals(
1✔
612
        self, node_intrinsic_function: NodeIntrinsicFunction
613
    ) -> PreprocEntityDelta:
614
        # TODO: add argument shape validation.
615
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
616
            return args[0] == args[1]
1✔
617

618
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
619
        delta = self._cached_apply(
1✔
620
            scope=node_intrinsic_function.scope,
621
            arguments_delta=arguments_delta,
622
            resolver=_compute_fn_equals,
623
        )
624
        return delta
1✔
625

626
    def visit_node_intrinsic_function_fn_if(
1✔
627
        self, node_intrinsic_function: NodeIntrinsicFunction
628
    ) -> PreprocEntityDelta:
629
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
630
        # False branch. If the condition is False, we don't evaluate the True branch.
631
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
UNCOV
632
            raise ValueError(
×
633
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
634
            )
635

636
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
637
        if_delta = PreprocEntityDelta()
1✔
638
        if not is_nothing(condition_delta.before):
1✔
639
            node_condition = self._get_node_condition_if_exists(
1✔
640
                condition_name=condition_delta.before
641
            )
642
            condition_value = self.visit(node_condition).before
1✔
643
            if condition_value:
1✔
644
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
645
            else:
646
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
647
            if_delta.before = arg_delta.before
1✔
648

649
        if not is_nothing(condition_delta.after):
1✔
650
            node_condition = self._get_node_condition_if_exists(
1✔
651
                condition_name=condition_delta.after
652
            )
653
            condition_value = self.visit(node_condition).after
1✔
654
            if condition_value:
1✔
655
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
656
            else:
657
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
658
            if_delta.after = arg_delta.after
1✔
659

660
        return if_delta
1✔
661

662
    def visit_node_intrinsic_function_fn_and(
1✔
663
        self, node_intrinsic_function: NodeIntrinsicFunction
664
    ) -> PreprocEntityDelta:
665
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
666
            result = all(args)
1✔
667
            return result
1✔
668

669
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
670
        delta = self._cached_apply(
1✔
671
            scope=node_intrinsic_function.scope,
672
            arguments_delta=arguments_delta,
673
            resolver=_compute_fn_and,
674
        )
675
        return delta
1✔
676

677
    def visit_node_intrinsic_function_fn_or(
1✔
678
        self, node_intrinsic_function: NodeIntrinsicFunction
679
    ) -> PreprocEntityDelta:
680
        def _compute_fn_or(args: list[bool]):
1✔
681
            result = any(args)
1✔
682
            return result
1✔
683

684
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
685
        delta = self._cached_apply(
1✔
686
            scope=node_intrinsic_function.scope,
687
            arguments_delta=arguments_delta,
688
            resolver=_compute_fn_or,
689
        )
690
        return delta
1✔
691

692
    def visit_node_intrinsic_function_fn_not(
1✔
693
        self, node_intrinsic_function: NodeIntrinsicFunction
694
    ) -> PreprocEntityDelta:
695
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
696
            # Is the argument ever a lone boolean?
697
            if isinstance(arg, list):
1✔
698
                return not arg[0]
1✔
699
            else:
UNCOV
700
                return not arg
×
701

702
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
703
        delta = self._cached_apply(
1✔
704
            scope=node_intrinsic_function.scope,
705
            arguments_delta=arguments_delta,
706
            resolver=_compute_fn_not,
707
        )
708
        return delta
1✔
709

710
    def visit_node_intrinsic_function_fn_sub(
1✔
711
        self, node_intrinsic_function: NodeIntrinsicFunction
712
    ) -> PreprocEntityDelta:
713
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
714
            # TODO: add further schema validation.
715
            string_template: str
716
            sub_parameters: dict
717
            if isinstance(args, str):
1✔
718
                string_template = args
1✔
719
                sub_parameters = {}
1✔
720
            elif (
1✔
721
                isinstance(args, list)
722
                and len(args) == 2
723
                and isinstance(args[0], str)
724
                and isinstance(args[1], dict)
725
            ):
726
                string_template = args[0]
1✔
727
                sub_parameters = args[1]
1✔
728
            else:
UNCOV
729
                raise RuntimeError(
×
730
                    "Invalid arguments shape for Fn::Sub, expected a String "
731
                    f"or a Tuple of String and Map but got '{args}'"
732
                )
733
            sub_string = string_template
1✔
734
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
735
            for template_variable_name in template_variable_names:
1✔
736
                template_variable_value = Nothing
1✔
737

738
                # Try to resolve the variable name as pseudo parameter.
739
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
740
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
741
                        pseudo_parameter_name=template_variable_name
742
                    )
743

744
                # Try to resolve the variable name as an entry to the defined parameters.
745
                elif template_variable_name in sub_parameters:
1✔
746
                    template_variable_value = sub_parameters[template_variable_name]
1✔
747

748
                # Try to resolve the variable name as GetAtt.
749
                elif "." in template_variable_name:
1✔
750
                    try:
1✔
751
                        template_variable_value = self._resolve_attribute(
1✔
752
                            arguments=template_variable_name, select_before=select_before
753
                        )
754
                    except RuntimeError:
1✔
755
                        pass
1✔
756

757
                # Try to resolve the variable name as Ref.
758
                else:
759
                    try:
1✔
760
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
761
                        template_variable_value = (
1✔
762
                            resource_delta.before if select_before else resource_delta.after
763
                        )
764
                        if isinstance(template_variable_value, PreprocResource):
1✔
765
                            template_variable_value = template_variable_value.physical_resource_id
1✔
UNCOV
766
                    except RuntimeError:
×
UNCOV
767
                        pass
×
768

769
                if is_nothing(template_variable_value):
1✔
770
                    raise RuntimeError(
1✔
771
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
772
                    )
773

774
                if not isinstance(template_variable_value, str):
1✔
775
                    template_variable_value = str(template_variable_value)
1✔
776

777
                sub_string = sub_string.replace(
1✔
778
                    f"${{{template_variable_name}}}", template_variable_value
779
                )
780

781
            # FIXME: the following type reduction is ported from v1; however it appears as though such
782
            #        reduction is not performed by the engine, and certainly not at this depth given the
783
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
784
            #        and the resource providers reviewed.
785
            account_id = self._change_set.account_id
1✔
786
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
787
            if sub_string == account_id or is_another_account_id:
1✔
788
                result = sub_string
1✔
789
            elif sub_string.isdigit():
1✔
790
                result = int(sub_string)
1✔
791
            else:
792
                try:
1✔
793
                    result = float(sub_string)
1✔
794
                except ValueError:
1✔
795
                    result = sub_string
1✔
796
            return result
1✔
797

798
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
799
        arguments_before = arguments_delta.before
1✔
800
        arguments_after = arguments_delta.after
1✔
801
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
802
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
803
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
804
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
805
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
806
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
807
        return PreprocEntityDelta(before=before, after=after)
1✔
808

809
    def visit_node_intrinsic_function_fn_join(
1✔
810
        self, node_intrinsic_function: NodeIntrinsicFunction
811
    ) -> PreprocEntityDelta:
812
        # TODO: add support for schema validation.
813
        # TODO: add tests for joining non string values.
814
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
815
            if not (isinstance(args, list) and len(args) == 2):
1✔
816
                return Nothing
1✔
817
            delimiter: str = str(args[0])
1✔
818
            values: list[Any] = args[1]
1✔
819
            if not isinstance(values, list):
1✔
820
                # shortcut if values is the empty string, for example:
821
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
822
                # CDK bootstrap does this
823
                if values == "":
1✔
824
                    return ""
1✔
825
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
826
            str_values: list[str] = []
1✔
827
            for value in values:
1✔
828
                if value is None:
1✔
829
                    continue
1✔
830
                str_value = str(value)
1✔
831
                str_values.append(str_value)
1✔
832
            join_result = delimiter.join(str_values)
1✔
833
            return join_result
1✔
834

835
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
836
        delta = self._cached_apply(
1✔
837
            scope=node_intrinsic_function.scope,
838
            arguments_delta=arguments_delta,
839
            resolver=_compute_fn_join,
840
        )
841
        return delta
1✔
842

843
    def visit_node_intrinsic_function_fn_select(
1✔
844
        self, node_intrinsic_function: NodeIntrinsicFunction
845
    ):
846
        # TODO: add further support for schema validation
847
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
848
            values = args[1]
1✔
849
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
850
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
851
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
852

853
            if not isinstance(values, list) or not values:
1✔
854
                raise ValidationError(
1✔
855
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
856
                )
857
            try:
1✔
858
                index: int = int(args[0])
1✔
859
            except ValueError as e:
1✔
860
                raise ValidationError(
1✔
861
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
862
                ) from e
863

864
            values_len = len(values)
1✔
865
            if index < 0 or index >= values_len:
1✔
866
                raise ValidationError(
1✔
867
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
868
                )
869
            selection = values[index]
1✔
870
            return selection
1✔
871

872
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
873
        delta = self._cached_apply(
1✔
874
            scope=node_intrinsic_function.scope,
875
            arguments_delta=arguments_delta,
876
            resolver=_compute_fn_select,
877
        )
878
        return delta
1✔
879

880
    def visit_node_intrinsic_function_fn_split(
1✔
881
        self, node_intrinsic_function: NodeIntrinsicFunction
882
    ):
883
        # TODO: add further support for schema validation
884
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
885
            delimiter = args[0]
1✔
886
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
887
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
888
            source_string = args[1]
1✔
889
            if not isinstance(source_string, str):
1✔
890
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
891
            split_string = source_string.split(delimiter)
1✔
892
            return split_string
1✔
893

894
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
895
        delta = self._cached_apply(
1✔
896
            scope=node_intrinsic_function.scope,
897
            arguments_delta=arguments_delta,
898
            resolver=_compute_fn_split,
899
        )
900
        return delta
1✔
901

902
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
903
        self, node_intrinsic_function: NodeIntrinsicFunction
904
    ) -> PreprocEntityDelta:
905
        # TODO: add further support for schema validation
906

907
        def _compute_fn_get_a_zs(region) -> Any:
1✔
908
            if not isinstance(region, str):
1✔
UNCOV
909
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
910

911
            if not region:
1✔
912
                region = self._change_set.region_name
1✔
913

914
            account_id = self._change_set.account_id
1✔
915
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
916
            try:
1✔
917
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
918
                    ec2_client.describe_availability_zones()
919
                )
UNCOV
920
            except ClientError:
×
UNCOV
921
                raise RuntimeError(
×
922
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
923
                )
924
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
925
                "AvailabilityZones"
926
            ]
927
            azs = [az["ZoneName"] for az in availability_zones]
1✔
928
            return azs
1✔
929

930
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
931
        delta = self._cached_apply(
1✔
932
            scope=node_intrinsic_function.scope,
933
            arguments_delta=arguments_delta,
934
            resolver=_compute_fn_get_a_zs,
935
        )
936
        return delta
1✔
937

938
    def visit_node_intrinsic_function_fn_base64(
1✔
939
        self, node_intrinsic_function: NodeIntrinsicFunction
940
    ) -> PreprocEntityDelta:
941
        # TODO: add further support for schema validation
942
        def _compute_fn_base_64(string) -> Any:
1✔
943
            if not isinstance(string, str):
1✔
UNCOV
944
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
945
            # Ported from v1:
946
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
947
            return base64_string
1✔
948

949
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
950
        delta = self._cached_apply(
1✔
951
            scope=node_intrinsic_function.scope,
952
            arguments_delta=arguments_delta,
953
            resolver=_compute_fn_base_64,
954
        )
955
        return delta
1✔
956

957
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
958
        self, node_intrinsic_function: NodeIntrinsicFunction
959
    ) -> PreprocEntityDelta:
960
        # TODO: add type checking/validation for result unit?
961
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
962
        before_arguments = arguments_delta.before
1✔
963
        after_arguments = arguments_delta.after
1✔
964
        before = Nothing
1✔
965
        if before_arguments:
1✔
966
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
967
            before = before_value_delta.before
1✔
968
        after = Nothing
1✔
969
        if after_arguments:
1✔
970
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
971
            after = after_value_delta.after
1✔
972
        return PreprocEntityDelta(before=before, after=after)
1✔
973

974
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
975
        bindings_delta = self.visit(node_mapping.bindings)
1✔
976
        return bindings_delta
1✔
977

978
    def visit_node_parameters(
1✔
979
        self, node_parameters: NodeParameters
980
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
981
        before_parameters = {}
1✔
982
        after_parameters = {}
1✔
983
        for parameter in node_parameters.parameters:
1✔
984
            parameter_delta = self.visit(parameter)
1✔
985
            parameter_before = parameter_delta.before
1✔
986
            if not is_nothing(parameter_before):
1✔
987
                before_parameters[parameter.name] = parameter_before
1✔
988
            parameter_after = parameter_delta.after
1✔
989
            if not is_nothing(parameter_after):
1✔
990
                after_parameters[parameter.name] = parameter_after
1✔
991
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
992

993
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
994
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
995
            raise ValidationError(
1✔
996
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
997
            )
998
        dynamic_value = node_parameter.dynamic_value
1✔
999
        dynamic_delta = self.visit(dynamic_value)
1✔
1000

1001
        default_value = node_parameter.default_value
1✔
1002
        default_delta = self.visit(default_value)
1✔
1003

1004
        before = dynamic_delta.before or default_delta.before
1✔
1005
        after = dynamic_delta.after or default_delta.after
1✔
1006

1007
        parameter_type = self.visit(node_parameter.type_)
1✔
1008

1009
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1010
            match type_:
1✔
1011
                case "List<String>" | "CommaDelimitedList":
1✔
1012
                    return [item.strip() for item in value.split(",")]
1✔
1013
            return value
1✔
1014

1015
        if not is_nothing(after):
1✔
1016
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1017

1018
        return PreprocEntityDelta(before=before, after=after)
1✔
1019

1020
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1021
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1022
        return array_identifiers_delta
1✔
1023

1024
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1025
        delta = self.visit(node_condition.body)
1✔
1026
        return delta
1✔
1027

1028
    def _resource_physical_resource_id_from(
1✔
1029
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1030
    ) -> str | None:
1031
        # TODO: typing around resolved resources is needed and should be reflected here.
1032
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1033
        if resolved_resource.get("ResourceStatus") not in {
1✔
1034
            ResourceStatus.CREATE_COMPLETE,
1035
            ResourceStatus.UPDATE_COMPLETE,
1036
        }:
1037
            return None
1✔
1038

1039
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1040
        if not isinstance(physical_resource_id, str):
1✔
UNCOV
1041
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1042
        return physical_resource_id
1✔
1043

1044
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1045
        # TODO: typing around resolved resources is needed and should be reflected here.
1046
        return self._resource_physical_resource_id_from(
1✔
1047
            logical_resource_id=resource_logical_id,
1048
            resolved_resources=self._before_resolved_resources,
1049
        )
1050

1051
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1052
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1053

1054
    def visit_node_intrinsic_function_ref(
1✔
1055
        self, node_intrinsic_function: NodeIntrinsicFunction
1056
    ) -> PreprocEntityDelta:
1057
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1058
            if logical_id == "AWS::NoValue":
1✔
1059
                return Nothing
1✔
1060

1061
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1062
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1063
                reference_delta.before = before.physical_resource_id
1✔
1064
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1065
                reference_delta.after = after.physical_resource_id
1✔
1066
            return reference_delta
1✔
1067

1068
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1069
        delta = self._cached_apply(
1✔
1070
            scope=node_intrinsic_function.scope,
1071
            arguments_delta=arguments_delta,
1072
            resolver=_compute_fn_ref,
1073
        )
1074
        return delta
1✔
1075

1076
    def visit_node_intrinsic_function_condition(
1✔
1077
        self, node_intrinsic_function: NodeIntrinsicFunction
1078
    ) -> PreprocEntityDelta:
1079
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1080

1081
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1082
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1083
            if is_nothing(node_condition):
1✔
UNCOV
1084
                raise RuntimeError(f"Undefined condition '{name}'")
×
1085
            condition_delta = self.visit(node_condition)
1✔
1086
            return condition_delta
1✔
1087

1088
        delta = self._cached_apply(
1✔
1089
            resolver=_delta_of_condition,
1090
            scope=node_intrinsic_function.scope,
1091
            arguments_delta=arguments_delta,
1092
        )
1093
        return delta
1✔
1094

1095
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1096
        node_change_type = node_array.change_type
1✔
1097
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1098
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1099
        for change_set_entity in node_array.array:
1✔
1100
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1101
            delta_before = delta.before
1✔
1102
            delta_after = delta.after
1✔
1103
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1104
                before.append(delta_before)
1✔
1105
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1106
                after.append(delta_after)
1✔
1107
        return PreprocEntityDelta(before=before, after=after)
1✔
1108

1109
    def visit_node_properties(
1✔
1110
        self, node_properties: NodeProperties
1111
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1112
        node_change_type = node_properties.change_type
1✔
1113
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1114
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1115
        for node_property in node_properties.properties:
1✔
1116
            property_name = node_property.name
1✔
1117
            delta = self.visit(node_property)
1✔
1118
            delta_before = delta.before
1✔
1119
            delta_after = delta.after
1✔
1120
            if (
1✔
1121
                not is_nothing(before_bindings)
1122
                and not is_nothing(delta_before)
1123
                and delta_before is not None
1124
            ):
1125
                before_bindings[property_name] = delta_before
1✔
1126
            if (
1✔
1127
                not is_nothing(after_bindings)
1128
                and not is_nothing(delta_after)
1129
                and delta_after is not None
1130
            ):
1131
                after_bindings[property_name] = delta_after
1✔
1132
        before = Nothing
1✔
1133
        if not is_nothing(before_bindings):
1✔
1134
            before = PreprocProperties(properties=before_bindings)
1✔
1135
        after = Nothing
1✔
1136
        if not is_nothing(after_bindings):
1✔
1137
            after = PreprocProperties(properties=after_bindings)
1✔
1138
        return PreprocEntityDelta(before=before, after=after)
1✔
1139

1140
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1141
        reference_delta = self.visit(reference)
1✔
1142
        before_reference = reference_delta.before
1✔
1143
        before = Nothing
1✔
1144
        if isinstance(before_reference, str):
1✔
1145
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1146
            before = before_delta.before
1✔
1147
        after = Nothing
1✔
1148
        after_reference = reference_delta.after
1✔
1149
        if isinstance(after_reference, str):
1✔
1150
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1151
            after = after_delta.after
1✔
1152
        return PreprocEntityDelta(before=before, after=after)
1✔
1153

1154
    def visit_node_resource(
1✔
1155
        self, node_resource: NodeResource
1156
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1157
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1158
            raise ValidationError(
1✔
1159
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1160
            )
1161
        change_type = node_resource.change_type
1✔
1162
        condition_before = Nothing
1✔
1163
        condition_after = Nothing
1✔
1164
        if not is_nothing(node_resource.condition_reference):
1✔
1165
            condition_delta = self._resolve_resource_condition_reference(
1✔
1166
                node_resource.condition_reference
1167
            )
1168
            condition_before = condition_delta.before
1✔
1169
            condition_after = condition_delta.after
1✔
1170

1171
        depends_on_before = Nothing
1✔
1172
        depends_on_after = Nothing
1✔
1173
        if not is_nothing(node_resource.depends_on):
1✔
1174
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1175
            depends_on_before = depends_on_delta.before
1✔
1176
            depends_on_after = depends_on_delta.after
1✔
1177

1178
        type_delta = self.visit(node_resource.type_)
1✔
1179
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1180
            node_resource.properties
1181
        )
1182

1183
        before = Nothing
1✔
1184
        after = Nothing
1✔
1185
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1186
            logical_resource_id = node_resource.name
1✔
1187
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1188
                resource_logical_id=logical_resource_id
1189
            )
1190
            before = PreprocResource(
1✔
1191
                logical_id=logical_resource_id,
1192
                physical_resource_id=before_physical_resource_id,
1193
                condition=condition_before,
1194
                resource_type=type_delta.before,
1195
                properties=properties_delta.before,
1196
                depends_on=depends_on_before,
1197
                requires_replacement=False,
1198
            )
1199
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1200
            logical_resource_id = node_resource.name
1✔
1201
            try:
1✔
1202
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1203
                    resource_logical_id=logical_resource_id
1204
                )
UNCOV
1205
            except RuntimeError:
×
UNCOV
1206
                after_physical_resource_id = None
×
1207
            after = PreprocResource(
1✔
1208
                logical_id=logical_resource_id,
1209
                physical_resource_id=after_physical_resource_id,
1210
                condition=condition_after,
1211
                resource_type=type_delta.after,
1212
                properties=properties_delta.after,
1213
                depends_on=depends_on_after,
1214
                requires_replacement=node_resource.requires_replacement,
1215
            )
1216
        return PreprocEntityDelta(before=before, after=after)
1✔
1217

1218
    def visit_node_output(
1✔
1219
        self, node_output: NodeOutput
1220
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1221
        change_type = node_output.change_type
1✔
1222
        value_delta = self.visit(node_output.value)
1✔
1223

1224
        condition_delta = Nothing
1✔
1225
        if not is_nothing(node_output.condition_reference):
1✔
1226
            condition_delta = self._resolve_resource_condition_reference(
1✔
1227
                node_output.condition_reference
1228
            )
1229
            condition_before = condition_delta.before
1✔
1230
            condition_after = condition_delta.after
1✔
1231
            if not condition_before and condition_after:
1✔
1232
                change_type = ChangeType.CREATED
1✔
1233
            elif condition_before and not condition_after:
1✔
1234
                change_type = ChangeType.REMOVED
1✔
1235

1236
        export_delta = Nothing
1✔
1237
        if not is_nothing(node_output.export):
1✔
1238
            export_delta = self.visit(node_output.export)
1✔
1239

1240
        before: Maybe[PreprocOutput] = Nothing
1✔
1241
        if change_type != ChangeType.CREATED:
1✔
1242
            before = PreprocOutput(
1✔
1243
                name=node_output.name,
1244
                value=value_delta.before,
1245
                export=export_delta.before if export_delta else None,
1246
                condition=condition_delta.before if condition_delta else None,
1247
            )
1248
        after: Maybe[PreprocOutput] = Nothing
1✔
1249
        if change_type != ChangeType.REMOVED:
1✔
1250
            after = PreprocOutput(
1✔
1251
                name=node_output.name,
1252
                value=value_delta.after,
1253
                export=export_delta.after if export_delta else None,
1254
                condition=condition_delta.after if condition_delta else None,
1255
            )
1256
        return PreprocEntityDelta(before=before, after=after)
1✔
1257

1258
    def visit_node_outputs(
1✔
1259
        self, node_outputs: NodeOutputs
1260
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1261
        before: list[PreprocOutput] = []
1✔
1262
        after: list[PreprocOutput] = []
1✔
1263
        for node_output in node_outputs.outputs:
1✔
1264
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1265
            output_before = output_delta.before
1✔
1266
            output_after = output_delta.after
1✔
1267
            if not is_nothing(output_before):
1✔
1268
                before.append(output_before)
1✔
1269
            if not is_nothing(output_after):
1✔
1270
                after.append(output_after)
1✔
1271
        return PreprocEntityDelta(before=before, after=after)
1✔
1272

1273
    def visit_node_intrinsic_function_fn_import_value(
1✔
1274
        self, node_intrinsic_function: NodeIntrinsicFunction
1275
    ) -> PreprocEntityDelta:
1276
        def _compute_fn_import_value(string) -> str:
1✔
1277
            if not isinstance(string, str):
1✔
UNCOV
1278
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1279

1280
            exports = exports_map(
1✔
1281
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1282
            )
1283

1284
            return exports.get(string, {}).get("Value") or Nothing
1✔
1285

1286
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1287
        delta = self._cached_apply(
1✔
1288
            scope=node_intrinsic_function.scope,
1289
            arguments_delta=arguments_delta,
1290
            resolver=_compute_fn_import_value,
1291
        )
1292
        return delta
1✔
1293

1294
    def visit_node_intrinsic_function_fn_transform(
1✔
1295
        self, node_intrinsic_function: NodeIntrinsicFunction
1296
    ):
UNCOV
1297
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc