• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4975d127-9f1f-4d33-85f4-2ecd5a0c8f82

31 Mar 2025 04:19PM UTC coverage: 86.913% (+0.06%) from 86.858%
4975d127-9f1f-4d33-85f4-2ecd5a0c8f82

push

circleci

web-flow
CloudFormation: POC Support for Modeling of Outputs Blocks in the Update Graph, Improved Handling of Intrinsic Function Types (#12443)

159 of 177 new or added lines in 3 files covered. (89.83%)

77 existing lines in 8 files now uncovered.

63485 of 73044 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.06
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from itertools import zip_longest
1✔
6
from typing import Any, Final, Generator, Optional, Union, cast
1✔
7

8
from typing_extensions import TypeVar
1✔
9

10
from localstack.utils.strings import camel_to_snake_case
1✔
11

12
T = TypeVar("T")
1✔
13

14

15
class NothingType:
1✔
16
    """A sentinel that denotes 'no value' (distinct from None)."""
17

18
    _singleton = None
1✔
19
    __slots__ = ()
1✔
20

21
    def __new__(cls):
1✔
22
        if cls._singleton is None:
1✔
23
            cls._singleton = super().__new__(cls)
1✔
24
        return cls._singleton
1✔
25

26
    def __str__(self):
1✔
27
        return repr(self)
×
28

29
    def __repr__(self) -> str:
30
        return "Nothing"
31

32
    def __bool__(self):
1✔
33
        return False
1✔
34

35
    def __iter__(self):
1✔
36
        return iter(())
1✔
37

38

39
Maybe = Union[T, NothingType]
1✔
40
Nothing = NothingType()
1✔
41

42

43
class Scope(str):
1✔
44
    _ROOT_SCOPE: Final[str] = str()
1✔
45
    _SEPARATOR: Final[str] = "/"
1✔
46

47
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
48
        return cast(Scope, super().__new__(cls, scope))
1✔
49

50
    def open_scope(self, name: Scope | str) -> Scope:
1✔
51
        return Scope(self._SEPARATOR.join([self, name]))
1✔
52

53
    def open_index(self, index: int) -> Scope:
1✔
54
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
55

56
    def unwrap(self) -> list[str]:
1✔
57
        return self.split(self._SEPARATOR)
×
58

59

60
class ChangeType(enum.Enum):
1✔
61
    UNCHANGED = "Unchanged"
1✔
62
    CREATED = "Created"
1✔
63
    MODIFIED = "Modified"
1✔
64
    REMOVED = "Removed"
1✔
65

66
    def __str__(self):
1✔
67
        return self.value
×
68

69
    def for_child(self, child_change_type: ChangeType) -> ChangeType:
1✔
70
        if child_change_type == self:
1✔
71
            return self
1✔
72
        elif self == ChangeType.UNCHANGED:
1✔
73
            return child_change_type
1✔
74
        else:
75
            return ChangeType.MODIFIED
1✔
76

77

78
class ChangeSetEntity(abc.ABC):
1✔
79
    scope: Final[Scope]
1✔
80
    change_type: Final[ChangeType]
1✔
81

82
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
83
        self.scope = scope
1✔
84
        self.change_type = change_type
1✔
85

86
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
87
        for child in self.__dict__.values():
1✔
88
            yield from self._get_children_in(child)
1✔
89

90
    @staticmethod
1✔
91
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
92
        # TODO: could avoid the inductive logic here, and check for loops?
93
        if isinstance(obj, ChangeSetEntity):
1✔
94
            yield obj
1✔
95
        elif isinstance(obj, list):
1✔
96
            for item in obj:
1✔
97
                yield from ChangeSetEntity._get_children_in(item)
1✔
98
        elif isinstance(obj, dict):
1✔
99
            for item in obj.values():
×
100
                yield from ChangeSetEntity._get_children_in(item)
×
101

102
    def __str__(self):
1✔
103
        return f"({self.__class__.__name__}| {vars(self)}"
×
104

105
    def __repr__(self):
106
        return str(self)
107

108

109
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
110

111

112
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
113

114

115
class NodeTemplate(ChangeSetNode):
1✔
116
    mappings: Final[NodeMappings]
1✔
117
    parameters: Final[NodeParameters]
1✔
118
    conditions: Final[NodeConditions]
1✔
119
    resources: Final[NodeResources]
1✔
120
    outputs: Final[NodeOutputs]
1✔
121

122
    def __init__(
1✔
123
        self,
124
        scope: Scope,
125
        change_type: ChangeType,
126
        mappings: NodeMappings,
127
        parameters: NodeParameters,
128
        conditions: NodeConditions,
129
        resources: NodeResources,
130
        outputs: NodeOutputs,
131
    ):
132
        super().__init__(scope=scope, change_type=change_type)
1✔
133
        self.mappings = mappings
1✔
134
        self.parameters = parameters
1✔
135
        self.conditions = conditions
1✔
136
        self.resources = resources
1✔
137
        self.outputs = outputs
1✔
138

139

140
class NodeDivergence(ChangeSetNode):
1✔
141
    value: Final[ChangeSetEntity]
1✔
142
    divergence: Final[ChangeSetEntity]
1✔
143

144
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
145
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
146
        self.value = value
1✔
147
        self.divergence = divergence
1✔
148

149

150
class NodeParameter(ChangeSetNode):
1✔
151
    name: Final[str]
1✔
152
    value: Final[ChangeSetEntity]
1✔
153
    dynamic_value: Final[ChangeSetEntity]
1✔
154

155
    def __init__(
1✔
156
        self,
157
        scope: Scope,
158
        change_type: ChangeType,
159
        name: str,
160
        value: ChangeSetEntity,
161
        dynamic_value: ChangeSetEntity,
162
    ):
163
        super().__init__(scope=scope, change_type=change_type)
1✔
164
        self.name = name
1✔
165
        self.value = value
1✔
166
        self.dynamic_value = dynamic_value
1✔
167

168

169
class NodeParameters(ChangeSetNode):
1✔
170
    parameters: Final[list[NodeParameter]]
1✔
171

172
    def __init__(self, scope: Scope, change_type: ChangeType, parameters: list[NodeParameter]):
1✔
173
        super().__init__(scope=scope, change_type=change_type)
1✔
174
        self.parameters = parameters
1✔
175

176

177
class NodeMapping(ChangeSetNode):
1✔
178
    name: Final[str]
1✔
179
    bindings: Final[NodeObject]
1✔
180

181
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, bindings: NodeObject):
1✔
182
        super().__init__(scope=scope, change_type=change_type)
1✔
183
        self.name = name
1✔
184
        self.bindings = bindings
1✔
185

186

187
class NodeMappings(ChangeSetNode):
1✔
188
    mappings: Final[list[NodeMapping]]
1✔
189

190
    def __init__(self, scope: Scope, change_type: ChangeType, mappings: list[NodeMapping]):
1✔
191
        super().__init__(scope=scope, change_type=change_type)
1✔
192
        self.mappings = mappings
1✔
193

194

195
class NodeOutput(ChangeSetNode):
1✔
196
    name: Final[str]
1✔
197
    value: Final[ChangeSetEntity]
1✔
198
    export: Final[Optional[ChangeSetEntity]]
1✔
199
    condition_reference: Final[Optional[TerminalValue]]
1✔
200

201
    def __init__(
1✔
202
        self,
203
        scope: Scope,
204
        change_type: ChangeType,
205
        name: str,
206
        value: ChangeSetEntity,
207
        export: Optional[ChangeSetEntity],
208
        conditional_reference: Optional[TerminalValue],
209
    ):
210
        super().__init__(scope=scope, change_type=change_type)
1✔
211
        self.name = name
1✔
212
        self.value = value
1✔
213
        self.export = export
1✔
214
        self.condition_reference = conditional_reference
1✔
215

216

217
class NodeOutputs(ChangeSetNode):
1✔
218
    outputs: Final[list[NodeOutput]]
1✔
219

220
    def __init__(self, scope: Scope, change_type: ChangeType, outputs: list[NodeOutput]):
1✔
221
        super().__init__(scope=scope, change_type=change_type)
1✔
222
        self.outputs = outputs
1✔
223

224

225
class NodeCondition(ChangeSetNode):
1✔
226
    name: Final[str]
1✔
227
    body: Final[ChangeSetEntity]
1✔
228

229
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, body: ChangeSetEntity):
1✔
230
        super().__init__(scope=scope, change_type=change_type)
1✔
231
        self.name = name
1✔
232
        self.body = body
1✔
233

234

235
class NodeConditions(ChangeSetNode):
1✔
236
    conditions: Final[list[NodeCondition]]
1✔
237

238
    def __init__(self, scope: Scope, change_type: ChangeType, conditions: list[NodeCondition]):
1✔
239
        super().__init__(scope=scope, change_type=change_type)
1✔
240
        self.conditions = conditions
1✔
241

242

243
class NodeResources(ChangeSetNode):
1✔
244
    resources: Final[list[NodeResource]]
1✔
245

246
    def __init__(self, scope: Scope, change_type: ChangeType, resources: list[NodeResource]):
1✔
247
        super().__init__(scope=scope, change_type=change_type)
1✔
248
        self.resources = resources
1✔
249

250

251
class NodeResource(ChangeSetNode):
1✔
252
    name: Final[str]
1✔
253
    type_: Final[ChangeSetTerminal]
1✔
254
    condition_reference: Final[Optional[TerminalValue]]
1✔
255
    properties: Final[NodeProperties]
1✔
256

257
    def __init__(
1✔
258
        self,
259
        scope: Scope,
260
        change_type: ChangeType,
261
        name: str,
262
        type_: ChangeSetTerminal,
263
        condition_reference: TerminalValue,
264
        properties: NodeProperties,
265
    ):
266
        super().__init__(scope=scope, change_type=change_type)
1✔
267
        self.name = name
1✔
268
        self.type_ = type_
1✔
269
        self.condition_reference = condition_reference
1✔
270
        self.properties = properties
1✔
271

272

273
class NodeProperties(ChangeSetNode):
1✔
274
    properties: Final[list[NodeProperty]]
1✔
275

276
    def __init__(self, scope: Scope, change_type: ChangeType, properties: list[NodeProperty]):
1✔
277
        super().__init__(scope=scope, change_type=change_type)
1✔
278
        self.properties = properties
1✔
279

280

281
class NodeProperty(ChangeSetNode):
1✔
282
    name: Final[str]
1✔
283
    value: Final[ChangeSetEntity]
1✔
284

285
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, value: ChangeSetEntity):
1✔
286
        super().__init__(scope=scope, change_type=change_type)
1✔
287
        self.name = name
1✔
288
        self.value = value
1✔
289

290

291
class NodeIntrinsicFunction(ChangeSetNode):
1✔
292
    intrinsic_function: Final[str]
1✔
293
    arguments: Final[ChangeSetEntity]
1✔
294

295
    def __init__(
1✔
296
        self,
297
        scope: Scope,
298
        change_type: ChangeType,
299
        intrinsic_function: str,
300
        arguments: ChangeSetEntity,
301
    ):
302
        super().__init__(scope=scope, change_type=change_type)
1✔
303
        self.intrinsic_function = intrinsic_function
1✔
304
        self.arguments = arguments
1✔
305

306

307
class NodeObject(ChangeSetNode):
1✔
308
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
309

310
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
311
        super().__init__(scope=scope, change_type=change_type)
1✔
312
        self.bindings = bindings
1✔
313

314

315
class NodeArray(ChangeSetNode):
1✔
316
    array: Final[list[ChangeSetEntity]]
1✔
317

318
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
319
        super().__init__(scope=scope, change_type=change_type)
1✔
320
        self.array = array
1✔
321

322

323
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
324
    value: Final[Any]
1✔
325

326
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
327
        super().__init__(scope=scope, change_type=change_type)
1✔
328
        self.value = value
1✔
329

330

331
class TerminalValueModified(TerminalValue):
1✔
332
    modified_value: Final[Any]
1✔
333

334
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
335
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
336
        self.modified_value = modified_value
1✔
337

338

339
class TerminalValueCreated(TerminalValue):
1✔
340
    def __init__(self, scope: Scope, value: Any):
1✔
341
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
342

343

344
class TerminalValueRemoved(TerminalValue):
1✔
345
    def __init__(self, scope: Scope, value: Any):
1✔
346
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
347

348

349
class TerminalValueUnchanged(TerminalValue):
1✔
350
    def __init__(self, scope: Scope, value: Any):
1✔
351
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
352

353

354
TypeKey: Final[str] = "Type"
1✔
355
ConditionKey: Final[str] = "Condition"
1✔
356
ConditionsKey: Final[str] = "Conditions"
1✔
357
MappingsKey: Final[str] = "Mappings"
1✔
358
ResourcesKey: Final[str] = "Resources"
1✔
359
PropertiesKey: Final[str] = "Properties"
1✔
360
ParametersKey: Final[str] = "Parameters"
1✔
361
ValueKey: Final[str] = "Value"
1✔
362
ExportKey: Final[str] = "Export"
1✔
363
OutputsKey: Final[str] = "Outputs"
1✔
364
# TODO: expand intrinsic functions set.
365
RefKey: Final[str] = "Ref"
1✔
366
FnIf: Final[str] = "Fn::If"
1✔
367
FnNot: Final[str] = "Fn::Not"
1✔
368
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
369
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
370
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
371
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
372
    RefKey,
373
    FnIf,
374
    FnNot,
375
    FnEqualsKey,
376
    FnGetAttKey,
377
    FnFindInMapKey,
378
}
379

380

381
class ChangeSetModel:
1✔
382
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
383

384
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
385

386
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
387
    #  such as intrinsic functions.
388

389
    _before_template: Final[Maybe[dict]]
1✔
390
    _after_template: Final[Maybe[dict]]
1✔
391
    _before_parameters: Final[Maybe[dict]]
1✔
392
    _after_parameters: Final[Maybe[dict]]
1✔
393
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
394
    _node_template: Final[NodeTemplate]
1✔
395

396
    def __init__(
1✔
397
        self,
398
        before_template: Optional[dict],
399
        after_template: Optional[dict],
400
        before_parameters: Optional[dict],
401
        after_parameters: Optional[dict],
402
    ):
403
        self._before_template = before_template or Nothing
1✔
404
        self._after_template = after_template or Nothing
1✔
405
        self._before_parameters = before_parameters or Nothing
1✔
406
        self._after_parameters = after_parameters or Nothing
1✔
407
        self._visited_scopes = dict()
1✔
408
        self._node_template = self._model(
1✔
409
            before_template=self._before_template, after_template=self._after_template
410
        )
411
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
412

413
    def get_update_model(self) -> NodeTemplate:
1✔
414
        # TODO: rethink naming of this for outer utils
415
        return self._node_template
1✔
416

417
    def _visit_terminal_value(
1✔
418
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
419
    ) -> TerminalValue:
420
        terminal_value = self._visited_scopes.get(scope)
1✔
421
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
422
            return terminal_value
×
423
        if self._is_created(before=before_value, after=after_value):
1✔
424
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
425
        elif self._is_removed(before=before_value, after=after_value):
1✔
426
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
427
        elif before_value == after_value:
1✔
428
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
429
        else:
430
            terminal_value = TerminalValueModified(
1✔
431
                scope=scope, value=before_value, modified_value=after_value
432
            )
433
        self._visited_scopes[scope] = terminal_value
1✔
434
        return terminal_value
1✔
435

436
    def _visit_intrinsic_function(
1✔
437
        self,
438
        scope: Scope,
439
        intrinsic_function: str,
440
        before_arguments: Maybe[Any],
441
        after_arguments: Maybe[Any],
442
    ) -> NodeIntrinsicFunction:
443
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
444
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
445
            return node_intrinsic_function
×
446
        arguments = self._visit_value(
1✔
447
            scope=scope, before_value=before_arguments, after_value=after_arguments
448
        )
449
        if self._is_created(before=before_arguments, after=after_arguments):
1✔
450
            change_type = ChangeType.CREATED
1✔
451
        elif self._is_removed(before=before_arguments, after=after_arguments):
1✔
452
            change_type = ChangeType.REMOVED
1✔
453
        else:
454
            function_name = intrinsic_function.replace("::", "_")
1✔
455
            function_name = camel_to_snake_case(function_name)
1✔
456
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
457
            if hasattr(self, resolve_function_name):
1✔
458
                resolve_function = getattr(self, resolve_function_name)
1✔
459
                change_type = resolve_function(arguments)
1✔
460
            else:
461
                change_type = arguments.change_type
1✔
462
        node_intrinsic_function = NodeIntrinsicFunction(
1✔
463
            scope=scope,
464
            change_type=change_type,
465
            intrinsic_function=intrinsic_function,
466
            arguments=arguments,
467
        )
468
        self._visited_scopes[scope] = node_intrinsic_function
1✔
469
        return node_intrinsic_function
1✔
470

471
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
472
        # TODO: add support for nested intrinsic functions.
473
        # TODO: validate arguments structure and type.
474
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
475

476
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
477
            raise RuntimeError()
×
478
        logical_name_of_resource_entity = arguments.array[0]
1✔
479
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
UNCOV
480
            raise RuntimeError()
×
481
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
482
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
483
            raise RuntimeError()
×
484
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
485
            resource_name=logical_name_of_resource
486
        )
487

488
        node_property_attribute_name = arguments.array[1]
1✔
489
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
490
            raise RuntimeError()
×
491
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
492
            attribute_name = node_property_attribute_name.modified_value
×
493
        else:
494
            attribute_name = node_property_attribute_name.value
1✔
495

496
        # TODO: this is another use case for which properties should be referenced by name
497
        for node_property in node_resource.properties.properties:
1✔
498
            if node_property.name == attribute_name:
1✔
499
                return node_property.change_type
1✔
500

UNCOV
501
        return ChangeType.UNCHANGED
×
502

503
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
504
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
505
            return arguments.change_type
×
506
        # TODO: add support for nested functions, here we assume the argument is a logicalID.
507
        if not isinstance(arguments, TerminalValue):
1✔
UNCOV
508
            return arguments.change_type
×
509

510
        logical_id = arguments.value
1✔
511

512
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
513
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
514
            return node_condition.change_type
×
515

516
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
517
        if isinstance(node_parameter, NodeParameter):
1✔
518
            return node_parameter.dynamic_value.change_type
1✔
519

520
        # TODO: this should check the replacement flag for a resource update.
521
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
522
        return node_resource.change_type
1✔
523

524
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
525
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
526
            return arguments.change_type
×
527
        # TODO: validate arguments structure and type.
528
        # TODO: add support for nested functions, here we assume the arguments are string literals.
529

530
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
531
            raise RuntimeError()
×
532
        argument_mapping_name = arguments.array[0]
1✔
533
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
534
            raise NotImplementedError()
535
        argument_top_level_key = arguments.array[1]
1✔
536
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
537
            raise NotImplementedError()
538
        argument_second_level_key = arguments.array[2]
1✔
539
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
540
            raise NotImplementedError()
541
        mapping_name = argument_mapping_name.value
1✔
542
        top_level_key = argument_top_level_key.value
1✔
543
        second_level_key = argument_second_level_key.value
1✔
544

545
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
546
        # TODO: a lookup would be beneficial in this scenario too;
547
        #  consider implications downstream and for replication.
548
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
549
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
550
            raise RuntimeError()
×
551
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
552
        return target_map_value.change_type
1✔
553

554
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
555
        # TODO: validate arguments structure and type.
556
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
557
            raise RuntimeError()
×
558
        logical_name_of_condition_entity = arguments.array[0]
1✔
559
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
1✔
UNCOV
560
            raise RuntimeError()
×
561
        logical_name_of_condition: str = logical_name_of_condition_entity.value
1✔
562
        if not isinstance(logical_name_of_condition, str):
1✔
UNCOV
563
            raise RuntimeError()
×
564

565
        node_condition = self._retrieve_condition_if_exists(
1✔
566
            condition_name=logical_name_of_condition
567
        )
568
        if not isinstance(node_condition, NodeCondition):
1✔
UNCOV
569
            raise RuntimeError()
×
570
        change_types = [node_condition.change_type, *arguments.array[1:]]
1✔
571
        change_type = self._change_type_for_parent_of(change_types=change_types)
1✔
572
        return change_type
1✔
573

574
    def _visit_array(
1✔
575
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
576
    ) -> NodeArray:
577
        change_type = ChangeType.UNCHANGED
1✔
578
        array: list[ChangeSetEntity] = list()
1✔
579
        for index, (before_value, after_value) in enumerate(
1✔
580
            zip_longest(before_array, after_array, fillvalue=Nothing)
581
        ):
582
            # TODO: should extract this scoping logic.
583
            value_scope = scope.open_index(index=index)
1✔
584
            value = self._visit_value(
1✔
585
                scope=value_scope, before_value=before_value, after_value=after_value
586
            )
587
            array.append(value)
1✔
588
            if value.change_type != ChangeType.UNCHANGED:
1✔
589
                change_type = ChangeType.MODIFIED
1✔
590
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
591

592
    def _visit_object(
1✔
593
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
594
    ) -> NodeObject:
595
        node_object = self._visited_scopes.get(scope)
1✔
596
        if isinstance(node_object, NodeObject):
1✔
UNCOV
597
            return node_object
×
598

599
        change_type = ChangeType.UNCHANGED
1✔
600
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
601
        bindings: dict[str, ChangeSetEntity] = dict()
1✔
602
        for binding_name in binding_names:
1✔
603
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
604
                scope, binding_name, before_object, after_object
605
            )
606
            value = self._visit_value(
1✔
607
                scope=binding_scope, before_value=before_value, after_value=after_value
608
            )
609
            bindings[binding_name] = value
1✔
610
            change_type = change_type.for_child(value.change_type)
1✔
611
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
612
        self._visited_scopes[scope] = node_object
1✔
613
        return node_object
1✔
614

615
    def _visit_divergence(
1✔
616
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
617
    ) -> NodeDivergence:
618
        scope_value = scope.open_scope("value")
1✔
619
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
620
        scope_divergence = scope.open_scope("divergence")
1✔
621
        divergence = self._visit_value(
1✔
622
            scope=scope_divergence, before_value=Nothing, after_value=after_value
623
        )
624
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
625

626
    def _visit_value(
1✔
627
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
628
    ) -> ChangeSetEntity:
629
        value = self._visited_scopes.get(scope)
1✔
630
        if isinstance(value, ChangeSetEntity):
1✔
631
            return value
1✔
632

633
        before_type_name = self._type_name_of(before_value)
1✔
634
        after_type_name = self._type_name_of(after_value)
1✔
635
        unset = object()
1✔
636
        if before_type_name == after_type_name:
1✔
637
            dominant_value = before_value
1✔
638
        elif self._is_created(before=before_value, after=after_value):
1✔
639
            dominant_value = after_value
1✔
640
        elif self._is_removed(before=before_value, after=after_value):
1✔
641
            dominant_value = before_value
1✔
642
        else:
643
            dominant_value = unset
1✔
644
        if dominant_value is not unset:
1✔
645
            dominant_type_name = self._type_name_of(dominant_value)
1✔
646
            if self._is_terminal(value=dominant_value):
1✔
647
                value = self._visit_terminal_value(
1✔
648
                    scope=scope, before_value=before_value, after_value=after_value
649
                )
650
            elif self._is_object(value=dominant_value):
1✔
651
                value = self._visit_object(
1✔
652
                    scope=scope, before_object=before_value, after_object=after_value
653
                )
654
            elif self._is_array(value=dominant_value):
1✔
655
                value = self._visit_array(
1✔
656
                    scope=scope, before_array=before_value, after_array=after_value
657
                )
658
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
659
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
660
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
661
                )
662
                value = self._visit_intrinsic_function(
1✔
663
                    scope=scope,
664
                    intrinsic_function=dominant_type_name,
665
                    before_arguments=before_arguments,
666
                    after_arguments=after_arguments,
667
                )
668
            else:
UNCOV
669
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
670
        # Case: type divergence.
671
        else:
672
            value = self._visit_divergence(
1✔
673
                scope=scope, before_value=before_value, after_value=after_value
674
            )
675
        self._visited_scopes[scope] = value
1✔
676
        return value
1✔
677

678
    def _visit_property(
1✔
679
        self,
680
        scope: Scope,
681
        property_name: str,
682
        before_property: Maybe[Any],
683
        after_property: Maybe[Any],
684
    ) -> NodeProperty:
685
        node_property = self._visited_scopes.get(scope)
1✔
686
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
687
            return node_property
×
688

689
        if self._is_created(before=before_property, after=after_property):
1✔
690
            node_property = NodeProperty(
1✔
691
                scope=scope,
692
                change_type=ChangeType.CREATED,
693
                name=property_name,
694
                value=TerminalValueCreated(scope=scope, value=after_property),
695
            )
696
        elif self._is_removed(before=before_property, after=after_property):
1✔
697
            node_property = NodeProperty(
1✔
698
                scope=scope,
699
                change_type=ChangeType.REMOVED,
700
                name=property_name,
701
                value=TerminalValueRemoved(scope=scope, value=before_property),
702
            )
703
        else:
704
            value = self._visit_value(
1✔
705
                scope=scope, before_value=before_property, after_value=after_property
706
            )
707
            node_property = NodeProperty(
1✔
708
                scope=scope, change_type=value.change_type, name=property_name, value=value
709
            )
710
        self._visited_scopes[scope] = node_property
1✔
711
        return node_property
1✔
712

713
    def _visit_properties(
1✔
714
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
715
    ) -> NodeProperties:
716
        node_properties = self._visited_scopes.get(scope)
1✔
717
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
718
            return node_properties
×
719
        # TODO: double check we are sure not to have this be a NodeObject
720
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
721
        properties: list[NodeProperty] = list()
1✔
722
        change_type = ChangeType.UNCHANGED
1✔
723
        for property_name in property_names:
1✔
724
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
725
                scope, property_name, before_properties, after_properties
726
            )
727
            property_ = self._visit_property(
1✔
728
                scope=property_scope,
729
                property_name=property_name,
730
                before_property=before_property,
731
                after_property=after_property,
732
            )
733
            properties.append(property_)
1✔
734
            change_type = change_type.for_child(property_.change_type)
1✔
735
        node_properties = NodeProperties(
1✔
736
            scope=scope, change_type=change_type, properties=properties
737
        )
738
        self._visited_scopes[scope] = node_properties
1✔
739
        return node_properties
1✔
740

741
    def _visit_resource(
1✔
742
        self,
743
        scope: Scope,
744
        resource_name: str,
745
        before_resource: Maybe[dict],
746
        after_resource: Maybe[dict],
747
    ) -> NodeResource:
748
        node_resource = self._visited_scopes.get(scope)
1✔
749
        if isinstance(node_resource, NodeResource):
1✔
750
            return node_resource
1✔
751

752
        if self._is_created(before=before_resource, after=after_resource):
1✔
753
            change_type = ChangeType.CREATED
1✔
754
        elif self._is_removed(before=before_resource, after=after_resource):
1✔
755
            change_type = ChangeType.REMOVED
1✔
756
        else:
757
            change_type = ChangeType.UNCHANGED
1✔
758

759
        # TODO: investigate behaviour with type changes, for now this is filler code.
760
        _, type_str = self._safe_access_in(scope, TypeKey, before_resource)
1✔
761

762
        condition_reference = None
1✔
763
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
764
            scope, ConditionKey, before_resource, after_resource
765
        )
766
        # TODO: condition references should be resolved for the condition's change_type?
767
        if before_condition or after_condition:
1✔
768
            condition_reference = self._visit_terminal_value(
1✔
769
                scope_condition, before_condition, after_condition
770
            )
771

772
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
773
            scope, PropertiesKey, before_resource, after_resource
774
        )
775
        properties = self._visit_properties(
1✔
776
            scope=scope_properties,
777
            before_properties=before_properties,
778
            after_properties=after_properties,
779
        )
780
        change_type = change_type.for_child(properties.change_type)
1✔
781
        node_resource = NodeResource(
1✔
782
            scope=scope,
783
            change_type=change_type,
784
            name=resource_name,
785
            type_=TerminalValueUnchanged(scope=scope, value=type_str),
786
            condition_reference=condition_reference,
787
            properties=properties,
788
        )
789
        self._visited_scopes[scope] = node_resource
1✔
790
        return node_resource
1✔
791

792
    def _visit_resources(
1✔
793
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
794
    ) -> NodeResources:
795
        # TODO: investigate type changes behavior.
796
        change_type = ChangeType.UNCHANGED
1✔
797
        resources: list[NodeResource] = list()
1✔
798
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
799
        for resource_name in resource_names:
1✔
800
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
801
                scope, resource_name, before_resources, after_resources
802
            )
803
            resource = self._visit_resource(
1✔
804
                scope=resource_scope,
805
                resource_name=resource_name,
806
                before_resource=before_resource,
807
                after_resource=after_resource,
808
            )
809
            resources.append(resource)
1✔
810
            change_type = change_type.for_child(resource.change_type)
1✔
811
        return NodeResources(scope=scope, change_type=change_type, resources=resources)
1✔
812

813
    def _visit_mapping(
1✔
814
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
815
    ) -> NodeMapping:
816
        bindings = self._visit_object(
1✔
817
            scope=scope, before_object=before_mapping, after_object=after_mapping
818
        )
819
        return NodeMapping(
1✔
820
            scope=scope, change_type=bindings.change_type, name=name, bindings=bindings
821
        )
822

823
    def _visit_mappings(
1✔
824
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
825
    ) -> NodeMappings:
826
        change_type = ChangeType.UNCHANGED
1✔
827
        mappings: list[NodeMapping] = list()
1✔
828
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
829
        for mapping_name in mapping_names:
1✔
830
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
831
                scope, mapping_name, before_mappings, after_mappings
832
            )
833
            mapping = self._visit_mapping(
1✔
834
                scope=scope,
835
                name=mapping_name,
836
                before_mapping=before_mapping,
837
                after_mapping=after_mapping,
838
            )
839
            mappings.append(mapping)
1✔
840
            change_type = change_type.for_child(mapping.change_type)
1✔
841
        return NodeMappings(scope=scope, change_type=change_type, mappings=mappings)
1✔
842

843
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
844
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
845
        scope_parameter, (before_parameter, after_parameter) = self._safe_access_in(
1✔
846
            scope, parameter_name, self._before_parameters, self._after_parameters
847
        )
848
        parameter = self._visit_value(
1✔
849
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
850
        )
851
        return parameter
1✔
852

853
    def _visit_parameter(
1✔
854
        self,
855
        scope: Scope,
856
        parameter_name: str,
857
        before_parameter: Maybe[dict],
858
        after_parameter: Maybe[dict],
859
    ) -> NodeParameter:
860
        node_parameter = self._visited_scopes.get(scope)
1✔
861
        if isinstance(node_parameter, NodeParameter):
1✔
UNCOV
862
            return node_parameter
×
863
        # TODO: add logic to compute defaults already in the graph building process?
864
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
865
        if self._is_created(before=before_parameter, after=after_parameter):
1✔
UNCOV
866
            node_parameter = NodeParameter(
×
867
                scope=scope,
868
                change_type=ChangeType.CREATED,
869
                name=parameter_name,
870
                value=TerminalValueCreated(scope=scope, value=after_parameter),
871
                dynamic_value=dynamic_value,
872
            )
873
        elif self._is_removed(before=before_parameter, after=after_parameter):
1✔
UNCOV
874
            node_parameter = NodeParameter(
×
875
                scope=scope,
876
                change_type=ChangeType.REMOVED,
877
                name=parameter_name,
878
                value=TerminalValueRemoved(scope=scope, value=before_parameter),
879
                dynamic_value=dynamic_value,
880
            )
881
        else:
882
            value = self._visit_value(
1✔
883
                scope=scope, before_value=before_parameter, after_value=after_parameter
884
            )
885
            change_type = self._change_type_for_parent_of(
1✔
886
                change_types=[dynamic_value.change_type, value.change_type]
887
            )
888
            node_parameter = NodeParameter(
1✔
889
                scope=scope,
890
                change_type=change_type,
891
                name=parameter_name,
892
                value=value,
893
                dynamic_value=dynamic_value,
894
            )
895
        self._visited_scopes[scope] = node_parameter
1✔
896
        return node_parameter
1✔
897

898
    def _visit_parameters(
1✔
899
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
900
    ) -> NodeParameters:
901
        node_parameters = self._visited_scopes.get(scope)
1✔
902
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
903
            return node_parameters
×
904
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
905
        parameters: list[NodeParameter] = list()
1✔
906
        change_type = ChangeType.UNCHANGED
1✔
907
        for parameter_name in parameter_names:
1✔
908
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
909
                scope, parameter_name, before_parameters, after_parameters
910
            )
911
            parameter = self._visit_parameter(
1✔
912
                scope=parameter_scope,
913
                parameter_name=parameter_name,
914
                before_parameter=before_parameter,
915
                after_parameter=after_parameter,
916
            )
917
            parameters.append(parameter)
1✔
918
            change_type = change_type.for_child(parameter.change_type)
1✔
919
        node_parameters = NodeParameters(
1✔
920
            scope=scope, change_type=change_type, parameters=parameters
921
        )
922
        self._visited_scopes[scope] = node_parameters
1✔
923
        return node_parameters
1✔
924

925
    def _visit_condition(
1✔
926
        self,
927
        scope: Scope,
928
        condition_name: str,
929
        before_condition: Maybe[dict],
930
        after_condition: Maybe[dict],
931
    ) -> NodeCondition:
932
        node_condition = self._visited_scopes.get(scope)
1✔
933
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
934
            return node_condition
×
935
        body = self._visit_value(
1✔
936
            scope=scope, before_value=before_condition, after_value=after_condition
937
        )
938
        node_condition = NodeCondition(
1✔
939
            scope=scope, change_type=body.change_type, name=condition_name, body=body
940
        )
941
        self._visited_scopes[scope] = node_condition
1✔
942
        return node_condition
1✔
943

944
    def _visit_conditions(
1✔
945
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
946
    ) -> NodeConditions:
947
        node_conditions = self._visited_scopes.get(scope)
1✔
948
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
949
            return node_conditions
×
950
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
951
        conditions: list[NodeCondition] = list()
1✔
952
        change_type = ChangeType.UNCHANGED
1✔
953
        for condition_name in condition_names:
1✔
954
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
955
                scope, condition_name, before_conditions, after_conditions
956
            )
957
            condition = self._visit_condition(
1✔
958
                scope=condition_scope,
959
                condition_name=condition_name,
960
                before_condition=before_condition,
961
                after_condition=after_condition,
962
            )
963
            conditions.append(condition)
1✔
964
            change_type = change_type.for_child(child_change_type=condition.change_type)
1✔
965
        node_conditions = NodeConditions(
1✔
966
            scope=scope, change_type=change_type, conditions=conditions
967
        )
968
        self._visited_scopes[scope] = node_conditions
1✔
969
        return node_conditions
1✔
970

971
    def _visit_output(
1✔
972
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
973
    ) -> NodeOutput:
974
        change_type = ChangeType.UNCHANGED
1✔
975
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
976
            scope, ValueKey, before_output, after_output
977
        )
978
        value = self._visit_value(scope_value, before_value, after_value)
1✔
979
        change_type = change_type.for_child(value.change_type)
1✔
980

981
        export: Optional[ChangeSetEntity] = None
1✔
982
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
983
            scope, ExportKey, before_output, after_output
984
        )
985
        if before_export or after_export:
1✔
NEW
986
            export = self._visit_value(scope_export, before_export, after_export)
×
NEW
987
            change_type = change_type.for_child(export.change_type)
×
988

989
        # TODO: condition references should be resolved for the condition's change_type?
990
        condition_reference: Optional[TerminalValue] = None
1✔
991
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
992
            scope, ConditionKey, before_output, after_output
993
        )
994
        if before_condition or after_condition:
1✔
NEW
995
            condition_reference = self._visit_terminal_value(
×
996
                scope_condition, before_condition, after_condition
997
            )
NEW
998
            change_type = change_type.for_child(condition_reference.change_type)
×
999

1000
        return NodeOutput(
1✔
1001
            scope=scope,
1002
            change_type=change_type,
1003
            name=name,
1004
            value=value,
1005
            export=export,
1006
            conditional_reference=condition_reference,
1007
        )
1008

1009
    def _visit_outputs(
1✔
1010
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1011
    ) -> NodeOutputs:
1012
        change_type = ChangeType.UNCHANGED
1✔
1013
        outputs: list[NodeOutput] = list()
1✔
1014
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1015
        for output_name in output_names:
1✔
1016
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1017
                scope, output_name, before_outputs, after_outputs
1018
            )
1019
            output = self._visit_output(
1✔
1020
                scope=scope_output,
1021
                name=output_name,
1022
                before_output=before_output,
1023
                after_output=after_output,
1024
            )
1025
            outputs.append(output)
1✔
1026
            change_type = change_type.for_child(output.change_type)
1✔
1027
        return NodeOutputs(scope=scope, change_type=change_type, outputs=outputs)
1✔
1028

1029
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1030
        root_scope = Scope()
1✔
1031
        # TODO: visit other child types
1032

1033
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1034
            root_scope, MappingsKey, before_template, after_template
1035
        )
1036
        mappings = self._visit_mappings(
1✔
1037
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1038
        )
1039

1040
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1041
            root_scope, ParametersKey, before_template, after_template
1042
        )
1043
        parameters = self._visit_parameters(
1✔
1044
            scope=parameters_scope,
1045
            before_parameters=before_parameters,
1046
            after_parameters=after_parameters,
1047
        )
1048

1049
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1050
            root_scope, ConditionsKey, before_template, after_template
1051
        )
1052
        conditions = self._visit_conditions(
1✔
1053
            scope=conditions_scope,
1054
            before_conditions=before_conditions,
1055
            after_conditions=after_conditions,
1056
        )
1057

1058
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1059
            root_scope, ResourcesKey, before_template, after_template
1060
        )
1061
        resources = self._visit_resources(
1✔
1062
            scope=resources_scope,
1063
            before_resources=before_resources,
1064
            after_resources=after_resources,
1065
        )
1066

1067
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1068
            root_scope, OutputsKey, before_template, after_template
1069
        )
1070
        outputs = self._visit_outputs(
1✔
1071
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1072
        )
1073

1074
        # TODO: compute the change_type of the template properly.
1075
        return NodeTemplate(
1✔
1076
            scope=root_scope,
1077
            change_type=resources.change_type,
1078
            mappings=mappings,
1079
            parameters=parameters,
1080
            conditions=conditions,
1081
            resources=resources,
1082
            outputs=outputs,
1083
        )
1084

1085
    def _retrieve_condition_if_exists(self, condition_name: str) -> Optional[NodeCondition]:
1✔
1086
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1087
            Scope(), ConditionsKey, self._before_template, self._after_template
1088
        )
1089
        before_conditions = before_conditions or dict()
1✔
1090
        after_conditions = after_conditions or dict()
1✔
1091
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
1092
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1093
                conditions_scope, condition_name, before_conditions, after_conditions
1094
            )
1095
            node_condition = self._visit_condition(
1✔
1096
                conditions_scope,
1097
                condition_name,
1098
                before_condition=before_condition,
1099
                after_condition=after_condition,
1100
            )
1101
            return node_condition
1✔
1102
        return None
1✔
1103

1104
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Optional[NodeParameter]:
1✔
1105
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1106
            Scope(), ParametersKey, self._before_template, self._after_template
1107
        )
1108
        before_parameters = before_parameters or dict()
1✔
1109
        after_parameters = after_parameters or dict()
1✔
1110
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1111
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1112
                parameters_scope, parameter_name, before_parameters, after_parameters
1113
            )
1114
            node_parameter = self._visit_parameter(
1✔
1115
                parameters_scope,
1116
                parameter_name,
1117
                before_parameter=before_parameter,
1118
                after_parameter=after_parameter,
1119
            )
1120
            return node_parameter
1✔
1121
        return None
1✔
1122

1123
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1124
        # TODO: add caching mechanism, and raise appropriate error if missing.
1125
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1126
            Scope(), MappingsKey, self._before_template, self._after_template
1127
        )
1128
        before_mappings = before_mappings or dict()
1✔
1129
        after_mappings = after_mappings or dict()
1✔
1130
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1131
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1132
                scope_mappings, mapping_name, before_mappings, after_mappings
1133
            )
1134
            node_mapping = self._visit_mapping(
1✔
1135
                scope_mapping, mapping_name, before_mapping, after_mapping
1136
            )
1137
            return node_mapping
1✔
UNCOV
1138
        raise RuntimeError()
×
1139

1140
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1141
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1142
            Scope(),
1143
            ResourcesKey,
1144
            self._before_template,
1145
            self._after_template,
1146
        )
1147
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1148
            resources_scope, resource_name, before_resources, after_resources
1149
        )
1150
        return self._visit_resource(
1✔
1151
            scope=resource_scope,
1152
            resource_name=resource_name,
1153
            before_resource=before_resource,
1154
            after_resource=after_resource,
1155
        )
1156

1157
    @staticmethod
1✔
1158
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1159
        # TODO: are intrinsic functions soft keywords?
1160
        return function_name in INTRINSIC_FUNCTIONS
1✔
1161

1162
    @staticmethod
1✔
1163
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1164
        results = list()
1✔
1165
        for obj in objects:
1✔
1166
            # TODO: raise errors if not dict
1167
            if not isinstance(obj, NothingType):
1✔
1168
                results.append(obj.get(key, Nothing))
1✔
1169
            else:
1170
                results.append(obj)
1✔
1171
        new_scope = scope.open_scope(name=key)
1✔
1172
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1173

1174
    @staticmethod
1✔
1175
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1176
        key_set: set[str] = set()
1✔
1177
        for obj in objects:
1✔
1178
            # TODO: raise errors if not dict
1179
            if isinstance(obj, dict):
1✔
1180
                key_set.update(obj.keys())
1✔
1181
        # The keys list is sorted to increase reproducibility of the
1182
        # update graph build process or downstream logics.
1183
        keys = sorted(key_set)
1✔
1184
        return keys
1✔
1185

1186
    @staticmethod
1✔
1187
    def _change_type_for_parent_of(change_types: list[ChangeType]) -> ChangeType:
1✔
1188
        parent_change_type = ChangeType.UNCHANGED
1✔
1189
        for child_change_type in change_types:
1✔
1190
            parent_change_type = parent_change_type.for_child(child_change_type)
1✔
1191
            if parent_change_type == ChangeType.MODIFIED:
1✔
1192
                break
1✔
1193
        return parent_change_type
1✔
1194

1195
    @staticmethod
1✔
1196
    def _name_if_intrinsic_function(value: Maybe[Any]) -> Optional[str]:
1✔
1197
        if isinstance(value, dict):
1✔
1198
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1199
            if len(keys) == 1:
1✔
1200
                key_name = keys[0]
1✔
1201
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1202
                    return key_name
1✔
1203
        return None
1✔
1204

1205
    @staticmethod
1✔
1206
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1207
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1208
        if maybe_intrinsic_function_name is not None:
1✔
1209
            return maybe_intrinsic_function_name
1✔
1210
        return type(value).__name__
1✔
1211

1212
    @staticmethod
1✔
1213
    def _is_terminal(value: Any) -> bool:
1✔
1214
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1215

1216
    @staticmethod
1✔
1217
    def _is_object(value: Any) -> bool:
1✔
1218
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1219

1220
    @staticmethod
1✔
1221
    def _is_array(value: Any) -> bool:
1✔
1222
        return isinstance(value, list)
1✔
1223

1224
    @staticmethod
1✔
1225
    def _is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1226
        return isinstance(before, NothingType) and not isinstance(after, NothingType)
1✔
1227

1228
    @staticmethod
1✔
1229
    def _is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1230
        return not isinstance(before, NothingType) and isinstance(after, NothingType)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc