• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.63
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import re
1✔
6
from collections.abc import Callable
1✔
7
from typing import Any, Final
1✔
8

9
from botocore.exceptions import ClientError
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import ResourceStatus
1✔
13
from localstack.aws.api.ec2 import AvailabilityZoneList, DescribeAvailabilityZonesResult
1✔
14
from localstack.aws.connect import connect_to
1✔
15
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
16
    ChangeSetEntity,
17
    ChangeType,
18
    Maybe,
19
    NodeArray,
20
    NodeCondition,
21
    NodeDependsOn,
22
    NodeDivergence,
23
    NodeIntrinsicFunction,
24
    NodeMapping,
25
    NodeObject,
26
    NodeOutput,
27
    NodeOutputs,
28
    NodeParameter,
29
    NodeParameters,
30
    NodeProperties,
31
    NodeProperty,
32
    NodeResource,
33
    NodeResources,
34
    NodeTemplate,
35
    Nothing,
36
    NothingType,
37
    Scope,
38
    TerminalValue,
39
    TerminalValueCreated,
40
    TerminalValueModified,
41
    TerminalValueRemoved,
42
    TerminalValueUnchanged,
43
    is_nothing,
44
)
45
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
46
    ChangeSetModelVisitor,
47
)
48
from localstack.services.cloudformation.engine.v2.resolving import (
1✔
49
    REGEX_DYNAMIC_REF,
50
    extract_dynamic_reference,
51
    perform_dynamic_reference_lookup,
52
)
53
from localstack.services.cloudformation.engine.v2.unsupported_resource import (
1✔
54
    should_ignore_unsupported_resource_type,
55
)
56
from localstack.services.cloudformation.engine.validations import ValidationError
1✔
57
from localstack.services.cloudformation.stores import (
1✔
58
    exports_map,
59
)
60
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
61
from localstack.services.cloudformation.v2.types import ResolvedResource
1✔
62
from localstack.utils.aws.arns import get_partition
1✔
63
from localstack.utils.numbers import to_number
1✔
64
from localstack.utils.objects import get_value_from_path
1✔
65
from localstack.utils.run import to_str
1✔
66
from localstack.utils.strings import to_bytes
1✔
67
from localstack.utils.urls import localstack_host
1✔
68

69
_AWS_URL_SUFFIX = localstack_host().host  # The value in AWS is "amazonaws.com"
1✔
70

71
_PSEUDO_PARAMETERS: Final[set[str]] = {
1✔
72
    "AWS::Partition",
73
    "AWS::AccountId",
74
    "AWS::Region",
75
    "AWS::StackName",
76
    "AWS::StackId",
77
    "AWS::URLSuffix",
78
    "AWS::NoValue",
79
    "AWS::NotificationARNs",
80
}
81

82

83
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
84
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
85
)
86
MOCKED_REFERENCE = "unknown"
1✔
87

88
VALID_LOGICAL_RESOURCE_ID_RE = re.compile(r"^[A-Za-z0-9]+$")
1✔
89

90

91
class PreprocEntityDelta[TBefore, TAfter]:
1✔
92
    before: Maybe[TBefore]
1✔
93
    after: Maybe[TAfter]
1✔
94

95
    def __init__(self, before: Maybe[TBefore] = Nothing, after: Maybe[TAfter] = Nothing):
1✔
96
        self.before = before
1✔
97
        self.after = after
1✔
98

99
    def __eq__(self, other):
1✔
UNCOV
100
        if not isinstance(other, PreprocEntityDelta):
×
UNCOV
101
            return False
×
UNCOV
102
        return self.before == other.before and self.after == other.after
×
103

104

105
class PreprocProperties:
1✔
106
    properties: dict[str, Any]
1✔
107

108
    def __init__(self, properties: dict[str, Any]):
1✔
109
        self.properties = properties
1✔
110

111
    def __eq__(self, other):
1✔
112
        if not isinstance(other, PreprocProperties):
1✔
113
            return False
1✔
114
        return self.properties == other.properties
1✔
115

116

117
class PreprocResource:
1✔
118
    logical_id: str
1✔
119
    physical_resource_id: str | None
1✔
120
    condition: bool | None
1✔
121
    resource_type: str
1✔
122
    properties: PreprocProperties
1✔
123
    depends_on: list[str] | None
1✔
124
    requires_replacement: bool
1✔
125
    status: ResourceStatus | None
1✔
126

127
    def __init__(
1✔
128
        self,
129
        logical_id: str,
130
        physical_resource_id: str,
131
        condition: bool | None,
132
        resource_type: str,
133
        properties: PreprocProperties,
134
        depends_on: list[str] | None,
135
        requires_replacement: bool,
136
        status: ResourceStatus | None = None,
137
    ):
138
        self.logical_id = logical_id
1✔
139
        self.physical_resource_id = physical_resource_id
1✔
140
        self.condition = condition
1✔
141
        self.resource_type = resource_type
1✔
142
        self.properties = properties
1✔
143
        self.depends_on = depends_on
1✔
144
        self.requires_replacement = requires_replacement
1✔
145
        self.status = status
1✔
146

147
    @staticmethod
1✔
148
    def _compare_conditions(c1: bool, c2: bool):
1✔
149
        # The lack of condition equates to a true condition.
150
        c1 = c1 if isinstance(c1, bool) else True
1✔
151
        c2 = c2 if isinstance(c2, bool) else True
1✔
152
        return c1 == c2
1✔
153

154
    def __eq__(self, other):
1✔
155
        if not isinstance(other, PreprocResource):
1✔
156
            return False
1✔
157
        return all(
1✔
158
            [
159
                self.logical_id == other.logical_id,
160
                self._compare_conditions(self.condition, other.condition),
161
                self.resource_type == other.resource_type,
162
                self.properties == other.properties,
163
            ]
164
        )
165

166

167
class PreprocOutput:
1✔
168
    name: str
1✔
169
    value: Any
1✔
170
    export: Any | None
1✔
171
    condition: bool | None
1✔
172

173
    def __init__(self, name: str, value: Any, export: Any | None, condition: bool | None):
1✔
174
        self.name = name
1✔
175
        self.value = value
1✔
176
        self.export = export
1✔
177
        self.condition = condition
1✔
178

179
    def __eq__(self, other):
1✔
UNCOV
180
        if not isinstance(other, PreprocOutput):
×
UNCOV
181
            return False
×
UNCOV
182
        return all(
×
183
            [
184
                self.name == other.name,
185
                self.value == other.value,
186
                self.export == other.export,
187
                self.condition == other.condition,
188
            ]
189
        )
190

191

192
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
193
    _change_set: Final[ChangeSet]
1✔
194
    _before_resolved_resources: Final[dict]
1✔
195
    _before_cache: Final[dict[Scope, Any]]
1✔
196
    _after_cache: Final[dict[Scope, Any]]
1✔
197

198
    def __init__(self, change_set: ChangeSet):
1✔
199
        self._change_set = change_set
1✔
200
        self._before_resolved_resources = change_set.stack.resolved_resources
1✔
201
        self._before_cache = {}
1✔
202
        self._after_cache = {}
1✔
203

204
    def _setup_runtime_cache(self) -> None:
1✔
205
        runtime_cache_key = self.__class__.__name__
1✔
206

207
        self._before_cache.clear()
1✔
208
        self._after_cache.clear()
1✔
209

210
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
211
        if cache := before_runtime_cache.get(runtime_cache_key):
1✔
212
            self._before_cache.update(cache)
1✔
213

214
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
215
        if cache := after_runtime_cache.get(runtime_cache_key):
1✔
UNCOV
216
            self._after_cache.update(cache)
×
217

218
    def _save_runtime_cache(self) -> None:
1✔
219
        runtime_cache_key = self.__class__.__name__
1✔
220

221
        before_runtime_cache = self._change_set.update_model.before_runtime_cache
1✔
222
        before_runtime_cache[runtime_cache_key] = copy.deepcopy(self._before_cache)
1✔
223

224
        after_runtime_cache = self._change_set.update_model.after_runtime_cache
1✔
225
        after_runtime_cache[runtime_cache_key] = copy.deepcopy(self._after_cache)
1✔
226

227
    def process(self) -> None:
1✔
228
        self._setup_runtime_cache()
1✔
229
        node_template = self._change_set.update_model.node_template
1✔
230
        node_conditions = self._change_set.update_model.node_template.conditions
1✔
231
        self.visit(node_conditions)
1✔
232
        self.visit(node_template)
1✔
233
        self._save_runtime_cache()
1✔
234

235
    def _get_node_resource_for(
1✔
236
        self, resource_name: str, node_template: NodeTemplate
237
    ) -> NodeResource:
238
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
239
        for node_resource in node_template.resources.resources:
1✔
240
            if node_resource.name == resource_name:
1✔
241
                self.visit(node_resource)
1✔
242
                return node_resource
1✔
243
        raise ValidationError(
1✔
244
            f"Template format error: Unresolved resource dependencies [{resource_name}] in the Resources block of the template"
245
        )
246

247
    def _get_node_property_for(
1✔
248
        self, property_name: str, node_resource: NodeResource
249
    ) -> NodeProperty | None:
250
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
251
        for node_property in node_resource.properties.properties:
1✔
252
            if node_property.name == property_name:
1✔
253
                self.visit(node_property)
1✔
254
                return node_property
1✔
255
        return None
1✔
256

257
    def _deployed_property_value_of(
1✔
258
        self, resource_logical_id: str, property_name: str, resolved_resources: dict
259
    ) -> Any:
260
        # We have to override this function to make sure it does not try to access the
261
        # resolved resource
262

263
        # Before we can obtain deployed value for a resource, we need to first ensure to
264
        # process the resource if this wasn't processed already. Ideally, values should only
265
        # be accessible through delta objects, to ensure computation is always complete at
266
        # every level.
267
        _ = self._get_node_resource_for(
1✔
268
            resource_name=resource_logical_id,
269
            node_template=self._change_set.update_model.node_template,
270
        )
271
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
272
        if resolved_resource is None:
1✔
273
            raise RuntimeError(
1✔
274
                f"No deployed instances of resource '{resource_logical_id}' were found"
275
            )
276
        properties = resolved_resource.get("Properties", {})
1✔
277
        resource_type = resolved_resource.get("Type")
1✔
278
        # TODO support structured properties, e.g. NestedStack.Outputs.OutputName
279
        property_value: Any | None = get_value_from_path(properties, property_name)
1✔
280

281
        if property_value:
1✔
282
            if not isinstance(property_value, (str, list, dict)):
1✔
283
                # Str: Standard expected type. TODO validate bools and numbers
284
                # List: Multiple resource types can return a list of values e.g. AWS::EC2::VPC.
285
                # Dict: Custom resources in CloudFormation can return arbitrary data structures.
UNCOV
286
                raise RuntimeError(
×
287
                    f"Accessing property '{property_name}' from '{resource_logical_id}' resulted in a non-string value nor list"
288
                )
289
            return property_value
1✔
290
        elif resource_type and should_ignore_unsupported_resource_type(
1✔
291
            resource_type=resource_type,
292
            change_set_type=self._change_set.change_set_type,
293
        ):
294
            return MOCKED_REFERENCE
1✔
295

UNCOV
296
        return property_value
×
297

298
    def _before_deployed_property_value_of(
1✔
299
        self, resource_logical_id: str, property_name: str
300
    ) -> Any:
301
        return self._deployed_property_value_of(
1✔
302
            resource_logical_id=resource_logical_id,
303
            property_name=property_name,
304
            resolved_resources=self._before_resolved_resources,
305
        )
306

307
    def _after_deployed_property_value_of(
1✔
308
        self, resource_logical_id: str, property_name: str
309
    ) -> str | None:
310
        return self._before_deployed_property_value_of(
1✔
311
            resource_logical_id=resource_logical_id, property_name=property_name
312
        )
313

314
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
315
        mappings: list[NodeMapping] = self._change_set.update_model.node_template.mappings.mappings
1✔
316
        # TODO: another scenarios suggesting property lookups might be preferable.
317
        for mapping in mappings:
1✔
318
            if mapping.name == map_name:
1✔
319
                self.visit(mapping)
1✔
320
                return mapping
1✔
UNCOV
321
        raise RuntimeError(f"Undefined '{map_name}' mapping")
×
322

323
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
324
        parameters: list[NodeParameter] = (
1✔
325
            self._change_set.update_model.node_template.parameters.parameters
326
        )
327
        # TODO: another scenarios suggesting property lookups might be preferable.
328
        for parameter in parameters:
1✔
329
            if parameter.name == parameter_name:
1✔
330
                self.visit(parameter)
1✔
331
                return parameter
1✔
332
        return Nothing
1✔
333

334
    def _get_node_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
335
        conditions: list[NodeCondition] = (
1✔
336
            self._change_set.update_model.node_template.conditions.conditions
337
        )
338
        # TODO: another scenarios suggesting property lookups might be preferable.
339
        for condition in conditions:
1✔
340
            if condition.name == condition_name:
1✔
341
                self.visit(condition)
1✔
342
                return condition
1✔
343
        return Nothing
1✔
344

345
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
346
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
347
        if isinstance(node_condition, NodeCondition):
1✔
348
            condition_delta = self.visit(node_condition)
1✔
349
            return condition_delta
1✔
UNCOV
350
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
351

352
    def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> Any:
1✔
353
        match pseudo_parameter_name:
1✔
354
            case "AWS::Partition":
1✔
355
                return get_partition(self._change_set.region_name)
1✔
356
            case "AWS::AccountId":
1✔
357
                return self._change_set.stack.account_id
1✔
358
            case "AWS::Region":
1✔
359
                return self._change_set.stack.region_name
1✔
360
            case "AWS::StackName":
1✔
361
                return self._change_set.stack.stack_name
1✔
362
            case "AWS::StackId":
1✔
363
                return self._change_set.stack.stack_id
1✔
364
            case "AWS::URLSuffix":
1✔
365
                return _AWS_URL_SUFFIX
1✔
UNCOV
366
            case "AWS::NoValue":
×
UNCOV
367
                return None
×
UNCOV
368
            case _:
×
369
                raise RuntimeError(f"The use of '{pseudo_parameter_name}' is currently unsupported")
×
370

371
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
372
        if logical_id in _PSEUDO_PARAMETERS:
1✔
373
            pseudo_parameter_value = self._resolve_pseudo_parameter(
1✔
374
                pseudo_parameter_name=logical_id
375
            )
376
            # Pseudo parameters are constants within the lifecycle of a template.
377
            return PreprocEntityDelta(before=pseudo_parameter_value, after=pseudo_parameter_value)
1✔
378

379
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
380
        if isinstance(node_parameter, NodeParameter):
1✔
381
            parameter_delta = self.visit(node_parameter)
1✔
382
            return parameter_delta
1✔
383

384
        node_resource = self._get_node_resource_for(
1✔
385
            resource_name=logical_id, node_template=self._change_set.update_model.node_template
386
        )
387
        resource_delta = self.visit(node_resource)
1✔
388
        before = resource_delta.before
1✔
389
        after = resource_delta.after
1✔
390
        return PreprocEntityDelta(before=before, after=after)
1✔
391

392
    def _resolve_mapping(
1✔
393
        self, map_name: str, top_level_key: str, second_level_key
394
    ) -> PreprocEntityDelta:
395
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
396
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
397
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
398
        if not isinstance(top_level_value, NodeObject):
1✔
399
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
400
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
401
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
402
        if not isinstance(second_level_value, (TerminalValue, NodeArray, NodeObject)):
1✔
403
            error_key = "::".join([map_name, top_level_key, second_level_key])
1✔
404
            raise ValidationError(f"Template error: Unable to get mapping for {error_key}")
1✔
405
        mapping_value_delta = self.visit(second_level_value)
1✔
406
        return mapping_value_delta
1✔
407

408
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
409
        entity_scope = change_set_entity.scope
1✔
410
        if entity_scope in self._before_cache and entity_scope in self._after_cache:
1✔
411
            before = self._before_cache[entity_scope]
1✔
412
            after = self._after_cache[entity_scope]
1✔
413
            return PreprocEntityDelta(before=before, after=after)
1✔
414
        delta = super().visit(change_set_entity=change_set_entity)
1✔
415
        if isinstance(delta, PreprocEntityDelta):
1✔
416
            delta = self._maybe_perform_replacements(delta)
1✔
417
            self._before_cache[entity_scope] = delta.before
1✔
418
            self._after_cache[entity_scope] = delta.after
1✔
419
        return delta
1✔
420

421
    def _maybe_perform_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
422
        delta = self._maybe_perform_static_replacements(delta)
1✔
423
        delta = self._maybe_perform_dynamic_replacements(delta)
1✔
424
        return delta
1✔
425

426
    def _maybe_perform_static_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
427
        return self._maybe_perform_on_delta(delta, self._perform_static_replacements)
1✔
428

429
    def _maybe_perform_dynamic_replacements(self, delta: PreprocEntityDelta) -> PreprocEntityDelta:
1✔
430
        return self._maybe_perform_on_delta(delta, self._perform_dynamic_replacements)
1✔
431

432
    def _maybe_perform_on_delta[T](
1✔
433
        self, delta: PreprocEntityDelta | None, f: Callable[[T], T]
434
    ) -> PreprocEntityDelta | None:
435
        if isinstance(delta.before, str):
1✔
436
            delta.before = f(delta.before)
1✔
437
        if isinstance(delta.after, str):
1✔
438
            delta.after = f(delta.after)
1✔
439
        return delta
1✔
440

441
    def _perform_dynamic_replacements[T](self, value: T) -> T:
1✔
442
        if not isinstance(value, str):
1✔
UNCOV
443
            return value
×
444

445
        if dynamic_ref := extract_dynamic_reference(value):
1✔
446
            new_value = perform_dynamic_reference_lookup(
1✔
447
                reference=dynamic_ref,
448
                account_id=self._change_set.account_id,
449
                region_name=self._change_set.region_name,
450
            )
451
            if new_value:
1✔
452
                # We need to use a function here, to avoid backslash processing by regex.
453
                # From the regex sub documentation:
454
                # repl can be a string or a function; if it is a string, any backslash escapes in it are processed.
455
                # Using a function, we can avoid this processing.
456
                return REGEX_DYNAMIC_REF.sub(lambda _: new_value, value)
1✔
457

458
        return value
1✔
459

460
    @staticmethod
1✔
461
    def _perform_static_replacements(value: str) -> str:
1✔
462
        api_match = REGEX_OUTPUT_APIGATEWAY.match(value)
1✔
463
        if api_match and value not in config.CFN_STRING_REPLACEMENT_DENY_LIST:
1✔
464
            prefix = api_match[1]
1✔
465
            host = api_match[2]
1✔
466
            path = api_match[3]
1✔
467
            port = localstack_host().port
1✔
468
            value = f"{prefix}{host}:{port}/{path}"
1✔
469
            return value
1✔
470

471
        return value
1✔
472

473
    def _cached_apply(
1✔
474
        self, scope: Scope, arguments_delta: PreprocEntityDelta, resolver: Callable[[Any], Any]
475
    ) -> PreprocEntityDelta:
476
        """
477
        Applies the resolver function to the given input delta if and only if the required
478
        values are not already present in the runtime caches. This function handles both
479
        the 'before' and 'after' components of the delta independently.
480

481
        The resolver function receives either the 'before' or 'after' value from the input
482
        delta and returns a resolved value. If the result returned by the resolver is
483
        itself a PreprocEntityDelta, the function automatically extracts the appropriate
484
        component from it:  the 'before' value if the input was 'before', and the 'after'
485
        value if the input was 'after'.
486

487
        This function only reads from the cache and does not update it. It is the caller's
488
        responsibility to handle caching, either manually or via the upstream visit method
489
        of this class.
490

491
        Args:
492
            scope (Scope): The current scope used as a key for cache lookup.
493
            arguments_delta (PreprocEntityDelta): The delta containing 'before' and 'after' values to resolve.
494
            resolver (Callable[[Any], Any]): Function to apply on uncached 'before' or 'after' argument values.
495

496
        Returns:
497
            PreprocEntityDelta: A new delta with resolved 'before' and 'after' values.
498
        """
499

500
        # TODO: Update all visit_* methods in this class and its subclasses to use this function.
501
        #       This ensures maximal reuse of precomputed 'before' (and 'after') values from
502
        #       prior runtimes on the change sets template, thus avoiding unnecessary recomputation.
503

504
        arguments_before = arguments_delta.before
1✔
505
        arguments_after = arguments_delta.after
1✔
506

507
        before = self._before_cache.get(scope, Nothing)
1✔
508
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
509
            before = resolver(arguments_before)
1✔
510
            if isinstance(before, PreprocEntityDelta):
1✔
511
                before = before.before
1✔
512

513
        after = self._after_cache.get(scope, Nothing)
1✔
514
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
515
            after = resolver(arguments_after)
1✔
516
            if isinstance(after, PreprocEntityDelta):
1✔
517
                after = after.after
1✔
518

519
        return PreprocEntityDelta(before=before, after=after)
1✔
520

521
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
522
        return self.visit(node_property.value)
1✔
523

524
    def visit_terminal_value_modified(
1✔
525
        self, terminal_value_modified: TerminalValueModified
526
    ) -> PreprocEntityDelta:
527
        return PreprocEntityDelta(
1✔
528
            before=terminal_value_modified.value,
529
            after=terminal_value_modified.modified_value,
530
        )
531

532
    def visit_terminal_value_created(
1✔
533
        self, terminal_value_created: TerminalValueCreated
534
    ) -> PreprocEntityDelta:
535
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
536

537
    def visit_terminal_value_removed(
1✔
538
        self, terminal_value_removed: TerminalValueRemoved
539
    ) -> PreprocEntityDelta:
540
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
541

542
    def visit_terminal_value_unchanged(
1✔
543
        self, terminal_value_unchanged: TerminalValueUnchanged
544
    ) -> PreprocEntityDelta:
545
        return PreprocEntityDelta(
1✔
546
            before=terminal_value_unchanged.value,
547
            after=terminal_value_unchanged.value,
548
        )
549

550
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
551
        before_delta = self.visit(node_divergence.value)
1✔
552
        after_delta = self.visit(node_divergence.divergence)
1✔
553
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
1✔
554

555
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
556
        node_change_type = node_object.change_type
1✔
557
        before = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
558
        after = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
559
        for name, change_set_entity in node_object.bindings.items():
1✔
560
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
561
            delta_before = delta.before
1✔
562
            delta_after = delta.after
1✔
563
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
564
                before[name] = delta_before
1✔
565
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
566
                after[name] = delta_after
1✔
567
        return PreprocEntityDelta(before=before, after=after)
1✔
568

569
    def _resolve_attribute(self, arguments: str | list[str], select_before: bool) -> str:
1✔
570
        # TODO: add arguments validation.
571
        arguments_list: list[str]
572
        if isinstance(arguments, str):
1✔
573
            arguments_list = arguments.split(".")
1✔
574
        else:
575
            arguments_list = arguments
1✔
576

577
        if len(arguments_list) < 2:
1✔
578
            raise ValidationError(
1✔
579
                "Template error: every Fn::GetAtt object requires two non-empty parameters, the resource name and the resource attribute"
580
            )
581

582
        logical_name_of_resource = arguments_list[0]
1✔
583
        attribute_name = ".".join(arguments_list[1:])
1✔
584

585
        node_resource = self._get_node_resource_for(
1✔
586
            resource_name=logical_name_of_resource,
587
            node_template=self._change_set.update_model.node_template,
588
        )
589

590
        if not is_nothing(node_resource.condition_reference):
1✔
591
            condition = self._get_node_condition_if_exists(node_resource.condition_reference.value)
1✔
592
            evaluation_result = self._resolve_condition(condition.name)
1✔
593

594
            if select_before and not evaluation_result.before:
1✔
595
                raise ValidationError(
1✔
596
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
597
                )
598

599
            if not select_before and not evaluation_result.after:
1✔
UNCOV
600
                raise ValidationError(
×
601
                    f"Template format error: Unresolved resource dependencies [{logical_name_of_resource}] in the Resources block of the template"
602
                )
603

604
        # Custom Resources can mutate their definition
605
        # So the preproc should search first in the resource values and then check the template
606
        if select_before:
1✔
607
            value = self._before_deployed_property_value_of(
1✔
608
                resource_logical_id=logical_name_of_resource,
609
                property_name=attribute_name,
610
            )
611
        else:
612
            value = self._after_deployed_property_value_of(
1✔
613
                resource_logical_id=logical_name_of_resource,
614
                property_name=attribute_name,
615
            )
616
        if value is not None:
1✔
617
            return value
1✔
618

UNCOV
619
        node_property: NodeProperty | None = self._get_node_property_for(
×
620
            property_name=attribute_name, node_resource=node_resource
621
        )
622
        if node_property is not None:
×
623
            # The property is statically defined in the template and its value can be computed.
UNCOV
624
            property_delta = self.visit(node_property)
×
625
            value = property_delta.before if select_before else property_delta.after
×
626

627
        return value
×
628

629
    def visit_node_intrinsic_function_fn_get_att(
1✔
630
        self, node_intrinsic_function: NodeIntrinsicFunction
631
    ) -> PreprocEntityDelta:
632
        # TODO: validate the return value according to the spec.
633
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
634
        before_arguments: Maybe[str | list[str]] = arguments_delta.before
1✔
635
        after_arguments: Maybe[str | list[str]] = arguments_delta.after
1✔
636

637
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
638
        if is_nothing(before) and not is_nothing(before_arguments):
1✔
639
            before = self._resolve_attribute(arguments=before_arguments, select_before=True)
1✔
640

641
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
642
        if is_nothing(after) and not is_nothing(after_arguments):
1✔
643
            after = self._resolve_attribute(arguments=after_arguments, select_before=False)
1✔
644

645
        return PreprocEntityDelta(before=before, after=after)
1✔
646

647
    def visit_node_intrinsic_function_fn_equals(
1✔
648
        self, node_intrinsic_function: NodeIntrinsicFunction
649
    ) -> PreprocEntityDelta:
650
        # TODO: add argument shape validation.
651
        def _compute_fn_equals(args: list[Any]) -> bool:
1✔
652
            return args[0] == args[1]
1✔
653

654
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
655

656
        if isinstance(arguments_delta.after, list) and len(arguments_delta.after) != 2:
1✔
UNCOV
657
            raise ValidationError(
×
658
                "Template error: every Fn::Equals object requires a list of 2 string parameters."
659
            )
660

661
        delta = self._cached_apply(
1✔
662
            scope=node_intrinsic_function.scope,
663
            arguments_delta=arguments_delta,
664
            resolver=_compute_fn_equals,
665
        )
666
        return delta
1✔
667

668
    def visit_node_intrinsic_function_fn_if(
1✔
669
        self, node_intrinsic_function: NodeIntrinsicFunction
670
    ) -> PreprocEntityDelta:
671
        # `if` needs to be short-circuiting i.e. if the condition is True we don't evaluate the
672
        # False branch. If the condition is False, we don't evaluate the True branch.
673
        if len(node_intrinsic_function.arguments.array) != 3:
1✔
UNCOV
674
            raise ValueError(
×
675
                f"Incorrectly constructed Fn::If usage, expected 3 arguments, found {len(node_intrinsic_function.arguments.array)}"
676
            )
677

678
        condition_delta = self.visit(node_intrinsic_function.arguments.array[0])
1✔
679
        if_delta = PreprocEntityDelta()
1✔
680
        if not is_nothing(condition_delta.before):
1✔
681
            node_condition = self._get_node_condition_if_exists(
1✔
682
                condition_name=condition_delta.before
683
            )
684
            if is_nothing(node_condition):
1✔
685
                # TODO: I don't think this is a possible state since for us to be evaluating the before state,
686
                #  we must have successfully deployed the stack and as such this case was not reached before
UNCOV
687
                raise ValidationError(
×
688
                    f"Template error: unresolved condition dependency {condition_delta.before} in Fn::If"
689
                )
690

691
            condition_value = self.visit(node_condition).before
1✔
692
            if condition_value:
1✔
693
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
694
            else:
695
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
696
            if_delta.before = arg_delta.before
1✔
697

698
        if not is_nothing(condition_delta.after):
1✔
699
            node_condition = self._get_node_condition_if_exists(
1✔
700
                condition_name=condition_delta.after
701
            )
702
            if is_nothing(node_condition):
1✔
703
                raise ValidationError(
1✔
704
                    f"Template error: unresolved condition dependency {condition_delta.after} in Fn::If"
705
                )
706

707
            condition_value = self.visit(node_condition).after
1✔
708
            if condition_value:
1✔
709
                arg_delta = self.visit(node_intrinsic_function.arguments.array[1])
1✔
710
            else:
711
                arg_delta = self.visit(node_intrinsic_function.arguments.array[2])
1✔
712
            if_delta.after = arg_delta.after
1✔
713

714
        return if_delta
1✔
715

716
    def visit_node_intrinsic_function_fn_and(
1✔
717
        self, node_intrinsic_function: NodeIntrinsicFunction
718
    ) -> PreprocEntityDelta:
719
        def _compute_fn_and(args: list[bool]) -> bool:
1✔
720
            result = all(args)
1✔
721
            return result
1✔
722

723
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
724
        delta = self._cached_apply(
1✔
725
            scope=node_intrinsic_function.scope,
726
            arguments_delta=arguments_delta,
727
            resolver=_compute_fn_and,
728
        )
729
        return delta
1✔
730

731
    def visit_node_intrinsic_function_fn_or(
1✔
732
        self, node_intrinsic_function: NodeIntrinsicFunction
733
    ) -> PreprocEntityDelta:
734
        def _compute_fn_or(args: list[bool]):
1✔
735
            result = any(args)
1✔
736
            return result
1✔
737

738
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
739
        delta = self._cached_apply(
1✔
740
            scope=node_intrinsic_function.scope,
741
            arguments_delta=arguments_delta,
742
            resolver=_compute_fn_or,
743
        )
744
        return delta
1✔
745

746
    def visit_node_intrinsic_function_fn_not(
1✔
747
        self, node_intrinsic_function: NodeIntrinsicFunction
748
    ) -> PreprocEntityDelta:
749
        def _compute_fn_not(arg: list[bool] | bool) -> bool:
1✔
750
            # Is the argument ever a lone boolean?
751
            if isinstance(arg, list):
1✔
752
                return not arg[0]
1✔
753
            else:
UNCOV
754
                return not arg
×
755

756
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
757
        delta = self._cached_apply(
1✔
758
            scope=node_intrinsic_function.scope,
759
            arguments_delta=arguments_delta,
760
            resolver=_compute_fn_not,
761
        )
762
        return delta
1✔
763

764
    def visit_node_intrinsic_function_fn_sub(
1✔
765
        self, node_intrinsic_function: NodeIntrinsicFunction
766
    ) -> PreprocEntityDelta:
767
        def _compute_sub(args: str | list[Any], select_before: bool) -> str:
1✔
768
            # TODO: add further schema validation.
769
            string_template: str
770
            sub_parameters: dict
771
            if isinstance(args, str):
1✔
772
                string_template = args
1✔
773
                sub_parameters = {}
1✔
774
            elif (
1✔
775
                isinstance(args, list)
776
                and len(args) == 2
777
                and isinstance(args[0], str)
778
                and isinstance(args[1], dict)
779
            ):
780
                string_template = args[0]
1✔
781
                sub_parameters = args[1]
1✔
782
            else:
UNCOV
783
                raise RuntimeError(
×
784
                    "Invalid arguments shape for Fn::Sub, expected a String "
785
                    f"or a Tuple of String and Map but got '{args}'"
786
                )
787
            sub_string = string_template
1✔
788
            template_variable_names = re.findall("\\${([^}]+)}", string_template)
1✔
789
            for template_variable_name in template_variable_names:
1✔
790
                template_variable_value = Nothing
1✔
791

792
                # Try to resolve the variable name as pseudo parameter.
793
                if template_variable_name in _PSEUDO_PARAMETERS:
1✔
794
                    template_variable_value = self._resolve_pseudo_parameter(
1✔
795
                        pseudo_parameter_name=template_variable_name
796
                    )
797

798
                # Try to resolve the variable name as an entry to the defined parameters.
799
                elif template_variable_name in sub_parameters:
1✔
800
                    template_variable_value = sub_parameters[template_variable_name]
1✔
801

802
                # Try to resolve the variable name as GetAtt.
803
                elif "." in template_variable_name:
1✔
804
                    try:
1✔
805
                        template_variable_value = self._resolve_attribute(
1✔
806
                            arguments=template_variable_name, select_before=select_before
807
                        )
808
                    except RuntimeError:
1✔
809
                        pass
1✔
810

811
                # Try to resolve the variable name as Ref.
812
                else:
813
                    try:
1✔
814
                        resource_delta = self._resolve_reference(logical_id=template_variable_name)
1✔
815
                        template_variable_value = (
1✔
816
                            resource_delta.before if select_before else resource_delta.after
817
                        )
818
                        if isinstance(template_variable_value, PreprocResource):
1✔
819
                            template_variable_value = template_variable_value.physical_resource_id
1✔
UNCOV
820
                    except RuntimeError:
×
UNCOV
821
                        pass
×
822

823
                if is_nothing(template_variable_value):
1✔
824
                    raise RuntimeError(
1✔
825
                        f"Undefined variable name in Fn::Sub string template '{template_variable_name}'"
826
                    )
827

828
                if not isinstance(template_variable_value, str):
1✔
829
                    template_variable_value = str(template_variable_value)
1✔
830

831
                sub_string = sub_string.replace(
1✔
832
                    f"${{{template_variable_name}}}", template_variable_value
833
                )
834

835
            # FIXME: the following type reduction is ported from v1; however it appears as though such
836
            #        reduction is not performed by the engine, and certainly not at this depth given the
837
            #        lack of context. This section should be removed with Fn::Sub always retuning a string
838
            #        and the resource providers reviewed.
839
            account_id = self._change_set.account_id
1✔
840
            is_another_account_id = sub_string.isdigit() and len(sub_string) == len(account_id)
1✔
841
            if sub_string == account_id or is_another_account_id:
1✔
842
                result = sub_string
1✔
843
            elif sub_string.isdigit():
1✔
844
                result = int(sub_string)
1✔
845
            else:
846
                try:
1✔
847
                    result = float(sub_string)
1✔
848
                except ValueError:
1✔
849
                    result = sub_string
1✔
850
            return result
1✔
851

852
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
853
        arguments_before = arguments_delta.before
1✔
854
        arguments_after = arguments_delta.after
1✔
855
        before = self._before_cache.get(node_intrinsic_function.scope, Nothing)
1✔
856
        if is_nothing(before) and not is_nothing(arguments_before):
1✔
857
            before = _compute_sub(args=arguments_before, select_before=True)
1✔
858
        after = self._after_cache.get(node_intrinsic_function.scope, Nothing)
1✔
859
        if is_nothing(after) and not is_nothing(arguments_after):
1✔
860
            after = _compute_sub(args=arguments_after, select_before=False)
1✔
861
        return PreprocEntityDelta(before=before, after=after)
1✔
862

863
    def visit_node_intrinsic_function_fn_join(
1✔
864
        self, node_intrinsic_function: NodeIntrinsicFunction
865
    ) -> PreprocEntityDelta:
866
        # TODO: add support for schema validation.
867
        # TODO: add tests for joining non string values.
868
        def _compute_fn_join(args: list[Any]) -> str | NothingType:
1✔
869
            if not (isinstance(args, list) and len(args) == 2):
1✔
870
                return Nothing
1✔
871
            delimiter: str = str(args[0])
1✔
872
            values: list[Any] = args[1]
1✔
873
            if not isinstance(values, list):
1✔
874
                # shortcut if values is the empty string, for example:
875
                # {"Fn::Join": ["", {"Ref": <parameter>}]}
876
                # CDK bootstrap does this
877
                if values == "":
1✔
878
                    return ""
1✔
879
                raise RuntimeError(f"Invalid arguments list definition for Fn::Join: '{args}'")
1✔
880
            str_values: list[str] = []
1✔
881
            for value in values:
1✔
882
                if value is None:
1✔
883
                    continue
1✔
884
                str_value = str(value)
1✔
885
                str_values.append(str_value)
1✔
886
            join_result = delimiter.join(str_values)
1✔
887
            return join_result
1✔
888

889
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
890
        delta = self._cached_apply(
1✔
891
            scope=node_intrinsic_function.scope,
892
            arguments_delta=arguments_delta,
893
            resolver=_compute_fn_join,
894
        )
895
        return delta
1✔
896

897
    def visit_node_intrinsic_function_fn_select(
1✔
898
        self, node_intrinsic_function: NodeIntrinsicFunction
899
    ):
900
        # TODO: add further support for schema validation
901
        def _compute_fn_select(args: list[Any]) -> Any:
1✔
902
            values = args[1]
1✔
903
            # defer evaluation if the selection list contains unresolved elements (e.g., unresolved intrinsics)
904
            if isinstance(values, list) and not all(isinstance(value, str) for value in values):
1✔
905
                raise RuntimeError("Fn::Select list contains unresolved elements")
1✔
906

907
            if not isinstance(values, list) or not values:
1✔
908
                raise ValidationError(
1✔
909
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
910
                )
911
            try:
1✔
912
                index: int = int(args[0])
1✔
913
            except ValueError as e:
1✔
914
                raise ValidationError(
1✔
915
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
916
                ) from e
917

918
            values_len = len(values)
1✔
919
            if index < 0 or index >= values_len:
1✔
920
                raise ValidationError(
1✔
921
                    "Template error: Fn::Select requires a list argument with two elements: an integer index and a list"
922
                )
923
            selection = values[index]
1✔
924
            return selection
1✔
925

926
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
927
        delta = self._cached_apply(
1✔
928
            scope=node_intrinsic_function.scope,
929
            arguments_delta=arguments_delta,
930
            resolver=_compute_fn_select,
931
        )
932
        return delta
1✔
933

934
    def visit_node_intrinsic_function_fn_split(
1✔
935
        self, node_intrinsic_function: NodeIntrinsicFunction
936
    ):
937
        # TODO: add further support for schema validation
938
        def _compute_fn_split(args: list[Any]) -> Any:
1✔
939
            delimiter = args[0]
1✔
940
            if not isinstance(delimiter, str) or not delimiter:
1✔
UNCOV
941
                raise RuntimeError(f"Invalid delimiter value for Fn::Split: '{delimiter}'")
×
942
            source_string = args[1]
1✔
943
            if not isinstance(source_string, str):
1✔
944
                raise RuntimeError(f"Invalid source string value for Fn::Split: '{source_string}'")
1✔
945
            split_string = source_string.split(delimiter)
1✔
946
            return split_string
1✔
947

948
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
949

950
        if not (
1✔
951
            is_nothing(arguments_delta.after)
952
            or isinstance(arguments_delta.after, list)
953
            and len(arguments_delta.after) == 2
954
        ):
UNCOV
955
            raise ValidationError(
×
956
                "Template error: every Fn::Split object requires two parameters, "
957
                "(1) a string delimiter and (2) a string to be split or a function that returns a string to be split."
958
            )
959

960
        delta = self._cached_apply(
1✔
961
            scope=node_intrinsic_function.scope,
962
            arguments_delta=arguments_delta,
963
            resolver=_compute_fn_split,
964
        )
965
        return delta
1✔
966

967
    def visit_node_intrinsic_function_fn_get_a_zs(
1✔
968
        self, node_intrinsic_function: NodeIntrinsicFunction
969
    ) -> PreprocEntityDelta:
970
        # TODO: add further support for schema validation
971

972
        def _compute_fn_get_a_zs(region) -> Any:
1✔
973
            if not isinstance(region, str):
1✔
UNCOV
974
                raise RuntimeError(f"Invalid region value for Fn::GetAZs: '{region}'")
×
975

976
            if not region:
1✔
977
                region = self._change_set.region_name
1✔
978

979
            account_id = self._change_set.account_id
1✔
980
            ec2_client = connect_to(aws_access_key_id=account_id, region_name=region).ec2
1✔
981
            try:
1✔
982
                get_availability_zones_result: DescribeAvailabilityZonesResult = (
1✔
983
                    ec2_client.describe_availability_zones()
984
                )
UNCOV
985
            except ClientError:
×
UNCOV
986
                raise RuntimeError(
×
987
                    "Could not describe zones availability whilst evaluating Fn::GetAZs"
988
                )
989
            availability_zones: AvailabilityZoneList = get_availability_zones_result[
1✔
990
                "AvailabilityZones"
991
            ]
992
            azs = [az["ZoneName"] for az in availability_zones]
1✔
993
            return azs
1✔
994

995
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
996
        delta = self._cached_apply(
1✔
997
            scope=node_intrinsic_function.scope,
998
            arguments_delta=arguments_delta,
999
            resolver=_compute_fn_get_a_zs,
1000
        )
1001
        return delta
1✔
1002

1003
    def visit_node_intrinsic_function_fn_base64(
1✔
1004
        self, node_intrinsic_function: NodeIntrinsicFunction
1005
    ) -> PreprocEntityDelta:
1006
        # TODO: add further support for schema validation
1007
        def _compute_fn_base_64(string) -> Any:
1✔
1008
            if not isinstance(string, str):
1✔
UNCOV
1009
                raise RuntimeError(f"Invalid valueToEncode for Fn::Base64: '{string}'")
×
1010
            # Ported from v1:
1011
            base64_string = to_str(base64.b64encode(to_bytes(string)))
1✔
1012
            return base64_string
1✔
1013

1014
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1015
        delta = self._cached_apply(
1✔
1016
            scope=node_intrinsic_function.scope,
1017
            arguments_delta=arguments_delta,
1018
            resolver=_compute_fn_base_64,
1019
        )
1020
        return delta
1✔
1021

1022
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
1023
        self, node_intrinsic_function: NodeIntrinsicFunction
1024
    ) -> PreprocEntityDelta:
1025
        # TODO: add type checking/validation for result unit?
1026
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1027
        before_arguments = arguments_delta.before
1✔
1028
        after_arguments = arguments_delta.after
1✔
1029
        before = Nothing
1✔
1030
        if before_arguments:
1✔
1031
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
1032
            before = before_value_delta.before
1✔
1033
        after = Nothing
1✔
1034
        if after_arguments:
1✔
1035
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
1036
            after = after_value_delta.after
1✔
1037
        return PreprocEntityDelta(before=before, after=after)
1✔
1038

1039
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
1040
        bindings_delta = self.visit(node_mapping.bindings)
1✔
1041
        return bindings_delta
1✔
1042

1043
    def visit_node_parameters(
1✔
1044
        self, node_parameters: NodeParameters
1045
    ) -> PreprocEntityDelta[dict[str, Any], dict[str, Any]]:
1046
        before_parameters = {}
1✔
1047
        after_parameters = {}
1✔
1048
        for parameter in node_parameters.parameters:
1✔
1049
            parameter_delta = self.visit(parameter)
1✔
1050
            parameter_before = parameter_delta.before
1✔
1051
            if not is_nothing(parameter_before):
1✔
1052
                before_parameters[parameter.name] = parameter_before
1✔
1053
            parameter_after = parameter_delta.after
1✔
1054
            if not is_nothing(parameter_after):
1✔
1055
                after_parameters[parameter.name] = parameter_after
1✔
1056
        return PreprocEntityDelta(before=before_parameters, after=after_parameters)
1✔
1057

1058
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
1059
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_parameter.name):
1✔
1060
            raise ValidationError(
1✔
1061
                f"Template format error: Parameter name {node_parameter.name} is non alphanumeric."
1062
            )
1063
        dynamic_value = node_parameter.dynamic_value
1✔
1064
        dynamic_delta = self.visit(dynamic_value)
1✔
1065

1066
        default_value = node_parameter.default_value
1✔
1067
        default_delta = self.visit(default_value)
1✔
1068

1069
        before = dynamic_delta.before or default_delta.before
1✔
1070
        after = dynamic_delta.after or default_delta.after
1✔
1071

1072
        parameter_type = self.visit(node_parameter.type_)
1✔
1073

1074
        def _resolve_parameter_type(value: str, type_: str) -> Any:
1✔
1075
            match type_:
1✔
1076
                case s if re.match(r"List<[^>]+>", s):
1✔
1077
                    return [item.strip() for item in value.split(",")]
1✔
1078
                case "CommaDelimitedList":
1✔
1079
                    return [item.strip() for item in value.split(",")]
1✔
1080
                case "Number":
1✔
1081
                    # TODO: validate the parameter type at template parse time (or whatever is in parity with AWS) so we know this cannot fail
1082
                    return to_number(value)
1✔
1083
            return value
1✔
1084

1085
        if not is_nothing(after):
1✔
1086
            after = _resolve_parameter_type(after, parameter_type.after)
1✔
1087

1088
        return PreprocEntityDelta(before=before, after=after)
1✔
1089

1090
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
1091
        array_identifiers_delta = self.visit(node_depends_on.depends_on)
1✔
1092
        return array_identifiers_delta
1✔
1093

1094
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
1095
        delta = self.visit(node_condition.body)
1✔
1096
        return delta
1✔
1097

1098
    def _resource_physical_resource_id_from(
1✔
1099
        self, logical_resource_id: str, resolved_resources: dict[str, ResolvedResource]
1100
    ) -> str | None:
1101
        # TODO: typing around resolved resources is needed and should be reflected here.
1102
        resolved_resource = resolved_resources.get(logical_resource_id, {})
1✔
1103
        if resolved_resource.get("ResourceStatus") not in {
1✔
1104
            ResourceStatus.CREATE_COMPLETE,
1105
            ResourceStatus.UPDATE_COMPLETE,
1106
        }:
1107
            return None
1✔
1108

1109
        physical_resource_id = resolved_resource.get("PhysicalResourceId")
1✔
1110
        if not isinstance(physical_resource_id, str):
1✔
UNCOV
1111
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
×
1112
        return physical_resource_id
1✔
1113

1114
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1115
        # TODO: typing around resolved resources is needed and should be reflected here.
1116
        return self._resource_physical_resource_id_from(
1✔
1117
            logical_resource_id=resource_logical_id,
1118
            resolved_resources=self._before_resolved_resources,
1119
        )
1120

1121
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
1122
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
1123

1124
    def visit_node_intrinsic_function_ref(
1✔
1125
        self, node_intrinsic_function: NodeIntrinsicFunction
1126
    ) -> PreprocEntityDelta:
1127
        def _compute_fn_ref(logical_id: str) -> PreprocEntityDelta:
1✔
1128
            if logical_id == "AWS::NoValue":
1✔
1129
                return Nothing
1✔
1130

1131
            reference_delta: PreprocEntityDelta = self._resolve_reference(logical_id=logical_id)
1✔
1132
            if isinstance(before := reference_delta.before, PreprocResource):
1✔
1133
                reference_delta.before = before.physical_resource_id
1✔
1134
            if isinstance(after := reference_delta.after, PreprocResource):
1✔
1135
                reference_delta.after = after.physical_resource_id
1✔
1136
            return reference_delta
1✔
1137

1138
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1139
        delta = self._cached_apply(
1✔
1140
            scope=node_intrinsic_function.scope,
1141
            arguments_delta=arguments_delta,
1142
            resolver=_compute_fn_ref,
1143
        )
1144
        return delta
1✔
1145

1146
    def visit_node_intrinsic_function_condition(
1✔
1147
        self, node_intrinsic_function: NodeIntrinsicFunction
1148
    ) -> PreprocEntityDelta:
1149
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1150

1151
        def _delta_of_condition(name: str) -> PreprocEntityDelta:
1✔
1152
            node_condition = self._get_node_condition_if_exists(condition_name=name)
1✔
1153
            if is_nothing(node_condition):
1✔
UNCOV
1154
                raise RuntimeError(f"Undefined condition '{name}'")
×
1155
            condition_delta = self.visit(node_condition)
1✔
1156
            return condition_delta
1✔
1157

1158
        delta = self._cached_apply(
1✔
1159
            resolver=_delta_of_condition,
1160
            scope=node_intrinsic_function.scope,
1161
            arguments_delta=arguments_delta,
1162
        )
1163
        return delta
1✔
1164

1165
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
1166
        node_change_type = node_array.change_type
1✔
1167
        before = [] if node_change_type != ChangeType.CREATED else Nothing
1✔
1168
        after = [] if node_change_type != ChangeType.REMOVED else Nothing
1✔
1169
        for change_set_entity in node_array.array:
1✔
1170
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
1171
            delta_before = delta.before
1✔
1172
            delta_after = delta.after
1✔
1173
            if not is_nothing(before) and not is_nothing(delta_before):
1✔
1174
                before.append(delta_before)
1✔
1175
            if not is_nothing(after) and not is_nothing(delta_after):
1✔
1176
                after.append(delta_after)
1✔
1177
        return PreprocEntityDelta(before=before, after=after)
1✔
1178

1179
    def visit_node_properties(
1✔
1180
        self, node_properties: NodeProperties
1181
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
1182
        node_change_type = node_properties.change_type
1✔
1183
        before_bindings = {} if node_change_type != ChangeType.CREATED else Nothing
1✔
1184
        after_bindings = {} if node_change_type != ChangeType.REMOVED else Nothing
1✔
1185
        for node_property in node_properties.properties:
1✔
1186
            property_name = node_property.name
1✔
1187
            delta = self.visit(node_property)
1✔
1188
            delta_before = delta.before
1✔
1189
            delta_after = delta.after
1✔
1190
            if (
1✔
1191
                not is_nothing(before_bindings)
1192
                and not is_nothing(delta_before)
1193
                and delta_before is not None
1194
            ):
1195
                before_bindings[property_name] = delta_before
1✔
1196
            if (
1✔
1197
                not is_nothing(after_bindings)
1198
                and not is_nothing(delta_after)
1199
                and delta_after is not None
1200
            ):
1201
                after_bindings[property_name] = delta_after
1✔
1202
        before = Nothing
1✔
1203
        if not is_nothing(before_bindings):
1✔
1204
            before = PreprocProperties(properties=before_bindings)
1✔
1205
        after = Nothing
1✔
1206
        if not is_nothing(after_bindings):
1✔
1207
            after = PreprocProperties(properties=after_bindings)
1✔
1208
        return PreprocEntityDelta(before=before, after=after)
1✔
1209

1210
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
1211
        reference_delta = self.visit(reference)
1✔
1212
        before_reference = reference_delta.before
1✔
1213
        before = Nothing
1✔
1214
        if isinstance(before_reference, str):
1✔
1215
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
1216
            before = before_delta.before
1✔
1217
        after = Nothing
1✔
1218
        after_reference = reference_delta.after
1✔
1219
        if isinstance(after_reference, str):
1✔
1220
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
1221
            after = after_delta.after
1✔
1222
        return PreprocEntityDelta(before=before, after=after)
1✔
1223

1224
    def visit_node_resources(self, node_resources: NodeResources):
1✔
1225
        """
1226
        Skip resources where they conditionally evaluate to False
1227
        """
1228
        for node_resource in node_resources.resources:
1✔
1229
            if not is_nothing(node_resource.condition_reference):
1✔
1230
                condition_delta = self._resolve_resource_condition_reference(
1✔
1231
                    node_resource.condition_reference
1232
                )
1233
                condition_after = condition_delta.after
1✔
1234
                if condition_after is False:
1✔
1235
                    continue
1✔
1236
            self.visit(node_resource)
1✔
1237

1238
    def visit_node_resource(
1✔
1239
        self, node_resource: NodeResource
1240
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
1241
        if not VALID_LOGICAL_RESOURCE_ID_RE.match(node_resource.name):
1✔
1242
            raise ValidationError(
1✔
1243
                f"Template format error: Resource name {node_resource.name} is non alphanumeric."
1244
            )
1245
        change_type = node_resource.change_type
1✔
1246
        condition_before = Nothing
1✔
1247
        condition_after = Nothing
1✔
1248
        if not is_nothing(node_resource.condition_reference):
1✔
1249
            condition_delta = self._resolve_resource_condition_reference(
1✔
1250
                node_resource.condition_reference
1251
            )
1252
            condition_before = condition_delta.before
1✔
1253
            condition_after = condition_delta.after
1✔
1254

1255
        depends_on_before = Nothing
1✔
1256
        depends_on_after = Nothing
1✔
1257
        if not is_nothing(node_resource.depends_on):
1✔
1258
            depends_on_delta = self.visit(node_resource.depends_on)
1✔
1259
            depends_on_before = depends_on_delta.before
1✔
1260
            depends_on_after = depends_on_delta.after
1✔
1261

1262
        type_delta = self.visit(node_resource.type_)
1✔
1263
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
1264
            node_resource.properties
1265
        )
1266

1267
        before = Nothing
1✔
1268
        after = Nothing
1✔
1269
        if change_type != ChangeType.CREATED and is_nothing(condition_before) or condition_before:
1✔
1270
            logical_resource_id = node_resource.name
1✔
1271
            before_physical_resource_id = self._before_resource_physical_id(
1✔
1272
                resource_logical_id=logical_resource_id
1273
            )
1274
            before = PreprocResource(
1✔
1275
                logical_id=logical_resource_id,
1276
                physical_resource_id=before_physical_resource_id,
1277
                condition=condition_before,
1278
                resource_type=type_delta.before,
1279
                properties=properties_delta.before,
1280
                depends_on=depends_on_before,
1281
                requires_replacement=False,
1282
            )
1283
        if change_type != ChangeType.REMOVED and is_nothing(condition_after) or condition_after:
1✔
1284
            logical_resource_id = node_resource.name
1✔
1285
            try:
1✔
1286
                after_physical_resource_id = self._after_resource_physical_id(
1✔
1287
                    resource_logical_id=logical_resource_id
1288
                )
UNCOV
1289
            except RuntimeError:
×
UNCOV
1290
                after_physical_resource_id = None
×
1291
            after = PreprocResource(
1✔
1292
                logical_id=logical_resource_id,
1293
                physical_resource_id=after_physical_resource_id,
1294
                condition=condition_after,
1295
                resource_type=type_delta.after,
1296
                properties=properties_delta.after,
1297
                depends_on=depends_on_after,
1298
                requires_replacement=node_resource.requires_replacement,
1299
            )
1300
        return PreprocEntityDelta(before=before, after=after)
1✔
1301

1302
    def visit_node_output(
1✔
1303
        self, node_output: NodeOutput
1304
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
1305
        change_type = node_output.change_type
1✔
1306
        value_delta = self.visit(node_output.value)
1✔
1307

1308
        condition_delta = Nothing
1✔
1309
        if not is_nothing(node_output.condition_reference):
1✔
1310
            condition_delta = self._resolve_resource_condition_reference(
1✔
1311
                node_output.condition_reference
1312
            )
1313
            condition_before = condition_delta.before
1✔
1314
            condition_after = condition_delta.after
1✔
1315
            if not condition_before and condition_after:
1✔
1316
                change_type = ChangeType.CREATED
1✔
1317
            elif condition_before and not condition_after:
1✔
1318
                change_type = ChangeType.REMOVED
1✔
1319

1320
        export_delta = Nothing
1✔
1321
        if not is_nothing(node_output.export):
1✔
1322
            export_delta = self.visit(node_output.export)
1✔
1323

1324
        before: Maybe[PreprocOutput] = Nothing
1✔
1325
        if change_type != ChangeType.CREATED:
1✔
1326
            before = PreprocOutput(
1✔
1327
                name=node_output.name,
1328
                value=value_delta.before,
1329
                export=export_delta.before if export_delta else None,
1330
                condition=condition_delta.before if condition_delta else None,
1331
            )
1332
        after: Maybe[PreprocOutput] = Nothing
1✔
1333
        if change_type != ChangeType.REMOVED:
1✔
1334
            after = PreprocOutput(
1✔
1335
                name=node_output.name,
1336
                value=value_delta.after,
1337
                export=export_delta.after if export_delta else None,
1338
                condition=condition_delta.after if condition_delta else None,
1339
            )
1340
        return PreprocEntityDelta(before=before, after=after)
1✔
1341

1342
    def visit_node_outputs(
1✔
1343
        self, node_outputs: NodeOutputs
1344
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
1345
        before: list[PreprocOutput] = []
1✔
1346
        after: list[PreprocOutput] = []
1✔
1347
        for node_output in node_outputs.outputs:
1✔
1348
            if not is_nothing(node_output.condition_reference):
1✔
1349
                condition_delta = self._resolve_resource_condition_reference(
1✔
1350
                    node_output.condition_reference
1351
                )
1352
                condition_after = condition_delta.after
1✔
1353
                if condition_after is False:
1✔
1354
                    continue
1✔
1355

1356
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
1357
            output_before = output_delta.before
1✔
1358
            output_after = output_delta.after
1✔
1359
            if not is_nothing(output_before):
1✔
1360
                before.append(output_before)
1✔
1361
            if not is_nothing(output_after):
1✔
1362
                after.append(output_after)
1✔
1363
        return PreprocEntityDelta(before=before, after=after)
1✔
1364

1365
    def visit_node_intrinsic_function_fn_import_value(
1✔
1366
        self, node_intrinsic_function: NodeIntrinsicFunction
1367
    ) -> PreprocEntityDelta:
1368
        def _compute_fn_import_value(string) -> str:
1✔
1369
            if not isinstance(string, str):
1✔
UNCOV
1370
                raise RuntimeError(f"Invalid parameter for import: '{string}'")
×
1371

1372
            exports = exports_map(
1✔
1373
                account_id=self._change_set.account_id, region_name=self._change_set.region_name
1374
            )
1375

1376
            return exports.get(string, {}).get("Value") or Nothing
1✔
1377

1378
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
1379
        delta = self._cached_apply(
1✔
1380
            scope=node_intrinsic_function.scope,
1381
            arguments_delta=arguments_delta,
1382
            resolver=_compute_fn_import_value,
1383
        )
1384
        return delta
1✔
1385

1386
    def visit_node_intrinsic_function_fn_transform(
1✔
1387
        self, node_intrinsic_function: NodeIntrinsicFunction
1388
    ):
UNCOV
1389
        raise RuntimeError("Fn::Transform should have been handled by the Transformer")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc