• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a63921d2-72eb-4883-bbb8-968104512d0e

02 Apr 2025 01:40PM UTC coverage: 86.807% (-0.08%) from 86.888%
a63921d2-72eb-4883-bbb8-968104512d0e

push

circleci

web-flow
CFn: WIP POC v2 executor (#12396)

Co-authored-by: Marco Edoardo Palma <64580864+MEPalma@users.noreply.github.com>

36 of 112 new or added lines in 6 files covered. (32.14%)

72 existing lines in 7 files now uncovered.

63519 of 73173 relevant lines covered (86.81%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.09
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from itertools import zip_longest
1✔
6
from typing import Any, Final, Generator, Optional, Union, cast
1✔
7

8
from typing_extensions import TypeVar
1✔
9

10
from localstack.aws.api.cloudformation import ChangeAction
1✔
11
from localstack.utils.strings import camel_to_snake_case
1✔
12

13
T = TypeVar("T")
1✔
14

15

16
class NothingType:
1✔
17
    """A sentinel that denotes 'no value' (distinct from None)."""
18

19
    _singleton = None
1✔
20
    __slots__ = ()
1✔
21

22
    def __new__(cls):
1✔
23
        if cls._singleton is None:
1✔
24
            cls._singleton = super().__new__(cls)
1✔
25
        return cls._singleton
1✔
26

27
    def __str__(self):
1✔
28
        return repr(self)
×
29

30
    def __repr__(self) -> str:
31
        return "Nothing"
32

33
    def __bool__(self):
1✔
34
        return False
1✔
35

36
    def __iter__(self):
1✔
37
        return iter(())
1✔
38

39

40
Maybe = Union[T, NothingType]
1✔
41
Nothing = NothingType()
1✔
42

43

44
class Scope(str):
1✔
45
    _ROOT_SCOPE: Final[str] = str()
1✔
46
    _SEPARATOR: Final[str] = "/"
1✔
47

48
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
49
        return cast(Scope, super().__new__(cls, scope))
1✔
50

51
    def open_scope(self, name: Scope | str) -> Scope:
1✔
52
        return Scope(self._SEPARATOR.join([self, name]))
1✔
53

54
    def open_index(self, index: int) -> Scope:
1✔
55
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
56

57
    def unwrap(self) -> list[str]:
1✔
58
        return self.split(self._SEPARATOR)
×
59

60

61
class ChangeType(enum.Enum):
1✔
62
    UNCHANGED = "Unchanged"
1✔
63
    CREATED = "Created"
1✔
64
    MODIFIED = "Modified"
1✔
65
    REMOVED = "Removed"
1✔
66

67
    def __str__(self):
1✔
68
        return self.value
×
69

70
    def to_action(self) -> ChangeAction | None:
1✔
NEW
71
        match self:
×
NEW
72
            case self.CREATED:
×
NEW
73
                return ChangeAction.Add
×
NEW
74
            case self.MODIFIED:
×
NEW
75
                return ChangeAction.Modify
×
NEW
76
            case self.REMOVED:
×
NEW
77
                return ChangeAction.Remove
×
78

79
    def for_child(self, child_change_type: ChangeType) -> ChangeType:
1✔
80
        if child_change_type == self:
1✔
81
            return self
1✔
82
        elif self == ChangeType.UNCHANGED:
1✔
83
            return child_change_type
1✔
84
        else:
85
            return ChangeType.MODIFIED
1✔
86

87

88
class ChangeSetEntity(abc.ABC):
1✔
89
    scope: Final[Scope]
1✔
90
    change_type: Final[ChangeType]
1✔
91

92
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
93
        self.scope = scope
1✔
94
        self.change_type = change_type
1✔
95

96
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
97
        for child in self.__dict__.values():
1✔
98
            yield from self._get_children_in(child)
1✔
99

100
    @staticmethod
1✔
101
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
102
        # TODO: could avoid the inductive logic here, and check for loops?
103
        if isinstance(obj, ChangeSetEntity):
1✔
104
            yield obj
1✔
105
        elif isinstance(obj, list):
1✔
106
            for item in obj:
1✔
107
                yield from ChangeSetEntity._get_children_in(item)
1✔
108
        elif isinstance(obj, dict):
1✔
109
            for item in obj.values():
×
110
                yield from ChangeSetEntity._get_children_in(item)
×
111

112
    def __str__(self):
1✔
113
        return f"({self.__class__.__name__}| {vars(self)}"
×
114

115
    def __repr__(self):
116
        return str(self)
117

118

119
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
120

121

122
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
123

124

125
class NodeTemplate(ChangeSetNode):
1✔
126
    mappings: Final[NodeMappings]
1✔
127
    parameters: Final[NodeParameters]
1✔
128
    conditions: Final[NodeConditions]
1✔
129
    resources: Final[NodeResources]
1✔
130
    outputs: Final[NodeOutputs]
1✔
131

132
    def __init__(
1✔
133
        self,
134
        scope: Scope,
135
        change_type: ChangeType,
136
        mappings: NodeMappings,
137
        parameters: NodeParameters,
138
        conditions: NodeConditions,
139
        resources: NodeResources,
140
        outputs: NodeOutputs,
141
    ):
142
        super().__init__(scope=scope, change_type=change_type)
1✔
143
        self.mappings = mappings
1✔
144
        self.parameters = parameters
1✔
145
        self.conditions = conditions
1✔
146
        self.resources = resources
1✔
147
        self.outputs = outputs
1✔
148

149

150
class NodeDivergence(ChangeSetNode):
1✔
151
    value: Final[ChangeSetEntity]
1✔
152
    divergence: Final[ChangeSetEntity]
1✔
153

154
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
155
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
156
        self.value = value
1✔
157
        self.divergence = divergence
1✔
158

159

160
class NodeParameter(ChangeSetNode):
1✔
161
    name: Final[str]
1✔
162
    value: Final[ChangeSetEntity]
1✔
163
    dynamic_value: Final[ChangeSetEntity]
1✔
164

165
    def __init__(
1✔
166
        self,
167
        scope: Scope,
168
        change_type: ChangeType,
169
        name: str,
170
        value: ChangeSetEntity,
171
        dynamic_value: ChangeSetEntity,
172
    ):
173
        super().__init__(scope=scope, change_type=change_type)
1✔
174
        self.name = name
1✔
175
        self.value = value
1✔
176
        self.dynamic_value = dynamic_value
1✔
177

178

179
class NodeParameters(ChangeSetNode):
1✔
180
    parameters: Final[list[NodeParameter]]
1✔
181

182
    def __init__(self, scope: Scope, change_type: ChangeType, parameters: list[NodeParameter]):
1✔
183
        super().__init__(scope=scope, change_type=change_type)
1✔
184
        self.parameters = parameters
1✔
185

186

187
class NodeMapping(ChangeSetNode):
1✔
188
    name: Final[str]
1✔
189
    bindings: Final[NodeObject]
1✔
190

191
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, bindings: NodeObject):
1✔
192
        super().__init__(scope=scope, change_type=change_type)
1✔
193
        self.name = name
1✔
194
        self.bindings = bindings
1✔
195

196

197
class NodeMappings(ChangeSetNode):
1✔
198
    mappings: Final[list[NodeMapping]]
1✔
199

200
    def __init__(self, scope: Scope, change_type: ChangeType, mappings: list[NodeMapping]):
1✔
201
        super().__init__(scope=scope, change_type=change_type)
1✔
202
        self.mappings = mappings
1✔
203

204

205
class NodeOutput(ChangeSetNode):
1✔
206
    name: Final[str]
1✔
207
    value: Final[ChangeSetEntity]
1✔
208
    export: Final[Optional[ChangeSetEntity]]
1✔
209
    condition_reference: Final[Optional[TerminalValue]]
1✔
210

211
    def __init__(
1✔
212
        self,
213
        scope: Scope,
214
        change_type: ChangeType,
215
        name: str,
216
        value: ChangeSetEntity,
217
        export: Optional[ChangeSetEntity],
218
        conditional_reference: Optional[TerminalValue],
219
    ):
220
        super().__init__(scope=scope, change_type=change_type)
1✔
221
        self.name = name
1✔
222
        self.value = value
1✔
223
        self.export = export
1✔
224
        self.condition_reference = conditional_reference
1✔
225

226

227
class NodeOutputs(ChangeSetNode):
1✔
228
    outputs: Final[list[NodeOutput]]
1✔
229

230
    def __init__(self, scope: Scope, change_type: ChangeType, outputs: list[NodeOutput]):
1✔
231
        super().__init__(scope=scope, change_type=change_type)
1✔
232
        self.outputs = outputs
1✔
233

234

235
class NodeCondition(ChangeSetNode):
1✔
236
    name: Final[str]
1✔
237
    body: Final[ChangeSetEntity]
1✔
238

239
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, body: ChangeSetEntity):
1✔
240
        super().__init__(scope=scope, change_type=change_type)
1✔
241
        self.name = name
1✔
242
        self.body = body
1✔
243

244

245
class NodeConditions(ChangeSetNode):
1✔
246
    conditions: Final[list[NodeCondition]]
1✔
247

248
    def __init__(self, scope: Scope, change_type: ChangeType, conditions: list[NodeCondition]):
1✔
249
        super().__init__(scope=scope, change_type=change_type)
1✔
250
        self.conditions = conditions
1✔
251

252

253
class NodeResources(ChangeSetNode):
1✔
254
    resources: Final[list[NodeResource]]
1✔
255

256
    def __init__(self, scope: Scope, change_type: ChangeType, resources: list[NodeResource]):
1✔
257
        super().__init__(scope=scope, change_type=change_type)
1✔
258
        self.resources = resources
1✔
259

260

261
class NodeResource(ChangeSetNode):
1✔
262
    name: Final[str]
1✔
263
    type_: Final[ChangeSetTerminal]
1✔
264
    condition_reference: Final[Optional[TerminalValue]]
1✔
265
    properties: Final[NodeProperties]
1✔
266

267
    def __init__(
1✔
268
        self,
269
        scope: Scope,
270
        change_type: ChangeType,
271
        name: str,
272
        type_: ChangeSetTerminal,
273
        condition_reference: TerminalValue,
274
        properties: NodeProperties,
275
    ):
276
        super().__init__(scope=scope, change_type=change_type)
1✔
277
        self.name = name
1✔
278
        self.type_ = type_
1✔
279
        self.condition_reference = condition_reference
1✔
280
        self.properties = properties
1✔
281

282

283
class NodeProperties(ChangeSetNode):
1✔
284
    properties: Final[list[NodeProperty]]
1✔
285

286
    def __init__(self, scope: Scope, change_type: ChangeType, properties: list[NodeProperty]):
1✔
287
        super().__init__(scope=scope, change_type=change_type)
1✔
288
        self.properties = properties
1✔
289

290

291
class NodeProperty(ChangeSetNode):
1✔
292
    name: Final[str]
1✔
293
    value: Final[ChangeSetEntity]
1✔
294

295
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, value: ChangeSetEntity):
1✔
296
        super().__init__(scope=scope, change_type=change_type)
1✔
297
        self.name = name
1✔
298
        self.value = value
1✔
299

300

301
class NodeIntrinsicFunction(ChangeSetNode):
1✔
302
    intrinsic_function: Final[str]
1✔
303
    arguments: Final[ChangeSetEntity]
1✔
304

305
    def __init__(
1✔
306
        self,
307
        scope: Scope,
308
        change_type: ChangeType,
309
        intrinsic_function: str,
310
        arguments: ChangeSetEntity,
311
    ):
312
        super().__init__(scope=scope, change_type=change_type)
1✔
313
        self.intrinsic_function = intrinsic_function
1✔
314
        self.arguments = arguments
1✔
315

316

317
class NodeObject(ChangeSetNode):
1✔
318
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
319

320
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
321
        super().__init__(scope=scope, change_type=change_type)
1✔
322
        self.bindings = bindings
1✔
323

324

325
class NodeArray(ChangeSetNode):
1✔
326
    array: Final[list[ChangeSetEntity]]
1✔
327

328
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
329
        super().__init__(scope=scope, change_type=change_type)
1✔
330
        self.array = array
1✔
331

332

333
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
334
    value: Final[Any]
1✔
335

336
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
337
        super().__init__(scope=scope, change_type=change_type)
1✔
338
        self.value = value
1✔
339

340

341
class TerminalValueModified(TerminalValue):
1✔
342
    modified_value: Final[Any]
1✔
343

344
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
345
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
346
        self.modified_value = modified_value
1✔
347

348

349
class TerminalValueCreated(TerminalValue):
1✔
350
    def __init__(self, scope: Scope, value: Any):
1✔
351
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
352

353

354
class TerminalValueRemoved(TerminalValue):
1✔
355
    def __init__(self, scope: Scope, value: Any):
1✔
356
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
357

358

359
class TerminalValueUnchanged(TerminalValue):
1✔
360
    def __init__(self, scope: Scope, value: Any):
1✔
361
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
362

363

364
TypeKey: Final[str] = "Type"
1✔
365
ConditionKey: Final[str] = "Condition"
1✔
366
ConditionsKey: Final[str] = "Conditions"
1✔
367
MappingsKey: Final[str] = "Mappings"
1✔
368
ResourcesKey: Final[str] = "Resources"
1✔
369
PropertiesKey: Final[str] = "Properties"
1✔
370
ParametersKey: Final[str] = "Parameters"
1✔
371
ValueKey: Final[str] = "Value"
1✔
372
ExportKey: Final[str] = "Export"
1✔
373
OutputsKey: Final[str] = "Outputs"
1✔
374
# TODO: expand intrinsic functions set.
375
RefKey: Final[str] = "Ref"
1✔
376
FnIf: Final[str] = "Fn::If"
1✔
377
FnNot: Final[str] = "Fn::Not"
1✔
378
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
379
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
380
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
381
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
382
    RefKey,
383
    FnIf,
384
    FnNot,
385
    FnEqualsKey,
386
    FnGetAttKey,
387
    FnFindInMapKey,
388
}
389

390

391
class ChangeSetModel:
1✔
392
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
393

394
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
395

396
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
397
    #  such as intrinsic functions.
398

399
    _before_template: Final[Maybe[dict]]
1✔
400
    _after_template: Final[Maybe[dict]]
1✔
401
    _before_parameters: Final[Maybe[dict]]
1✔
402
    _after_parameters: Final[Maybe[dict]]
1✔
403
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
404
    _node_template: Final[NodeTemplate]
1✔
405

406
    def __init__(
1✔
407
        self,
408
        before_template: Optional[dict],
409
        after_template: Optional[dict],
410
        before_parameters: Optional[dict],
411
        after_parameters: Optional[dict],
412
    ):
413
        self._before_template = before_template or Nothing
1✔
414
        self._after_template = after_template or Nothing
1✔
415
        self._before_parameters = before_parameters or Nothing
1✔
416
        self._after_parameters = after_parameters or Nothing
1✔
417
        self._visited_scopes = dict()
1✔
418
        self._node_template = self._model(
1✔
419
            before_template=self._before_template, after_template=self._after_template
420
        )
421
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
422

423
    def get_update_model(self) -> NodeTemplate:
1✔
424
        # TODO: rethink naming of this for outer utils
425
        return self._node_template
1✔
426

427
    def _visit_terminal_value(
1✔
428
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
429
    ) -> TerminalValue:
430
        terminal_value = self._visited_scopes.get(scope)
1✔
431
        if isinstance(terminal_value, TerminalValue):
1✔
432
            return terminal_value
×
433
        if self._is_created(before=before_value, after=after_value):
1✔
434
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
435
        elif self._is_removed(before=before_value, after=after_value):
1✔
436
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
437
        elif before_value == after_value:
1✔
438
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
439
        else:
440
            terminal_value = TerminalValueModified(
1✔
441
                scope=scope, value=before_value, modified_value=after_value
442
            )
443
        self._visited_scopes[scope] = terminal_value
1✔
444
        return terminal_value
1✔
445

446
    def _visit_intrinsic_function(
1✔
447
        self,
448
        scope: Scope,
449
        intrinsic_function: str,
450
        before_arguments: Maybe[Any],
451
        after_arguments: Maybe[Any],
452
    ) -> NodeIntrinsicFunction:
453
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
454
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
455
            return node_intrinsic_function
×
456
        arguments = self._visit_value(
1✔
457
            scope=scope, before_value=before_arguments, after_value=after_arguments
458
        )
459
        if self._is_created(before=before_arguments, after=after_arguments):
1✔
460
            change_type = ChangeType.CREATED
1✔
461
        elif self._is_removed(before=before_arguments, after=after_arguments):
1✔
462
            change_type = ChangeType.REMOVED
1✔
463
        else:
464
            function_name = intrinsic_function.replace("::", "_")
1✔
465
            function_name = camel_to_snake_case(function_name)
1✔
466
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
467
            if hasattr(self, resolve_function_name):
1✔
468
                resolve_function = getattr(self, resolve_function_name)
1✔
469
                change_type = resolve_function(arguments)
1✔
470
            else:
471
                change_type = arguments.change_type
1✔
472
        node_intrinsic_function = NodeIntrinsicFunction(
1✔
473
            scope=scope,
474
            change_type=change_type,
475
            intrinsic_function=intrinsic_function,
476
            arguments=arguments,
477
        )
478
        self._visited_scopes[scope] = node_intrinsic_function
1✔
479
        return node_intrinsic_function
1✔
480

481
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
482
        # TODO: add support for nested intrinsic functions.
483
        # TODO: validate arguments structure and type.
484
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
485

486
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
487
            raise RuntimeError()
×
488
        logical_name_of_resource_entity = arguments.array[0]
1✔
489
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
490
            raise RuntimeError()
×
491
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
492
        if not isinstance(logical_name_of_resource, str):
1✔
493
            raise RuntimeError()
×
494
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
495
            resource_name=logical_name_of_resource
496
        )
497

498
        node_property_attribute_name = arguments.array[1]
1✔
499
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
500
            raise RuntimeError()
×
501
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
502
            attribute_name = node_property_attribute_name.modified_value
×
503
        else:
504
            attribute_name = node_property_attribute_name.value
1✔
505

506
        # TODO: this is another use case for which properties should be referenced by name
507
        for node_property in node_resource.properties.properties:
1✔
508
            if node_property.name == attribute_name:
1✔
509
                return node_property.change_type
1✔
510

511
        return ChangeType.UNCHANGED
×
512

513
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
514
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
515
            return arguments.change_type
×
516
        # TODO: add support for nested functions, here we assume the argument is a logicalID.
517
        if not isinstance(arguments, TerminalValue):
1✔
518
            return arguments.change_type
×
519

520
        logical_id = arguments.value
1✔
521

522
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
523
        if isinstance(node_condition, NodeCondition):
1✔
524
            return node_condition.change_type
×
525

526
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
527
        if isinstance(node_parameter, NodeParameter):
1✔
528
            return node_parameter.dynamic_value.change_type
1✔
529

530
        # TODO: this should check the replacement flag for a resource update.
531
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
532
        return node_resource.change_type
1✔
533

534
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
535
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
536
            return arguments.change_type
×
537
        # TODO: validate arguments structure and type.
538
        # TODO: add support for nested functions, here we assume the arguments are string literals.
539

540
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
541
            raise RuntimeError()
×
542
        argument_mapping_name = arguments.array[0]
1✔
543
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
544
            raise NotImplementedError()
545
        argument_top_level_key = arguments.array[1]
1✔
546
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
547
            raise NotImplementedError()
548
        argument_second_level_key = arguments.array[2]
1✔
549
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
550
            raise NotImplementedError()
551
        mapping_name = argument_mapping_name.value
1✔
552
        top_level_key = argument_top_level_key.value
1✔
553
        second_level_key = argument_second_level_key.value
1✔
554

555
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
556
        # TODO: a lookup would be beneficial in this scenario too;
557
        #  consider implications downstream and for replication.
558
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
559
        if not isinstance(top_level_object, NodeObject):
1✔
560
            raise RuntimeError()
×
561
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
562
        return target_map_value.change_type
1✔
563

564
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
565
        # TODO: validate arguments structure and type.
566
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
567
            raise RuntimeError()
×
568
        logical_name_of_condition_entity = arguments.array[0]
1✔
569
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
1✔
570
            raise RuntimeError()
×
571
        logical_name_of_condition: str = logical_name_of_condition_entity.value
1✔
572
        if not isinstance(logical_name_of_condition, str):
1✔
573
            raise RuntimeError()
×
574

575
        node_condition = self._retrieve_condition_if_exists(
1✔
576
            condition_name=logical_name_of_condition
577
        )
578
        if not isinstance(node_condition, NodeCondition):
1✔
579
            raise RuntimeError()
×
580
        change_types = [node_condition.change_type, *arguments.array[1:]]
1✔
581
        change_type = self._change_type_for_parent_of(change_types=change_types)
1✔
582
        return change_type
1✔
583

584
    def _visit_array(
1✔
585
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
586
    ) -> NodeArray:
587
        change_type = ChangeType.UNCHANGED
1✔
588
        array: list[ChangeSetEntity] = list()
1✔
589
        for index, (before_value, after_value) in enumerate(
1✔
590
            zip_longest(before_array, after_array, fillvalue=Nothing)
591
        ):
592
            # TODO: should extract this scoping logic.
593
            value_scope = scope.open_index(index=index)
1✔
594
            value = self._visit_value(
1✔
595
                scope=value_scope, before_value=before_value, after_value=after_value
596
            )
597
            array.append(value)
1✔
598
            if value.change_type != ChangeType.UNCHANGED:
1✔
599
                change_type = ChangeType.MODIFIED
1✔
600
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
601

602
    def _visit_object(
1✔
603
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
604
    ) -> NodeObject:
605
        node_object = self._visited_scopes.get(scope)
1✔
606
        if isinstance(node_object, NodeObject):
1✔
607
            return node_object
×
608

609
        change_type = ChangeType.UNCHANGED
1✔
610
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
611
        bindings: dict[str, ChangeSetEntity] = dict()
1✔
612
        for binding_name in binding_names:
1✔
613
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
614
                scope, binding_name, before_object, after_object
615
            )
616
            value = self._visit_value(
1✔
617
                scope=binding_scope, before_value=before_value, after_value=after_value
618
            )
619
            bindings[binding_name] = value
1✔
620
            change_type = change_type.for_child(value.change_type)
1✔
621
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
622
        self._visited_scopes[scope] = node_object
1✔
623
        return node_object
1✔
624

625
    def _visit_divergence(
1✔
626
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
627
    ) -> NodeDivergence:
628
        scope_value = scope.open_scope("value")
1✔
629
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
630
        scope_divergence = scope.open_scope("divergence")
1✔
631
        divergence = self._visit_value(
1✔
632
            scope=scope_divergence, before_value=Nothing, after_value=after_value
633
        )
634
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
635

636
    def _visit_value(
1✔
637
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
638
    ) -> ChangeSetEntity:
639
        value = self._visited_scopes.get(scope)
1✔
640
        if isinstance(value, ChangeSetEntity):
1✔
641
            return value
1✔
642

643
        before_type_name = self._type_name_of(before_value)
1✔
644
        after_type_name = self._type_name_of(after_value)
1✔
645
        unset = object()
1✔
646
        if before_type_name == after_type_name:
1✔
647
            dominant_value = before_value
1✔
648
        elif self._is_created(before=before_value, after=after_value):
1✔
649
            dominant_value = after_value
1✔
650
        elif self._is_removed(before=before_value, after=after_value):
1✔
651
            dominant_value = before_value
1✔
652
        else:
653
            dominant_value = unset
1✔
654
        if dominant_value is not unset:
1✔
655
            dominant_type_name = self._type_name_of(dominant_value)
1✔
656
            if self._is_terminal(value=dominant_value):
1✔
657
                value = self._visit_terminal_value(
1✔
658
                    scope=scope, before_value=before_value, after_value=after_value
659
                )
660
            elif self._is_object(value=dominant_value):
1✔
661
                value = self._visit_object(
1✔
662
                    scope=scope, before_object=before_value, after_object=after_value
663
                )
664
            elif self._is_array(value=dominant_value):
1✔
665
                value = self._visit_array(
1✔
666
                    scope=scope, before_array=before_value, after_array=after_value
667
                )
668
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
669
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
670
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
671
                )
672
                value = self._visit_intrinsic_function(
1✔
673
                    scope=scope,
674
                    intrinsic_function=dominant_type_name,
675
                    before_arguments=before_arguments,
676
                    after_arguments=after_arguments,
677
                )
678
            else:
679
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
680
        # Case: type divergence.
681
        else:
682
            value = self._visit_divergence(
1✔
683
                scope=scope, before_value=before_value, after_value=after_value
684
            )
685
        self._visited_scopes[scope] = value
1✔
686
        return value
1✔
687

688
    def _visit_property(
1✔
689
        self,
690
        scope: Scope,
691
        property_name: str,
692
        before_property: Maybe[Any],
693
        after_property: Maybe[Any],
694
    ) -> NodeProperty:
695
        node_property = self._visited_scopes.get(scope)
1✔
696
        if isinstance(node_property, NodeProperty):
1✔
697
            return node_property
×
698

699
        value = self._visit_value(
1✔
700
            scope=scope, before_value=before_property, after_value=after_property
701
        )
702
        if self._is_created(before=before_property, after=after_property):
1✔
703
            node_property = NodeProperty(
1✔
704
                scope=scope,
705
                change_type=ChangeType.CREATED,
706
                name=property_name,
707
                value=value,
708
            )
709
        elif self._is_removed(before=before_property, after=after_property):
1✔
710
            node_property = NodeProperty(
1✔
711
                scope=scope,
712
                change_type=ChangeType.REMOVED,
713
                name=property_name,
714
                value=value,
715
            )
716
        else:
717
            node_property = NodeProperty(
1✔
718
                scope=scope, change_type=value.change_type, name=property_name, value=value
719
            )
720
        self._visited_scopes[scope] = node_property
1✔
721
        return node_property
1✔
722

723
    def _visit_properties(
1✔
724
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
725
    ) -> NodeProperties:
726
        node_properties = self._visited_scopes.get(scope)
1✔
727
        if isinstance(node_properties, NodeProperties):
1✔
728
            return node_properties
×
729
        # TODO: double check we are sure not to have this be a NodeObject
730
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
731
        properties: list[NodeProperty] = list()
1✔
732
        change_type = ChangeType.UNCHANGED
1✔
733
        for property_name in property_names:
1✔
734
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
735
                scope, property_name, before_properties, after_properties
736
            )
737
            property_ = self._visit_property(
1✔
738
                scope=property_scope,
739
                property_name=property_name,
740
                before_property=before_property,
741
                after_property=after_property,
742
            )
743
            properties.append(property_)
1✔
744
            change_type = change_type.for_child(property_.change_type)
1✔
745
        node_properties = NodeProperties(
1✔
746
            scope=scope, change_type=change_type, properties=properties
747
        )
748
        self._visited_scopes[scope] = node_properties
1✔
749
        return node_properties
1✔
750

751
    def _visit_resource(
1✔
752
        self,
753
        scope: Scope,
754
        resource_name: str,
755
        before_resource: Maybe[dict],
756
        after_resource: Maybe[dict],
757
    ) -> NodeResource:
758
        node_resource = self._visited_scopes.get(scope)
1✔
759
        if isinstance(node_resource, NodeResource):
1✔
760
            return node_resource
1✔
761

762
        if self._is_created(before=before_resource, after=after_resource):
1✔
763
            change_type = ChangeType.CREATED
1✔
764
        elif self._is_removed(before=before_resource, after=after_resource):
1✔
765
            change_type = ChangeType.REMOVED
1✔
766
        else:
767
            change_type = ChangeType.UNCHANGED
1✔
768

769
        # TODO: investigate behaviour with type changes, for now this is filler code.
770
        _, type_str = self._safe_access_in(scope, TypeKey, after_resource)
1✔
771

772
        condition_reference = None
1✔
773
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
774
            scope, ConditionKey, before_resource, after_resource
775
        )
776
        # TODO: condition references should be resolved for the condition's change_type?
777
        if before_condition or after_condition:
1✔
778
            condition_reference = self._visit_terminal_value(
1✔
779
                scope_condition, before_condition, after_condition
780
            )
781

782
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
783
            scope, PropertiesKey, before_resource, after_resource
784
        )
785
        properties = self._visit_properties(
1✔
786
            scope=scope_properties,
787
            before_properties=before_properties,
788
            after_properties=after_properties,
789
        )
790
        change_type = change_type.for_child(properties.change_type)
1✔
791
        node_resource = NodeResource(
1✔
792
            scope=scope,
793
            change_type=change_type,
794
            name=resource_name,
795
            type_=TerminalValueUnchanged(scope=scope, value=type_str),
796
            condition_reference=condition_reference,
797
            properties=properties,
798
        )
799
        self._visited_scopes[scope] = node_resource
1✔
800
        return node_resource
1✔
801

802
    def _visit_resources(
1✔
803
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
804
    ) -> NodeResources:
805
        # TODO: investigate type changes behavior.
806
        change_type = ChangeType.UNCHANGED
1✔
807
        resources: list[NodeResource] = list()
1✔
808
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
809
        for resource_name in resource_names:
1✔
810
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
811
                scope, resource_name, before_resources, after_resources
812
            )
813
            resource = self._visit_resource(
1✔
814
                scope=resource_scope,
815
                resource_name=resource_name,
816
                before_resource=before_resource,
817
                after_resource=after_resource,
818
            )
819
            resources.append(resource)
1✔
820
            change_type = change_type.for_child(resource.change_type)
1✔
821
        return NodeResources(scope=scope, change_type=change_type, resources=resources)
1✔
822

823
    def _visit_mapping(
1✔
824
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
825
    ) -> NodeMapping:
826
        bindings = self._visit_object(
1✔
827
            scope=scope, before_object=before_mapping, after_object=after_mapping
828
        )
829
        return NodeMapping(
1✔
830
            scope=scope, change_type=bindings.change_type, name=name, bindings=bindings
831
        )
832

833
    def _visit_mappings(
1✔
834
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
835
    ) -> NodeMappings:
836
        change_type = ChangeType.UNCHANGED
1✔
837
        mappings: list[NodeMapping] = list()
1✔
838
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
839
        for mapping_name in mapping_names:
1✔
840
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
841
                scope, mapping_name, before_mappings, after_mappings
842
            )
843
            mapping = self._visit_mapping(
1✔
844
                scope=scope,
845
                name=mapping_name,
846
                before_mapping=before_mapping,
847
                after_mapping=after_mapping,
848
            )
849
            mappings.append(mapping)
1✔
850
            change_type = change_type.for_child(mapping.change_type)
1✔
851
        return NodeMappings(scope=scope, change_type=change_type, mappings=mappings)
1✔
852

853
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
854
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
855
        scope_parameter, (before_parameter, after_parameter) = self._safe_access_in(
1✔
856
            scope, parameter_name, self._before_parameters, self._after_parameters
857
        )
858
        parameter = self._visit_value(
1✔
859
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
860
        )
861
        return parameter
1✔
862

863
    def _visit_parameter(
1✔
864
        self,
865
        scope: Scope,
866
        parameter_name: str,
867
        before_parameter: Maybe[dict],
868
        after_parameter: Maybe[dict],
869
    ) -> NodeParameter:
870
        node_parameter = self._visited_scopes.get(scope)
1✔
871
        if isinstance(node_parameter, NodeParameter):
1✔
872
            return node_parameter
×
873
        # TODO: add logic to compute defaults already in the graph building process?
874
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
875
        if self._is_created(before=before_parameter, after=after_parameter):
1✔
876
            node_parameter = NodeParameter(
×
877
                scope=scope,
878
                change_type=ChangeType.CREATED,
879
                name=parameter_name,
880
                value=TerminalValueCreated(scope=scope, value=after_parameter),
881
                dynamic_value=dynamic_value,
882
            )
883
        elif self._is_removed(before=before_parameter, after=after_parameter):
1✔
884
            node_parameter = NodeParameter(
×
885
                scope=scope,
886
                change_type=ChangeType.REMOVED,
887
                name=parameter_name,
888
                value=TerminalValueRemoved(scope=scope, value=before_parameter),
889
                dynamic_value=dynamic_value,
890
            )
891
        else:
892
            value = self._visit_value(
1✔
893
                scope=scope, before_value=before_parameter, after_value=after_parameter
894
            )
895
            change_type = self._change_type_for_parent_of(
1✔
896
                change_types=[dynamic_value.change_type, value.change_type]
897
            )
898
            node_parameter = NodeParameter(
1✔
899
                scope=scope,
900
                change_type=change_type,
901
                name=parameter_name,
902
                value=value,
903
                dynamic_value=dynamic_value,
904
            )
905
        self._visited_scopes[scope] = node_parameter
1✔
906
        return node_parameter
1✔
907

908
    def _visit_parameters(
1✔
909
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
910
    ) -> NodeParameters:
911
        node_parameters = self._visited_scopes.get(scope)
1✔
912
        if isinstance(node_parameters, NodeParameters):
1✔
913
            return node_parameters
×
914
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
915
        parameters: list[NodeParameter] = list()
1✔
916
        change_type = ChangeType.UNCHANGED
1✔
917
        for parameter_name in parameter_names:
1✔
918
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
919
                scope, parameter_name, before_parameters, after_parameters
920
            )
921
            parameter = self._visit_parameter(
1✔
922
                scope=parameter_scope,
923
                parameter_name=parameter_name,
924
                before_parameter=before_parameter,
925
                after_parameter=after_parameter,
926
            )
927
            parameters.append(parameter)
1✔
928
            change_type = change_type.for_child(parameter.change_type)
1✔
929
        node_parameters = NodeParameters(
1✔
930
            scope=scope, change_type=change_type, parameters=parameters
931
        )
932
        self._visited_scopes[scope] = node_parameters
1✔
933
        return node_parameters
1✔
934

935
    def _visit_condition(
1✔
936
        self,
937
        scope: Scope,
938
        condition_name: str,
939
        before_condition: Maybe[dict],
940
        after_condition: Maybe[dict],
941
    ) -> NodeCondition:
942
        node_condition = self._visited_scopes.get(scope)
1✔
943
        if isinstance(node_condition, NodeCondition):
1✔
944
            return node_condition
×
945
        body = self._visit_value(
1✔
946
            scope=scope, before_value=before_condition, after_value=after_condition
947
        )
948
        node_condition = NodeCondition(
1✔
949
            scope=scope, change_type=body.change_type, name=condition_name, body=body
950
        )
951
        self._visited_scopes[scope] = node_condition
1✔
952
        return node_condition
1✔
953

954
    def _visit_conditions(
1✔
955
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
956
    ) -> NodeConditions:
957
        node_conditions = self._visited_scopes.get(scope)
1✔
958
        if isinstance(node_conditions, NodeConditions):
1✔
959
            return node_conditions
×
960
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
961
        conditions: list[NodeCondition] = list()
1✔
962
        change_type = ChangeType.UNCHANGED
1✔
963
        for condition_name in condition_names:
1✔
964
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
965
                scope, condition_name, before_conditions, after_conditions
966
            )
967
            condition = self._visit_condition(
1✔
968
                scope=condition_scope,
969
                condition_name=condition_name,
970
                before_condition=before_condition,
971
                after_condition=after_condition,
972
            )
973
            conditions.append(condition)
1✔
974
            change_type = change_type.for_child(child_change_type=condition.change_type)
1✔
975
        node_conditions = NodeConditions(
1✔
976
            scope=scope, change_type=change_type, conditions=conditions
977
        )
978
        self._visited_scopes[scope] = node_conditions
1✔
979
        return node_conditions
1✔
980

981
    def _visit_output(
1✔
982
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
983
    ) -> NodeOutput:
984
        change_type = ChangeType.UNCHANGED
1✔
985
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
986
            scope, ValueKey, before_output, after_output
987
        )
988
        value = self._visit_value(scope_value, before_value, after_value)
1✔
989
        change_type = change_type.for_child(value.change_type)
1✔
990

991
        export: Optional[ChangeSetEntity] = None
1✔
992
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
993
            scope, ExportKey, before_output, after_output
994
        )
995
        if before_export or after_export:
1✔
996
            export = self._visit_value(scope_export, before_export, after_export)
×
997
            change_type = change_type.for_child(export.change_type)
×
998

999
        # TODO: condition references should be resolved for the condition's change_type?
1000
        condition_reference: Optional[TerminalValue] = None
1✔
1001
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1002
            scope, ConditionKey, before_output, after_output
1003
        )
1004
        if before_condition or after_condition:
1✔
1005
            condition_reference = self._visit_terminal_value(
×
1006
                scope_condition, before_condition, after_condition
1007
            )
1008
            change_type = change_type.for_child(condition_reference.change_type)
×
1009

1010
        return NodeOutput(
1✔
1011
            scope=scope,
1012
            change_type=change_type,
1013
            name=name,
1014
            value=value,
1015
            export=export,
1016
            conditional_reference=condition_reference,
1017
        )
1018

1019
    def _visit_outputs(
1✔
1020
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1021
    ) -> NodeOutputs:
1022
        change_type = ChangeType.UNCHANGED
1✔
1023
        outputs: list[NodeOutput] = list()
1✔
1024
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1025
        for output_name in output_names:
1✔
1026
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1027
                scope, output_name, before_outputs, after_outputs
1028
            )
1029
            output = self._visit_output(
1✔
1030
                scope=scope_output,
1031
                name=output_name,
1032
                before_output=before_output,
1033
                after_output=after_output,
1034
            )
1035
            outputs.append(output)
1✔
1036
            change_type = change_type.for_child(output.change_type)
1✔
1037
        return NodeOutputs(scope=scope, change_type=change_type, outputs=outputs)
1✔
1038

1039
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1040
        root_scope = Scope()
1✔
1041
        # TODO: visit other child types
1042

1043
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1044
            root_scope, MappingsKey, before_template, after_template
1045
        )
1046
        mappings = self._visit_mappings(
1✔
1047
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1048
        )
1049

1050
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1051
            root_scope, ParametersKey, before_template, after_template
1052
        )
1053
        parameters = self._visit_parameters(
1✔
1054
            scope=parameters_scope,
1055
            before_parameters=before_parameters,
1056
            after_parameters=after_parameters,
1057
        )
1058

1059
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1060
            root_scope, ConditionsKey, before_template, after_template
1061
        )
1062
        conditions = self._visit_conditions(
1✔
1063
            scope=conditions_scope,
1064
            before_conditions=before_conditions,
1065
            after_conditions=after_conditions,
1066
        )
1067

1068
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1069
            root_scope, ResourcesKey, before_template, after_template
1070
        )
1071
        resources = self._visit_resources(
1✔
1072
            scope=resources_scope,
1073
            before_resources=before_resources,
1074
            after_resources=after_resources,
1075
        )
1076

1077
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1078
            root_scope, OutputsKey, before_template, after_template
1079
        )
1080
        outputs = self._visit_outputs(
1✔
1081
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1082
        )
1083

1084
        # TODO: compute the change_type of the template properly.
1085
        return NodeTemplate(
1✔
1086
            scope=root_scope,
1087
            change_type=resources.change_type,
1088
            mappings=mappings,
1089
            parameters=parameters,
1090
            conditions=conditions,
1091
            resources=resources,
1092
            outputs=outputs,
1093
        )
1094

1095
    def _retrieve_condition_if_exists(self, condition_name: str) -> Optional[NodeCondition]:
1✔
1096
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1097
            Scope(), ConditionsKey, self._before_template, self._after_template
1098
        )
1099
        before_conditions = before_conditions or dict()
1✔
1100
        after_conditions = after_conditions or dict()
1✔
1101
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
1102
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1103
                conditions_scope, condition_name, before_conditions, after_conditions
1104
            )
1105
            node_condition = self._visit_condition(
1✔
1106
                conditions_scope,
1107
                condition_name,
1108
                before_condition=before_condition,
1109
                after_condition=after_condition,
1110
            )
1111
            return node_condition
1✔
1112
        return None
1✔
1113

1114
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Optional[NodeParameter]:
1✔
1115
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1116
            Scope(), ParametersKey, self._before_template, self._after_template
1117
        )
1118
        before_parameters = before_parameters or dict()
1✔
1119
        after_parameters = after_parameters or dict()
1✔
1120
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1121
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1122
                parameters_scope, parameter_name, before_parameters, after_parameters
1123
            )
1124
            node_parameter = self._visit_parameter(
1✔
1125
                parameters_scope,
1126
                parameter_name,
1127
                before_parameter=before_parameter,
1128
                after_parameter=after_parameter,
1129
            )
1130
            return node_parameter
1✔
1131
        return None
1✔
1132

1133
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1134
        # TODO: add caching mechanism, and raise appropriate error if missing.
1135
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1136
            Scope(), MappingsKey, self._before_template, self._after_template
1137
        )
1138
        before_mappings = before_mappings or dict()
1✔
1139
        after_mappings = after_mappings or dict()
1✔
1140
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1141
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1142
                scope_mappings, mapping_name, before_mappings, after_mappings
1143
            )
1144
            node_mapping = self._visit_mapping(
1✔
1145
                scope_mapping, mapping_name, before_mapping, after_mapping
1146
            )
1147
            return node_mapping
1✔
1148
        raise RuntimeError()
×
1149

1150
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1151
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1152
            Scope(),
1153
            ResourcesKey,
1154
            self._before_template,
1155
            self._after_template,
1156
        )
1157
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1158
            resources_scope, resource_name, before_resources, after_resources
1159
        )
1160
        return self._visit_resource(
1✔
1161
            scope=resource_scope,
1162
            resource_name=resource_name,
1163
            before_resource=before_resource,
1164
            after_resource=after_resource,
1165
        )
1166

1167
    @staticmethod
1✔
1168
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1169
        # TODO: are intrinsic functions soft keywords?
1170
        return function_name in INTRINSIC_FUNCTIONS
1✔
1171

1172
    @staticmethod
1✔
1173
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1174
        results = list()
1✔
1175
        for obj in objects:
1✔
1176
            # TODO: raise errors if not dict
1177
            if not isinstance(obj, NothingType):
1✔
1178
                results.append(obj.get(key, Nothing))
1✔
1179
            else:
1180
                results.append(obj)
1✔
1181
        new_scope = scope.open_scope(name=key)
1✔
1182
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1183

1184
    @staticmethod
1✔
1185
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1186
        key_set: set[str] = set()
1✔
1187
        for obj in objects:
1✔
1188
            # TODO: raise errors if not dict
1189
            if isinstance(obj, dict):
1✔
1190
                key_set.update(obj.keys())
1✔
1191
        # The keys list is sorted to increase reproducibility of the
1192
        # update graph build process or downstream logics.
1193
        keys = sorted(key_set)
1✔
1194
        return keys
1✔
1195

1196
    @staticmethod
1✔
1197
    def _change_type_for_parent_of(change_types: list[ChangeType]) -> ChangeType:
1✔
1198
        parent_change_type = ChangeType.UNCHANGED
1✔
1199
        for child_change_type in change_types:
1✔
1200
            parent_change_type = parent_change_type.for_child(child_change_type)
1✔
1201
            if parent_change_type == ChangeType.MODIFIED:
1✔
1202
                break
1✔
1203
        return parent_change_type
1✔
1204

1205
    @staticmethod
1✔
1206
    def _name_if_intrinsic_function(value: Maybe[Any]) -> Optional[str]:
1✔
1207
        if isinstance(value, dict):
1✔
1208
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1209
            if len(keys) == 1:
1✔
1210
                key_name = keys[0]
1✔
1211
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1212
                    return key_name
1✔
1213
        return None
1✔
1214

1215
    @staticmethod
1✔
1216
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1217
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1218
        if maybe_intrinsic_function_name is not None:
1✔
1219
            return maybe_intrinsic_function_name
1✔
1220
        return type(value).__name__
1✔
1221

1222
    @staticmethod
1✔
1223
    def _is_terminal(value: Any) -> bool:
1✔
1224
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1225

1226
    @staticmethod
1✔
1227
    def _is_object(value: Any) -> bool:
1✔
1228
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1229

1230
    @staticmethod
1✔
1231
    def _is_array(value: Any) -> bool:
1✔
1232
        return isinstance(value, list)
1✔
1233

1234
    @staticmethod
1✔
1235
    def _is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1236
        return isinstance(before, NothingType) and not isinstance(after, NothingType)
1✔
1237

1238
    @staticmethod
1✔
1239
    def _is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
1240
        return not isinstance(before, NothingType) and isinstance(after, NothingType)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc