• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 274ae585-9ad2-4b5f-8087-866ef08d3d6e

24 Apr 2025 05:15PM UTC coverage: 85.262% (-1.0%) from 86.266%
274ae585-9ad2-4b5f-8087-866ef08d3d6e

push

circleci

web-flow
CFn v2: support outputs (#12536)

10 of 29 new or added lines in 3 files covered. (34.48%)

1105 existing lines in 26 files now uncovered.

63256 of 74190 relevant lines covered (85.26%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.76
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from itertools import zip_longest
1✔
6
from typing import Any, Final, Generator, Optional, Union, cast
1✔
7

8
from typing_extensions import TypeVar
1✔
9

10
from localstack.utils.strings import camel_to_snake_case
1✔
11

12
T = TypeVar("T")
1✔
13

14

15
class NothingType:
1✔
16
    """A sentinel that denotes 'no value' (distinct from None)."""
17

18
    _singleton = None
1✔
19
    __slots__ = ()
1✔
20

21
    def __new__(cls):
1✔
22
        if cls._singleton is None:
1✔
23
            cls._singleton = super().__new__(cls)
1✔
24
        return cls._singleton
1✔
25

26
    def __str__(self):
1✔
27
        return repr(self)
×
28

29
    def __repr__(self) -> str:
30
        return "Nothing"
31

32
    def __bool__(self):
1✔
UNCOV
33
        return False
×
34

35
    def __iter__(self):
1✔
UNCOV
36
        return iter(())
×
37

38

39
Maybe = Union[T, NothingType]
1✔
40
Nothing = NothingType()
1✔
41

42

43
class Scope(str):
1✔
44
    _ROOT_SCOPE: Final[str] = str()
1✔
45
    _SEPARATOR: Final[str] = "/"
1✔
46

47
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
UNCOV
48
        return cast(Scope, super().__new__(cls, scope))
×
49

50
    def open_scope(self, name: Scope | str) -> Scope:
1✔
UNCOV
51
        return Scope(self._SEPARATOR.join([self, name]))
×
52

53
    def open_index(self, index: int) -> Scope:
1✔
UNCOV
54
        return Scope(self._SEPARATOR.join([self, str(index)]))
×
55

56
    def unwrap(self) -> list[str]:
1✔
57
        return self.split(self._SEPARATOR)
×
58

59

60
class ChangeType(enum.Enum):
1✔
61
    UNCHANGED = "Unchanged"
1✔
62
    CREATED = "Created"
1✔
63
    MODIFIED = "Modified"
1✔
64
    REMOVED = "Removed"
1✔
65

66
    def __str__(self):
1✔
67
        return self.value
×
68

69
    def for_child(self, child_change_type: ChangeType) -> ChangeType:
1✔
UNCOV
70
        if child_change_type == self:
×
UNCOV
71
            return self
×
UNCOV
72
        elif self == ChangeType.UNCHANGED:
×
UNCOV
73
            return child_change_type
×
74
        else:
UNCOV
75
            return ChangeType.MODIFIED
×
76

77

78
class ChangeSetEntity(abc.ABC):
1✔
79
    scope: Final[Scope]
1✔
80
    change_type: Final[ChangeType]
1✔
81

82
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
UNCOV
83
        self.scope = scope
×
UNCOV
84
        self.change_type = change_type
×
85

86
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
UNCOV
87
        for child in self.__dict__.values():
×
UNCOV
88
            yield from self._get_children_in(child)
×
89

90
    @staticmethod
1✔
91
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
92
        # TODO: could avoid the inductive logic here, and check for loops?
UNCOV
93
        if isinstance(obj, ChangeSetEntity):
×
UNCOV
94
            yield obj
×
UNCOV
95
        elif isinstance(obj, list):
×
UNCOV
96
            for item in obj:
×
UNCOV
97
                yield from ChangeSetEntity._get_children_in(item)
×
UNCOV
98
        elif isinstance(obj, dict):
×
99
            for item in obj.values():
×
100
                yield from ChangeSetEntity._get_children_in(item)
×
101

102
    def __str__(self):
1✔
103
        return f"({self.__class__.__name__}| {vars(self)}"
×
104

105
    def __repr__(self):
106
        return str(self)
107

108

109
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
110

111

112
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
113

114

115
class NodeTemplate(ChangeSetNode):
1✔
116
    mappings: Final[NodeMappings]
1✔
117
    parameters: Final[NodeParameters]
1✔
118
    conditions: Final[NodeConditions]
1✔
119
    resources: Final[NodeResources]
1✔
120
    outputs: Final[NodeOutputs]
1✔
121

122
    def __init__(
1✔
123
        self,
124
        scope: Scope,
125
        change_type: ChangeType,
126
        mappings: NodeMappings,
127
        parameters: NodeParameters,
128
        conditions: NodeConditions,
129
        resources: NodeResources,
130
        outputs: NodeOutputs,
131
    ):
UNCOV
132
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
133
        self.mappings = mappings
×
UNCOV
134
        self.parameters = parameters
×
UNCOV
135
        self.conditions = conditions
×
UNCOV
136
        self.resources = resources
×
UNCOV
137
        self.outputs = outputs
×
138

139

140
class NodeDivergence(ChangeSetNode):
1✔
141
    value: Final[ChangeSetEntity]
1✔
142
    divergence: Final[ChangeSetEntity]
1✔
143

144
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
UNCOV
145
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
×
UNCOV
146
        self.value = value
×
UNCOV
147
        self.divergence = divergence
×
148

149

150
class NodeParameter(ChangeSetNode):
1✔
151
    name: Final[str]
1✔
152
    type_: Final[ChangeSetEntity]
1✔
153
    dynamic_value: Final[ChangeSetEntity]
1✔
154
    default_value: Final[Optional[ChangeSetEntity]]
1✔
155

156
    def __init__(
1✔
157
        self,
158
        scope: Scope,
159
        change_type: ChangeType,
160
        name: str,
161
        type_: ChangeSetEntity,
162
        dynamic_value: ChangeSetEntity,
163
        default_value: Optional[ChangeSetEntity],
164
    ):
UNCOV
165
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
166
        self.name = name
×
UNCOV
167
        self.type_ = type_
×
UNCOV
168
        self.dynamic_value = dynamic_value
×
UNCOV
169
        self.default_value = default_value
×
170

171

172
class NodeParameters(ChangeSetNode):
1✔
173
    parameters: Final[list[NodeParameter]]
1✔
174

175
    def __init__(self, scope: Scope, change_type: ChangeType, parameters: list[NodeParameter]):
1✔
UNCOV
176
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
177
        self.parameters = parameters
×
178

179

180
class NodeMapping(ChangeSetNode):
1✔
181
    name: Final[str]
1✔
182
    bindings: Final[NodeObject]
1✔
183

184
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, bindings: NodeObject):
1✔
UNCOV
185
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
186
        self.name = name
×
UNCOV
187
        self.bindings = bindings
×
188

189

190
class NodeMappings(ChangeSetNode):
1✔
191
    mappings: Final[list[NodeMapping]]
1✔
192

193
    def __init__(self, scope: Scope, change_type: ChangeType, mappings: list[NodeMapping]):
1✔
UNCOV
194
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
195
        self.mappings = mappings
×
196

197

198
class NodeOutput(ChangeSetNode):
1✔
199
    name: Final[str]
1✔
200
    value: Final[ChangeSetEntity]
1✔
201
    export: Final[Optional[ChangeSetEntity]]
1✔
202
    condition_reference: Final[Optional[TerminalValue]]
1✔
203

204
    def __init__(
1✔
205
        self,
206
        scope: Scope,
207
        change_type: ChangeType,
208
        name: str,
209
        value: ChangeSetEntity,
210
        export: Optional[ChangeSetEntity],
211
        conditional_reference: Optional[TerminalValue],
212
    ):
UNCOV
213
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
214
        self.name = name
×
UNCOV
215
        self.value = value
×
UNCOV
216
        self.export = export
×
UNCOV
217
        self.condition_reference = conditional_reference
×
218

219

220
class NodeOutputs(ChangeSetNode):
1✔
221
    outputs: Final[list[NodeOutput]]
1✔
222

223
    def __init__(self, scope: Scope, change_type: ChangeType, outputs: list[NodeOutput]):
1✔
UNCOV
224
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
225
        self.outputs = outputs
×
226

227

228
class NodeCondition(ChangeSetNode):
1✔
229
    name: Final[str]
1✔
230
    body: Final[ChangeSetEntity]
1✔
231

232
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, body: ChangeSetEntity):
1✔
UNCOV
233
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
234
        self.name = name
×
UNCOV
235
        self.body = body
×
236

237

238
class NodeConditions(ChangeSetNode):
1✔
239
    conditions: Final[list[NodeCondition]]
1✔
240

241
    def __init__(self, scope: Scope, change_type: ChangeType, conditions: list[NodeCondition]):
1✔
UNCOV
242
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
243
        self.conditions = conditions
×
244

245

246
class NodeResources(ChangeSetNode):
1✔
247
    resources: Final[list[NodeResource]]
1✔
248

249
    def __init__(self, scope: Scope, change_type: ChangeType, resources: list[NodeResource]):
1✔
UNCOV
250
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
251
        self.resources = resources
×
252

253

254
class NodeResource(ChangeSetNode):
1✔
255
    name: Final[str]
1✔
256
    type_: Final[ChangeSetTerminal]
1✔
257
    condition_reference: Final[Optional[TerminalValue]]
1✔
258
    properties: Final[NodeProperties]
1✔
259

260
    def __init__(
1✔
261
        self,
262
        scope: Scope,
263
        change_type: ChangeType,
264
        name: str,
265
        type_: ChangeSetTerminal,
266
        condition_reference: TerminalValue,
267
        properties: NodeProperties,
268
    ):
UNCOV
269
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
270
        self.name = name
×
UNCOV
271
        self.type_ = type_
×
UNCOV
272
        self.condition_reference = condition_reference
×
UNCOV
273
        self.properties = properties
×
274

275

276
class NodeProperties(ChangeSetNode):
1✔
277
    properties: Final[list[NodeProperty]]
1✔
278

279
    def __init__(self, scope: Scope, change_type: ChangeType, properties: list[NodeProperty]):
1✔
UNCOV
280
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
281
        self.properties = properties
×
282

283

284
class NodeProperty(ChangeSetNode):
1✔
285
    name: Final[str]
1✔
286
    value: Final[ChangeSetEntity]
1✔
287

288
    def __init__(self, scope: Scope, change_type: ChangeType, name: str, value: ChangeSetEntity):
1✔
UNCOV
289
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
290
        self.name = name
×
UNCOV
291
        self.value = value
×
292

293

294
class NodeIntrinsicFunction(ChangeSetNode):
1✔
295
    intrinsic_function: Final[str]
1✔
296
    arguments: Final[ChangeSetEntity]
1✔
297

298
    def __init__(
1✔
299
        self,
300
        scope: Scope,
301
        change_type: ChangeType,
302
        intrinsic_function: str,
303
        arguments: ChangeSetEntity,
304
    ):
UNCOV
305
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
306
        self.intrinsic_function = intrinsic_function
×
UNCOV
307
        self.arguments = arguments
×
308

309

310
class NodeObject(ChangeSetNode):
1✔
311
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
312

313
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
UNCOV
314
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
315
        self.bindings = bindings
×
316

317

318
class NodeArray(ChangeSetNode):
1✔
319
    array: Final[list[ChangeSetEntity]]
1✔
320

321
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
UNCOV
322
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
323
        self.array = array
×
324

325

326
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
327
    value: Final[Any]
1✔
328

329
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
UNCOV
330
        super().__init__(scope=scope, change_type=change_type)
×
UNCOV
331
        self.value = value
×
332

333

334
class TerminalValueModified(TerminalValue):
1✔
335
    modified_value: Final[Any]
1✔
336

337
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
UNCOV
338
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
×
UNCOV
339
        self.modified_value = modified_value
×
340

341

342
class TerminalValueCreated(TerminalValue):
1✔
343
    def __init__(self, scope: Scope, value: Any):
1✔
UNCOV
344
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
×
345

346

347
class TerminalValueRemoved(TerminalValue):
1✔
348
    def __init__(self, scope: Scope, value: Any):
1✔
UNCOV
349
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
×
350

351

352
class TerminalValueUnchanged(TerminalValue):
1✔
353
    def __init__(self, scope: Scope, value: Any):
1✔
UNCOV
354
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
×
355

356

357
TypeKey: Final[str] = "Type"
1✔
358
ConditionKey: Final[str] = "Condition"
1✔
359
ConditionsKey: Final[str] = "Conditions"
1✔
360
MappingsKey: Final[str] = "Mappings"
1✔
361
ResourcesKey: Final[str] = "Resources"
1✔
362
PropertiesKey: Final[str] = "Properties"
1✔
363
ParametersKey: Final[str] = "Parameters"
1✔
364
DefaultKey: Final[str] = "Default"
1✔
365
ValueKey: Final[str] = "Value"
1✔
366
ExportKey: Final[str] = "Export"
1✔
367
OutputsKey: Final[str] = "Outputs"
1✔
368
# TODO: expand intrinsic functions set.
369
RefKey: Final[str] = "Ref"
1✔
370
FnIf: Final[str] = "Fn::If"
1✔
371
FnNot: Final[str] = "Fn::Not"
1✔
372
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
373
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
374
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
375
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
376
    RefKey,
377
    FnIf,
378
    FnNot,
379
    FnEqualsKey,
380
    FnGetAttKey,
381
    FnFindInMapKey,
382
}
383

384

385
class ChangeSetModel:
1✔
386
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
387

388
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
389

390
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
391
    #  such as intrinsic functions.
392

393
    _before_template: Final[Maybe[dict]]
1✔
394
    _after_template: Final[Maybe[dict]]
1✔
395
    _before_parameters: Final[Maybe[dict]]
1✔
396
    _after_parameters: Final[Maybe[dict]]
1✔
397
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
398
    _node_template: Final[NodeTemplate]
1✔
399

400
    def __init__(
1✔
401
        self,
402
        before_template: Optional[dict],
403
        after_template: Optional[dict],
404
        before_parameters: Optional[dict],
405
        after_parameters: Optional[dict],
406
    ):
UNCOV
407
        self._before_template = before_template or Nothing
×
UNCOV
408
        self._after_template = after_template or Nothing
×
UNCOV
409
        self._before_parameters = before_parameters or Nothing
×
UNCOV
410
        self._after_parameters = after_parameters or Nothing
×
UNCOV
411
        self._visited_scopes = dict()
×
UNCOV
412
        self._node_template = self._model(
×
413
            before_template=self._before_template, after_template=self._after_template
414
        )
415
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
416

417
    def get_update_model(self) -> NodeTemplate:
1✔
418
        # TODO: rethink naming of this for outer utils
UNCOV
419
        return self._node_template
×
420

421
    def _visit_terminal_value(
1✔
422
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
423
    ) -> TerminalValue:
UNCOV
424
        terminal_value = self._visited_scopes.get(scope)
×
UNCOV
425
        if isinstance(terminal_value, TerminalValue):
×
UNCOV
426
            return terminal_value
×
UNCOV
427
        if self._is_created(before=before_value, after=after_value):
×
UNCOV
428
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
×
UNCOV
429
        elif self._is_removed(before=before_value, after=after_value):
×
UNCOV
430
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
×
UNCOV
431
        elif before_value == after_value:
×
UNCOV
432
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
×
433
        else:
UNCOV
434
            terminal_value = TerminalValueModified(
×
435
                scope=scope, value=before_value, modified_value=after_value
436
            )
UNCOV
437
        self._visited_scopes[scope] = terminal_value
×
UNCOV
438
        return terminal_value
×
439

440
    def _visit_intrinsic_function(
1✔
441
        self,
442
        scope: Scope,
443
        intrinsic_function: str,
444
        before_arguments: Maybe[Any],
445
        after_arguments: Maybe[Any],
446
    ) -> NodeIntrinsicFunction:
UNCOV
447
        node_intrinsic_function = self._visited_scopes.get(scope)
×
UNCOV
448
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
×
UNCOV
449
            return node_intrinsic_function
×
UNCOV
450
        arguments = self._visit_value(
×
451
            scope=scope, before_value=before_arguments, after_value=after_arguments
452
        )
UNCOV
453
        if self._is_created(before=before_arguments, after=after_arguments):
×
UNCOV
454
            change_type = ChangeType.CREATED
×
UNCOV
455
        elif self._is_removed(before=before_arguments, after=after_arguments):
×
UNCOV
456
            change_type = ChangeType.REMOVED
×
457
        else:
UNCOV
458
            function_name = intrinsic_function.replace("::", "_")
×
UNCOV
459
            function_name = camel_to_snake_case(function_name)
×
UNCOV
460
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
×
UNCOV
461
            if hasattr(self, resolve_function_name):
×
UNCOV
462
                resolve_function = getattr(self, resolve_function_name)
×
UNCOV
463
                change_type = resolve_function(arguments)
×
464
            else:
UNCOV
465
                change_type = arguments.change_type
×
UNCOV
466
        node_intrinsic_function = NodeIntrinsicFunction(
×
467
            scope=scope,
468
            change_type=change_type,
469
            intrinsic_function=intrinsic_function,
470
            arguments=arguments,
471
        )
UNCOV
472
        self._visited_scopes[scope] = node_intrinsic_function
×
UNCOV
473
        return node_intrinsic_function
×
474

475
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
476
        # TODO: add support for nested intrinsic functions.
477
        # TODO: validate arguments structure and type.
478
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
479

480
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
UNCOV
481
            raise RuntimeError()
×
UNCOV
482
        logical_name_of_resource_entity = arguments.array[0]
×
483
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
×
UNCOV
484
            raise RuntimeError()
×
UNCOV
485
        logical_name_of_resource: str = logical_name_of_resource_entity.value
×
UNCOV
486
        if not isinstance(logical_name_of_resource, str):
×
UNCOV
487
            raise RuntimeError()
×
UNCOV
488
        node_resource: NodeResource = self._retrieve_or_visit_resource(
×
489
            resource_name=logical_name_of_resource
490
        )
491

492
        node_property_attribute_name = arguments.array[1]
×
UNCOV
493
        if not isinstance(node_property_attribute_name, TerminalValue):
×
UNCOV
494
            raise RuntimeError()
×
UNCOV
495
        if isinstance(node_property_attribute_name, TerminalValueModified):
×
UNCOV
496
            attribute_name = node_property_attribute_name.modified_value
×
497
        else:
UNCOV
498
            attribute_name = node_property_attribute_name.value
×
499

500
        # TODO: this is another use case for which properties should be referenced by name
501
        for node_property in node_resource.properties.properties:
×
UNCOV
502
            if node_property.name == attribute_name:
×
UNCOV
503
                return node_property.change_type
×
504

505
        return ChangeType.UNCHANGED
×
506

507
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
508
        if arguments.change_type != ChangeType.UNCHANGED:
×
UNCOV
509
            return arguments.change_type
×
510
        # TODO: add support for nested functions, here we assume the argument is a logicalID.
UNCOV
511
        if not isinstance(arguments, TerminalValue):
×
UNCOV
512
            return arguments.change_type
×
513

514
        logical_id = arguments.value
×
515

UNCOV
516
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
×
UNCOV
517
        if isinstance(node_condition, NodeCondition):
×
UNCOV
518
            return node_condition.change_type
×
519

UNCOV
520
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
×
UNCOV
521
        if isinstance(node_parameter, NodeParameter):
×
UNCOV
522
            return node_parameter.change_type
×
523

524
        # TODO: this should check the replacement flag for a resource update.
UNCOV
525
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
×
526
        return node_resource.change_type
×
527

528
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
UNCOV
529
        if arguments.change_type != ChangeType.UNCHANGED:
×
UNCOV
530
            return arguments.change_type
×
531
        # TODO: validate arguments structure and type.
532
        # TODO: add support for nested functions, here we assume the arguments are string literals.
533

UNCOV
534
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
UNCOV
535
            raise RuntimeError()
×
UNCOV
536
        argument_mapping_name = arguments.array[0]
×
UNCOV
537
        if not isinstance(argument_mapping_name, TerminalValue):
×
538
            raise NotImplementedError()
UNCOV
539
        argument_top_level_key = arguments.array[1]
×
UNCOV
540
        if not isinstance(argument_top_level_key, TerminalValue):
×
541
            raise NotImplementedError()
UNCOV
542
        argument_second_level_key = arguments.array[2]
×
UNCOV
543
        if not isinstance(argument_second_level_key, TerminalValue):
×
544
            raise NotImplementedError()
UNCOV
545
        mapping_name = argument_mapping_name.value
×
UNCOV
546
        top_level_key = argument_top_level_key.value
×
UNCOV
547
        second_level_key = argument_second_level_key.value
×
548

UNCOV
549
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
×
550
        # TODO: a lookup would be beneficial in this scenario too;
551
        #  consider implications downstream and for replication.
UNCOV
552
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
×
UNCOV
553
        if not isinstance(top_level_object, NodeObject):
×
UNCOV
554
            raise RuntimeError()
×
UNCOV
555
        target_map_value = top_level_object.bindings.get(second_level_key)
×
UNCOV
556
        return target_map_value.change_type
×
557

558
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
559
        # TODO: validate arguments structure and type.
560
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
UNCOV
561
            raise RuntimeError()
×
UNCOV
562
        logical_name_of_condition_entity = arguments.array[0]
×
563
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
UNCOV
564
            raise RuntimeError()
×
UNCOV
565
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
UNCOV
566
        if not isinstance(logical_name_of_condition, str):
×
UNCOV
567
            raise RuntimeError()
×
568

569
        node_condition = self._retrieve_condition_if_exists(
×
570
            condition_name=logical_name_of_condition
571
        )
UNCOV
572
        if not isinstance(node_condition, NodeCondition):
×
UNCOV
573
            raise RuntimeError()
×
UNCOV
574
        change_types = [node_condition.change_type, *arguments.array[1:]]
×
UNCOV
575
        change_type = self._change_type_for_parent_of(change_types=change_types)
×
UNCOV
576
        return change_type
×
577

578
    def _visit_array(
1✔
579
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
580
    ) -> NodeArray:
UNCOV
581
        array: list[ChangeSetEntity] = list()
×
UNCOV
582
        for index, (before_value, after_value) in enumerate(
×
583
            zip_longest(before_array, after_array, fillvalue=Nothing)
584
        ):
UNCOV
585
            value_scope = scope.open_index(index=index)
×
UNCOV
586
            value = self._visit_value(
×
587
                scope=value_scope, before_value=before_value, after_value=after_value
588
            )
UNCOV
589
            array.append(value)
×
UNCOV
590
        change_type = self._change_type_for_parent_of([value.change_type for value in array])
×
UNCOV
591
        return NodeArray(scope=scope, change_type=change_type, array=array)
×
592

593
    def _visit_object(
1✔
594
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
595
    ) -> NodeObject:
UNCOV
596
        node_object = self._visited_scopes.get(scope)
×
UNCOV
597
        if isinstance(node_object, NodeObject):
×
UNCOV
598
            return node_object
×
599

UNCOV
600
        change_type = ChangeType.UNCHANGED
×
UNCOV
601
        binding_names = self._safe_keys_of(before_object, after_object)
×
UNCOV
602
        bindings: dict[str, ChangeSetEntity] = dict()
×
UNCOV
603
        for binding_name in binding_names:
×
UNCOV
604
            binding_scope, (before_value, after_value) = self._safe_access_in(
×
605
                scope, binding_name, before_object, after_object
606
            )
UNCOV
607
            value = self._visit_value(
×
608
                scope=binding_scope, before_value=before_value, after_value=after_value
609
            )
UNCOV
610
            bindings[binding_name] = value
×
UNCOV
611
            change_type = change_type.for_child(value.change_type)
×
UNCOV
612
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
×
UNCOV
613
        self._visited_scopes[scope] = node_object
×
UNCOV
614
        return node_object
×
615

616
    def _visit_divergence(
1✔
617
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
618
    ) -> NodeDivergence:
UNCOV
619
        scope_value = scope.open_scope("value")
×
UNCOV
620
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
×
UNCOV
621
        scope_divergence = scope.open_scope("divergence")
×
UNCOV
622
        divergence = self._visit_value(
×
623
            scope=scope_divergence, before_value=Nothing, after_value=after_value
624
        )
UNCOV
625
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
×
626

627
    def _visit_value(
1✔
628
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
629
    ) -> ChangeSetEntity:
UNCOV
630
        value = self._visited_scopes.get(scope)
×
UNCOV
631
        if isinstance(value, ChangeSetEntity):
×
UNCOV
632
            return value
×
633

UNCOV
634
        before_type_name = self._type_name_of(before_value)
×
UNCOV
635
        after_type_name = self._type_name_of(after_value)
×
UNCOV
636
        unset = object()
×
UNCOV
637
        if before_type_name == after_type_name:
×
UNCOV
638
            dominant_value = before_value
×
UNCOV
639
        elif self._is_created(before=before_value, after=after_value):
×
UNCOV
640
            dominant_value = after_value
×
UNCOV
641
        elif self._is_removed(before=before_value, after=after_value):
×
UNCOV
642
            dominant_value = before_value
×
643
        else:
UNCOV
644
            dominant_value = unset
×
UNCOV
645
        if dominant_value is not unset:
×
UNCOV
646
            dominant_type_name = self._type_name_of(dominant_value)
×
UNCOV
647
            if self._is_terminal(value=dominant_value):
×
UNCOV
648
                value = self._visit_terminal_value(
×
649
                    scope=scope, before_value=before_value, after_value=after_value
650
                )
UNCOV
651
            elif self._is_object(value=dominant_value):
×
UNCOV
652
                value = self._visit_object(
×
653
                    scope=scope, before_object=before_value, after_object=after_value
654
                )
UNCOV
655
            elif self._is_array(value=dominant_value):
×
UNCOV
656
                value = self._visit_array(
×
657
                    scope=scope, before_array=before_value, after_array=after_value
658
                )
UNCOV
659
            elif self._is_intrinsic_function_name(dominant_type_name):
×
UNCOV
660
                intrinsic_function_scope, (before_arguments, after_arguments) = (
×
661
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
662
                )
UNCOV
663
                value = self._visit_intrinsic_function(
×
664
                    scope=scope,
665
                    intrinsic_function=dominant_type_name,
666
                    before_arguments=before_arguments,
667
                    after_arguments=after_arguments,
668
                )
669
            else:
UNCOV
670
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
671
        # Case: type divergence.
672
        else:
UNCOV
673
            value = self._visit_divergence(
×
674
                scope=scope, before_value=before_value, after_value=after_value
675
            )
UNCOV
676
        self._visited_scopes[scope] = value
×
UNCOV
677
        return value
×
678

679
    def _visit_property(
1✔
680
        self,
681
        scope: Scope,
682
        property_name: str,
683
        before_property: Maybe[Any],
684
        after_property: Maybe[Any],
685
    ) -> NodeProperty:
UNCOV
686
        node_property = self._visited_scopes.get(scope)
×
UNCOV
687
        if isinstance(node_property, NodeProperty):
×
UNCOV
688
            return node_property
×
UNCOV
689
        value = self._visit_value(
×
690
            scope=scope, before_value=before_property, after_value=after_property
691
        )
UNCOV
692
        node_property = NodeProperty(
×
693
            scope=scope, change_type=value.change_type, name=property_name, value=value
694
        )
UNCOV
695
        self._visited_scopes[scope] = node_property
×
UNCOV
696
        return node_property
×
697

698
    def _visit_properties(
1✔
699
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
700
    ) -> NodeProperties:
UNCOV
701
        node_properties = self._visited_scopes.get(scope)
×
UNCOV
702
        if isinstance(node_properties, NodeProperties):
×
UNCOV
703
            return node_properties
×
704
        # TODO: double check we are sure not to have this be a NodeObject
UNCOV
705
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
×
UNCOV
706
        properties: list[NodeProperty] = list()
×
UNCOV
707
        change_type = ChangeType.UNCHANGED
×
UNCOV
708
        for property_name in property_names:
×
UNCOV
709
            property_scope, (before_property, after_property) = self._safe_access_in(
×
710
                scope, property_name, before_properties, after_properties
711
            )
UNCOV
712
            property_ = self._visit_property(
×
713
                scope=property_scope,
714
                property_name=property_name,
715
                before_property=before_property,
716
                after_property=after_property,
717
            )
UNCOV
718
            properties.append(property_)
×
UNCOV
719
            change_type = change_type.for_child(property_.change_type)
×
UNCOV
720
        node_properties = NodeProperties(
×
721
            scope=scope, change_type=change_type, properties=properties
722
        )
UNCOV
723
        self._visited_scopes[scope] = node_properties
×
UNCOV
724
        return node_properties
×
725

726
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
UNCOV
727
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
×
UNCOV
728
        if not isinstance(value, TerminalValue):
×
729
            # TODO: decide where template schema validation should occur.
UNCOV
730
            raise RuntimeError()
×
UNCOV
731
        return value
×
732

733
    def _visit_resource(
1✔
734
        self,
735
        scope: Scope,
736
        resource_name: str,
737
        before_resource: Maybe[dict],
738
        after_resource: Maybe[dict],
739
    ) -> NodeResource:
UNCOV
740
        node_resource = self._visited_scopes.get(scope)
×
UNCOV
741
        if isinstance(node_resource, NodeResource):
×
UNCOV
742
            return node_resource
×
743

UNCOV
744
        if self._is_created(before=before_resource, after=after_resource):
×
UNCOV
745
            change_type = ChangeType.CREATED
×
UNCOV
746
        elif self._is_removed(before=before_resource, after=after_resource):
×
UNCOV
747
            change_type = ChangeType.REMOVED
×
748
        else:
UNCOV
749
            change_type = ChangeType.UNCHANGED
×
750

UNCOV
751
        scope_type, (before_type, after_type) = self._safe_access_in(
×
752
            scope, TypeKey, before_resource, after_resource
753
        )
UNCOV
754
        terminal_value_type = self._visit_type(
×
755
            scope=scope_type, before_type=before_type, after_type=after_type
756
        )
757

UNCOV
758
        condition_reference = None
×
UNCOV
759
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
×
760
            scope, ConditionKey, before_resource, after_resource
761
        )
762
        # TODO: condition references should be resolved for the condition's change_type?
UNCOV
763
        if before_condition or after_condition:
×
UNCOV
764
            condition_reference = self._visit_terminal_value(
×
765
                scope_condition, before_condition, after_condition
766
            )
767

UNCOV
768
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
×
769
            scope, PropertiesKey, before_resource, after_resource
770
        )
UNCOV
771
        properties = self._visit_properties(
×
772
            scope=scope_properties,
773
            before_properties=before_properties,
774
            after_properties=after_properties,
775
        )
UNCOV
776
        if properties.properties:
×
777
            # Properties were defined in the before or after template, thus must play a role
778
            # in affecting the change type of this resource.
UNCOV
779
            change_type = change_type.for_child(properties.change_type)
×
UNCOV
780
        node_resource = NodeResource(
×
781
            scope=scope,
782
            change_type=change_type,
783
            name=resource_name,
784
            type_=terminal_value_type,
785
            condition_reference=condition_reference,
786
            properties=properties,
787
        )
UNCOV
788
        self._visited_scopes[scope] = node_resource
×
UNCOV
789
        return node_resource
×
790

791
    def _visit_resources(
1✔
792
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
793
    ) -> NodeResources:
794
        # TODO: investigate type changes behavior.
UNCOV
795
        change_type = ChangeType.UNCHANGED
×
UNCOV
796
        resources: list[NodeResource] = list()
×
UNCOV
797
        resource_names = self._safe_keys_of(before_resources, after_resources)
×
UNCOV
798
        for resource_name in resource_names:
×
UNCOV
799
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
×
800
                scope, resource_name, before_resources, after_resources
801
            )
UNCOV
802
            resource = self._visit_resource(
×
803
                scope=resource_scope,
804
                resource_name=resource_name,
805
                before_resource=before_resource,
806
                after_resource=after_resource,
807
            )
UNCOV
808
            resources.append(resource)
×
UNCOV
809
            change_type = change_type.for_child(resource.change_type)
×
UNCOV
810
        return NodeResources(scope=scope, change_type=change_type, resources=resources)
×
811

812
    def _visit_mapping(
1✔
813
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
814
    ) -> NodeMapping:
UNCOV
815
        bindings = self._visit_object(
×
816
            scope=scope, before_object=before_mapping, after_object=after_mapping
817
        )
UNCOV
818
        return NodeMapping(
×
819
            scope=scope, change_type=bindings.change_type, name=name, bindings=bindings
820
        )
821

822
    def _visit_mappings(
1✔
823
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
824
    ) -> NodeMappings:
UNCOV
825
        change_type = ChangeType.UNCHANGED
×
UNCOV
826
        mappings: list[NodeMapping] = list()
×
UNCOV
827
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
×
UNCOV
828
        for mapping_name in mapping_names:
×
UNCOV
829
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
×
830
                scope, mapping_name, before_mappings, after_mappings
831
            )
UNCOV
832
            mapping = self._visit_mapping(
×
833
                scope=scope,
834
                name=mapping_name,
835
                before_mapping=before_mapping,
836
                after_mapping=after_mapping,
837
            )
UNCOV
838
            mappings.append(mapping)
×
UNCOV
839
            change_type = change_type.for_child(mapping.change_type)
×
UNCOV
840
        return NodeMappings(scope=scope, change_type=change_type, mappings=mappings)
×
841

842
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
UNCOV
843
        scope = Scope("Dynamic").open_scope("Parameters")
×
UNCOV
844
        scope_parameter, (before_parameter, after_parameter) = self._safe_access_in(
×
845
            scope, parameter_name, self._before_parameters, self._after_parameters
846
        )
UNCOV
847
        parameter = self._visit_value(
×
848
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
849
        )
UNCOV
850
        return parameter
×
851

852
    def _visit_parameter(
1✔
853
        self,
854
        scope: Scope,
855
        parameter_name: str,
856
        before_parameter: Maybe[dict],
857
        after_parameter: Maybe[dict],
858
    ) -> NodeParameter:
UNCOV
859
        node_parameter = self._visited_scopes.get(scope)
×
UNCOV
860
        if isinstance(node_parameter, NodeParameter):
×
UNCOV
861
            return node_parameter
×
862

UNCOV
863
        type_scope, (before_type, after_type) = self._safe_access_in(
×
864
            scope, TypeKey, before_parameter, after_parameter
865
        )
UNCOV
866
        type_ = self._visit_value(type_scope, before_type, after_type)
×
867

UNCOV
868
        default_scope, (before_default, after_default) = self._safe_access_in(
×
869
            scope, DefaultKey, before_parameter, after_parameter
870
        )
UNCOV
871
        default_value = self._visit_value(default_scope, before_default, after_default)
×
872

UNCOV
873
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
×
874

UNCOV
875
        change_type = self._change_type_for_parent_of(
×
876
            change_types=[type_.change_type, default_value.change_type, dynamic_value.change_type]
877
        )
878

UNCOV
879
        node_parameter = NodeParameter(
×
880
            scope=scope,
881
            change_type=change_type,
882
            name=parameter_name,
883
            type_=type_,
884
            default_value=default_value,
885
            dynamic_value=dynamic_value,
886
        )
UNCOV
887
        self._visited_scopes[scope] = node_parameter
×
UNCOV
888
        return node_parameter
×
889

890
    def _visit_parameters(
1✔
891
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
892
    ) -> NodeParameters:
UNCOV
893
        node_parameters = self._visited_scopes.get(scope)
×
UNCOV
894
        if isinstance(node_parameters, NodeParameters):
×
UNCOV
895
            return node_parameters
×
UNCOV
896
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
×
UNCOV
897
        parameters: list[NodeParameter] = list()
×
UNCOV
898
        change_type = ChangeType.UNCHANGED
×
UNCOV
899
        for parameter_name in parameter_names:
×
UNCOV
900
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
×
901
                scope, parameter_name, before_parameters, after_parameters
902
            )
UNCOV
903
            parameter = self._visit_parameter(
×
904
                scope=parameter_scope,
905
                parameter_name=parameter_name,
906
                before_parameter=before_parameter,
907
                after_parameter=after_parameter,
908
            )
UNCOV
909
            parameters.append(parameter)
×
UNCOV
910
            change_type = change_type.for_child(parameter.change_type)
×
UNCOV
911
        node_parameters = NodeParameters(
×
912
            scope=scope, change_type=change_type, parameters=parameters
913
        )
UNCOV
914
        self._visited_scopes[scope] = node_parameters
×
UNCOV
915
        return node_parameters
×
916

917
    def _visit_condition(
1✔
918
        self,
919
        scope: Scope,
920
        condition_name: str,
921
        before_condition: Maybe[dict],
922
        after_condition: Maybe[dict],
923
    ) -> NodeCondition:
UNCOV
924
        node_condition = self._visited_scopes.get(scope)
×
UNCOV
925
        if isinstance(node_condition, NodeCondition):
×
UNCOV
926
            return node_condition
×
927
        body = self._visit_value(
×
928
            scope=scope, before_value=before_condition, after_value=after_condition
929
        )
UNCOV
930
        node_condition = NodeCondition(
×
931
            scope=scope, change_type=body.change_type, name=condition_name, body=body
932
        )
UNCOV
933
        self._visited_scopes[scope] = node_condition
×
UNCOV
934
        return node_condition
×
935

936
    def _visit_conditions(
1✔
937
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
938
    ) -> NodeConditions:
UNCOV
939
        node_conditions = self._visited_scopes.get(scope)
×
UNCOV
940
        if isinstance(node_conditions, NodeConditions):
×
UNCOV
941
            return node_conditions
×
UNCOV
942
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
×
UNCOV
943
        conditions: list[NodeCondition] = list()
×
UNCOV
944
        change_type = ChangeType.UNCHANGED
×
UNCOV
945
        for condition_name in condition_names:
×
UNCOV
946
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
947
                scope, condition_name, before_conditions, after_conditions
948
            )
UNCOV
949
            condition = self._visit_condition(
×
950
                scope=condition_scope,
951
                condition_name=condition_name,
952
                before_condition=before_condition,
953
                after_condition=after_condition,
954
            )
UNCOV
955
            conditions.append(condition)
×
UNCOV
956
            change_type = change_type.for_child(child_change_type=condition.change_type)
×
UNCOV
957
        node_conditions = NodeConditions(
×
958
            scope=scope, change_type=change_type, conditions=conditions
959
        )
UNCOV
960
        self._visited_scopes[scope] = node_conditions
×
UNCOV
961
        return node_conditions
×
962

963
    def _visit_output(
1✔
964
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
965
    ) -> NodeOutput:
UNCOV
966
        change_type = ChangeType.UNCHANGED
×
UNCOV
967
        scope_value, (before_value, after_value) = self._safe_access_in(
×
968
            scope, ValueKey, before_output, after_output
969
        )
UNCOV
970
        value = self._visit_value(scope_value, before_value, after_value)
×
UNCOV
971
        change_type = change_type.for_child(value.change_type)
×
972

973
        export: Optional[ChangeSetEntity] = None
×
UNCOV
974
        scope_export, (before_export, after_export) = self._safe_access_in(
×
975
            scope, ExportKey, before_output, after_output
976
        )
UNCOV
977
        if before_export or after_export:
×
UNCOV
978
            export = self._visit_value(scope_export, before_export, after_export)
×
UNCOV
979
            change_type = change_type.for_child(export.change_type)
×
980

981
        # TODO: condition references should be resolved for the condition's change_type?
UNCOV
982
        condition_reference: Optional[TerminalValue] = None
×
UNCOV
983
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
×
984
            scope, ConditionKey, before_output, after_output
985
        )
UNCOV
986
        if before_condition or after_condition:
×
UNCOV
987
            condition_reference = self._visit_terminal_value(
×
988
                scope_condition, before_condition, after_condition
989
            )
UNCOV
990
            change_type = change_type.for_child(condition_reference.change_type)
×
991

UNCOV
992
        return NodeOutput(
×
993
            scope=scope,
994
            change_type=change_type,
995
            name=name,
996
            value=value,
997
            export=export,
998
            conditional_reference=condition_reference,
999
        )
1000

1001
    def _visit_outputs(
1✔
1002
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1003
    ) -> NodeOutputs:
UNCOV
1004
        change_type = ChangeType.UNCHANGED
×
UNCOV
1005
        outputs: list[NodeOutput] = list()
×
UNCOV
1006
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
×
UNCOV
1007
        for output_name in output_names:
×
UNCOV
1008
            scope_output, (before_output, after_output) = self._safe_access_in(
×
1009
                scope, output_name, before_outputs, after_outputs
1010
            )
UNCOV
1011
            output = self._visit_output(
×
1012
                scope=scope_output,
1013
                name=output_name,
1014
                before_output=before_output,
1015
                after_output=after_output,
1016
            )
UNCOV
1017
            outputs.append(output)
×
UNCOV
1018
            change_type = change_type.for_child(output.change_type)
×
UNCOV
1019
        return NodeOutputs(scope=scope, change_type=change_type, outputs=outputs)
×
1020

1021
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
UNCOV
1022
        root_scope = Scope()
×
1023
        # TODO: visit other child types
1024

UNCOV
1025
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
×
1026
            root_scope, MappingsKey, before_template, after_template
1027
        )
UNCOV
1028
        mappings = self._visit_mappings(
×
1029
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1030
        )
1031

UNCOV
1032
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
×
1033
            root_scope, ParametersKey, before_template, after_template
1034
        )
UNCOV
1035
        parameters = self._visit_parameters(
×
1036
            scope=parameters_scope,
1037
            before_parameters=before_parameters,
1038
            after_parameters=after_parameters,
1039
        )
1040

UNCOV
1041
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
×
1042
            root_scope, ConditionsKey, before_template, after_template
1043
        )
UNCOV
1044
        conditions = self._visit_conditions(
×
1045
            scope=conditions_scope,
1046
            before_conditions=before_conditions,
1047
            after_conditions=after_conditions,
1048
        )
1049

UNCOV
1050
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
×
1051
            root_scope, ResourcesKey, before_template, after_template
1052
        )
UNCOV
1053
        resources = self._visit_resources(
×
1054
            scope=resources_scope,
1055
            before_resources=before_resources,
1056
            after_resources=after_resources,
1057
        )
1058

UNCOV
1059
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
×
1060
            root_scope, OutputsKey, before_template, after_template
1061
        )
UNCOV
1062
        outputs = self._visit_outputs(
×
1063
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1064
        )
1065

1066
        # TODO: compute the change_type of the template properly.
UNCOV
1067
        return NodeTemplate(
×
1068
            scope=root_scope,
1069
            change_type=resources.change_type,
1070
            mappings=mappings,
1071
            parameters=parameters,
1072
            conditions=conditions,
1073
            resources=resources,
1074
            outputs=outputs,
1075
        )
1076

1077
    def _retrieve_condition_if_exists(self, condition_name: str) -> Optional[NodeCondition]:
1✔
UNCOV
1078
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
×
1079
            Scope(), ConditionsKey, self._before_template, self._after_template
1080
        )
UNCOV
1081
        before_conditions = before_conditions or dict()
×
UNCOV
1082
        after_conditions = after_conditions or dict()
×
UNCOV
1083
        if condition_name in before_conditions or condition_name in after_conditions:
×
UNCOV
1084
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1085
                conditions_scope, condition_name, before_conditions, after_conditions
1086
            )
UNCOV
1087
            node_condition = self._visit_condition(
×
1088
                conditions_scope,
1089
                condition_name,
1090
                before_condition=before_condition,
1091
                after_condition=after_condition,
1092
            )
UNCOV
1093
            return node_condition
×
UNCOV
1094
        return None
×
1095

1096
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Optional[NodeParameter]:
1✔
UNCOV
1097
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
×
1098
            Scope(), ParametersKey, self._before_template, self._after_template
1099
        )
UNCOV
1100
        before_parameters = before_parameters or dict()
×
UNCOV
1101
        after_parameters = after_parameters or dict()
×
UNCOV
1102
        if parameter_name in before_parameters or parameter_name in after_parameters:
×
UNCOV
1103
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
×
1104
                parameters_scope, parameter_name, before_parameters, after_parameters
1105
            )
UNCOV
1106
            node_parameter = self._visit_parameter(
×
1107
                parameters_scope,
1108
                parameter_name,
1109
                before_parameter=before_parameter,
1110
                after_parameter=after_parameter,
1111
            )
UNCOV
1112
            return node_parameter
×
UNCOV
1113
        return None
×
1114

1115
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1116
        # TODO: add caching mechanism, and raise appropriate error if missing.
UNCOV
1117
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
×
1118
            Scope(), MappingsKey, self._before_template, self._after_template
1119
        )
UNCOV
1120
        before_mappings = before_mappings or dict()
×
UNCOV
1121
        after_mappings = after_mappings or dict()
×
UNCOV
1122
        if mapping_name in before_mappings or mapping_name in after_mappings:
×
UNCOV
1123
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
×
1124
                scope_mappings, mapping_name, before_mappings, after_mappings
1125
            )
UNCOV
1126
            node_mapping = self._visit_mapping(
×
1127
                scope_mapping, mapping_name, before_mapping, after_mapping
1128
            )
UNCOV
1129
            return node_mapping
×
UNCOV
1130
        raise RuntimeError()
×
1131

1132
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
UNCOV
1133
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
×
1134
            Scope(),
1135
            ResourcesKey,
1136
            self._before_template,
1137
            self._after_template,
1138
        )
UNCOV
1139
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
×
1140
            resources_scope, resource_name, before_resources, after_resources
1141
        )
UNCOV
1142
        return self._visit_resource(
×
1143
            scope=resource_scope,
1144
            resource_name=resource_name,
1145
            before_resource=before_resource,
1146
            after_resource=after_resource,
1147
        )
1148

1149
    @staticmethod
1✔
1150
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1151
        # TODO: are intrinsic functions soft keywords?
UNCOV
1152
        return function_name in INTRINSIC_FUNCTIONS
×
1153

1154
    @staticmethod
1✔
1155
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
UNCOV
1156
        results = list()
×
UNCOV
1157
        for obj in objects:
×
1158
            # TODO: raise errors if not dict
UNCOV
1159
            if not isinstance(obj, NothingType):
×
UNCOV
1160
                results.append(obj.get(key, Nothing))
×
1161
            else:
UNCOV
1162
                results.append(obj)
×
UNCOV
1163
        new_scope = scope.open_scope(name=key)
×
UNCOV
1164
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
×
1165

1166
    @staticmethod
1✔
1167
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
UNCOV
1168
        key_set: set[str] = set()
×
UNCOV
1169
        for obj in objects:
×
1170
            # TODO: raise errors if not dict
UNCOV
1171
            if isinstance(obj, dict):
×
UNCOV
1172
                key_set.update(obj.keys())
×
1173
        # The keys list is sorted to increase reproducibility of the
1174
        # update graph build process or downstream logics.
UNCOV
1175
        keys = sorted(key_set)
×
UNCOV
1176
        return keys
×
1177

1178
    @staticmethod
1✔
1179
    def _change_type_for_parent_of(change_types: list[ChangeType]) -> ChangeType:
1✔
UNCOV
1180
        parent_change_type = ChangeType.UNCHANGED
×
UNCOV
1181
        for child_change_type in change_types:
×
UNCOV
1182
            parent_change_type = parent_change_type.for_child(child_change_type)
×
UNCOV
1183
            if parent_change_type == ChangeType.MODIFIED:
×
UNCOV
1184
                break
×
UNCOV
1185
        return parent_change_type
×
1186

1187
    @staticmethod
1✔
1188
    def _name_if_intrinsic_function(value: Maybe[Any]) -> Optional[str]:
1✔
UNCOV
1189
        if isinstance(value, dict):
×
UNCOV
1190
            keys = ChangeSetModel._safe_keys_of(value)
×
UNCOV
1191
            if len(keys) == 1:
×
UNCOV
1192
                key_name = keys[0]
×
UNCOV
1193
                if ChangeSetModel._is_intrinsic_function_name(key_name):
×
UNCOV
1194
                    return key_name
×
UNCOV
1195
        return None
×
1196

1197
    @staticmethod
1✔
1198
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
UNCOV
1199
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
×
UNCOV
1200
        if maybe_intrinsic_function_name is not None:
×
UNCOV
1201
            return maybe_intrinsic_function_name
×
UNCOV
1202
        return type(value).__name__
×
1203

1204
    @staticmethod
1✔
1205
    def _is_terminal(value: Any) -> bool:
1✔
UNCOV
1206
        return type(value) in {int, float, bool, str, None, NothingType}
×
1207

1208
    @staticmethod
1✔
1209
    def _is_object(value: Any) -> bool:
1✔
UNCOV
1210
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
×
1211

1212
    @staticmethod
1✔
1213
    def _is_array(value: Any) -> bool:
1✔
UNCOV
1214
        return isinstance(value, list)
×
1215

1216
    @staticmethod
1✔
1217
    def _is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
UNCOV
1218
        return isinstance(before, NothingType) and not isinstance(after, NothingType)
×
1219

1220
    @staticmethod
1✔
1221
    def _is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
UNCOV
1222
        return not isinstance(before, NothingType) and isinstance(after, NothingType)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc