• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d8dc9956-71ea-40c6-95cb-26b2b584943a

08 May 2025 05:15PM UTC coverage: 86.66% (+0.1%) from 86.535%
d8dc9956-71ea-40c6-95cb-26b2b584943a

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64346 of 74251 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.03
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from itertools import zip_longest
1✔
6
from typing import Any, Final, Generator, Optional, Union, cast
1✔
7

8
from typing_extensions import TypeVar
1✔
9

10
from localstack.utils.strings import camel_to_snake_case
1✔
11

12
T = TypeVar("T")
1✔
13

14

15
class NothingType:
1✔
16
    """A sentinel that denotes 'no value' (distinct from None)."""
17

18
    _singleton = None
1✔
19
    __slots__ = ()
1✔
20

21
    def __new__(cls):
1✔
22
        if cls._singleton is None:
1✔
23
            cls._singleton = super().__new__(cls)
1✔
24
        return cls._singleton
1✔
25

26
    def __str__(self):
1✔
27
        return repr(self)
×
28

29
    def __repr__(self) -> str:
30
        return "Nothing"
31

32
    def __bool__(self):
1✔
33
        return False
1✔
34

35
    def __iter__(self):
1✔
36
        return iter(())
1✔
37

38

39
Maybe = Union[T, NothingType]
1✔
40
Nothing = NothingType()
1✔
41

42

43
class Scope(str):
1✔
44
    _ROOT_SCOPE: Final[str] = str()
1✔
45
    _SEPARATOR: Final[str] = "/"
1✔
46

47
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
48
        return cast(Scope, super().__new__(cls, scope))
1✔
49

50
    def open_scope(self, name: Scope | str) -> Scope:
1✔
51
        return Scope(self._SEPARATOR.join([self, name]))
1✔
52

53
    def open_index(self, index: int) -> Scope:
1✔
54
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
55

56
    def unwrap(self) -> list[str]:
1✔
57
        return self.split(self._SEPARATOR)
×
58

59

60
class ChangeType(enum.Enum):
1✔
61
    UNCHANGED = "Unchanged"
1✔
62
    CREATED = "Created"
1✔
63
    MODIFIED = "Modified"
1✔
64
    REMOVED = "Removed"
1✔
65

66
    def __str__(self):
1✔
67
        return self.value
×
68

69
    def for_child(self, child_change_type: ChangeType) -> ChangeType:
1✔
70
        if child_change_type == self:
1✔
71
            return self
1✔
72
        elif self == ChangeType.UNCHANGED:
1✔
73
            return child_change_type
1✔
74
        else:
75
            return ChangeType.MODIFIED
1✔
76

77

78
class ChangeSetEntity(abc.ABC):
1✔
79
    scope: Final[Scope]
1✔
80
    change_type: Final[ChangeType]
1✔
81

82
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
83
        self.scope = scope
1✔
84
        self.change_type = change_type
1✔
85

86
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
87
        for child in self.__dict__.values():
1✔
88
            yield from self._get_children_in(child)
1✔
89

90
    @staticmethod
1✔
91
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
92
        # TODO: could avoid the inductive logic here, and check for loops?
93
        if isinstance(obj, ChangeSetEntity):
1✔
94
            yield obj
1✔
95
        elif isinstance(obj, list):
1✔
96
            for item in obj:
1✔
97
                yield from ChangeSetEntity._get_children_in(item)
1✔
98
        elif isinstance(obj, dict):
1✔
99
            for item in obj.values():
×
100
                yield from ChangeSetEntity._get_children_in(item)
×
101

102
    def __str__(self):
1✔
103
        return f"({self.__class__.__name__}| {vars(self)}"
×
104

105
    def __repr__(self):
106
        return str(self)
107

108

109
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
110

111

112
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
113

114

115
class NodeTemplate(ChangeSetNode):
1✔
116
    mappings: Final[NodeMappings]
1✔
117
    parameters: Final[NodeParameters]
1✔
118
    conditions: Final[NodeConditions]
1✔
119
    resources: Final[NodeResources]
1✔
120
    outputs: Final[NodeOutputs]
1✔
121

122
    def __init__(
1✔
123
        self,
124
        scope: Scope,
125
        change_type: ChangeType,
126
        mappings: NodeMappings,
127
        parameters: NodeParameters,
128
        conditions: NodeConditions,
129
        resources: NodeResources,
130
        outputs: NodeOutputs,
131
    ):
132
        super().__init__(scope=scope, change_type=change_type)
1✔
133
        self.mappings = mappings
1✔
134
        self.parameters = parameters
1✔
135
        self.conditions = conditions
1✔
136
        self.resources = resources
1✔
137
        self.outputs = outputs
1✔
138

139

140
class NodeDivergence(ChangeSetNode):
1✔
141
    value: Final[ChangeSetEntity]
1✔
142
    divergence: Final[ChangeSetEntity]
1✔
143

144
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
145
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
×
146
        self.value = value
×
147
        self.divergence = divergence
×
148

149

150
class NodeParameter(ChangeSetNode):
1✔
151
    name: Final[str]
1✔
152
    type_: Final[ChangeSetEntity]
1✔
153
    dynamic_value: Final[ChangeSetEntity]
1✔
154
    default_value: Final[Optional[ChangeSetEntity]]
1✔
155

156
    def __init__(
1✔
157
        self,
158
        scope: Scope,
159
        change_type: ChangeType,
160
        name: str,
161
        type_: ChangeSetEntity,
162
        dynamic_value: ChangeSetEntity,
163
        default_value: Optional[ChangeSetEntity],
164
    ):
165
        super().__init__(scope=scope, change_type=change_type)
1✔
166
        self.name = name
1✔
167
        self.type_ = type_
1✔
168
        self.dynamic_value = dynamic_value
1✔
169
        self.default_value = default_value
1✔
170

171

172
class NodeParameters(ChangeSetNode):
1✔
173
    parameters: Final[list[NodeParameter]]
1✔
174

175
    def __init__(self, scope: Scope, change_type: ChangeType, parameters: list[NodeParameter]):
1✔
176
        super().__init__(scope=scope, change_type=change_type)
1✔
177
        self.parameters = parameters
1✔
178

179

180
class NodeMapping(ChangeSetNode):
1✔
181
    name: Final[str]
1✔
182
    bindings: Final[NodeObject]
1✔
183

184
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, bindings: NodeObject):
1✔
185
        super().__init__(scope=scope, change_type=change_type)
1✔
186
        self.name = name
1✔
187
        self.bindings = bindings
1✔
188

189

190
class NodeMappings(ChangeSetNode):
1✔
191
    mappings: Final[list[NodeMapping]]
1✔
192

193
    def __init__(self, scope: Scope, change_type: ChangeType, mappings: list[NodeMapping]):
1✔
194
        super().__init__(scope=scope, change_type=change_type)
1✔
195
        self.mappings = mappings
1✔
196

197

198
class NodeOutput(ChangeSetNode):
1✔
199
    name: Final[str]
1✔
200
    value: Final[ChangeSetEntity]
1✔
201
    export: Final[Optional[ChangeSetEntity]]
1✔
202
    condition_reference: Final[Optional[TerminalValue]]
1✔
203

204
    def __init__(
1✔
205
        self,
206
        scope: Scope,
207
        change_type: ChangeType,
208
        name: str,
209
        value: ChangeSetEntity,
210
        export: Optional[ChangeSetEntity],
211
        conditional_reference: Optional[TerminalValue],
212
    ):
213
        super().__init__(scope=scope, change_type=change_type)
1✔
214
        self.name = name
1✔
215
        self.value = value
1✔
216
        self.export = export
1✔
217
        self.condition_reference = conditional_reference
1✔
218

219

220
class NodeOutputs(ChangeSetNode):
1✔
221
    outputs: Final[list[NodeOutput]]
1✔
222

223
    def __init__(self, scope: Scope, change_type: ChangeType, outputs: list[NodeOutput]):
1✔
224
        super().__init__(scope=scope, change_type=change_type)
1✔
225
        self.outputs = outputs
1✔
226

227

228
class NodeCondition(ChangeSetNode):
1✔
229
    name: Final[str]
1✔
230
    body: Final[ChangeSetEntity]
1✔
231

232
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, body: ChangeSetEntity):
1✔
233
        super().__init__(scope=scope, change_type=change_type)
1✔
234
        self.name = name
1✔
235
        self.body = body
1✔
236

237

238
class NodeConditions(ChangeSetNode):
1✔
239
    conditions: Final[list[NodeCondition]]
1✔
240

241
    def __init__(self, scope: Scope, change_type: ChangeType, conditions: list[NodeCondition]):
1✔
242
        super().__init__(scope=scope, change_type=change_type)
1✔
243
        self.conditions = conditions
1✔
244

245

246
class NodeResources(ChangeSetNode):
1✔
247
    resources: Final[list[NodeResource]]
1✔
248

249
    def __init__(self, scope: Scope, change_type: ChangeType, resources: list[NodeResource]):
1✔
250
        super().__init__(scope=scope, change_type=change_type)
1✔
251
        self.resources = resources
1✔
252

253

254
class NodeResource(ChangeSetNode):
1✔
255
    name: Final[str]
1✔
256
    type_: Final[ChangeSetTerminal]
1✔
257
    condition_reference: Final[Optional[TerminalValue]]
1✔
258
    properties: Final[NodeProperties]
1✔
259

260
    def __init__(
1✔
261
        self,
262
        scope: Scope,
263
        change_type: ChangeType,
264
        name: str,
265
        type_: ChangeSetTerminal,
266
        condition_reference: TerminalValue,
267
        properties: NodeProperties,
268
    ):
269
        super().__init__(scope=scope, change_type=change_type)
1✔
270
        self.name = name
1✔
271
        self.type_ = type_
1✔
272
        self.condition_reference = condition_reference
1✔
273
        self.properties = properties
1✔
274

275

276
class NodeProperties(ChangeSetNode):
1✔
277
    properties: Final[list[NodeProperty]]
1✔
278

279
    def __init__(self, scope: Scope, change_type: ChangeType, properties: list[NodeProperty]):
1✔
280
        super().__init__(scope=scope, change_type=change_type)
1✔
281
        self.properties = properties
1✔
282

283

284
class NodeProperty(ChangeSetNode):
1✔
285
    name: Final[str]
1✔
286
    value: Final[ChangeSetEntity]
1✔
287

288
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, value: ChangeSetEntity):
1✔
289
        super().__init__(scope=scope, change_type=change_type)
1✔
290
        self.name = name
1✔
291
        self.value = value
1✔
292

293

294
class NodeIntrinsicFunction(ChangeSetNode):
1✔
295
    intrinsic_function: Final[str]
1✔
296
    arguments: Final[ChangeSetEntity]
1✔
297

298
    def __init__(
1✔
299
        self,
300
        scope: Scope,
301
        change_type: ChangeType,
302
        intrinsic_function: str,
303
        arguments: ChangeSetEntity,
304
    ):
305
        super().__init__(scope=scope, change_type=change_type)
1✔
306
        self.intrinsic_function = intrinsic_function
1✔
307
        self.arguments = arguments
1✔
308

309

310
class NodeObject(ChangeSetNode):
1✔
311
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
312

313
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
314
        super().__init__(scope=scope, change_type=change_type)
1✔
315
        self.bindings = bindings
1✔
316

317

318
class NodeArray(ChangeSetNode):
1✔
319
    array: Final[list[ChangeSetEntity]]
1✔
320

321
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
322
        super().__init__(scope=scope, change_type=change_type)
1✔
323
        self.array = array
1✔
324

325

326
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
327
    value: Final[Any]
1✔
328

329
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
330
        super().__init__(scope=scope, change_type=change_type)
1✔
331
        self.value = value
1✔
332

333

334
class TerminalValueModified(TerminalValue):
1✔
335
    modified_value: Final[Any]
1✔
336

337
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
338
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
339
        self.modified_value = modified_value
1✔
340

341

342
class TerminalValueCreated(TerminalValue):
1✔
343
    def __init__(self, scope: Scope, value: Any):
1✔
344
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
345

346

347
class TerminalValueRemoved(TerminalValue):
1✔
348
    def __init__(self, scope: Scope, value: Any):
1✔
349
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
350

351

352
class TerminalValueUnchanged(TerminalValue):
1✔
353
    def __init__(self, scope: Scope, value: Any):
1✔
354
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
355

356

357
TypeKey: Final[str] = "Type"
1✔
358
ConditionKey: Final[str] = "Condition"
1✔
359
ConditionsKey: Final[str] = "Conditions"
1✔
360
MappingsKey: Final[str] = "Mappings"
1✔
361
ResourcesKey: Final[str] = "Resources"
1✔
362
PropertiesKey: Final[str] = "Properties"
1✔
363
ParametersKey: Final[str] = "Parameters"
1✔
364
DefaultKey: Final[str] = "Default"
1✔
365
ValueKey: Final[str] = "Value"
1✔
366
ExportKey: Final[str] = "Export"
1✔
367
OutputsKey: Final[str] = "Outputs"
1✔
368
# TODO: expand intrinsic functions set.
369
RefKey: Final[str] = "Ref"
1✔
370
FnIfKey: Final[str] = "Fn::If"
1✔
371
FnNotKey: Final[str] = "Fn::Not"
1✔
372
FnJoinKey: Final[str] = "Fn::Join"
1✔
373
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
374
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
375
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
376
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
377
    RefKey,
378
    FnIfKey,
379
    FnNotKey,
380
    FnJoinKey,
381
    FnEqualsKey,
382
    FnGetAttKey,
383
    FnFindInMapKey,
384
}
385

386

387
class ChangeSetModel:
1✔
388
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
389

390
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
391

392
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
393
    #  such as intrinsic functions.
394

395
    _before_template: Final[Maybe[dict]]
1✔
396
    _after_template: Final[Maybe[dict]]
1✔
397
    _before_parameters: Final[Maybe[dict]]
1✔
398
    _after_parameters: Final[Maybe[dict]]
1✔
399
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
400
    _node_template: Final[NodeTemplate]
1✔
401

402
    def __init__(
1✔
403
        self,
404
        before_template: Optional[dict],
405
        after_template: Optional[dict],
406
        before_parameters: Optional[dict],
407
        after_parameters: Optional[dict],
408
    ):
409
        self._before_template = before_template or Nothing
1✔
410
        self._after_template = after_template or Nothing
1✔
411
        self._before_parameters = before_parameters or Nothing
1✔
412
        self._after_parameters = after_parameters or Nothing
1✔
413
        self._visited_scopes = dict()
1✔
414
        self._node_template = self._model(
1✔
415
            before_template=self._before_template, after_template=self._after_template
416
        )
417
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
418

419
    def get_update_model(self) -> NodeTemplate:
1✔
420
        # TODO: rethink naming of this for outer utils
421
        return self._node_template
1✔
422

423
    def _visit_terminal_value(
1✔
424
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
425
    ) -> TerminalValue:
426
        terminal_value = self._visited_scopes.get(scope)
1✔
427
        if isinstance(terminal_value, TerminalValue):
1✔
428
            return terminal_value
×
429
        if self._is_created(before=before_value, after=after_value):
1✔
430
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
431
        elif self._is_removed(before=before_value, after=after_value):
1✔
432
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
433
        elif before_value == after_value:
1✔
434
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
435
        else:
436
            terminal_value = TerminalValueModified(
1✔
437
                scope=scope, value=before_value, modified_value=after_value
438
            )
439
        self._visited_scopes[scope] = terminal_value
1✔
440
        return terminal_value
1✔
441

442
    def _visit_intrinsic_function(
1✔
443
        self,
444
        scope: Scope,
445
        intrinsic_function: str,
446
        before_arguments: Maybe[Any],
447
        after_arguments: Maybe[Any],
448
    ) -> NodeIntrinsicFunction:
449
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
450
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
451
            return node_intrinsic_function
×
452
        arguments = self._visit_value(
1✔
453
            scope=scope, before_value=before_arguments, after_value=after_arguments
454
        )
455
        if self._is_created(before=before_arguments, after=after_arguments):
1✔
456
            change_type = ChangeType.CREATED
1✔
457
        elif self._is_removed(before=before_arguments, after=after_arguments):
1✔
458
            change_type = ChangeType.REMOVED
1✔
459
        else:
460
            function_name = intrinsic_function.replace("::", "_")
1✔
461
            function_name = camel_to_snake_case(function_name)
1✔
462
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
463
            if hasattr(self, resolve_function_name):
1✔
464
                resolve_function = getattr(self, resolve_function_name)
1✔
465
                change_type = resolve_function(arguments)
1✔
466
            else:
467
                change_type = arguments.change_type
1✔
468
        node_intrinsic_function = NodeIntrinsicFunction(
1✔
469
            scope=scope,
470
            change_type=change_type,
471
            intrinsic_function=intrinsic_function,
472
            arguments=arguments,
473
        )
474
        self._visited_scopes[scope] = node_intrinsic_function
1✔
475
        return node_intrinsic_function
1✔
476

477
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
478
        # TODO: add support for nested intrinsic functions.
479
        # TODO: validate arguments structure and type.
480
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
481

482
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
483
            raise RuntimeError()
×
484
        logical_name_of_resource_entity = arguments.array[0]
1✔
485
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
486
            raise RuntimeError()
×
487
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
488
        if not isinstance(logical_name_of_resource, str):
1✔
489
            raise RuntimeError()
×
490
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
491
            resource_name=logical_name_of_resource
492
        )
493

494
        node_property_attribute_name = arguments.array[1]
1✔
495
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
496
            raise RuntimeError()
×
497
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
498
            attribute_name = node_property_attribute_name.modified_value
×
499
        else:
500
            attribute_name = node_property_attribute_name.value
1✔
501

502
        # TODO: this is another use case for which properties should be referenced by name
503
        for node_property in node_resource.properties.properties:
1✔
504
            if node_property.name == attribute_name:
1✔
505
                return node_property.change_type
1✔
506

507
        return ChangeType.UNCHANGED
×
508

509
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
510
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
511
            return arguments.change_type
×
512
        # TODO: add support for nested functions, here we assume the argument is a logicalID.
513
        if not isinstance(arguments, TerminalValue):
1✔
514
            return arguments.change_type
×
515

516
        logical_id = arguments.value
1✔
517

518
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
519
        if isinstance(node_condition, NodeCondition):
1✔
520
            return node_condition.change_type
×
521

522
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
523
        if isinstance(node_parameter, NodeParameter):
1✔
524
            return node_parameter.change_type
1✔
525

526
        # TODO: this should check the replacement flag for a resource update.
527
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
528
        return node_resource.change_type
1✔
529

530
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
531
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
532
            return arguments.change_type
1✔
533
        # TODO: validate arguments structure and type.
534
        # TODO: add support for nested functions, here we assume the arguments are string literals.
535

536
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
537
            raise RuntimeError()
×
538
        argument_mapping_name = arguments.array[0]
1✔
539
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
540
            raise NotImplementedError()
541
        argument_top_level_key = arguments.array[1]
1✔
542
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
543
            raise NotImplementedError()
544
        argument_second_level_key = arguments.array[2]
1✔
545
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
546
            raise NotImplementedError()
547
        mapping_name = argument_mapping_name.value
1✔
548
        top_level_key = argument_top_level_key.value
1✔
549
        second_level_key = argument_second_level_key.value
1✔
550

551
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
552
        # TODO: a lookup would be beneficial in this scenario too;
553
        #  consider implications downstream and for replication.
554
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
555
        if not isinstance(top_level_object, NodeObject):
1✔
556
            raise RuntimeError()
×
557
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
558
        return target_map_value.change_type
1✔
559

560
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
561
        # TODO: validate arguments structure and type.
562
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
563
            raise RuntimeError()
×
564
        logical_name_of_condition_entity = arguments.array[0]
×
565
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
566
            raise RuntimeError()
×
567
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
568
        if not isinstance(logical_name_of_condition, str):
×
569
            raise RuntimeError()
×
570

571
        node_condition = self._retrieve_condition_if_exists(
×
572
            condition_name=logical_name_of_condition
573
        )
574
        if not isinstance(node_condition, NodeCondition):
×
575
            raise RuntimeError()
×
576
        change_types = [node_condition.change_type, *arguments.array[1:]]
×
577
        change_type = self._change_type_for_parent_of(change_types=change_types)
×
578
        return change_type
×
579

580
    def _visit_array(
1✔
581
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
582
    ) -> NodeArray:
583
        array: list[ChangeSetEntity] = list()
1✔
584
        for index, (before_value, after_value) in enumerate(
1✔
585
            zip_longest(before_array, after_array, fillvalue=Nothing)
586
        ):
587
            value_scope = scope.open_index(index=index)
1✔
588
            value = self._visit_value(
1✔
589
                scope=value_scope, before_value=before_value, after_value=after_value
590
            )
591
            array.append(value)
1✔
592
        if self._is_created(before=before_array, after=after_array):
1✔
593
            change_type = ChangeType.CREATED
1✔
594
        elif self._is_removed(before=before_array, after=after_array):
1✔
595
            change_type = ChangeType.REMOVED
1✔
596
        else:
597
            change_type = self._change_type_for_parent_of([value.change_type for value in array])
1✔
598
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
599

600
    def _visit_object(
1✔
601
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
602
    ) -> NodeObject:
603
        node_object = self._visited_scopes.get(scope)
1✔
604
        if isinstance(node_object, NodeObject):
1✔
605
            return node_object
×
606
        if self._is_created(before=before_object, after=after_object):
1✔
607
            change_type = ChangeType.CREATED
1✔
608
        elif self._is_removed(before=before_object, after=after_object):
1✔
609
            change_type = ChangeType.REMOVED
1✔
610
        else:
611
            change_type = ChangeType.UNCHANGED
1✔
612
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
613
        bindings: dict[str, ChangeSetEntity] = dict()
1✔
614
        for binding_name in binding_names:
1✔
615
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
616
                scope, binding_name, before_object, after_object
617
            )
618
            value = self._visit_value(
1✔
619
                scope=binding_scope, before_value=before_value, after_value=after_value
620
            )
621
            bindings[binding_name] = value
1✔
622
            change_type = change_type.for_child(value.change_type)
1✔
623
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
624
        self._visited_scopes[scope] = node_object
1✔
625
        return node_object
1✔
626

627
    def _visit_divergence(
1✔
628
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
629
    ) -> NodeDivergence:
630
        scope_value = scope.open_scope("value")
×
631
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
×
632
        scope_divergence = scope.open_scope("divergence")
×
633
        divergence = self._visit_value(
×
634
            scope=scope_divergence, before_value=Nothing, after_value=after_value
635
        )
636
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
×
637

638
    def _visit_value(
1✔
639
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
640
    ) -> ChangeSetEntity:
641
        value = self._visited_scopes.get(scope)
1✔
642
        if isinstance(value, ChangeSetEntity):
1✔
643
            return value
1✔
644

645
        before_type_name = self._type_name_of(before_value)
1✔
646
        after_type_name = self._type_name_of(after_value)
1✔
647
        unset = object()
1✔
648
        if before_type_name == after_type_name:
1✔
649
            dominant_value = before_value
1✔
650
        elif self._is_created(before=before_value, after=after_value):
1✔
651
            dominant_value = after_value
1✔
652
        elif self._is_removed(before=before_value, after=after_value):
1✔
653
            dominant_value = before_value
1✔
654
        else:
655
            dominant_value = unset
×
656
        if dominant_value is not unset:
1✔
657
            dominant_type_name = self._type_name_of(dominant_value)
1✔
658
            if self._is_terminal(value=dominant_value):
1✔
659
                value = self._visit_terminal_value(
1✔
660
                    scope=scope, before_value=before_value, after_value=after_value
661
                )
662
            elif self._is_object(value=dominant_value):
1✔
663
                value = self._visit_object(
1✔
664
                    scope=scope, before_object=before_value, after_object=after_value
665
                )
666
            elif self._is_array(value=dominant_value):
1✔
667
                value = self._visit_array(
1✔
668
                    scope=scope, before_array=before_value, after_array=after_value
669
                )
670
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
671
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
672
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
673
                )
674
                value = self._visit_intrinsic_function(
1✔
675
                    scope=scope,
676
                    intrinsic_function=dominant_type_name,
677
                    before_arguments=before_arguments,
678
                    after_arguments=after_arguments,
679
                )
680
            else:
681
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
682
        # Case: type divergence.
683
        else:
684
            value = self._visit_divergence(
×
685
                scope=scope, before_value=before_value, after_value=after_value
686
            )
687
        self._visited_scopes[scope] = value
1✔
688
        return value
1✔
689

690
    def _visit_property(
1✔
691
        self,
692
        scope: Scope,
693
        property_name: str,
694
        before_property: Maybe[Any],
695
        after_property: Maybe[Any],
696
    ) -> NodeProperty:
697
        node_property = self._visited_scopes.get(scope)
1✔
698
        if isinstance(node_property, NodeProperty):
1✔
699
            return node_property
×
700
        value = self._visit_value(
1✔
701
            scope=scope, before_value=before_property, after_value=after_property
702
        )
703
        node_property = NodeProperty(
1✔
704
            scope=scope, change_type=value.change_type, name=property_name, value=value
705
        )
706
        self._visited_scopes[scope] = node_property
1✔
707
        return node_property
1✔
708

709
    def _visit_properties(
1✔
710
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
711
    ) -> NodeProperties:
712
        node_properties = self._visited_scopes.get(scope)
1✔
713
        if isinstance(node_properties, NodeProperties):
1✔
714
            return node_properties
×
715
        # TODO: double check we are sure not to have this be a NodeObject
716
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
717
        properties: list[NodeProperty] = list()
1✔
718
        change_type = ChangeType.UNCHANGED
1✔
719
        for property_name in property_names:
1✔
720
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
721
                scope, property_name, before_properties, after_properties
722
            )
723
            property_ = self._visit_property(
1✔
724
                scope=property_scope,
725
                property_name=property_name,
726
                before_property=before_property,
727
                after_property=after_property,
728
            )
729
            properties.append(property_)
1✔
730
            change_type = change_type.for_child(property_.change_type)
1✔
731
        node_properties = NodeProperties(
1✔
732
            scope=scope, change_type=change_type, properties=properties
733
        )
734
        self._visited_scopes[scope] = node_properties
1✔
735
        return node_properties
1✔
736

737
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
738
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
739
        if not isinstance(value, TerminalValue):
1✔
740
            # TODO: decide where template schema validation should occur.
741
            raise RuntimeError()
×
742
        return value
1✔
743

744
    def _visit_resource(
1✔
745
        self,
746
        scope: Scope,
747
        resource_name: str,
748
        before_resource: Maybe[dict],
749
        after_resource: Maybe[dict],
750
    ) -> NodeResource:
751
        node_resource = self._visited_scopes.get(scope)
1✔
752
        if isinstance(node_resource, NodeResource):
1✔
753
            return node_resource
1✔
754

755
        if self._is_created(before=before_resource, after=after_resource):
1✔
756
            change_type = ChangeType.CREATED
1✔
757
        elif self._is_removed(before=before_resource, after=after_resource):
1✔
758
            change_type = ChangeType.REMOVED
1✔
759
        else:
760
            change_type = ChangeType.UNCHANGED
1✔
761

762
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
763
            scope, TypeKey, before_resource, after_resource
764
        )
765
        terminal_value_type = self._visit_type(
1✔
766
            scope=scope_type, before_type=before_type, after_type=after_type
767
        )
768

769
        condition_reference = None
1✔
770
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
771
            scope, ConditionKey, before_resource, after_resource
772
        )
773
        # TODO: condition references should be resolved for the condition's change_type?
774
        if before_condition or after_condition:
1✔
775
            condition_reference = self._visit_terminal_value(
1✔
776
                scope_condition, before_condition, after_condition
777
            )
778

779
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
780
            scope, PropertiesKey, before_resource, after_resource
781
        )
782
        properties = self._visit_properties(
1✔
783
            scope=scope_properties,
784
            before_properties=before_properties,
785
            after_properties=after_properties,
786
        )
787
        if properties.properties:
1✔
788
            # Properties were defined in the before or after template, thus must play a role
789
            # in affecting the change type of this resource.
790
            change_type = change_type.for_child(properties.change_type)
1✔
791
        node_resource = NodeResource(
1✔
792
            scope=scope,
793
            change_type=change_type,
794
            name=resource_name,
795
            type_=terminal_value_type,
796
            condition_reference=condition_reference,
797
            properties=properties,
798
        )
799
        self._visited_scopes[scope] = node_resource
1✔
800
        return node_resource
1✔
801

802
    def _visit_resources(
1✔
803
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
804
    ) -> NodeResources:
805
        # TODO: investigate type changes behavior.
806
        change_type = ChangeType.UNCHANGED
1✔
807
        resources: list[NodeResource] = list()
1✔
808
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
809
        for resource_name in resource_names:
1✔
810
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
811
                scope, resource_name, before_resources, after_resources
812
            )
813
            resource = self._visit_resource(
1✔
814
                scope=resource_scope,
815
                resource_name=resource_name,
816
                before_resource=before_resource,
817
                after_resource=after_resource,
818
            )
819
            resources.append(resource)
1✔
820
            change_type = change_type.for_child(resource.change_type)
1✔
821
        return NodeResources(scope=scope, change_type=change_type, resources=resources)
1✔
822

823
    def _visit_mapping(
1✔
824
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
825
    ) -> NodeMapping:
826
        bindings = self._visit_object(
1✔
827
            scope=scope, before_object=before_mapping, after_object=after_mapping
828
        )
829
        return NodeMapping(
1✔
830
            scope=scope, change_type=bindings.change_type, name=name, bindings=bindings
831
        )
832

833
    def _visit_mappings(
1✔
834
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
835
    ) -> NodeMappings:
836
        change_type = ChangeType.UNCHANGED
1✔
837
        mappings: list[NodeMapping] = list()
1✔
838
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
839
        for mapping_name in mapping_names:
1✔
840
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
841
                scope, mapping_name, before_mappings, after_mappings
842
            )
843
            mapping = self._visit_mapping(
1✔
844
                scope=scope,
845
                name=mapping_name,
846
                before_mapping=before_mapping,
847
                after_mapping=after_mapping,
848
            )
849
            mappings.append(mapping)
1✔
850
            change_type = change_type.for_child(mapping.change_type)
1✔
851
        return NodeMappings(scope=scope, change_type=change_type, mappings=mappings)
1✔
852

853
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
854
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
855
        scope_parameter, (before_parameter, after_parameter) = self._safe_access_in(
1✔
856
            scope, parameter_name, self._before_parameters, self._after_parameters
857
        )
858
        parameter = self._visit_value(
1✔
859
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
860
        )
861
        return parameter
1✔
862

863
    def _visit_parameter(
1✔
864
        self,
865
        scope: Scope,
866
        parameter_name: str,
867
        before_parameter: Maybe[dict],
868
        after_parameter: Maybe[dict],
869
    ) -> NodeParameter:
870
        node_parameter = self._visited_scopes.get(scope)
1✔
871
        if isinstance(node_parameter, NodeParameter):
1✔
872
            return node_parameter
×
873

874
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
875
            scope, TypeKey, before_parameter, after_parameter
876
        )
877
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
878

879
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
880
            scope, DefaultKey, before_parameter, after_parameter
881
        )
882
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
883

884
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
885

886
        change_type = self._change_type_for_parent_of(
1✔
887
            change_types=[type_.change_type, default_value.change_type, dynamic_value.change_type]
888
        )
889

890
        node_parameter = NodeParameter(
1✔
891
            scope=scope,
892
            change_type=change_type,
893
            name=parameter_name,
894
            type_=type_,
895
            default_value=default_value,
896
            dynamic_value=dynamic_value,
897
        )
898
        self._visited_scopes[scope] = node_parameter
1✔
899
        return node_parameter
1✔
900

901
    def _visit_parameters(
1✔
902
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
903
    ) -> NodeParameters:
904
        node_parameters = self._visited_scopes.get(scope)
1✔
905
        if isinstance(node_parameters, NodeParameters):
1✔
906
            return node_parameters
×
907
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
908
        parameters: list[NodeParameter] = list()
1✔
909
        change_type = ChangeType.UNCHANGED
1✔
910
        for parameter_name in parameter_names:
1✔
911
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
912
                scope, parameter_name, before_parameters, after_parameters
913
            )
914
            parameter = self._visit_parameter(
1✔
915
                scope=parameter_scope,
916
                parameter_name=parameter_name,
917
                before_parameter=before_parameter,
918
                after_parameter=after_parameter,
919
            )
920
            parameters.append(parameter)
1✔
921
            change_type = change_type.for_child(parameter.change_type)
1✔
922
        node_parameters = NodeParameters(
1✔
923
            scope=scope, change_type=change_type, parameters=parameters
924
        )
925
        self._visited_scopes[scope] = node_parameters
1✔
926
        return node_parameters
1✔
927

928
    def _visit_condition(
1✔
929
        self,
930
        scope: Scope,
931
        condition_name: str,
932
        before_condition: Maybe[dict],
933
        after_condition: Maybe[dict],
934
    ) -> NodeCondition:
935
        node_condition = self._visited_scopes.get(scope)
1✔
936
        if isinstance(node_condition, NodeCondition):
1✔
937
            return node_condition
×
938
        body = self._visit_value(
1✔
939
            scope=scope, before_value=before_condition, after_value=after_condition
940
        )
941
        node_condition = NodeCondition(
1✔
942
            scope=scope, change_type=body.change_type, name=condition_name, body=body
943
        )
944
        self._visited_scopes[scope] = node_condition
1✔
945
        return node_condition
1✔
946

947
    def _visit_conditions(
1✔
948
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
949
    ) -> NodeConditions:
950
        node_conditions = self._visited_scopes.get(scope)
1✔
951
        if isinstance(node_conditions, NodeConditions):
1✔
952
            return node_conditions
×
953
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
954
        conditions: list[NodeCondition] = list()
1✔
955
        change_type = ChangeType.UNCHANGED
1✔
956
        for condition_name in condition_names:
1✔
957
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
958
                scope, condition_name, before_conditions, after_conditions
959
            )
960
            condition = self._visit_condition(
1✔
961
                scope=condition_scope,
962
                condition_name=condition_name,
963
                before_condition=before_condition,
964
                after_condition=after_condition,
965
            )
966
            conditions.append(condition)
1✔
967
            change_type = change_type.for_child(child_change_type=condition.change_type)
1✔
968
        node_conditions = NodeConditions(
1✔
969
            scope=scope, change_type=change_type, conditions=conditions
970
        )
971
        self._visited_scopes[scope] = node_conditions
1✔
972
        return node_conditions
1✔
973

974
    def _visit_output(
1✔
975
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
976
    ) -> NodeOutput:
977
        change_type = ChangeType.UNCHANGED
1✔
978
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
979
            scope, ValueKey, before_output, after_output
980
        )
981
        value = self._visit_value(scope_value, before_value, after_value)
1✔
982
        change_type = change_type.for_child(value.change_type)
1✔
983

984
        export: Optional[ChangeSetEntity] = None
1✔
985
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
986
            scope, ExportKey, before_output, after_output
987
        )
988
        if before_export or after_export:
1✔
989
            export = self._visit_value(scope_export, before_export, after_export)
×
990
            change_type = change_type.for_child(export.change_type)
×
991

992
        # TODO: condition references should be resolved for the condition's change_type?
993
        condition_reference: Optional[TerminalValue] = None
1✔
994
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
995
            scope, ConditionKey, before_output, after_output
996
        )
997
        if before_condition or after_condition:
1✔
998
            condition_reference = self._visit_terminal_value(
×
999
                scope_condition, before_condition, after_condition
1000
            )
1001
            change_type = change_type.for_child(condition_reference.change_type)
×
1002

1003
        return NodeOutput(
1✔
1004
            scope=scope,
1005
            change_type=change_type,
1006
            name=name,
1007
            value=value,
1008
            export=export,
1009
            conditional_reference=condition_reference,
1010
        )
1011

1012
    def _visit_outputs(
1✔
1013
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1014
    ) -> NodeOutputs:
1015
        change_type = ChangeType.UNCHANGED
1✔
1016
        outputs: list[NodeOutput] = list()
1✔
1017
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1018
        for output_name in output_names:
1✔
1019
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1020
                scope, output_name, before_outputs, after_outputs
1021
            )
1022
            output = self._visit_output(
1✔
1023
                scope=scope_output,
1024
                name=output_name,
1025
                before_output=before_output,
1026
                after_output=after_output,
1027
            )
1028
            outputs.append(output)
1✔
1029
            change_type = change_type.for_child(output.change_type)
1✔
1030
        return NodeOutputs(scope=scope, change_type=change_type, outputs=outputs)
1✔
1031

1032
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1033
        root_scope = Scope()
1✔
1034
        # TODO: visit other child types
1035

1036
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1037
            root_scope, MappingsKey, before_template, after_template
1038
        )
1039
        mappings = self._visit_mappings(
1✔
1040
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1041
        )
1042

1043
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1044
            root_scope, ParametersKey, before_template, after_template
1045
        )
1046
        parameters = self._visit_parameters(
1✔
1047
            scope=parameters_scope,
1048
            before_parameters=before_parameters,
1049
            after_parameters=after_parameters,
1050
        )
1051

1052
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1053
            root_scope, ConditionsKey, before_template, after_template
1054
        )
1055
        conditions = self._visit_conditions(
1✔
1056
            scope=conditions_scope,
1057
            before_conditions=before_conditions,
1058
            after_conditions=after_conditions,
1059
        )
1060

1061
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1062
            root_scope, ResourcesKey, before_template, after_template
1063
        )
1064
        resources = self._visit_resources(
1✔
1065
            scope=resources_scope,
1066
            before_resources=before_resources,
1067
            after_resources=after_resources,
1068
        )
1069

1070
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1071
            root_scope, OutputsKey, before_template, after_template
1072
        )
1073
        outputs = self._visit_outputs(
1✔
1074
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1075
        )
1076

1077
        # TODO: compute the change_type of the template properly.
1078
        return NodeTemplate(
1✔
1079
            scope=root_scope,
1080
            change_type=resources.change_type,
1081
            mappings=mappings,
1082
            parameters=parameters,
1083
            conditions=conditions,
1084
            resources=resources,
1085
            outputs=outputs,
1086
        )
1087

1088
    def _retrieve_condition_if_exists(self, condition_name: str) -> Optional[NodeCondition]:
1✔
1089
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1090
            Scope(), ConditionsKey, self._before_template, self._after_template
1091
        )
1092
        before_conditions = before_conditions or dict()
1✔
1093
        after_conditions = after_conditions or dict()
1✔
1094
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
1095
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1096
                conditions_scope, condition_name, before_conditions, after_conditions
1097
            )
1098
            node_condition = self._visit_condition(
×
1099
                conditions_scope,
1100
                condition_name,
1101
                before_condition=before_condition,
1102
                after_condition=after_condition,
1103
            )
1104
            return node_condition
×
1105
        return None
1✔
1106

1107
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Optional[NodeParameter]:
1✔
1108
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1109
            Scope(), ParametersKey, self._before_template, self._after_template
1110
        )
1111
        before_parameters = before_parameters or dict()
1✔
1112
        after_parameters = after_parameters or dict()
1✔
1113
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1114
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1115
                parameters_scope, parameter_name, before_parameters, after_parameters
1116
            )
1117
            node_parameter = self._visit_parameter(
1✔
1118
                parameters_scope,
1119
                parameter_name,
1120
                before_parameter=before_parameter,
1121
                after_parameter=after_parameter,
1122
            )
1123
            return node_parameter
1✔
1124
        return None
1✔
1125

1126
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1127
        # TODO: add caching mechanism, and raise appropriate error if missing.
1128
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1129
            Scope(), MappingsKey, self._before_template, self._after_template
1130
        )
1131
        before_mappings = before_mappings or dict()
1✔
1132
        after_mappings = after_mappings or dict()
1✔
1133
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1134
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1135
                scope_mappings, mapping_name, before_mappings, after_mappings
1136
            )
1137
            node_mapping = self._visit_mapping(
1✔
1138
                scope_mapping, mapping_name, before_mapping, after_mapping
1139
            )
1140
            return node_mapping
1✔
1141
        raise RuntimeError()
×
1142

1143
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1144
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1145
            Scope(),
1146
            ResourcesKey,
1147
            self._before_template,
1148
            self._after_template,
1149
        )
1150
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1151
            resources_scope, resource_name, before_resources, after_resources
1152
        )
1153
        return self._visit_resource(
1✔
1154
            scope=resource_scope,
1155
            resource_name=resource_name,
1156
            before_resource=before_resource,
1157
            after_resource=after_resource,
1158
        )
1159

1160
    @staticmethod
1✔
1161
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1162
        # TODO: are intrinsic functions soft keywords?
1163
        return function_name in INTRINSIC_FUNCTIONS
1✔
1164

1165
    @staticmethod
1✔
1166
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1167
        results = list()
1✔
1168
        for obj in objects:
1✔
1169
            # TODO: raise errors if not dict
1170
            if not isinstance(obj, NothingType):
1✔
1171
                results.append(obj.get(key, Nothing))
1✔
1172
            else:
1173
                results.append(obj)
1✔
1174
        new_scope = scope.open_scope(name=key)
1✔
1175
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1176

1177
    @staticmethod
1✔
1178
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1179
        key_set: set[str] = set()
1✔
1180
        for obj in objects:
1✔
1181
            # TODO: raise errors if not dict
1182
            if isinstance(obj, dict):
1✔
1183
                key_set.update(obj.keys())
1✔
1184
        # The keys list is sorted to increase reproducibility of the
1185
        # update graph build process or downstream logics.
1186
        keys = sorted(key_set)
1✔
1187
        return keys
1✔
1188

1189
    @staticmethod
1✔
1190
    def _change_type_for_parent_of(change_types: list[ChangeType]) -> ChangeType:
1✔
1191
        parent_change_type = ChangeType.UNCHANGED
1✔
1192
        for child_change_type in change_types:
1✔
1193
            parent_change_type = parent_change_type.for_child(child_change_type)
1✔
1194
            if parent_change_type == ChangeType.MODIFIED:
1✔
1195
                break
1✔
1196
        return parent_change_type
1✔
1197

1198
    @staticmethod
1✔
1199
    def _name_if_intrinsic_function(value: Maybe[Any]) -> Optional[str]:
1✔
1200
        if isinstance(value, dict):
1✔
1201
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1202
            if len(keys) == 1:
1✔
1203
                key_name = keys[0]
1✔
1204
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1205
                    return key_name
1✔
1206
        return None
1✔
1207

1208
    @staticmethod
1✔
1209
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1210
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1211
        if maybe_intrinsic_function_name is not None:
1✔
1212
            return maybe_intrinsic_function_name
1✔
1213
        return type(value).__name__
1✔
1214

1215
    @staticmethod
1✔
1216
    def _is_terminal(value: Any) -> bool:
1✔
1217
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1218

1219
    @staticmethod
1✔
1220
    def _is_object(value: Any) -> bool:
1✔
1221
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1222

1223
    @staticmethod
1✔
1224
    def _is_array(value: Any) -> bool:
1✔
1225
        return isinstance(value, list)
1✔
1226

1227
    @staticmethod
1✔
1228
    def _is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1229
        return isinstance(before, NothingType) and not isinstance(after, NothingType)
1✔
1230

1231
    @staticmethod
1✔
1232
    def _is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1233
        return not isinstance(before, NothingType) and isinstance(after, NothingType)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc