• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d8dc9956-71ea-40c6-95cb-26b2b584943a

08 May 2025 05:15PM UTC coverage: 86.66% (+0.1%) from 86.535%
d8dc9956-71ea-40c6-95cb-26b2b584943a

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64346 of 74251 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.5
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_preproc.py
1
from __future__ import annotations
1✔
2

3
from typing import Any, Final, Generic, Optional, TypeVar
1✔
4

5
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
6
    ChangeSetEntity,
7
    ChangeType,
8
    NodeArray,
9
    NodeCondition,
10
    NodeDivergence,
11
    NodeIntrinsicFunction,
12
    NodeMapping,
13
    NodeObject,
14
    NodeOutput,
15
    NodeOutputs,
16
    NodeParameter,
17
    NodeProperties,
18
    NodeProperty,
19
    NodeResource,
20
    NodeTemplate,
21
    Scope,
22
    TerminalValue,
23
    TerminalValueCreated,
24
    TerminalValueModified,
25
    TerminalValueRemoved,
26
    TerminalValueUnchanged,
27
)
28
from localstack.services.cloudformation.engine.v2.change_set_model_visitor import (
1✔
29
    ChangeSetModelVisitor,
30
)
31

32
TBefore = TypeVar("TBefore")
1✔
33
TAfter = TypeVar("TAfter")
1✔
34

35

36
class PreprocEntityDelta(Generic[TBefore, TAfter]):
1✔
37
    before: Optional[TBefore]
1✔
38
    after: Optional[TAfter]
1✔
39

40
    def __init__(self, before: Optional[TBefore] = None, after: Optional[TAfter] = None):
1✔
41
        self.before = before
1✔
42
        self.after = after
1✔
43

44
    def __eq__(self, other):
1✔
45
        if not isinstance(other, PreprocEntityDelta):
×
46
            return False
×
47
        return self.before == other.before and self.after == other.after
×
48

49

50
class PreprocProperties:
1✔
51
    properties: dict[str, Any]
1✔
52

53
    def __init__(self, properties: dict[str, Any]):
1✔
54
        self.properties = properties
1✔
55

56
    def __eq__(self, other):
1✔
57
        if not isinstance(other, PreprocProperties):
1✔
58
            return False
×
59
        return self.properties == other.properties
1✔
60

61

62
class PreprocResource:
1✔
63
    logical_id: str
1✔
64
    physical_resource_id: Optional[str]
1✔
65
    condition: Optional[bool]
1✔
66
    resource_type: str
1✔
67
    properties: PreprocProperties
1✔
68

69
    def __init__(
1✔
70
        self,
71
        logical_id: str,
72
        physical_resource_id: str,
73
        condition: Optional[bool],
74
        resource_type: str,
75
        properties: PreprocProperties,
76
    ):
77
        self.logical_id = logical_id
1✔
78
        self.physical_resource_id = physical_resource_id
1✔
79
        self.condition = condition
1✔
80
        self.resource_type = resource_type
1✔
81
        self.properties = properties
1✔
82

83
    @staticmethod
1✔
84
    def _compare_conditions(c1: bool, c2: bool):
1✔
85
        # The lack of condition equates to a true condition.
86
        c1 = c1 if isinstance(c1, bool) else True
1✔
87
        c2 = c2 if isinstance(c2, bool) else True
1✔
88
        return c1 == c2
1✔
89

90
    def __eq__(self, other):
1✔
91
        if not isinstance(other, PreprocResource):
1✔
92
            return False
1✔
93
        return all(
1✔
94
            [
95
                self.logical_id == other.logical_id,
96
                self._compare_conditions(self.condition, other.condition),
97
                self.resource_type == other.resource_type,
98
                self.properties == other.properties,
99
            ]
100
        )
101

102

103
class PreprocOutput:
1✔
104
    name: str
1✔
105
    value: Any
1✔
106
    export: Optional[Any]
1✔
107
    condition: Optional[bool]
1✔
108

109
    def __init__(self, name: str, value: Any, export: Optional[Any], condition: Optional[bool]):
1✔
110
        self.name = name
1✔
111
        self.value = value
1✔
112
        self.export = export
1✔
113
        self.condition = condition
1✔
114

115
    def __eq__(self, other):
1✔
116
        if not isinstance(other, PreprocOutput):
×
117
            return False
×
118
        return all(
×
119
            [
120
                self.name == other.name,
121
                self.value == other.value,
122
                self.export == other.export,
123
                self.condition == other.condition,
124
            ]
125
        )
126

127

128
class ChangeSetModelPreproc(ChangeSetModelVisitor):
1✔
129
    _node_template: Final[NodeTemplate]
1✔
130
    _before_resolved_resources: Final[dict]
1✔
131
    _processed: dict[Scope, Any]
1✔
132

133
    def __init__(self, node_template: NodeTemplate, before_resolved_resources: dict):
1✔
134
        self._node_template = node_template
1✔
135
        self._before_resolved_resources = before_resolved_resources
1✔
136
        self._processed = dict()
1✔
137

138
    def process(self) -> None:
1✔
139
        self._processed.clear()
1✔
140
        self.visit(self._node_template)
1✔
141

142
    def _get_node_resource_for(
1✔
143
        self, resource_name: str, node_template: NodeTemplate
144
    ) -> NodeResource:
145
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
146
        for node_resource in node_template.resources.resources:
1✔
147
            if node_resource.name == resource_name:
1✔
148
                return node_resource
1✔
149
        raise RuntimeError(f"No resource '{resource_name}' was found")
×
150

151
    def _get_node_property_for(
1✔
152
        self, property_name: str, node_resource: NodeResource
153
    ) -> Optional[NodeProperty]:
154
        # TODO: this could be improved with hashmap lookups if the Node contained bindings and not lists.
155
        for node_property in node_resource.properties.properties:
1✔
156
            if node_property.name == property_name:
1✔
157
                return node_property
1✔
158
        return None
1✔
159

160
    @staticmethod
1✔
161
    def _deployed_property_value_of(
1✔
162
        resource_logical_id: str, property_name: str, resolved_resources: dict
163
    ) -> Any:
164
        # TODO: typing around resolved resources is needed and should be reflected here.
165
        resolved_resource = resolved_resources.get(resource_logical_id)
1✔
166
        if resolved_resource is None:
1✔
167
            raise RuntimeError(
×
168
                f"No deployed instances of resource '{resource_logical_id}' were found"
169
            )
170
        properties = resolved_resource.get("Properties", dict())
1✔
171
        property_value: Optional[Any] = properties.get(property_name)
1✔
172
        if property_value is None:
1✔
173
            raise RuntimeError(
×
174
                f"No '{property_name}' found for deployed resource '{resource_logical_id}' was found"
175
            )
176
        return property_value
1✔
177

178
    def _before_deployed_property_value_of(
1✔
179
        self, resource_logical_id: str, property_name: str
180
    ) -> Any:
181
        return self._deployed_property_value_of(
×
182
            resource_logical_id=resource_logical_id,
183
            property_name=property_name,
184
            resolved_resources=self._before_resolved_resources,
185
        )
186

187
    def _after_deployed_property_value_of(
1✔
188
        self, resource_logical_id: str, property_name: str
189
    ) -> Optional[str]:
190
        return self._before_deployed_property_value_of(
×
191
            resource_logical_id=resource_logical_id, property_name=property_name
192
        )
193

194
    def _get_node_mapping(self, map_name: str) -> NodeMapping:
1✔
195
        mappings: list[NodeMapping] = self._node_template.mappings.mappings
1✔
196
        # TODO: another scenarios suggesting property lookups might be preferable.
197
        for mapping in mappings:
1✔
198
            if mapping.name == map_name:
1✔
199
                return mapping
1✔
200
        # TODO
201
        raise RuntimeError()
×
202

203
    def _get_node_parameter_if_exists(self, parameter_name: str) -> Optional[NodeParameter]:
1✔
204
        parameters: list[NodeParameter] = self._node_template.parameters.parameters
1✔
205
        # TODO: another scenarios suggesting property lookups might be preferable.
206
        for parameter in parameters:
1✔
207
            if parameter.name == parameter_name:
1✔
208
                return parameter
1✔
209
        return None
1✔
210

211
    def _get_node_condition_if_exists(self, condition_name: str) -> Optional[NodeCondition]:
1✔
212
        conditions: list[NodeCondition] = self._node_template.conditions.conditions
1✔
213
        # TODO: another scenarios suggesting property lookups might be preferable.
214
        for condition in conditions:
1✔
215
            if condition.name == condition_name:
1✔
216
                return condition
1✔
217
        return None
×
218

219
    def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
1✔
220
        node_condition = self._get_node_condition_if_exists(condition_name=logical_id)
1✔
221
        if isinstance(node_condition, NodeCondition):
1✔
222
            condition_delta = self.visit(node_condition)
1✔
223
            return condition_delta
1✔
224
        raise RuntimeError(f"No condition '{logical_id}' was found.")
×
225

226
    def _resolve_reference(self, logical_id: str) -> PreprocEntityDelta:
1✔
227
        node_parameter = self._get_node_parameter_if_exists(parameter_name=logical_id)
1✔
228
        if isinstance(node_parameter, NodeParameter):
1✔
229
            parameter_delta = self.visit(node_parameter)
1✔
230
            return parameter_delta
1✔
231

232
        node_resource = self._get_node_resource_for(
1✔
233
            resource_name=logical_id, node_template=self._node_template
234
        )
235
        resource_delta = self.visit(node_resource)
1✔
236
        before = resource_delta.before
1✔
237
        after = resource_delta.after
1✔
238
        return PreprocEntityDelta(before=before, after=after)
1✔
239

240
    def _resolve_mapping(
1✔
241
        self, map_name: str, top_level_key: str, second_level_key
242
    ) -> PreprocEntityDelta:
243
        # TODO: add support for nested intrinsic functions, and KNOWN AFTER APPLY logical ids.
244
        node_mapping: NodeMapping = self._get_node_mapping(map_name=map_name)
1✔
245
        top_level_value = node_mapping.bindings.bindings.get(top_level_key)
1✔
246
        if not isinstance(top_level_value, NodeObject):
1✔
247
            raise RuntimeError()
×
248
        second_level_value = top_level_value.bindings.get(second_level_key)
1✔
249
        mapping_value_delta = self.visit(second_level_value)
1✔
250
        return mapping_value_delta
1✔
251

252
    def visit(self, change_set_entity: ChangeSetEntity) -> PreprocEntityDelta:
1✔
253
        delta = self._processed.get(change_set_entity.scope)
1✔
254
        if delta is not None:
1✔
255
            return delta
1✔
256
        delta = super().visit(change_set_entity=change_set_entity)
1✔
257
        self._processed[change_set_entity.scope] = delta
1✔
258
        return delta
1✔
259

260
    def visit_terminal_value_modified(
1✔
261
        self, terminal_value_modified: TerminalValueModified
262
    ) -> PreprocEntityDelta:
263
        return PreprocEntityDelta(
1✔
264
            before=terminal_value_modified.value,
265
            after=terminal_value_modified.modified_value,
266
        )
267

268
    def visit_terminal_value_created(
1✔
269
        self, terminal_value_created: TerminalValueCreated
270
    ) -> PreprocEntityDelta:
271
        return PreprocEntityDelta(after=terminal_value_created.value)
1✔
272

273
    def visit_terminal_value_removed(
1✔
274
        self, terminal_value_removed: TerminalValueRemoved
275
    ) -> PreprocEntityDelta:
276
        return PreprocEntityDelta(before=terminal_value_removed.value)
1✔
277

278
    def visit_terminal_value_unchanged(
1✔
279
        self, terminal_value_unchanged: TerminalValueUnchanged
280
    ) -> PreprocEntityDelta:
281
        return PreprocEntityDelta(
1✔
282
            before=terminal_value_unchanged.value,
283
            after=terminal_value_unchanged.value,
284
        )
285

286
    def visit_node_divergence(self, node_divergence: NodeDivergence) -> PreprocEntityDelta:
1✔
287
        before_delta = self.visit(node_divergence.value)
×
288
        after_delta = self.visit(node_divergence.divergence)
×
289
        return PreprocEntityDelta(before=before_delta.before, after=after_delta.after)
×
290

291
    def visit_node_object(self, node_object: NodeObject) -> PreprocEntityDelta:
1✔
292
        before = dict()
1✔
293
        after = dict()
1✔
294
        for name, change_set_entity in node_object.bindings.items():
1✔
295
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
296
            match change_set_entity.change_type:
1✔
297
                case ChangeType.MODIFIED:
1✔
298
                    before[name] = delta.before
1✔
299
                    after[name] = delta.after
1✔
300
                case ChangeType.CREATED:
1✔
301
                    after[name] = delta.after
1✔
302
                case ChangeType.REMOVED:
1✔
303
                    before[name] = delta.before
1✔
304
                case ChangeType.UNCHANGED:
1✔
305
                    before[name] = delta.before
1✔
306
                    after[name] = delta.before
1✔
307
        return PreprocEntityDelta(before=before, after=after)
1✔
308

309
    def visit_node_intrinsic_function_fn_get_att(
1✔
310
        self, node_intrinsic_function: NodeIntrinsicFunction
311
    ) -> PreprocEntityDelta:
312
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
313
        # TODO: validate the return value according to the spec.
314
        before_argument_list = arguments_delta.before
1✔
315
        after_argument_list = arguments_delta.after
1✔
316

317
        before = None
1✔
318
        if before_argument_list:
1✔
319
            before_logical_name_of_resource = before_argument_list[0]
1✔
320
            before_attribute_name = before_argument_list[1]
1✔
321

322
            before_node_resource = self._get_node_resource_for(
1✔
323
                resource_name=before_logical_name_of_resource, node_template=self._node_template
324
            )
325
            before_node_property: Optional[NodeProperty] = self._get_node_property_for(
1✔
326
                property_name=before_attribute_name, node_resource=before_node_resource
327
            )
328
            if before_node_property is not None:
1✔
329
                # The property is statically defined in the template and its value can be computed.
330
                before_property_delta = self.visit(before_node_property)
1✔
331
                before = before_property_delta.before
1✔
332
            else:
333
                # The property is not statically defined and must therefore be available in
334
                # the properties deployed set.
335
                before = self._before_deployed_property_value_of(
×
336
                    resource_logical_id=before_logical_name_of_resource,
337
                    property_name=before_attribute_name,
338
                )
339

340
        after = None
1✔
341
        if after_argument_list:
1✔
342
            after_logical_name_of_resource = after_argument_list[0]
1✔
343
            after_attribute_name = after_argument_list[1]
1✔
344
            after_node_resource = self._get_node_resource_for(
1✔
345
                resource_name=after_logical_name_of_resource, node_template=self._node_template
346
            )
347
            after_node_property = self._get_node_property_for(
1✔
348
                property_name=after_attribute_name, node_resource=after_node_resource
349
            )
350
            if after_node_property is not None:
1✔
351
                # The property is statically defined in the template and its value can be computed.
352
                after_property_delta = self.visit(after_node_property)
1✔
353
                after = after_property_delta.after
1✔
354
            else:
355
                # The property is not statically defined and must therefore be available in
356
                # the properties deployed set.
357
                after = self._after_deployed_property_value_of(
1✔
358
                    resource_logical_id=after_logical_name_of_resource,
359
                    property_name=after_attribute_name,
360
                )
361

362
        return PreprocEntityDelta(before=before, after=after)
1✔
363

364
    def visit_node_intrinsic_function_fn_equals(
1✔
365
        self, node_intrinsic_function: NodeIntrinsicFunction
366
    ) -> PreprocEntityDelta:
367
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
368
        before_values = arguments_delta.before
1✔
369
        after_values = arguments_delta.after
1✔
370
        before = None
1✔
371
        if before_values:
1✔
372
            before = before_values[0] == before_values[1]
1✔
373
        after = None
1✔
374
        if after_values:
1✔
375
            after = after_values[0] == after_values[1]
1✔
376
        return PreprocEntityDelta(before=before, after=after)
1✔
377

378
    def visit_node_intrinsic_function_fn_if(
1✔
379
        self, node_intrinsic_function: NodeIntrinsicFunction
380
    ) -> PreprocEntityDelta:
381
        arguments_delta = self.visit(node_intrinsic_function.arguments)
×
382

383
        def _compute_delta_for_if_statement(args: list[Any]) -> PreprocEntityDelta:
×
384
            condition_name = args[0]
×
385
            boolean_expression_delta = self._resolve_condition(logical_id=condition_name)
×
386
            return PreprocEntityDelta(
×
387
                before=args[1] if boolean_expression_delta.before else args[2],
388
                after=args[1] if boolean_expression_delta.after else args[2],
389
            )
390

391
        # TODO: add support for this being created or removed.
392
        before_outcome_delta = _compute_delta_for_if_statement(arguments_delta.before)
×
393
        before = before_outcome_delta.before
×
394
        after_outcome_delta = _compute_delta_for_if_statement(arguments_delta.after)
×
395
        after = after_outcome_delta.after
×
396
        return PreprocEntityDelta(before=before, after=after)
×
397

398
    def visit_node_intrinsic_function_fn_not(
1✔
399
        self, node_intrinsic_function: NodeIntrinsicFunction
400
    ) -> PreprocEntityDelta:
401
        arguments_delta = self.visit(node_intrinsic_function.arguments)
×
402
        before_condition = arguments_delta.before
×
403
        after_condition = arguments_delta.after
×
404
        if before_condition:
×
405
            before_condition_outcome = before_condition[0]
×
406
            before = not before_condition_outcome
×
407
        else:
408
            before = None
×
409

410
        if after_condition:
×
411
            after_condition_outcome = after_condition[0]
×
412
            after = not after_condition_outcome
×
413
        else:
414
            after = None
×
415
        # Implicit change type computation.
416
        return PreprocEntityDelta(before=before, after=after)
×
417

418
    def visit_node_intrinsic_function_fn_join(
1✔
419
        self, node_intrinsic_function: NodeIntrinsicFunction
420
    ) -> PreprocEntityDelta:
421
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
422
        arguments_before = arguments_delta.before
1✔
423
        arguments_after = arguments_delta.after
1✔
424

425
        def _compute_join(args: list[Any]) -> str:
1✔
426
            # TODO: add support for schema validation.
427
            # TODO: add tests for joining non string values.
428
            delimiter: str = str(args[0])
1✔
429
            values: list[Any] = args[1]
1✔
430
            if not isinstance(values, list):
1✔
431
                raise RuntimeError("Invalid arguments list definition for Fn::Join")
×
432
            join_result = delimiter.join(map(str, values))
1✔
433
            return join_result
1✔
434

435
        before = None
1✔
436
        if isinstance(arguments_before, list) and len(arguments_before) == 2:
1✔
437
            before = _compute_join(arguments_before)
1✔
438
        after = None
1✔
439
        if isinstance(arguments_after, list) and len(arguments_after) == 2:
1✔
440
            after = _compute_join(arguments_after)
1✔
441
        return PreprocEntityDelta(before=before, after=after)
1✔
442

443
    def visit_node_intrinsic_function_fn_find_in_map(
1✔
444
        self, node_intrinsic_function: NodeIntrinsicFunction
445
    ) -> PreprocEntityDelta:
446
        # TODO: add type checking/validation for result unit?
447
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
448
        before_arguments = arguments_delta.before
1✔
449
        after_arguments = arguments_delta.after
1✔
450
        if before_arguments:
1✔
451
            before_value_delta = self._resolve_mapping(*before_arguments)
1✔
452
            before = before_value_delta.before
1✔
453
        else:
454
            before = None
1✔
455
        if after_arguments:
1✔
456
            after_value_delta = self._resolve_mapping(*after_arguments)
1✔
457
            after = after_value_delta.after
1✔
458
        else:
459
            after = None
×
460
        return PreprocEntityDelta(before=before, after=after)
1✔
461

462
    def visit_node_mapping(self, node_mapping: NodeMapping) -> PreprocEntityDelta:
1✔
463
        bindings_delta = self.visit(node_mapping.bindings)
1✔
464
        return bindings_delta
1✔
465

466
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
467
        dynamic_value = node_parameter.dynamic_value
1✔
468
        dynamic_delta = self.visit(dynamic_value)
1✔
469

470
        default_value = node_parameter.default_value
1✔
471
        default_delta = self.visit(default_value)
1✔
472

473
        before = dynamic_delta.before or default_delta.before
1✔
474
        after = dynamic_delta.after or default_delta.after
1✔
475

476
        return PreprocEntityDelta(before=before, after=after)
1✔
477

478
    def visit_node_condition(self, node_condition: NodeCondition) -> PreprocEntityDelta:
1✔
479
        delta = self.visit(node_condition.body)
1✔
480
        return delta
1✔
481

482
    def _resource_physical_resource_id_from(
1✔
483
        self, logical_resource_id: str, resolved_resources: dict
484
    ) -> str:
485
        # TODO: typing around resolved resources is needed and should be reflected here.
486
        resolved_resource = resolved_resources.get(logical_resource_id, dict())
1✔
487
        physical_resource_id: Optional[str] = resolved_resource.get("PhysicalResourceId")
1✔
488
        if not isinstance(physical_resource_id, str):
1✔
489
            raise RuntimeError(f"No PhysicalResourceId found for resource '{logical_resource_id}'")
1✔
490
        return physical_resource_id
1✔
491

492
    def _before_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
493
        # TODO: typing around resolved resources is needed and should be reflected here.
494
        return self._resource_physical_resource_id_from(
1✔
495
            logical_resource_id=resource_logical_id,
496
            resolved_resources=self._before_resolved_resources,
497
        )
498

499
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
500
        return self._before_resource_physical_id(resource_logical_id=resource_logical_id)
1✔
501

502
    def visit_node_intrinsic_function_ref(
1✔
503
        self, node_intrinsic_function: NodeIntrinsicFunction
504
    ) -> PreprocEntityDelta:
505
        arguments_delta = self.visit(node_intrinsic_function.arguments)
1✔
506
        before_logical_id = arguments_delta.before
1✔
507
        after_logical_id = arguments_delta.after
1✔
508

509
        # TODO: extend this to support references to other types.
510
        before = None
1✔
511
        if before_logical_id is not None:
1✔
512
            before_delta = self._resolve_reference(logical_id=before_logical_id)
1✔
513
            before = before_delta.before
1✔
514
            if isinstance(before, PreprocResource):
1✔
515
                before = before.physical_resource_id
1✔
516

517
        after = None
1✔
518
        if after_logical_id is not None:
1✔
519
            after_delta = self._resolve_reference(logical_id=after_logical_id)
1✔
520
            after = after_delta.after
1✔
521
            if isinstance(after, PreprocResource):
1✔
522
                after = after.physical_resource_id
1✔
523

524
        return PreprocEntityDelta(before=before, after=after)
1✔
525

526
    def visit_node_array(self, node_array: NodeArray) -> PreprocEntityDelta:
1✔
527
        before = list()
1✔
528
        after = list()
1✔
529
        for change_set_entity in node_array.array:
1✔
530
            delta: PreprocEntityDelta = self.visit(change_set_entity=change_set_entity)
1✔
531
            if delta.before is not None:
1✔
532
                before.append(delta.before)
1✔
533
            if delta.after is not None:
1✔
534
                after.append(delta.after)
1✔
535
        return PreprocEntityDelta(before=before, after=after)
1✔
536

537
    def visit_node_property(self, node_property: NodeProperty) -> PreprocEntityDelta:
1✔
538
        return self.visit(node_property.value)
1✔
539

540
    def visit_node_properties(
1✔
541
        self, node_properties: NodeProperties
542
    ) -> PreprocEntityDelta[PreprocProperties, PreprocProperties]:
543
        before_bindings: dict[str, Any] = dict()
1✔
544
        after_bindings: dict[str, Any] = dict()
1✔
545
        for node_property in node_properties.properties:
1✔
546
            delta = self.visit(node_property)
1✔
547
            property_name = node_property.name
1✔
548
            if node_property.change_type != ChangeType.CREATED:
1✔
549
                before_bindings[property_name] = delta.before
1✔
550
            if node_property.change_type != ChangeType.REMOVED:
1✔
551
                after_bindings[property_name] = delta.after
1✔
552
        before = PreprocProperties(properties=before_bindings)
1✔
553
        after = PreprocProperties(properties=after_bindings)
1✔
554
        return PreprocEntityDelta(before=before, after=after)
1✔
555

556
    def _resolve_resource_condition_reference(self, reference: TerminalValue) -> PreprocEntityDelta:
1✔
557
        reference_delta = self.visit(reference)
1✔
558
        before_reference = reference_delta.before
1✔
559
        before = None
1✔
560
        if before_reference is not None:
1✔
561
            before_delta = self._resolve_condition(logical_id=before_reference)
1✔
562
            before = before_delta.before
1✔
563
        after = None
1✔
564
        after_reference = reference_delta.after
1✔
565
        if after_reference is not None:
1✔
566
            after_delta = self._resolve_condition(logical_id=after_reference)
1✔
567
            after = after_delta.after
1✔
568
        return PreprocEntityDelta(before=before, after=after)
1✔
569

570
    def visit_node_resource(
1✔
571
        self, node_resource: NodeResource
572
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
573
        change_type = node_resource.change_type
1✔
574
        condition_before = None
1✔
575
        condition_after = None
1✔
576
        if node_resource.condition_reference is not None:
1✔
577
            condition_delta = self._resolve_resource_condition_reference(
1✔
578
                node_resource.condition_reference
579
            )
580
            condition_before = condition_delta.before
1✔
581
            condition_after = condition_delta.after
1✔
582

583
        type_delta = self.visit(node_resource.type_)
1✔
584
        properties_delta: PreprocEntityDelta[PreprocProperties, PreprocProperties] = self.visit(
1✔
585
            node_resource.properties
586
        )
587

588
        before = None
1✔
589
        after = None
1✔
590
        if change_type != ChangeType.CREATED and condition_before is None or condition_before:
1✔
591
            logical_resource_id = node_resource.name
1✔
592
            before_physical_resource_id = self._before_resource_physical_id(
1✔
593
                resource_logical_id=logical_resource_id
594
            )
595
            before = PreprocResource(
1✔
596
                logical_id=logical_resource_id,
597
                physical_resource_id=before_physical_resource_id,
598
                condition=condition_before,
599
                resource_type=type_delta.before,
600
                properties=properties_delta.before,
601
            )
602
        if change_type != ChangeType.REMOVED and condition_after is None or condition_after:
1✔
603
            logical_resource_id = node_resource.name
1✔
604
            try:
1✔
605
                after_physical_resource_id = self._after_resource_physical_id(
1✔
606
                    resource_logical_id=logical_resource_id
607
                )
608
            except RuntimeError:
1✔
609
                after_physical_resource_id = None
1✔
610
            after = PreprocResource(
1✔
611
                logical_id=logical_resource_id,
612
                physical_resource_id=after_physical_resource_id,
613
                condition=condition_after,
614
                resource_type=type_delta.after,
615
                properties=properties_delta.after,
616
            )
617
        return PreprocEntityDelta(before=before, after=after)
1✔
618

619
    def visit_node_output(
1✔
620
        self, node_output: NodeOutput
621
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
622
        change_type = node_output.change_type
1✔
623
        value_delta = self.visit(node_output.value)
1✔
624

625
        condition_delta = None
1✔
626
        if node_output.condition_reference is not None:
1✔
627
            condition_delta = self._resolve_resource_condition_reference(
×
628
                node_output.condition_reference
629
            )
630
            condition_before = condition_delta.before
×
631
            condition_after = condition_delta.after
×
632
            if not condition_before and condition_after:
×
633
                change_type = ChangeType.CREATED
×
634
            elif condition_before and not condition_after:
×
635
                change_type = ChangeType.REMOVED
×
636

637
        export_delta = None
1✔
638
        if node_output.export is not None:
1✔
639
            export_delta = self.visit(node_output.export)
×
640

641
        before: Optional[PreprocOutput] = None
1✔
642
        if change_type != ChangeType.CREATED:
1✔
643
            before = PreprocOutput(
×
644
                name=node_output.name,
645
                value=value_delta.before,
646
                export=export_delta.before if export_delta else None,
647
                condition=condition_delta.before if condition_delta else None,
648
            )
649
        after: Optional[PreprocOutput] = None
1✔
650
        if change_type != ChangeType.REMOVED:
1✔
651
            after = PreprocOutput(
1✔
652
                name=node_output.name,
653
                value=value_delta.after,
654
                export=export_delta.after if export_delta else None,
655
                condition=condition_delta.after if condition_delta else None,
656
            )
657
        return PreprocEntityDelta(before=before, after=after)
1✔
658

659
    def visit_node_outputs(
1✔
660
        self, node_outputs: NodeOutputs
661
    ) -> PreprocEntityDelta[list[PreprocOutput], list[PreprocOutput]]:
662
        before: list[PreprocOutput] = list()
1✔
663
        after: list[PreprocOutput] = list()
1✔
664
        for node_output in node_outputs.outputs:
1✔
665
            output_delta: PreprocEntityDelta[PreprocOutput, PreprocOutput] = self.visit(node_output)
1✔
666
            output_before = output_delta.before
1✔
667
            output_after = output_delta.after
1✔
668
            if output_before:
1✔
669
                before.append(output_before)
×
670
            if output_after:
1✔
671
                after.append(output_after)
1✔
672
        return PreprocEntityDelta(before=before, after=after)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc