• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / ed350f2e-b6c3-4bef-a7fa-04255aec4056

03 Jun 2025 05:34PM UTC coverage: 86.768% (+0.04%) from 86.729%
ed350f2e-b6c3-4bef-a7fa-04255aec4056

push

circleci

web-flow
CloudFormation v2 Engine: Base Support for Fn::Base64 (#12700)

20 of 22 new or added lines in 3 files covered. (90.91%)

185 existing lines in 14 files now uncovered.

65077 of 75001 relevant lines covered (86.77%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.38
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from itertools import zip_longest
1✔
6
from typing import Any, Final, Generator, Optional, Union, cast
1✔
7

8
from typing_extensions import TypeVar
1✔
9

10
from localstack.utils.strings import camel_to_snake_case
1✔
11

12
T = TypeVar("T")
1✔
13

14

15
class NothingType:
1✔
16
    """A sentinel that denotes 'no value' (distinct from None)."""
17

18
    _singleton = None
1✔
19
    __slots__ = ()
1✔
20

21
    def __new__(cls):
1✔
22
        if cls._singleton is None:
1✔
23
            cls._singleton = super().__new__(cls)
1✔
24
        return cls._singleton
1✔
25

26
    def __eq__(self, other):
1✔
27
        return is_nothing(other)
1✔
28

29
    def __str__(self):
1✔
30
        return repr(self)
×
31

32
    def __repr__(self) -> str:
33
        return "Nothing"
34

35
    def __bool__(self):
1✔
36
        return False
1✔
37

38
    def __iter__(self):
1✔
39
        return iter(())
1✔
40

41
    def __contains__(self, item):
1✔
42
        return False
1✔
43

44

45
Maybe = Union[T, NothingType]
1✔
46
Nothing = NothingType()
1✔
47

48

49
def is_nothing(value: Any) -> bool:
1✔
50
    return isinstance(value, NothingType)
1✔
51

52

53
def is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
54
    return is_nothing(before) and not is_nothing(after)
1✔
55

56

57
def is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
58
    return not is_nothing(before) and is_nothing(after)
1✔
59

60

61
def parent_change_type_of(children: list[Maybe[ChangeSetEntity]]):
1✔
62
    change_types = [c.change_type for c in children if not is_nothing(c)]
1✔
63
    if not change_types:
1✔
64
        return ChangeType.UNCHANGED
1✔
65
    first_type = change_types[0]
1✔
66
    if all(ct == first_type for ct in change_types):
1✔
67
        return first_type
1✔
68
    return ChangeType.MODIFIED
1✔
69

70

71
def change_type_of(before: Maybe[Any], after: Maybe[Any], children: list[Maybe[ChangeSetEntity]]):
1✔
72
    if is_created(before, after):
1✔
73
        change_type = ChangeType.CREATED
1✔
74
    elif is_removed(before, after):
1✔
75
        change_type = ChangeType.REMOVED
1✔
76
    else:
77
        change_type = parent_change_type_of(children)
1✔
78
    return change_type
1✔
79

80

81
class Scope(str):
1✔
82
    _ROOT_SCOPE: Final[str] = str()
1✔
83
    _SEPARATOR: Final[str] = "/"
1✔
84

85
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
86
        return cast(Scope, super().__new__(cls, scope))
1✔
87

88
    def open_scope(self, name: Scope | str) -> Scope:
1✔
89
        return Scope(self._SEPARATOR.join([self, name]))
1✔
90

91
    def open_index(self, index: int) -> Scope:
1✔
92
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
93

94
    def unwrap(self) -> list[str]:
1✔
95
        return self.split(self._SEPARATOR)
×
96

97

98
class ChangeType(enum.Enum):
1✔
99
    UNCHANGED = "Unchanged"
1✔
100
    CREATED = "Created"
1✔
101
    MODIFIED = "Modified"
1✔
102
    REMOVED = "Removed"
1✔
103

104
    def __str__(self):
1✔
105
        return self.value
×
106

107

108
class ChangeSetEntity(abc.ABC):
1✔
109
    scope: Final[Scope]
1✔
110
    change_type: Final[ChangeType]
1✔
111

112
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
113
        self.scope = scope
1✔
114
        self.change_type = change_type
1✔
115

116
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
117
        for child in self.__dict__.values():
1✔
118
            yield from self._get_children_in(child)
1✔
119

120
    @staticmethod
1✔
121
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
122
        # TODO: could avoid the inductive logic here, and check for loops?
123
        if isinstance(obj, ChangeSetEntity):
1✔
124
            yield obj
1✔
125
        elif isinstance(obj, list):
1✔
126
            for item in obj:
1✔
127
                yield from ChangeSetEntity._get_children_in(item)
1✔
128
        elif isinstance(obj, dict):
1✔
129
            for item in obj.values():
×
130
                yield from ChangeSetEntity._get_children_in(item)
×
131

132
    def __str__(self):
1✔
133
        return f"({self.__class__.__name__}| {vars(self)}"
×
134

135
    def __repr__(self):
136
        return str(self)
137

138

139
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
140

141

142
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
143

144

145
class NodeTemplate(ChangeSetNode):
1✔
146
    mappings: Final[NodeMappings]
1✔
147
    parameters: Final[NodeParameters]
1✔
148
    conditions: Final[NodeConditions]
1✔
149
    resources: Final[NodeResources]
1✔
150
    outputs: Final[NodeOutputs]
1✔
151

152
    def __init__(
1✔
153
        self,
154
        scope: Scope,
155
        mappings: NodeMappings,
156
        parameters: NodeParameters,
157
        conditions: NodeConditions,
158
        resources: NodeResources,
159
        outputs: NodeOutputs,
160
    ):
161
        change_type = parent_change_type_of([resources, outputs])
1✔
162
        super().__init__(scope=scope, change_type=change_type)
1✔
163
        self.mappings = mappings
1✔
164
        self.parameters = parameters
1✔
165
        self.conditions = conditions
1✔
166
        self.resources = resources
1✔
167
        self.outputs = outputs
1✔
168

169

170
class NodeDivergence(ChangeSetNode):
1✔
171
    value: Final[ChangeSetEntity]
1✔
172
    divergence: Final[ChangeSetEntity]
1✔
173

174
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
175
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
176
        self.value = value
1✔
177
        self.divergence = divergence
1✔
178

179

180
class NodeParameter(ChangeSetNode):
1✔
181
    name: Final[str]
1✔
182
    type_: Final[ChangeSetEntity]
1✔
183
    dynamic_value: Final[ChangeSetEntity]
1✔
184
    default_value: Final[Maybe[ChangeSetEntity]]
1✔
185

186
    def __init__(
1✔
187
        self,
188
        scope: Scope,
189
        name: str,
190
        type_: ChangeSetEntity,
191
        dynamic_value: ChangeSetEntity,
192
        default_value: Maybe[ChangeSetEntity],
193
    ):
194
        change_type = parent_change_type_of([type_, default_value, dynamic_value])
1✔
195
        super().__init__(scope=scope, change_type=change_type)
1✔
196
        self.name = name
1✔
197
        self.type_ = type_
1✔
198
        self.dynamic_value = dynamic_value
1✔
199
        self.default_value = default_value
1✔
200

201

202
class NodeParameters(ChangeSetNode):
1✔
203
    parameters: Final[list[NodeParameter]]
1✔
204

205
    def __init__(self, scope: Scope, parameters: list[NodeParameter]):
1✔
206
        change_type = parent_change_type_of(parameters)
1✔
207
        super().__init__(scope=scope, change_type=change_type)
1✔
208
        self.parameters = parameters
1✔
209

210

211
class NodeMapping(ChangeSetNode):
1✔
212
    name: Final[str]
1✔
213
    bindings: Final[NodeObject]
1✔
214

215
    def __init__(self, scope: Scope, name: str, bindings: NodeObject):
1✔
216
        super().__init__(scope=scope, change_type=bindings.change_type)
1✔
217
        self.name = name
1✔
218
        self.bindings = bindings
1✔
219

220

221
class NodeMappings(ChangeSetNode):
1✔
222
    mappings: Final[list[NodeMapping]]
1✔
223

224
    def __init__(self, scope: Scope, mappings: list[NodeMapping]):
1✔
225
        change_type = parent_change_type_of(mappings)
1✔
226
        super().__init__(scope=scope, change_type=change_type)
1✔
227
        self.mappings = mappings
1✔
228

229

230
class NodeOutput(ChangeSetNode):
1✔
231
    name: Final[str]
1✔
232
    value: Final[ChangeSetEntity]
1✔
233
    export: Final[Maybe[ChangeSetEntity]]
1✔
234
    condition_reference: Final[Maybe[TerminalValue]]
1✔
235

236
    def __init__(
1✔
237
        self,
238
        scope: Scope,
239
        name: str,
240
        value: ChangeSetEntity,
241
        export: Maybe[ChangeSetEntity],
242
        conditional_reference: Maybe[TerminalValue],
243
    ):
244
        change_type = parent_change_type_of([value, export, conditional_reference])
1✔
245
        super().__init__(scope=scope, change_type=change_type)
1✔
246
        self.name = name
1✔
247
        self.value = value
1✔
248
        self.export = export
1✔
249
        self.condition_reference = conditional_reference
1✔
250

251

252
class NodeOutputs(ChangeSetNode):
1✔
253
    outputs: Final[list[NodeOutput]]
1✔
254

255
    def __init__(self, scope: Scope, outputs: list[NodeOutput]):
1✔
256
        change_type = parent_change_type_of(outputs)
1✔
257
        super().__init__(scope=scope, change_type=change_type)
1✔
258
        self.outputs = outputs
1✔
259

260

261
class NodeCondition(ChangeSetNode):
1✔
262
    name: Final[str]
1✔
263
    body: Final[ChangeSetEntity]
1✔
264

265
    def __init__(self, scope: Scope, name: str, body: ChangeSetEntity):
1✔
266
        super().__init__(scope=scope, change_type=body.change_type)
1✔
267
        self.name = name
1✔
268
        self.body = body
1✔
269

270

271
class NodeConditions(ChangeSetNode):
1✔
272
    conditions: Final[list[NodeCondition]]
1✔
273

274
    def __init__(self, scope: Scope, conditions: list[NodeCondition]):
1✔
275
        change_type = parent_change_type_of(conditions)
1✔
276
        super().__init__(scope=scope, change_type=change_type)
1✔
277
        self.conditions = conditions
1✔
278

279

280
class NodeResources(ChangeSetNode):
1✔
281
    resources: Final[list[NodeResource]]
1✔
282

283
    def __init__(self, scope: Scope, resources: list[NodeResource]):
1✔
284
        change_type = parent_change_type_of(resources)
1✔
285
        super().__init__(scope=scope, change_type=change_type)
1✔
286
        self.resources = resources
1✔
287

288

289
class NodeResource(ChangeSetNode):
1✔
290
    name: Final[str]
1✔
291
    type_: Final[ChangeSetTerminal]
1✔
292
    properties: Final[NodeProperties]
1✔
293
    condition_reference: Final[Maybe[TerminalValue]]
1✔
294
    depends_on: Final[Maybe[NodeDependsOn]]
1✔
295

296
    def __init__(
1✔
297
        self,
298
        scope: Scope,
299
        change_type: ChangeType,
300
        name: str,
301
        type_: ChangeSetTerminal,
302
        properties: NodeProperties,
303
        condition_reference: Maybe[TerminalValue],
304
        depends_on: Maybe[NodeDependsOn],
305
    ):
306
        super().__init__(scope=scope, change_type=change_type)
1✔
307
        self.name = name
1✔
308
        self.type_ = type_
1✔
309
        self.properties = properties
1✔
310
        self.condition_reference = condition_reference
1✔
311
        self.depends_on = depends_on
1✔
312

313

314
class NodeProperties(ChangeSetNode):
1✔
315
    properties: Final[list[NodeProperty]]
1✔
316

317
    def __init__(self, scope: Scope, properties: list[NodeProperty]):
1✔
318
        change_type = parent_change_type_of(properties)
1✔
319
        super().__init__(scope=scope, change_type=change_type)
1✔
320
        self.properties = properties
1✔
321

322

323
class NodeDependsOn(ChangeSetNode):
1✔
324
    depends_on: Final[NodeArray]
1✔
325

326
    def __init__(self, scope: Scope, depends_on: NodeArray):
1✔
327
        super().__init__(scope=scope, change_type=depends_on.change_type)
1✔
328
        self.depends_on = depends_on
1✔
329

330

331
class NodeProperty(ChangeSetNode):
1✔
332
    name: Final[str]
1✔
333
    value: Final[ChangeSetEntity]
1✔
334

335
    def __init__(self, scope: Scope, name: str, value: ChangeSetEntity):
1✔
336
        super().__init__(scope=scope, change_type=value.change_type)
1✔
337
        self.name = name
1✔
338
        self.value = value
1✔
339

340

341
class NodeIntrinsicFunction(ChangeSetNode):
1✔
342
    intrinsic_function: Final[str]
1✔
343
    arguments: Final[ChangeSetEntity]
1✔
344

345
    def __init__(
1✔
346
        self,
347
        scope: Scope,
348
        change_type: ChangeType,
349
        intrinsic_function: str,
350
        arguments: ChangeSetEntity,
351
    ):
352
        super().__init__(scope=scope, change_type=change_type)
1✔
353
        self.intrinsic_function = intrinsic_function
1✔
354
        self.arguments = arguments
1✔
355

356

357
class NodeObject(ChangeSetNode):
1✔
358
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
359

360
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
361
        super().__init__(scope=scope, change_type=change_type)
1✔
362
        self.bindings = bindings
1✔
363

364

365
class NodeArray(ChangeSetNode):
1✔
366
    array: Final[list[ChangeSetEntity]]
1✔
367

368
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
369
        super().__init__(scope=scope, change_type=change_type)
1✔
370
        self.array = array
1✔
371

372

373
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
374
    value: Final[Any]
1✔
375

376
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
377
        super().__init__(scope=scope, change_type=change_type)
1✔
378
        self.value = value
1✔
379

380

381
class TerminalValueModified(TerminalValue):
1✔
382
    modified_value: Final[Any]
1✔
383

384
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
385
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
386
        self.modified_value = modified_value
1✔
387

388

389
class TerminalValueCreated(TerminalValue):
1✔
390
    def __init__(self, scope: Scope, value: Any):
1✔
391
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
392

393

394
class TerminalValueRemoved(TerminalValue):
1✔
395
    def __init__(self, scope: Scope, value: Any):
1✔
396
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
397

398

399
class TerminalValueUnchanged(TerminalValue):
1✔
400
    def __init__(self, scope: Scope, value: Any):
1✔
401
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
402

403

404
TypeKey: Final[str] = "Type"
1✔
405
ConditionKey: Final[str] = "Condition"
1✔
406
ConditionsKey: Final[str] = "Conditions"
1✔
407
MappingsKey: Final[str] = "Mappings"
1✔
408
ResourcesKey: Final[str] = "Resources"
1✔
409
PropertiesKey: Final[str] = "Properties"
1✔
410
ParametersKey: Final[str] = "Parameters"
1✔
411
DefaultKey: Final[str] = "Default"
1✔
412
ValueKey: Final[str] = "Value"
1✔
413
ExportKey: Final[str] = "Export"
1✔
414
OutputsKey: Final[str] = "Outputs"
1✔
415
DependsOnKey: Final[str] = "DependsOn"
1✔
416
# TODO: expand intrinsic functions set.
417
RefKey: Final[str] = "Ref"
1✔
418
FnIfKey: Final[str] = "Fn::If"
1✔
419
FnNotKey: Final[str] = "Fn::Not"
1✔
420
FnJoinKey: Final[str] = "Fn::Join"
1✔
421
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
422
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
423
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
424
FnSubKey: Final[str] = "Fn::Sub"
1✔
425
FnTransform: Final[str] = "Fn::Transform"
1✔
426
FnSelect: Final[str] = "Fn::Select"
1✔
427
FnSplit: Final[str] = "Fn::Split"
1✔
428
FnGetAZs: Final[str] = "Fn::GetAZs"
1✔
429
FnBase64: Final[str] = "Fn::Base64"
1✔
430
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
431
    RefKey,
432
    FnIfKey,
433
    FnNotKey,
434
    FnJoinKey,
435
    FnEqualsKey,
436
    FnGetAttKey,
437
    FnFindInMapKey,
438
    FnSubKey,
439
    FnTransform,
440
    FnSelect,
441
    FnSplit,
442
    FnGetAZs,
443
    FnBase64,
444
}
445

446

447
class ChangeSetModel:
1✔
448
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
449

450
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
451

452
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
453
    #  such as intrinsic functions.
454

455
    _before_template: Final[Maybe[dict]]
1✔
456
    _after_template: Final[Maybe[dict]]
1✔
457
    _before_parameters: Final[Maybe[dict]]
1✔
458
    _after_parameters: Final[Maybe[dict]]
1✔
459
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
460
    _node_template: Final[NodeTemplate]
1✔
461

462
    def __init__(
1✔
463
        self,
464
        before_template: Optional[dict],
465
        after_template: Optional[dict],
466
        before_parameters: Optional[dict],
467
        after_parameters: Optional[dict],
468
    ):
469
        self._before_template = before_template or Nothing
1✔
470
        self._after_template = after_template or Nothing
1✔
471
        self._before_parameters = before_parameters or Nothing
1✔
472
        self._after_parameters = after_parameters or Nothing
1✔
473
        self._visited_scopes = dict()
1✔
474
        self._node_template = self._model(
1✔
475
            before_template=self._before_template, after_template=self._after_template
476
        )
477
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
478

479
    def get_update_model(self) -> NodeTemplate:
1✔
480
        # TODO: rethink naming of this for outer utils
481
        return self._node_template
1✔
482

483
    def _visit_terminal_value(
1✔
484
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
485
    ) -> TerminalValue:
486
        terminal_value = self._visited_scopes.get(scope)
1✔
487
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
488
            return terminal_value
×
489
        if is_created(before=before_value, after=after_value):
1✔
490
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
491
        elif is_removed(before=before_value, after=after_value):
1✔
492
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
493
        elif before_value == after_value:
1✔
494
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
495
        else:
496
            terminal_value = TerminalValueModified(
1✔
497
                scope=scope, value=before_value, modified_value=after_value
498
            )
499
        self._visited_scopes[scope] = terminal_value
1✔
500
        return terminal_value
1✔
501

502
    def _visit_intrinsic_function(
1✔
503
        self,
504
        scope: Scope,
505
        intrinsic_function: str,
506
        before_arguments: Maybe[Any],
507
        after_arguments: Maybe[Any],
508
    ) -> NodeIntrinsicFunction:
509
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
510
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
511
            return node_intrinsic_function
×
512
        arguments = self._visit_value(
1✔
513
            scope=scope, before_value=before_arguments, after_value=after_arguments
514
        )
515
        if is_created(before=before_arguments, after=after_arguments):
1✔
516
            change_type = ChangeType.CREATED
1✔
517
        elif is_removed(before=before_arguments, after=after_arguments):
1✔
518
            change_type = ChangeType.REMOVED
1✔
519
        else:
520
            function_name = intrinsic_function.replace("::", "_")
1✔
521
            function_name = camel_to_snake_case(function_name)
1✔
522
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
523
            if hasattr(self, resolve_function_name):
1✔
524
                resolve_function = getattr(self, resolve_function_name)
1✔
525
                change_type = resolve_function(arguments)
1✔
526
            else:
527
                change_type = arguments.change_type
1✔
528
        node_intrinsic_function = NodeIntrinsicFunction(
1✔
529
            scope=scope,
530
            change_type=change_type,
531
            intrinsic_function=intrinsic_function,
532
            arguments=arguments,
533
        )
534
        self._visited_scopes[scope] = node_intrinsic_function
1✔
535
        return node_intrinsic_function
1✔
536

537
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
538
        # TODO: add support for nested intrinsic functions.
539
        # TODO: validate arguments structure and type.
540
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
541

542
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
543
            raise RuntimeError()
×
544
        logical_name_of_resource_entity = arguments.array[0]
1✔
545
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
UNCOV
546
            raise RuntimeError()
×
547
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
548
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
549
            raise RuntimeError()
×
550
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
551
            resource_name=logical_name_of_resource
552
        )
553

554
        node_property_attribute_name = arguments.array[1]
1✔
555
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
556
            raise RuntimeError()
×
557
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
UNCOV
558
            attribute_name = node_property_attribute_name.modified_value
×
559
        else:
560
            attribute_name = node_property_attribute_name.value
1✔
561

562
        # TODO: this is another use case for which properties should be referenced by name
563
        for node_property in node_resource.properties.properties:
1✔
564
            if node_property.name == attribute_name:
1✔
565
                return node_property.change_type
1✔
566

567
        return ChangeType.UNCHANGED
1✔
568

569
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
570
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
571
            return arguments.change_type
×
572
        if not isinstance(arguments, TerminalValue):
1✔
UNCOV
573
            return arguments.change_type
×
574

575
        logical_id = arguments.value
1✔
576

577
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
578
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
579
            return node_condition.change_type
×
580

581
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
582
        if isinstance(node_parameter, NodeParameter):
1✔
583
            return node_parameter.change_type
1✔
584

585
        # TODO: this should check the replacement flag for a resource update.
586
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
587
        return node_resource.change_type
1✔
588

589
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
590
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
591
            return arguments.change_type
1✔
592
        # TODO: validate arguments structure and type.
593
        # TODO: add support for nested functions, here we assume the arguments are string literals.
594

595
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
596
            raise RuntimeError()
×
597
        argument_mapping_name = arguments.array[0]
1✔
598
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
599
            raise NotImplementedError()
600
        argument_top_level_key = arguments.array[1]
1✔
601
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
602
            raise NotImplementedError()
603
        argument_second_level_key = arguments.array[2]
1✔
604
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
605
            raise NotImplementedError()
606
        mapping_name = argument_mapping_name.value
1✔
607
        top_level_key = argument_top_level_key.value
1✔
608
        second_level_key = argument_second_level_key.value
1✔
609

610
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
611
        # TODO: a lookup would be beneficial in this scenario too;
612
        #  consider implications downstream and for replication.
613
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
614
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
615
            raise RuntimeError()
×
616
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
617
        return target_map_value.change_type
1✔
618

619
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
620
        # TODO: validate arguments structure and type.
621
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
622
            raise RuntimeError()
×
623
        logical_name_of_condition_entity = arguments.array[0]
×
624
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
UNCOV
625
            raise RuntimeError()
×
626
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
UNCOV
627
        if not isinstance(logical_name_of_condition, str):
×
UNCOV
628
            raise RuntimeError()
×
629

630
        node_condition = self._retrieve_condition_if_exists(
×
631
            condition_name=logical_name_of_condition
632
        )
UNCOV
633
        if not isinstance(node_condition, NodeCondition):
×
UNCOV
634
            raise RuntimeError()
×
UNCOV
635
        change_type = parent_change_type_of([node_condition, *arguments[1:]])
×
UNCOV
636
        return change_type
×
637

638
    def _visit_array(
1✔
639
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
640
    ) -> NodeArray:
641
        array: list[ChangeSetEntity] = list()
1✔
642
        for index, (before_value, after_value) in enumerate(
1✔
643
            zip_longest(before_array, after_array, fillvalue=Nothing)
644
        ):
645
            value_scope = scope.open_index(index=index)
1✔
646
            value = self._visit_value(
1✔
647
                scope=value_scope, before_value=before_value, after_value=after_value
648
            )
649
            array.append(value)
1✔
650
        change_type = change_type_of(before_array, after_array, array)
1✔
651
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
652

653
    def _visit_object(
1✔
654
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
655
    ) -> NodeObject:
656
        node_object = self._visited_scopes.get(scope)
1✔
657
        if isinstance(node_object, NodeObject):
1✔
658
            return node_object
1✔
659
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
660
        bindings: dict[str, ChangeSetEntity] = dict()
1✔
661
        for binding_name in binding_names:
1✔
662
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
663
                scope, binding_name, before_object, after_object
664
            )
665
            value = self._visit_value(
1✔
666
                scope=binding_scope, before_value=before_value, after_value=after_value
667
            )
668
            bindings[binding_name] = value
1✔
669
        change_type = change_type_of(before_object, after_object, list(bindings.values()))
1✔
670
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
671
        self._visited_scopes[scope] = node_object
1✔
672
        return node_object
1✔
673

674
    def _visit_divergence(
1✔
675
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
676
    ) -> NodeDivergence:
677
        scope_value = scope.open_scope("value")
1✔
678
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
679
        scope_divergence = scope.open_scope("divergence")
1✔
680
        divergence = self._visit_value(
1✔
681
            scope=scope_divergence, before_value=Nothing, after_value=after_value
682
        )
683
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
684

685
    def _visit_value(
1✔
686
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
687
    ) -> ChangeSetEntity:
688
        value = self._visited_scopes.get(scope)
1✔
689
        if isinstance(value, ChangeSetEntity):
1✔
UNCOV
690
            return value
×
691

692
        before_type_name = self._type_name_of(before_value)
1✔
693
        after_type_name = self._type_name_of(after_value)
1✔
694
        unset = object()
1✔
695
        if before_type_name == after_type_name:
1✔
696
            dominant_value = before_value
1✔
697
        elif is_created(before=before_value, after=after_value):
1✔
698
            dominant_value = after_value
1✔
699
        elif is_removed(before=before_value, after=after_value):
1✔
700
            dominant_value = before_value
1✔
701
        else:
702
            dominant_value = unset
1✔
703
        if dominant_value is not unset:
1✔
704
            dominant_type_name = self._type_name_of(dominant_value)
1✔
705
            if self._is_terminal(value=dominant_value):
1✔
706
                value = self._visit_terminal_value(
1✔
707
                    scope=scope, before_value=before_value, after_value=after_value
708
                )
709
            elif self._is_object(value=dominant_value):
1✔
710
                value = self._visit_object(
1✔
711
                    scope=scope, before_object=before_value, after_object=after_value
712
                )
713
            elif self._is_array(value=dominant_value):
1✔
714
                value = self._visit_array(
1✔
715
                    scope=scope, before_array=before_value, after_array=after_value
716
                )
717
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
718
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
719
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
720
                )
721
                value = self._visit_intrinsic_function(
1✔
722
                    scope=scope,
723
                    intrinsic_function=dominant_type_name,
724
                    before_arguments=before_arguments,
725
                    after_arguments=after_arguments,
726
                )
727
            else:
UNCOV
728
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
729
        # Case: type divergence.
730
        else:
731
            value = self._visit_divergence(
1✔
732
                scope=scope, before_value=before_value, after_value=after_value
733
            )
734
        self._visited_scopes[scope] = value
1✔
735
        return value
1✔
736

737
    def _visit_property(
1✔
738
        self,
739
        scope: Scope,
740
        property_name: str,
741
        before_property: Maybe[Any],
742
        after_property: Maybe[Any],
743
    ) -> NodeProperty:
744
        node_property = self._visited_scopes.get(scope)
1✔
745
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
746
            return node_property
×
747
        value = self._visit_value(
1✔
748
            scope=scope, before_value=before_property, after_value=after_property
749
        )
750
        node_property = NodeProperty(scope=scope, name=property_name, value=value)
1✔
751
        self._visited_scopes[scope] = node_property
1✔
752
        return node_property
1✔
753

754
    def _visit_properties(
1✔
755
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
756
    ) -> NodeProperties:
757
        node_properties = self._visited_scopes.get(scope)
1✔
758
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
759
            return node_properties
×
760
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
761
        properties: list[NodeProperty] = list()
1✔
762
        for property_name in property_names:
1✔
763
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
764
                scope, property_name, before_properties, after_properties
765
            )
766
            property_ = self._visit_property(
1✔
767
                scope=property_scope,
768
                property_name=property_name,
769
                before_property=before_property,
770
                after_property=after_property,
771
            )
772
            properties.append(property_)
1✔
773
        node_properties = NodeProperties(scope=scope, properties=properties)
1✔
774
        self._visited_scopes[scope] = node_properties
1✔
775
        return node_properties
1✔
776

777
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
778
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
779
        if not isinstance(value, TerminalValue):
1✔
780
            # TODO: decide where template schema validation should occur.
UNCOV
781
            raise RuntimeError()
×
782
        return value
1✔
783

784
    def _visit_resource(
1✔
785
        self,
786
        scope: Scope,
787
        resource_name: str,
788
        before_resource: Maybe[dict],
789
        after_resource: Maybe[dict],
790
    ) -> NodeResource:
791
        node_resource = self._visited_scopes.get(scope)
1✔
792
        if isinstance(node_resource, NodeResource):
1✔
793
            return node_resource
1✔
794

795
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
796
            scope, TypeKey, before_resource, after_resource
797
        )
798
        terminal_value_type = self._visit_type(
1✔
799
            scope=scope_type, before_type=before_type, after_type=after_type
800
        )
801

802
        condition_reference = Nothing
1✔
803
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
804
            scope, ConditionKey, before_resource, after_resource
805
        )
806
        if before_condition or after_condition:
1✔
807
            condition_reference = self._visit_terminal_value(
1✔
808
                scope_condition, before_condition, after_condition
809
            )
810

811
        depends_on = Nothing
1✔
812
        scope_depends_on, (before_depends_on, after_depends_on) = self._safe_access_in(
1✔
813
            scope, DependsOnKey, before_resource, after_resource
814
        )
815
        if before_depends_on or after_depends_on:
1✔
816
            depends_on = self._visit_depends_on(
1✔
817
                scope_depends_on, before_depends_on, after_depends_on
818
            )
819

820
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
821
            scope, PropertiesKey, before_resource, after_resource
822
        )
823
        properties = self._visit_properties(
1✔
824
            scope=scope_properties,
825
            before_properties=before_properties,
826
            after_properties=after_properties,
827
        )
828

829
        change_type = change_type_of(
1✔
830
            before_resource, after_resource, [properties, condition_reference, depends_on]
831
        )
832
        node_resource = NodeResource(
1✔
833
            scope=scope,
834
            change_type=change_type,
835
            name=resource_name,
836
            type_=terminal_value_type,
837
            properties=properties,
838
            condition_reference=condition_reference,
839
            depends_on=depends_on,
840
        )
841
        self._visited_scopes[scope] = node_resource
1✔
842
        return node_resource
1✔
843

844
    def _visit_resources(
1✔
845
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
846
    ) -> NodeResources:
847
        # TODO: investigate type changes behavior.
848
        resources: list[NodeResource] = list()
1✔
849
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
850
        for resource_name in resource_names:
1✔
851
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
852
                scope, resource_name, before_resources, after_resources
853
            )
854
            resource = self._visit_resource(
1✔
855
                scope=resource_scope,
856
                resource_name=resource_name,
857
                before_resource=before_resource,
858
                after_resource=after_resource,
859
            )
860
            resources.append(resource)
1✔
861
        return NodeResources(scope=scope, resources=resources)
1✔
862

863
    def _visit_mapping(
1✔
864
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
865
    ) -> NodeMapping:
866
        bindings = self._visit_object(
1✔
867
            scope=scope, before_object=before_mapping, after_object=after_mapping
868
        )
869
        return NodeMapping(scope=scope, name=name, bindings=bindings)
1✔
870

871
    def _visit_mappings(
1✔
872
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
873
    ) -> NodeMappings:
874
        mappings: list[NodeMapping] = list()
1✔
875
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
876
        for mapping_name in mapping_names:
1✔
877
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
878
                scope, mapping_name, before_mappings, after_mappings
879
            )
880
            mapping = self._visit_mapping(
1✔
881
                scope=scope,
882
                name=mapping_name,
883
                before_mapping=before_mapping,
884
                after_mapping=after_mapping,
885
            )
886
            mappings.append(mapping)
1✔
887
        return NodeMappings(scope=scope, mappings=mappings)
1✔
888

889
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
890
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
891
        scope_parameter, (before_parameter, after_parameter) = self._safe_access_in(
1✔
892
            scope, parameter_name, self._before_parameters, self._after_parameters
893
        )
894
        parameter = self._visit_value(
1✔
895
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
896
        )
897
        return parameter
1✔
898

899
    def _visit_parameter(
1✔
900
        self,
901
        scope: Scope,
902
        parameter_name: str,
903
        before_parameter: Maybe[dict],
904
        after_parameter: Maybe[dict],
905
    ) -> NodeParameter:
906
        node_parameter = self._visited_scopes.get(scope)
1✔
907
        if isinstance(node_parameter, NodeParameter):
1✔
908
            return node_parameter
1✔
909

910
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
911
            scope, TypeKey, before_parameter, after_parameter
912
        )
913
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
914

915
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
916
            scope, DefaultKey, before_parameter, after_parameter
917
        )
918
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
919

920
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
921

922
        node_parameter = NodeParameter(
1✔
923
            scope=scope,
924
            name=parameter_name,
925
            type_=type_,
926
            default_value=default_value,
927
            dynamic_value=dynamic_value,
928
        )
929
        self._visited_scopes[scope] = node_parameter
1✔
930
        return node_parameter
1✔
931

932
    def _visit_parameters(
1✔
933
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
934
    ) -> NodeParameters:
935
        node_parameters = self._visited_scopes.get(scope)
1✔
936
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
937
            return node_parameters
×
938
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
939
        parameters: list[NodeParameter] = list()
1✔
940
        for parameter_name in parameter_names:
1✔
941
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
942
                scope, parameter_name, before_parameters, after_parameters
943
            )
944
            parameter = self._visit_parameter(
1✔
945
                scope=parameter_scope,
946
                parameter_name=parameter_name,
947
                before_parameter=before_parameter,
948
                after_parameter=after_parameter,
949
            )
950
            parameters.append(parameter)
1✔
951
        node_parameters = NodeParameters(scope=scope, parameters=parameters)
1✔
952
        self._visited_scopes[scope] = node_parameters
1✔
953
        return node_parameters
1✔
954

955
    @staticmethod
1✔
956
    def _normalise_depends_on_value(value: Maybe[str | list[str]]) -> Maybe[list[str]]:
1✔
957
        # To simplify downstream logics, reduce the type options to array of strings.
958
        # TODO: Add integrations tests for DependsOn validations (invalid types, duplicate identifiers, etc.)
959
        if isinstance(value, NothingType):
1✔
960
            return value
1✔
961
        if isinstance(value, str):
1✔
962
            value = [value]
1✔
963
        elif isinstance(value, list):
1✔
964
            value.sort()
1✔
965
        else:
UNCOV
966
            raise RuntimeError(
×
967
                f"Invalid type for DependsOn, expected a String or Array of String, but got: '{value}'"
968
            )
969
        return value
1✔
970

971
    def _visit_depends_on(
1✔
972
        self,
973
        scope: Scope,
974
        before_depends_on: Maybe[str | list[str]],
975
        after_depends_on: Maybe[str | list[str]],
976
    ) -> NodeDependsOn:
977
        before_depends_on = self._normalise_depends_on_value(value=before_depends_on)
1✔
978
        after_depends_on = self._normalise_depends_on_value(value=after_depends_on)
1✔
979
        node_array = self._visit_array(
1✔
980
            scope=scope, before_array=before_depends_on, after_array=after_depends_on
981
        )
982
        node_depends_on = NodeDependsOn(scope=scope, depends_on=node_array)
1✔
983
        return node_depends_on
1✔
984

985
    def _visit_condition(
1✔
986
        self,
987
        scope: Scope,
988
        condition_name: str,
989
        before_condition: Maybe[dict],
990
        after_condition: Maybe[dict],
991
    ) -> NodeCondition:
992
        node_condition = self._visited_scopes.get(scope)
1✔
993
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
994
            return node_condition
×
995
        body = self._visit_value(
1✔
996
            scope=scope, before_value=before_condition, after_value=after_condition
997
        )
998
        node_condition = NodeCondition(scope=scope, name=condition_name, body=body)
1✔
999
        self._visited_scopes[scope] = node_condition
1✔
1000
        return node_condition
1✔
1001

1002
    def _visit_conditions(
1✔
1003
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
1004
    ) -> NodeConditions:
1005
        node_conditions = self._visited_scopes.get(scope)
1✔
1006
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
1007
            return node_conditions
×
1008
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
1009
        conditions: list[NodeCondition] = list()
1✔
1010
        for condition_name in condition_names:
1✔
1011
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1012
                scope, condition_name, before_conditions, after_conditions
1013
            )
1014
            condition = self._visit_condition(
1✔
1015
                scope=condition_scope,
1016
                condition_name=condition_name,
1017
                before_condition=before_condition,
1018
                after_condition=after_condition,
1019
            )
1020
            conditions.append(condition)
1✔
1021
        node_conditions = NodeConditions(scope=scope, conditions=conditions)
1✔
1022
        self._visited_scopes[scope] = node_conditions
1✔
1023
        return node_conditions
1✔
1024

1025
    def _visit_output(
1✔
1026
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
1027
    ) -> NodeOutput:
1028
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
1029
            scope, ValueKey, before_output, after_output
1030
        )
1031
        value = self._visit_value(scope_value, before_value, after_value)
1✔
1032

1033
        export: Maybe[ChangeSetEntity] = Nothing
1✔
1034
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
1035
            scope, ExportKey, before_output, after_output
1036
        )
1037
        if before_export or after_export:
1✔
UNCOV
1038
            export = self._visit_value(scope_export, before_export, after_export)
×
1039

1040
        # TODO: condition references should be resolved for the condition's change_type?
1041
        condition_reference: Maybe[TerminalValue] = Nothing
1✔
1042
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1043
            scope, ConditionKey, before_output, after_output
1044
        )
1045
        if before_condition or after_condition:
1✔
1046
            condition_reference = self._visit_terminal_value(
1✔
1047
                scope_condition, before_condition, after_condition
1048
            )
1049

1050
        return NodeOutput(
1✔
1051
            scope=scope,
1052
            name=name,
1053
            value=value,
1054
            export=export,
1055
            conditional_reference=condition_reference,
1056
        )
1057

1058
    def _visit_outputs(
1✔
1059
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1060
    ) -> NodeOutputs:
1061
        outputs: list[NodeOutput] = list()
1✔
1062
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1063
        for output_name in output_names:
1✔
1064
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1065
                scope, output_name, before_outputs, after_outputs
1066
            )
1067
            output = self._visit_output(
1✔
1068
                scope=scope_output,
1069
                name=output_name,
1070
                before_output=before_output,
1071
                after_output=after_output,
1072
            )
1073
            outputs.append(output)
1✔
1074
        return NodeOutputs(scope=scope, outputs=outputs)
1✔
1075

1076
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1077
        root_scope = Scope()
1✔
1078
        # TODO: visit other child types
1079

1080
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1081
            root_scope, MappingsKey, before_template, after_template
1082
        )
1083
        mappings = self._visit_mappings(
1✔
1084
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1085
        )
1086

1087
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1088
            root_scope, ParametersKey, before_template, after_template
1089
        )
1090
        parameters = self._visit_parameters(
1✔
1091
            scope=parameters_scope,
1092
            before_parameters=before_parameters,
1093
            after_parameters=after_parameters,
1094
        )
1095

1096
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1097
            root_scope, ConditionsKey, before_template, after_template
1098
        )
1099
        conditions = self._visit_conditions(
1✔
1100
            scope=conditions_scope,
1101
            before_conditions=before_conditions,
1102
            after_conditions=after_conditions,
1103
        )
1104

1105
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1106
            root_scope, ResourcesKey, before_template, after_template
1107
        )
1108
        resources = self._visit_resources(
1✔
1109
            scope=resources_scope,
1110
            before_resources=before_resources,
1111
            after_resources=after_resources,
1112
        )
1113

1114
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1115
            root_scope, OutputsKey, before_template, after_template
1116
        )
1117
        outputs = self._visit_outputs(
1✔
1118
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1119
        )
1120

1121
        # TODO: compute the change_type of the template properly.
1122
        return NodeTemplate(
1✔
1123
            scope=root_scope,
1124
            mappings=mappings,
1125
            parameters=parameters,
1126
            conditions=conditions,
1127
            resources=resources,
1128
            outputs=outputs,
1129
        )
1130

1131
    def _retrieve_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
1132
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1133
            Scope(), ConditionsKey, self._before_template, self._after_template
1134
        )
1135
        before_conditions = before_conditions or dict()
1✔
1136
        after_conditions = after_conditions or dict()
1✔
1137
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
UNCOV
1138
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1139
                conditions_scope, condition_name, before_conditions, after_conditions
1140
            )
UNCOV
1141
            node_condition = self._visit_condition(
×
1142
                conditions_scope,
1143
                condition_name,
1144
                before_condition=before_condition,
1145
                after_condition=after_condition,
1146
            )
UNCOV
1147
            return node_condition
×
1148
        return Nothing
1✔
1149

1150
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
1151
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1152
            Scope(), ParametersKey, self._before_template, self._after_template
1153
        )
1154
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1155
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1156
                parameters_scope, parameter_name, before_parameters, after_parameters
1157
            )
1158
            node_parameter = self._visit_parameter(
1✔
1159
                parameter_scope,
1160
                parameter_name,
1161
                before_parameter=before_parameter,
1162
                after_parameter=after_parameter,
1163
            )
1164
            return node_parameter
1✔
1165
        return Nothing
1✔
1166

1167
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1168
        # TODO: add caching mechanism, and raise appropriate error if missing.
1169
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1170
            Scope(), MappingsKey, self._before_template, self._after_template
1171
        )
1172
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1173
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1174
                scope_mappings, mapping_name, before_mappings, after_mappings
1175
            )
1176
            node_mapping = self._visit_mapping(
1✔
1177
                scope_mapping, mapping_name, before_mapping, after_mapping
1178
            )
1179
            return node_mapping
1✔
UNCOV
1180
        raise RuntimeError()
×
1181

1182
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1183
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1184
            Scope(),
1185
            ResourcesKey,
1186
            self._before_template,
1187
            self._after_template,
1188
        )
1189
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1190
            resources_scope, resource_name, before_resources, after_resources
1191
        )
1192
        return self._visit_resource(
1✔
1193
            scope=resource_scope,
1194
            resource_name=resource_name,
1195
            before_resource=before_resource,
1196
            after_resource=after_resource,
1197
        )
1198

1199
    @staticmethod
1✔
1200
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1201
        # TODO: are intrinsic functions soft keywords?
1202
        return function_name in INTRINSIC_FUNCTIONS
1✔
1203

1204
    @staticmethod
1✔
1205
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1206
        results = list()
1✔
1207
        for obj in objects:
1✔
1208
            # TODO: raise errors if not dict
1209
            if not isinstance(obj, NothingType):
1✔
1210
                results.append(obj.get(key, Nothing))
1✔
1211
            else:
1212
                results.append(obj)
1✔
1213
        new_scope = scope.open_scope(name=key)
1✔
1214
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1215

1216
    @staticmethod
1✔
1217
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1218
        key_set: set[str] = set()
1✔
1219
        for obj in objects:
1✔
1220
            # TODO: raise errors if not dict
1221
            if isinstance(obj, dict):
1✔
1222
                key_set.update(obj.keys())
1✔
1223
        # The keys list is sorted to increase reproducibility of the
1224
        # update graph build process or downstream logics.
1225
        keys = sorted(key_set)
1✔
1226
        return keys
1✔
1227

1228
    @staticmethod
1✔
1229
    def _name_if_intrinsic_function(value: Maybe[Any]) -> Optional[str]:
1✔
1230
        if isinstance(value, dict):
1✔
1231
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1232
            if len(keys) == 1:
1✔
1233
                key_name = keys[0]
1✔
1234
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1235
                    return key_name
1✔
1236
        return None
1✔
1237

1238
    @staticmethod
1✔
1239
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1240
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1241
        if maybe_intrinsic_function_name is not None:
1✔
1242
            return maybe_intrinsic_function_name
1✔
1243
        return type(value).__name__
1✔
1244

1245
    @staticmethod
1✔
1246
    def _is_terminal(value: Any) -> bool:
1✔
1247
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1248

1249
    @staticmethod
1✔
1250
    def _is_object(value: Any) -> bool:
1✔
1251
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1252

1253
    @staticmethod
1✔
1254
    def _is_array(value: Any) -> bool:
1✔
1255
        return isinstance(value, list)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc