• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.07
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from collections.abc import Generator
1✔
6
from itertools import zip_longest
1✔
7
from typing import TYPE_CHECKING, Any, Final, TypedDict, cast
1✔
8

9
from typing_extensions import TypeVar
1✔
10

11
from localstack.aws.api.cloudformation import ChangeAction
1✔
12
from localstack.services.cloudformation.resource_provider import ResourceProviderExecutor
1✔
13
from localstack.utils.strings import camel_to_snake_case
1✔
14

15
if TYPE_CHECKING:
1✔
UNCOV
16
    from localstack.services.cloudformation.v2.entities import EngineParameter
×
17

18
T = TypeVar("T")
1✔
19

20

21
class NothingType:
1✔
22
    """A sentinel that denotes 'no value' (distinct from None)."""
23

24
    _singleton = None
1✔
25
    __slots__ = ()
1✔
26

27
    def __new__(cls):
1✔
28
        if cls._singleton is None:
1✔
29
            cls._singleton = super().__new__(cls)
1✔
30
        return cls._singleton
1✔
31

32
    def __eq__(self, other):
1✔
33
        return is_nothing(other)
1✔
34

35
    def __str__(self):
1✔
UNCOV
36
        return repr(self)
×
37

38
    def __repr__(self) -> str:
39
        return "Nothing"
40

41
    def __bool__(self):
1✔
42
        return False
1✔
43

44
    def __iter__(self):
1✔
45
        return iter(())
1✔
46

47
    def __contains__(self, item):
1✔
48
        return False
1✔
49

50

51
Maybe = T | NothingType
1✔
52
Nothing = NothingType()
1✔
53

54

55
def is_nothing(value: Any) -> bool:
1✔
56
    return isinstance(value, NothingType)
1✔
57

58

59
def is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
60
    return is_nothing(before) and not is_nothing(after)
1✔
61

62

63
def is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
64
    return not is_nothing(before) and is_nothing(after)
1✔
65

66

67
def parent_change_type_of(children: list[Maybe[ChangeSetEntity]]):
1✔
68
    change_types = [c.change_type for c in children if not is_nothing(c)]
1✔
69
    if not change_types:
1✔
70
        return ChangeType.UNCHANGED
1✔
71
    first_type = change_types[0]
1✔
72
    if all(ct == first_type for ct in change_types):
1✔
73
        return first_type
1✔
74
    return ChangeType.MODIFIED
1✔
75

76

77
def change_type_of(before: Maybe[Any], after: Maybe[Any], children: list[Maybe[ChangeSetEntity]]):
1✔
78
    if is_created(before, after):
1✔
79
        change_type = ChangeType.CREATED
1✔
80
    elif is_removed(before, after):
1✔
81
        change_type = ChangeType.REMOVED
1✔
82
    else:
83
        change_type = parent_change_type_of(children)
1✔
84
    return change_type
1✔
85

86

87
class NormalisedGlobalTransformDefinition(TypedDict):
1✔
88
    Name: Any
1✔
89
    Parameters: Maybe[Any]
1✔
90

91

92
class Scope(str):
1✔
93
    _ROOT_SCOPE: Final[str] = ""
1✔
94
    _SEPARATOR: Final[str] = "/"
1✔
95

96
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
97
        return cast(Scope, super().__new__(cls, scope))
1✔
98

99
    def open_scope(self, name: Scope | str) -> Scope:
1✔
100
        return Scope(self._SEPARATOR.join([self, name]))
1✔
101

102
    def open_index(self, index: int) -> Scope:
1✔
103
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
104

105
    def unwrap(self) -> list[str]:
1✔
UNCOV
106
        return self.split(self._SEPARATOR)
×
107

108

109
class ChangeType(enum.Enum):
1✔
110
    UNCHANGED = "Unchanged"
1✔
111
    CREATED = "Created"
1✔
112
    MODIFIED = "Modified"
1✔
113
    REMOVED = "Removed"
1✔
114

115
    def __str__(self):
1✔
UNCOV
116
        return self.value
×
117

118
    def to_change_action(self) -> ChangeAction:
1✔
119
        # Convert this change type into the change action used throughout the CFn API
UNCOV
120
        return {
×
121
            ChangeType.CREATED: ChangeAction.Add,
122
            ChangeType.MODIFIED: ChangeAction.Modify,
123
            ChangeType.REMOVED: ChangeAction.Remove,
124
        }.get(self, ChangeAction.Add)
125

126

127
class ChangeSetEntity(abc.ABC):
1✔
128
    scope: Final[Scope]
1✔
129
    change_type: Final[ChangeType]
1✔
130

131
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
132
        self.scope = scope
1✔
133
        self.change_type = change_type
1✔
134

135
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
136
        for child in self.__dict__.values():
1✔
137
            yield from self._get_children_in(child)
1✔
138

139
    @staticmethod
1✔
140
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
141
        # TODO: could avoid the inductive logic here, and check for loops?
142
        if isinstance(obj, ChangeSetEntity):
1✔
143
            yield obj
1✔
144
        elif isinstance(obj, list):
1✔
145
            for item in obj:
1✔
146
                yield from ChangeSetEntity._get_children_in(item)
1✔
147
        elif isinstance(obj, dict):
1✔
148
            for item in obj.values():
1✔
149
                yield from ChangeSetEntity._get_children_in(item)
1✔
150

151
    def __str__(self):
1✔
UNCOV
152
        return f"({self.__class__.__name__}| {vars(self)}"
×
153

154
    def __repr__(self):
155
        return str(self)
156

157

158
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
159

160

161
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
162

163

164
class UpdateModel:
1✔
165
    # TODO: may be expanded to keep track of other runtime values such as resolved_parameters.
166

167
    node_template: Final[NodeTemplate]
1✔
168
    before_runtime_cache: Final[dict]
1✔
169
    after_runtime_cache: Final[dict]
1✔
170

171
    def __init__(
1✔
172
        self,
173
        node_template: NodeTemplate,
174
    ):
175
        self.node_template = node_template
1✔
176
        self.before_runtime_cache = dict()
1✔
177
        self.after_runtime_cache = dict()
1✔
178

179

180
class NodeTemplate(ChangeSetNode):
1✔
181
    transform: Final[NodeTransform]
1✔
182
    mappings: Final[NodeMappings]
1✔
183
    parameters: Final[NodeParameters]
1✔
184
    conditions: Final[NodeConditions]
1✔
185
    resources: Final[NodeResources]
1✔
186
    outputs: Final[NodeOutputs]
1✔
187

188
    def __init__(
1✔
189
        self,
190
        scope: Scope,
191
        transform: NodeTransform,
192
        mappings: NodeMappings,
193
        parameters: NodeParameters,
194
        conditions: NodeConditions,
195
        resources: NodeResources,
196
        outputs: NodeOutputs,
197
    ):
198
        change_type = parent_change_type_of(
1✔
199
            [transform, mappings, parameters, conditions, resources, outputs]
200
        )
201
        super().__init__(scope=scope, change_type=change_type)
1✔
202
        self.transform = transform
1✔
203
        self.mappings = mappings
1✔
204
        self.parameters = parameters
1✔
205
        self.conditions = conditions
1✔
206
        self.resources = resources
1✔
207
        self.outputs = outputs
1✔
208

209

210
class NodeDivergence(ChangeSetNode):
1✔
211
    value: Final[ChangeSetEntity]
1✔
212
    divergence: Final[ChangeSetEntity]
1✔
213

214
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
215
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
216
        self.value = value
1✔
217
        self.divergence = divergence
1✔
218

219

220
class NodeParameter(ChangeSetNode):
1✔
221
    name: Final[str]
1✔
222
    type_: Final[ChangeSetEntity]
1✔
223
    dynamic_value: Final[ChangeSetEntity]
1✔
224
    default_value: Final[Maybe[ChangeSetEntity]]
1✔
225

226
    def __init__(
1✔
227
        self,
228
        scope: Scope,
229
        name: str,
230
        type_: ChangeSetEntity,
231
        dynamic_value: ChangeSetEntity,
232
        default_value: Maybe[ChangeSetEntity],
233
    ):
234
        change_type = parent_change_type_of([type_, default_value, dynamic_value])
1✔
235
        super().__init__(scope=scope, change_type=change_type)
1✔
236
        self.name = name
1✔
237
        self.type_ = type_
1✔
238
        self.dynamic_value = dynamic_value
1✔
239
        self.default_value = default_value
1✔
240

241

242
class NodeParameters(ChangeSetNode):
1✔
243
    parameters: Final[list[NodeParameter]]
1✔
244

245
    def __init__(self, scope: Scope, parameters: list[NodeParameter]):
1✔
246
        change_type = parent_change_type_of(parameters)
1✔
247
        super().__init__(scope=scope, change_type=change_type)
1✔
248
        self.parameters = parameters
1✔
249

250

251
class NodeMapping(ChangeSetNode):
1✔
252
    name: Final[str]
1✔
253
    bindings: Final[NodeObject]
1✔
254

255
    def __init__(self, scope: Scope, name: str, bindings: NodeObject):
1✔
256
        super().__init__(scope=scope, change_type=bindings.change_type)
1✔
257
        self.name = name
1✔
258
        self.bindings = bindings
1✔
259

260

261
class NodeMappings(ChangeSetNode):
1✔
262
    mappings: Final[list[NodeMapping]]
1✔
263

264
    def __init__(self, scope: Scope, mappings: list[NodeMapping]):
1✔
265
        change_type = parent_change_type_of(mappings)
1✔
266
        super().__init__(scope=scope, change_type=change_type)
1✔
267
        self.mappings = mappings
1✔
268

269

270
class NodeOutput(ChangeSetNode):
1✔
271
    name: Final[str]
1✔
272
    value: Final[ChangeSetEntity]
1✔
273
    export: Final[Maybe[ChangeSetEntity]]
1✔
274
    condition_reference: Final[Maybe[TerminalValue]]
1✔
275

276
    def __init__(
1✔
277
        self,
278
        scope: Scope,
279
        name: str,
280
        value: ChangeSetEntity,
281
        export: Maybe[ChangeSetEntity],
282
        conditional_reference: Maybe[TerminalValue],
283
    ):
284
        change_type = parent_change_type_of([value, export, conditional_reference])
1✔
285
        super().__init__(scope=scope, change_type=change_type)
1✔
286
        self.name = name
1✔
287
        self.value = value
1✔
288
        self.export = export
1✔
289
        self.condition_reference = conditional_reference
1✔
290

291

292
class NodeOutputs(ChangeSetNode):
1✔
293
    outputs: Final[list[NodeOutput]]
1✔
294

295
    def __init__(self, scope: Scope, outputs: list[NodeOutput]):
1✔
296
        change_type = parent_change_type_of(outputs)
1✔
297
        super().__init__(scope=scope, change_type=change_type)
1✔
298
        self.outputs = outputs
1✔
299

300

301
class NodeCondition(ChangeSetNode):
1✔
302
    name: Final[str]
1✔
303
    body: Final[ChangeSetEntity]
1✔
304

305
    def __init__(self, scope: Scope, name: str, body: ChangeSetEntity):
1✔
306
        super().__init__(scope=scope, change_type=body.change_type)
1✔
307
        self.name = name
1✔
308
        self.body = body
1✔
309

310

311
class NodeConditions(ChangeSetNode):
1✔
312
    conditions: Final[list[NodeCondition]]
1✔
313

314
    def __init__(self, scope: Scope, conditions: list[NodeCondition]):
1✔
315
        change_type = parent_change_type_of(conditions)
1✔
316
        super().__init__(scope=scope, change_type=change_type)
1✔
317
        self.conditions = conditions
1✔
318

319

320
class NodeGlobalTransform(ChangeSetNode):
1✔
321
    name: Final[TerminalValue]
1✔
322
    parameters: Final[Maybe[ChangeSetEntity]]
1✔
323

324
    def __init__(self, scope: Scope, name: TerminalValue, parameters: Maybe[ChangeSetEntity]):
1✔
325
        if not is_nothing(parameters):
1✔
326
            change_type = parent_change_type_of([name, parameters])
1✔
327
        else:
UNCOV
328
            change_type = name.change_type
×
329
        super().__init__(scope=scope, change_type=change_type)
1✔
330
        self.name = name
1✔
331
        self.parameters = parameters
1✔
332

333

334
class NodeTransform(ChangeSetNode):
1✔
335
    global_transforms: Final[list[NodeGlobalTransform]]
1✔
336

337
    def __init__(self, scope: Scope, global_transforms: list[NodeGlobalTransform]):
1✔
338
        change_type = parent_change_type_of(global_transforms)
1✔
339
        super().__init__(scope=scope, change_type=change_type)
1✔
340
        self.global_transforms = global_transforms
1✔
341

342

343
class NodeResources(ChangeSetNode):
1✔
344
    resources: Final[list[NodeResource]]
1✔
345

346
    def __init__(self, scope: Scope, resources: list[NodeResource]):
1✔
347
        change_type = parent_change_type_of(resources)
1✔
348
        super().__init__(scope=scope, change_type=change_type)
1✔
349
        self.resources = resources
1✔
350

351

352
class NodeResource(ChangeSetNode):
1✔
353
    name: Final[str]
1✔
354
    type_: Final[ChangeSetTerminal]
1✔
355
    properties: Final[NodeProperties]
1✔
356
    condition_reference: Final[Maybe[TerminalValue]]
1✔
357
    depends_on: Final[Maybe[NodeDependsOn]]
1✔
358
    requires_replacement: Final[bool]
1✔
359

360
    def __init__(
1✔
361
        self,
362
        scope: Scope,
363
        change_type: ChangeType,
364
        name: str,
365
        type_: ChangeSetTerminal,
366
        properties: NodeProperties,
367
        condition_reference: Maybe[TerminalValue],
368
        depends_on: Maybe[NodeDependsOn],
369
        requires_replacement: bool,
370
    ):
371
        super().__init__(scope=scope, change_type=change_type)
1✔
372
        self.name = name
1✔
373
        self.type_ = type_
1✔
374
        self.properties = properties
1✔
375
        self.condition_reference = condition_reference
1✔
376
        self.depends_on = depends_on
1✔
377
        self.requires_replacement = requires_replacement
1✔
378

379

380
class NodeProperties(ChangeSetNode):
1✔
381
    properties: Final[list[NodeProperty]]
1✔
382

383
    def __init__(self, scope: Scope, properties: list[NodeProperty]):
1✔
384
        change_type = parent_change_type_of(properties)
1✔
385
        super().__init__(scope=scope, change_type=change_type)
1✔
386
        self.properties = properties
1✔
387

388

389
class NodeDependsOn(ChangeSetNode):
1✔
390
    depends_on: Final[NodeArray]
1✔
391

392
    def __init__(self, scope: Scope, depends_on: NodeArray):
1✔
393
        super().__init__(scope=scope, change_type=depends_on.change_type)
1✔
394
        self.depends_on = depends_on
1✔
395

396

397
class NodeProperty(ChangeSetNode):
1✔
398
    name: Final[str]
1✔
399
    value: Final[ChangeSetEntity]
1✔
400

401
    def __init__(self, scope: Scope, name: str, value: ChangeSetEntity):
1✔
402
        super().__init__(scope=scope, change_type=value.change_type)
1✔
403
        self.name = name
1✔
404
        self.value = value
1✔
405

406

407
class NodeIntrinsicFunction(ChangeSetNode):
1✔
408
    intrinsic_function: Final[str]
1✔
409
    arguments: Final[ChangeSetEntity]
1✔
410

411
    def __init__(
1✔
412
        self,
413
        scope: Scope,
414
        change_type: ChangeType,
415
        intrinsic_function: str,
416
        arguments: ChangeSetEntity,
417
    ):
418
        super().__init__(scope=scope, change_type=change_type)
1✔
419
        self.intrinsic_function = intrinsic_function
1✔
420
        self.arguments = arguments
1✔
421

422

423
class NodeObject(ChangeSetNode):
1✔
424
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
425

426
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
427
        super().__init__(scope=scope, change_type=change_type)
1✔
428
        self.bindings = bindings
1✔
429

430

431
class NodeArray(ChangeSetNode):
1✔
432
    array: Final[list[ChangeSetEntity]]
1✔
433

434
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
435
        super().__init__(scope=scope, change_type=change_type)
1✔
436
        self.array = array
1✔
437

438

439
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
440
    value: Final[Any]
1✔
441

442
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
443
        super().__init__(scope=scope, change_type=change_type)
1✔
444
        self.value = value
1✔
445

446

447
class TerminalValueModified(TerminalValue):
1✔
448
    modified_value: Final[Any]
1✔
449

450
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
451
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
452
        self.modified_value = modified_value
1✔
453

454

455
class TerminalValueCreated(TerminalValue):
1✔
456
    def __init__(self, scope: Scope, value: Any):
1✔
457
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
458

459

460
class TerminalValueRemoved(TerminalValue):
1✔
461
    def __init__(self, scope: Scope, value: Any):
1✔
462
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
463

464

465
class TerminalValueUnchanged(TerminalValue):
1✔
466
    def __init__(self, scope: Scope, value: Any):
1✔
467
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
468

469

470
NameKey: Final[str] = "Name"
1✔
471
TransformKey: Final[str] = "Transform"
1✔
472
TypeKey: Final[str] = "Type"
1✔
473
ConditionKey: Final[str] = "Condition"
1✔
474
ConditionsKey: Final[str] = "Conditions"
1✔
475
MappingsKey: Final[str] = "Mappings"
1✔
476
ResourcesKey: Final[str] = "Resources"
1✔
477
PropertiesKey: Final[str] = "Properties"
1✔
478
ParametersKey: Final[str] = "Parameters"
1✔
479
DefaultKey: Final[str] = "Default"
1✔
480
ValueKey: Final[str] = "Value"
1✔
481
ExportKey: Final[str] = "Export"
1✔
482
OutputsKey: Final[str] = "Outputs"
1✔
483
DependsOnKey: Final[str] = "DependsOn"
1✔
484
# TODO: expand intrinsic functions set.
485
RefKey: Final[str] = "Ref"
1✔
486
RefConditionKey: Final[str] = "Condition"
1✔
487
FnIfKey: Final[str] = "Fn::If"
1✔
488
FnAnd: Final[str] = "Fn::And"
1✔
489
FnOr: Final[str] = "Fn::Or"
1✔
490
FnNotKey: Final[str] = "Fn::Not"
1✔
491
FnJoinKey: Final[str] = "Fn::Join"
1✔
492
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
493
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
494
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
495
FnSubKey: Final[str] = "Fn::Sub"
1✔
496
FnTransform: Final[str] = "Fn::Transform"
1✔
497
FnSelect: Final[str] = "Fn::Select"
1✔
498
FnSplit: Final[str] = "Fn::Split"
1✔
499
FnGetAZs: Final[str] = "Fn::GetAZs"
1✔
500
FnBase64: Final[str] = "Fn::Base64"
1✔
501
FnImportValue: Final[str] = "Fn::ImportValue"
1✔
502
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
503
    RefKey,
504
    RefConditionKey,
505
    FnIfKey,
506
    FnAnd,
507
    FnOr,
508
    FnNotKey,
509
    FnJoinKey,
510
    FnEqualsKey,
511
    FnGetAttKey,
512
    FnFindInMapKey,
513
    FnSubKey,
514
    FnTransform,
515
    FnSelect,
516
    FnSplit,
517
    FnGetAZs,
518
    FnBase64,
519
    FnImportValue,
520
}
521

522

523
class ChangeSetModel:
1✔
524
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
525

526
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
527

528
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
529
    #  such as intrinsic functions.
530

531
    _before_template: Final[Maybe[dict]]
1✔
532
    _after_template: Final[Maybe[dict]]
1✔
533
    _before_parameters: Final[Maybe[dict]]
1✔
534
    _after_parameters: Final[Maybe[dict]]
1✔
535
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
536
    _node_template: Final[NodeTemplate]
1✔
537

538
    def __init__(
1✔
539
        self,
540
        before_template: dict | None,
541
        after_template: dict | None,
542
        before_parameters: dict | None,
543
        after_parameters: dict[str, EngineParameter] | None,
544
    ):
545
        self._before_template = before_template or Nothing
1✔
546
        self._after_template = after_template or Nothing
1✔
547
        self._before_parameters = before_parameters or Nothing
1✔
548
        self._after_parameters = after_parameters or Nothing
1✔
549
        self._visited_scopes = dict()
1✔
550
        # TODO: move this modeling process to the `get_update_model` method as constructors shouldn't do work
551
        self._node_template = self._model(
1✔
552
            before_template=self._before_template, after_template=self._after_template
553
        )
554
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
555

556
    def get_update_model(self) -> UpdateModel:
1✔
557
        return UpdateModel(node_template=self._node_template)
1✔
558

559
    def _visit_terminal_value(
1✔
560
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
561
    ) -> TerminalValue:
562
        terminal_value = self._visited_scopes.get(scope)
1✔
563
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
564
            return terminal_value
×
565
        if is_created(before=before_value, after=after_value):
1✔
566
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
567
        elif is_removed(before=before_value, after=after_value):
1✔
568
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
569
        elif before_value == after_value:
1✔
570
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
571
        else:
572
            terminal_value = TerminalValueModified(
1✔
573
                scope=scope, value=before_value, modified_value=after_value
574
            )
575
        self._visited_scopes[scope] = terminal_value
1✔
576
        return terminal_value
1✔
577

578
    def _visit_intrinsic_function(
1✔
579
        self,
580
        scope: Scope,
581
        intrinsic_function: str,
582
        before_arguments: Maybe[Any],
583
        after_arguments: Maybe[Any],
584
    ) -> NodeIntrinsicFunction:
585
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
586
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
587
            return node_intrinsic_function
×
588
        arguments_scope = scope.open_scope("args")
1✔
589
        arguments = self._visit_value(
1✔
590
            scope=arguments_scope, before_value=before_arguments, after_value=after_arguments
591
        )
592
        if is_created(before=before_arguments, after=after_arguments):
1✔
593
            change_type = ChangeType.CREATED
1✔
594
        elif is_removed(before=before_arguments, after=after_arguments):
1✔
595
            change_type = ChangeType.REMOVED
1✔
596
        else:
597
            function_name = intrinsic_function.replace("::", "_")
1✔
598
            function_name = camel_to_snake_case(function_name)
1✔
599
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
600
            if hasattr(self, resolve_function_name):
1✔
601
                resolve_function = getattr(self, resolve_function_name)
1✔
602
                change_type = resolve_function(arguments)
1✔
603
            else:
604
                change_type = arguments.change_type
1✔
605
        node_intrinsic_function = NodeIntrinsicFunction(
1✔
606
            scope=scope,
607
            change_type=change_type,
608
            intrinsic_function=intrinsic_function,
609
            arguments=arguments,
610
        )
611
        self._visited_scopes[scope] = node_intrinsic_function
1✔
612
        return node_intrinsic_function
1✔
613

614
    def _resolve_intrinsic_function_fn_sub(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
615
        # TODO: This routine should instead export the implicit Ref and GetAtt calls within the first
616
        #       string template parameter and compute the respective change set types. Currently,
617
        #       changes referenced by Fn::Sub templates are only picked up during preprocessing; not
618
        #       at modelling.
619
        return arguments.change_type
1✔
620

621
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
622
        # TODO: add support for nested intrinsic functions.
623
        # TODO: validate arguments structure and type.
624
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
625

626
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
627
            raise RuntimeError()
×
628
        logical_name_of_resource_entity = arguments.array[0]
1✔
629
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
UNCOV
630
            raise RuntimeError()
×
631
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
632
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
633
            raise RuntimeError()
×
634
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
635
            resource_name=logical_name_of_resource
636
        )
637

638
        node_property_attribute_name = arguments.array[1]
1✔
639
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
640
            raise RuntimeError()
×
641
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
UNCOV
642
            attribute_name = node_property_attribute_name.modified_value
×
643
        else:
644
            attribute_name = node_property_attribute_name.value
1✔
645

646
        # TODO: this is another use case for which properties should be referenced by name
647
        for node_property in node_resource.properties.properties:
1✔
648
            if node_property.name == attribute_name:
1✔
649
                return node_property.change_type
1✔
650

651
        return ChangeType.UNCHANGED
1✔
652

653
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
654
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
655
            return arguments.change_type
×
656
        if not isinstance(arguments, TerminalValue):
1✔
UNCOV
657
            return arguments.change_type
×
658

659
        logical_id = arguments.value
1✔
660

661
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
662
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
663
            return node_condition.change_type
×
664

665
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
666
        if isinstance(node_parameter, NodeParameter):
1✔
667
            return node_parameter.change_type
1✔
668

669
        # TODO: this should check the replacement flag for a resource update.
670
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
671
        return node_resource.change_type
1✔
672

673
    def _resolve_intrinsic_function_condition(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
674
        if arguments.change_type != ChangeType.UNCHANGED:
×
675
            return arguments.change_type
×
676
        if not isinstance(arguments, TerminalValue):
×
677
            return arguments.change_type
×
678

UNCOV
679
        condition_name = arguments.value
×
UNCOV
680
        node_condition = self._retrieve_condition_if_exists(condition_name=condition_name)
×
UNCOV
681
        if isinstance(node_condition, NodeCondition):
×
UNCOV
682
            return node_condition.change_type
×
UNCOV
683
        raise RuntimeError(f"Undefined condition '{condition_name}'")
×
684

685
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
686
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
687
            return arguments.change_type
1✔
688
        # TODO: validate arguments structure and type.
689
        # TODO: add support for nested functions, here we assume the arguments are string literals.
690

691
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
692
            raise RuntimeError()
×
693
        argument_mapping_name = arguments.array[0]
1✔
694
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
695
            raise NotImplementedError()
696
        argument_top_level_key = arguments.array[1]
1✔
697
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
698
            raise NotImplementedError()
699
        argument_second_level_key = arguments.array[2]
1✔
700
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
701
            raise NotImplementedError()
702
        mapping_name = argument_mapping_name.value
1✔
703
        top_level_key = argument_top_level_key.value
1✔
704
        second_level_key = argument_second_level_key.value
1✔
705

706
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
707
        # TODO: a lookup would be beneficial in this scenario too;
708
        #  consider implications downstream and for replication.
709
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
710
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
711
            raise RuntimeError()
×
712
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
713
        return target_map_value.change_type
1✔
714

715
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
716
        # TODO: validate arguments structure and type.
717
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
718
            raise RuntimeError()
×
719
        logical_name_of_condition_entity = arguments.array[0]
×
UNCOV
720
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
721
            raise RuntimeError()
×
UNCOV
722
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
UNCOV
723
        if not isinstance(logical_name_of_condition, str):
×
724
            raise RuntimeError()
×
725

726
        node_condition = self._retrieve_condition_if_exists(
×
727
            condition_name=logical_name_of_condition
728
        )
UNCOV
729
        if not isinstance(node_condition, NodeCondition):
×
UNCOV
730
            raise RuntimeError()
×
NEW
731
        change_type = parent_change_type_of([node_condition, *arguments.array[1:]])
×
UNCOV
732
        return change_type
×
733

734
    def _resolve_requires_replacement(
1✔
735
        self, node_properties: NodeProperties, resource_type: TerminalValue
736
    ) -> bool:
737
        # a bit hacky but we have to load the resource provider executor _and_ resource provider to get the schema
738
        # Note: we don't log the attempt to load the resource provider, we need to make sure this is only done once and we already do this in the executor
739
        resource_provider = ResourceProviderExecutor.try_load_resource_provider(resource_type.value)
1✔
740
        if not resource_provider:
1✔
741
            # if we don't support a resource, assume an in-place update for simplicity
742
            return False
1✔
743

744
        create_only_properties: list[str] = resource_provider.SCHEMA.get("createOnlyProperties", [])
1✔
745
        # TODO: also hacky: strip the leading `/properties/` string from the definition
746
        #       ideally we should use a jsonpath or similar
747
        create_only_properties = [
1✔
748
            property.replace("/properties/", "", 1) for property in create_only_properties
749
        ]
750
        for node_property in node_properties.properties:
1✔
751
            if (
1✔
752
                node_property.change_type == ChangeType.MODIFIED
753
                and node_property.name in create_only_properties
754
            ):
755
                return True
1✔
756
        return False
1✔
757

758
    def _visit_array(
1✔
759
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
760
    ) -> NodeArray:
761
        array: list[ChangeSetEntity] = list()
1✔
762
        for index, (before_value, after_value) in enumerate(
1✔
763
            zip_longest(before_array, after_array, fillvalue=Nothing)
764
        ):
765
            value_scope = scope.open_index(index=index)
1✔
766
            value = self._visit_value(
1✔
767
                scope=value_scope, before_value=before_value, after_value=after_value
768
            )
769
            array.append(value)
1✔
770
        change_type = change_type_of(before_array, after_array, array)
1✔
771
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
772

773
    def _visit_object(
1✔
774
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
775
    ) -> NodeObject:
776
        node_object = self._visited_scopes.get(scope)
1✔
777
        if isinstance(node_object, NodeObject):
1✔
778
            return node_object
1✔
779
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
780
        bindings: dict[str, ChangeSetEntity] = dict()
1✔
781
        for binding_name in binding_names:
1✔
782
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
783
                scope, binding_name, before_object, after_object
784
            )
785
            value = self._visit_value(
1✔
786
                scope=binding_scope, before_value=before_value, after_value=after_value
787
            )
788
            bindings[binding_name] = value
1✔
789
        change_type = change_type_of(before_object, after_object, list(bindings.values()))
1✔
790
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
791
        self._visited_scopes[scope] = node_object
1✔
792
        return node_object
1✔
793

794
    def _visit_divergence(
1✔
795
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
796
    ) -> NodeDivergence:
797
        scope_value = scope.open_scope("value")
1✔
798
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
799
        scope_divergence = scope.open_scope("divergence")
1✔
800
        divergence = self._visit_value(
1✔
801
            scope=scope_divergence, before_value=Nothing, after_value=after_value
802
        )
803
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
804

805
    def _visit_value(
1✔
806
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
807
    ) -> ChangeSetEntity:
808
        value = self._visited_scopes.get(scope)
1✔
809
        if isinstance(value, ChangeSetEntity):
1✔
UNCOV
810
            return value
×
811

812
        before_type_name = self._type_name_of(before_value)
1✔
813
        after_type_name = self._type_name_of(after_value)
1✔
814
        unset = object()
1✔
815
        if before_type_name == after_type_name:
1✔
816
            dominant_value = before_value
1✔
817
        elif is_created(before=before_value, after=after_value):
1✔
818
            dominant_value = after_value
1✔
819
        elif is_removed(before=before_value, after=after_value):
1✔
820
            dominant_value = before_value
1✔
821
        else:
822
            dominant_value = unset
1✔
823
        if dominant_value is not unset:
1✔
824
            dominant_type_name = self._type_name_of(dominant_value)
1✔
825
            if self._is_terminal(value=dominant_value):
1✔
826
                value = self._visit_terminal_value(
1✔
827
                    scope=scope, before_value=before_value, after_value=after_value
828
                )
829
            elif self._is_object(value=dominant_value):
1✔
830
                value = self._visit_object(
1✔
831
                    scope=scope, before_object=before_value, after_object=after_value
832
                )
833
            elif self._is_array(value=dominant_value):
1✔
834
                value = self._visit_array(
1✔
835
                    scope=scope, before_array=before_value, after_array=after_value
836
                )
837
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
838
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
839
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
840
                )
841
                value = self._visit_intrinsic_function(
1✔
842
                    scope=intrinsic_function_scope,
843
                    intrinsic_function=dominant_type_name,
844
                    before_arguments=before_arguments,
845
                    after_arguments=after_arguments,
846
                )
847
            else:
UNCOV
848
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
849
        # Case: type divergence.
850
        else:
851
            value = self._visit_divergence(
1✔
852
                scope=scope, before_value=before_value, after_value=after_value
853
            )
854
        self._visited_scopes[scope] = value
1✔
855
        return value
1✔
856

857
    def _visit_property(
1✔
858
        self,
859
        scope: Scope,
860
        property_name: str,
861
        before_property: Maybe[Any],
862
        after_property: Maybe[Any],
863
    ) -> NodeProperty:
864
        node_property = self._visited_scopes.get(scope)
1✔
865
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
866
            return node_property
×
867
        # TODO: Review the use of Fn::Transform as resource properties.
868
        value = self._visit_value(
1✔
869
            scope=scope, before_value=before_property, after_value=after_property
870
        )
871
        node_property = NodeProperty(scope=scope, name=property_name, value=value)
1✔
872
        self._visited_scopes[scope] = node_property
1✔
873
        return node_property
1✔
874

875
    def _visit_properties(
1✔
876
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
877
    ) -> NodeProperties:
878
        node_properties = self._visited_scopes.get(scope)
1✔
879
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
880
            return node_properties
×
881
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
882
        properties: list[NodeProperty] = list()
1✔
883
        for property_name in property_names:
1✔
884
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
885
                scope, property_name, before_properties, after_properties
886
            )
887
            property_ = self._visit_property(
1✔
888
                scope=property_scope,
889
                property_name=property_name,
890
                before_property=before_property,
891
                after_property=after_property,
892
            )
893
            properties.append(property_)
1✔
894
        node_properties = NodeProperties(scope=scope, properties=properties)
1✔
895
        self._visited_scopes[scope] = node_properties
1✔
896
        return node_properties
1✔
897

898
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
899
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
900
        if not isinstance(value, TerminalValue):
1✔
901
            # TODO: decide where template schema validation should occur.
UNCOV
902
            raise RuntimeError()
×
903
        return value
1✔
904

905
    def _visit_resource(
1✔
906
        self,
907
        scope: Scope,
908
        resource_name: str,
909
        before_resource: Maybe[dict],
910
        after_resource: Maybe[dict],
911
    ) -> NodeResource:
912
        node_resource = self._visited_scopes.get(scope)
1✔
913
        if isinstance(node_resource, NodeResource):
1✔
914
            return node_resource
1✔
915

916
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
917
            scope, TypeKey, before_resource, after_resource
918
        )
919
        terminal_value_type = self._visit_type(
1✔
920
            scope=scope_type, before_type=before_type, after_type=after_type
921
        )
922

923
        condition_reference = Nothing
1✔
924
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
925
            scope, ConditionKey, before_resource, after_resource
926
        )
927
        if before_condition or after_condition:
1✔
928
            condition_reference = self._visit_terminal_value(
1✔
929
                scope_condition, before_condition, after_condition
930
            )
931

932
        depends_on = Nothing
1✔
933
        scope_depends_on, (before_depends_on, after_depends_on) = self._safe_access_in(
1✔
934
            scope, DependsOnKey, before_resource, after_resource
935
        )
936
        if before_depends_on or after_depends_on:
1✔
937
            depends_on = self._visit_depends_on(
1✔
938
                scope_depends_on, before_depends_on, after_depends_on
939
            )
940

941
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
942
            scope, PropertiesKey, before_resource, after_resource
943
        )
944
        properties = self._visit_properties(
1✔
945
            scope=scope_properties,
946
            before_properties=before_properties,
947
            after_properties=after_properties,
948
        )
949

950
        change_type = change_type_of(
1✔
951
            before_resource, after_resource, [properties, condition_reference, depends_on]
952
        )
953
        requires_replacement = self._resolve_requires_replacement(
1✔
954
            node_properties=properties, resource_type=terminal_value_type
955
        )
956
        node_resource = NodeResource(
1✔
957
            scope=scope,
958
            change_type=change_type,
959
            name=resource_name,
960
            type_=terminal_value_type,
961
            properties=properties,
962
            condition_reference=condition_reference,
963
            depends_on=depends_on,
964
            requires_replacement=requires_replacement,
965
        )
966
        self._visited_scopes[scope] = node_resource
1✔
967
        return node_resource
1✔
968

969
    def _visit_resources(
1✔
970
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
971
    ) -> NodeResources:
972
        # TODO: investigate type changes behavior.
973
        resources: list[NodeResource] = list()
1✔
974
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
975
        for resource_name in resource_names:
1✔
976
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
977
                scope, resource_name, before_resources, after_resources
978
            )
979
            resource = self._visit_resource(
1✔
980
                scope=resource_scope,
981
                resource_name=resource_name,
982
                before_resource=before_resource,
983
                after_resource=after_resource,
984
            )
985
            resources.append(resource)
1✔
986
        return NodeResources(scope=scope, resources=resources)
1✔
987

988
    def _visit_mapping(
1✔
989
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
990
    ) -> NodeMapping:
991
        bindings = self._visit_object(
1✔
992
            scope=scope, before_object=before_mapping, after_object=after_mapping
993
        )
994
        return NodeMapping(scope=scope, name=name, bindings=bindings)
1✔
995

996
    def _visit_mappings(
1✔
997
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
998
    ) -> NodeMappings:
999
        mappings: list[NodeMapping] = list()
1✔
1000
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
1001
        for mapping_name in mapping_names:
1✔
1002
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1003
                scope, mapping_name, before_mappings, after_mappings
1004
            )
1005
            mapping = self._visit_mapping(
1✔
1006
                scope=scope_mapping,
1007
                name=mapping_name,
1008
                before_mapping=before_mapping,
1009
                after_mapping=after_mapping,
1010
            )
1011
            mappings.append(mapping)
1✔
1012
        return NodeMappings(scope=scope, mappings=mappings)
1✔
1013

1014
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
1015
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
1016
        scope_parameter, (before_parameter_dct, after_parameter_dct) = self._safe_access_in(
1✔
1017
            scope, parameter_name, self._before_parameters, self._after_parameters
1018
        )
1019

1020
        before_parameter = Nothing
1✔
1021
        if not is_nothing(before_parameter_dct):
1✔
1022
            before_parameter = (
1✔
1023
                before_parameter_dct.get("resolved_value")
1024
                or before_parameter_dct["given_value"]
1025
                or before_parameter_dct.get("default_value")
1026
            )
1027

1028
        after_parameter = Nothing
1✔
1029
        if not is_nothing(after_parameter_dct):
1✔
1030
            after_parameter = (
1✔
1031
                after_parameter_dct.get("resolved_value")
1032
                or after_parameter_dct["given_value"]
1033
                or after_parameter_dct.get("default_value")
1034
            )
1035

1036
        parameter = self._visit_value(
1✔
1037
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
1038
        )
1039
        return parameter
1✔
1040

1041
    def _visit_parameter(
1✔
1042
        self,
1043
        scope: Scope,
1044
        parameter_name: str,
1045
        before_parameter: Maybe[dict],
1046
        after_parameter: Maybe[dict],
1047
    ) -> NodeParameter:
1048
        node_parameter = self._visited_scopes.get(scope)
1✔
1049
        if isinstance(node_parameter, NodeParameter):
1✔
1050
            return node_parameter
1✔
1051

1052
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
1053
            scope, TypeKey, before_parameter, after_parameter
1054
        )
1055
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
1056

1057
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
1058
            scope, DefaultKey, before_parameter, after_parameter
1059
        )
1060
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
1061

1062
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
1063

1064
        node_parameter = NodeParameter(
1✔
1065
            scope=scope,
1066
            name=parameter_name,
1067
            type_=type_,
1068
            default_value=default_value,
1069
            dynamic_value=dynamic_value,
1070
        )
1071
        self._visited_scopes[scope] = node_parameter
1✔
1072
        return node_parameter
1✔
1073

1074
    def _visit_parameters(
1✔
1075
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
1076
    ) -> NodeParameters:
1077
        node_parameters = self._visited_scopes.get(scope)
1✔
1078
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
1079
            return node_parameters
×
1080
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
1081
        parameters: list[NodeParameter] = list()
1✔
1082
        for parameter_name in parameter_names:
1✔
1083
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1084
                scope, parameter_name, before_parameters, after_parameters
1085
            )
1086
            parameter = self._visit_parameter(
1✔
1087
                scope=parameter_scope,
1088
                parameter_name=parameter_name,
1089
                before_parameter=before_parameter,
1090
                after_parameter=after_parameter,
1091
            )
1092
            parameters.append(parameter)
1✔
1093
        node_parameters = NodeParameters(scope=scope, parameters=parameters)
1✔
1094
        self._visited_scopes[scope] = node_parameters
1✔
1095
        return node_parameters
1✔
1096

1097
    @staticmethod
1✔
1098
    def _normalise_depends_on_value(value: Maybe[str | list[str]]) -> Maybe[list[str]]:
1✔
1099
        # To simplify downstream logics, reduce the type options to array of strings.
1100
        # TODO: Add integrations tests for DependsOn validations (invalid types, duplicate identifiers, etc.)
1101
        if isinstance(value, NothingType):
1✔
1102
            return value
1✔
1103
        if isinstance(value, str):
1✔
1104
            value = [value]
1✔
1105
        elif isinstance(value, list):
1✔
1106
            value.sort()
1✔
1107
        else:
UNCOV
1108
            raise RuntimeError(
×
1109
                f"Invalid type for DependsOn, expected a String or Array of String, but got: '{value}'"
1110
            )
1111
        return value
1✔
1112

1113
    def _visit_depends_on(
1✔
1114
        self,
1115
        scope: Scope,
1116
        before_depends_on: Maybe[str | list[str]],
1117
        after_depends_on: Maybe[str | list[str]],
1118
    ) -> NodeDependsOn:
1119
        before_depends_on = self._normalise_depends_on_value(value=before_depends_on)
1✔
1120
        after_depends_on = self._normalise_depends_on_value(value=after_depends_on)
1✔
1121
        node_array = self._visit_array(
1✔
1122
            scope=scope, before_array=before_depends_on, after_array=after_depends_on
1123
        )
1124
        node_depends_on = NodeDependsOn(scope=scope, depends_on=node_array)
1✔
1125
        return node_depends_on
1✔
1126

1127
    def _visit_condition(
1✔
1128
        self,
1129
        scope: Scope,
1130
        condition_name: str,
1131
        before_condition: Maybe[dict],
1132
        after_condition: Maybe[dict],
1133
    ) -> NodeCondition:
1134
        node_condition = self._visited_scopes.get(scope)
1✔
1135
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
1136
            return node_condition
×
1137
        body = self._visit_value(
1✔
1138
            scope=scope, before_value=before_condition, after_value=after_condition
1139
        )
1140
        node_condition = NodeCondition(scope=scope, name=condition_name, body=body)
1✔
1141
        self._visited_scopes[scope] = node_condition
1✔
1142
        return node_condition
1✔
1143

1144
    def _visit_conditions(
1✔
1145
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
1146
    ) -> NodeConditions:
1147
        node_conditions = self._visited_scopes.get(scope)
1✔
1148
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
1149
            return node_conditions
×
1150
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
1151
        conditions: list[NodeCondition] = list()
1✔
1152
        for condition_name in condition_names:
1✔
1153
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1154
                scope, condition_name, before_conditions, after_conditions
1155
            )
1156
            condition = self._visit_condition(
1✔
1157
                scope=condition_scope,
1158
                condition_name=condition_name,
1159
                before_condition=before_condition,
1160
                after_condition=after_condition,
1161
            )
1162
            conditions.append(condition)
1✔
1163
        node_conditions = NodeConditions(scope=scope, conditions=conditions)
1✔
1164
        self._visited_scopes[scope] = node_conditions
1✔
1165
        return node_conditions
1✔
1166

1167
    def _visit_output(
1✔
1168
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
1169
    ) -> NodeOutput:
1170
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
1171
            scope, ValueKey, before_output, after_output
1172
        )
1173
        value = self._visit_value(scope_value, before_value, after_value)
1✔
1174

1175
        export: Maybe[ChangeSetEntity] = Nothing
1✔
1176
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
1177
            scope, ExportKey, before_output, after_output
1178
        )
1179
        if before_export or after_export:
1✔
1180
            export = self._visit_value(scope_export, before_export, after_export)
1✔
1181

1182
        # TODO: condition references should be resolved for the condition's change_type?
1183
        condition_reference: Maybe[TerminalValue] = Nothing
1✔
1184
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1185
            scope, ConditionKey, before_output, after_output
1186
        )
1187
        if before_condition or after_condition:
1✔
1188
            condition_reference = self._visit_terminal_value(
1✔
1189
                scope_condition, before_condition, after_condition
1190
            )
1191

1192
        return NodeOutput(
1✔
1193
            scope=scope,
1194
            name=name,
1195
            value=value,
1196
            export=export,
1197
            conditional_reference=condition_reference,
1198
        )
1199

1200
    def _visit_outputs(
1✔
1201
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1202
    ) -> NodeOutputs:
1203
        outputs: list[NodeOutput] = list()
1✔
1204
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1205
        for output_name in output_names:
1✔
1206
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1207
                scope, output_name, before_outputs, after_outputs
1208
            )
1209
            output = self._visit_output(
1✔
1210
                scope=scope_output,
1211
                name=output_name,
1212
                before_output=before_output,
1213
                after_output=after_output,
1214
            )
1215
            outputs.append(output)
1✔
1216
        return NodeOutputs(scope=scope, outputs=outputs)
1✔
1217

1218
    def _visit_global_transform(
1✔
1219
        self,
1220
        scope: Scope,
1221
        before_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1222
        after_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1223
    ) -> NodeGlobalTransform:
1224
        name_scope, (before_name, after_name) = self._safe_access_in(
1✔
1225
            scope, NameKey, before_global_transform, after_global_transform
1226
        )
1227
        name = self._visit_terminal_value(
1✔
1228
            scope=name_scope, before_value=before_name, after_value=after_name
1229
        )
1230

1231
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1232
            scope, ParametersKey, before_global_transform, after_global_transform
1233
        )
1234
        parameters = self._visit_value(
1✔
1235
            scope=parameters_scope, before_value=before_parameters, after_value=after_parameters
1236
        )
1237

1238
        return NodeGlobalTransform(scope=scope, name=name, parameters=parameters)
1✔
1239

1240
    @staticmethod
1✔
1241
    def _normalise_transformer_value(value: Maybe[str | list[Any]]) -> Maybe[list[Any]]:
1✔
1242
        # To simplify downstream logics, reduce the type options to array of transformations.
1243
        # TODO: add further validation logic
1244
        # TODO: should we sort to avoid detecting user-side ordering changes as template changes?
1245
        if isinstance(value, NothingType):
1✔
1246
            return value
1✔
1247
        elif isinstance(value, str):
1✔
1248
            value = [NormalisedGlobalTransformDefinition(Name=value, Parameters=Nothing)]
1✔
1249
        elif isinstance(value, list):
1✔
1250
            tmp_value = list()
1✔
1251
            for item in value:
1✔
1252
                if isinstance(item, str):
1✔
1253
                    tmp_value.append(
1✔
1254
                        NormalisedGlobalTransformDefinition(Name=item, Parameters=Nothing)
1255
                    )
1256
                else:
UNCOV
1257
                    tmp_value.append(item)
×
1258
            value = tmp_value
1✔
UNCOV
1259
        elif isinstance(value, dict):
×
UNCOV
1260
            if "Name" not in value:
×
UNCOV
1261
                raise RuntimeError(f"Missing 'Name' field in Transform definition '{value}'")
×
UNCOV
1262
            name = value["Name"]
×
UNCOV
1263
            parameters = value.get("Parameters", Nothing)
×
UNCOV
1264
            value = [NormalisedGlobalTransformDefinition(Name=name, Parameters=parameters)]
×
1265
        else:
UNCOV
1266
            raise RuntimeError(f"Invalid Transform definition: '{value}'")
×
1267
        return value
1✔
1268

1269
    def _visit_transform(
1✔
1270
        self, scope: Scope, before_transform: Maybe[Any], after_transform: Maybe[Any]
1271
    ) -> NodeTransform:
1272
        before_transform_normalised = self._normalise_transformer_value(before_transform)
1✔
1273
        after_transform_normalised = self._normalise_transformer_value(after_transform)
1✔
1274
        global_transforms = list()
1✔
1275
        for index, (before_global_transform, after_global_transform) in enumerate(
1✔
1276
            zip_longest(before_transform_normalised, after_transform_normalised, fillvalue=Nothing)
1277
        ):
1278
            global_transform_scope = scope.open_index(index=index)
1✔
1279
            global_transform: NodeGlobalTransform = self._visit_global_transform(
1✔
1280
                scope=global_transform_scope,
1281
                before_global_transform=before_global_transform,
1282
                after_global_transform=after_global_transform,
1283
            )
1284
            global_transforms.append(global_transform)
1✔
1285
        return NodeTransform(scope=scope, global_transforms=global_transforms)
1✔
1286

1287
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1288
        root_scope = Scope()
1✔
1289
        # TODO: visit other child types
1290

1291
        transform_scope, (before_transform, after_transform) = self._safe_access_in(
1✔
1292
            root_scope, TransformKey, before_template, after_template
1293
        )
1294
        transform = self._visit_transform(
1✔
1295
            scope=transform_scope,
1296
            before_transform=before_transform,
1297
            after_transform=after_transform,
1298
        )
1299

1300
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1301
            root_scope, MappingsKey, before_template, after_template
1302
        )
1303
        mappings = self._visit_mappings(
1✔
1304
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1305
        )
1306

1307
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1308
            root_scope, ParametersKey, before_template, after_template
1309
        )
1310
        parameters = self._visit_parameters(
1✔
1311
            scope=parameters_scope,
1312
            before_parameters=before_parameters,
1313
            after_parameters=after_parameters,
1314
        )
1315

1316
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1317
            root_scope, ConditionsKey, before_template, after_template
1318
        )
1319
        conditions = self._visit_conditions(
1✔
1320
            scope=conditions_scope,
1321
            before_conditions=before_conditions,
1322
            after_conditions=after_conditions,
1323
        )
1324

1325
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1326
            root_scope, ResourcesKey, before_template, after_template
1327
        )
1328
        resources = self._visit_resources(
1✔
1329
            scope=resources_scope,
1330
            before_resources=before_resources,
1331
            after_resources=after_resources,
1332
        )
1333

1334
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1335
            root_scope, OutputsKey, before_template, after_template
1336
        )
1337
        outputs = self._visit_outputs(
1✔
1338
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1339
        )
1340

1341
        return NodeTemplate(
1✔
1342
            scope=root_scope,
1343
            transform=transform,
1344
            mappings=mappings,
1345
            parameters=parameters,
1346
            conditions=conditions,
1347
            resources=resources,
1348
            outputs=outputs,
1349
        )
1350

1351
    def _retrieve_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
1352
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1353
            Scope(), ConditionsKey, self._before_template, self._after_template
1354
        )
1355
        before_conditions = before_conditions or dict()
1✔
1356
        after_conditions = after_conditions or dict()
1✔
1357
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
UNCOV
1358
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1359
                conditions_scope, condition_name, before_conditions, after_conditions
1360
            )
UNCOV
1361
            node_condition = self._visit_condition(
×
1362
                conditions_scope,
1363
                condition_name,
1364
                before_condition=before_condition,
1365
                after_condition=after_condition,
1366
            )
UNCOV
1367
            return node_condition
×
1368
        return Nothing
1✔
1369

1370
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
1371
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1372
            Scope(), ParametersKey, self._before_template, self._after_template
1373
        )
1374
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1375
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1376
                parameters_scope, parameter_name, before_parameters, after_parameters
1377
            )
1378
            node_parameter = self._visit_parameter(
1✔
1379
                parameter_scope,
1380
                parameter_name,
1381
                before_parameter=before_parameter,
1382
                after_parameter=after_parameter,
1383
            )
1384
            return node_parameter
1✔
1385
        return Nothing
1✔
1386

1387
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1388
        # TODO: add caching mechanism, and raise appropriate error if missing.
1389
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1390
            Scope(), MappingsKey, self._before_template, self._after_template
1391
        )
1392
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1393
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1394
                scope_mappings, mapping_name, before_mappings, after_mappings
1395
            )
1396
            node_mapping = self._visit_mapping(
1✔
1397
                scope_mapping, mapping_name, before_mapping, after_mapping
1398
            )
1399
            return node_mapping
1✔
UNCOV
1400
        raise RuntimeError()
×
1401

1402
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1403
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1404
            Scope(),
1405
            ResourcesKey,
1406
            self._before_template,
1407
            self._after_template,
1408
        )
1409
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1410
            resources_scope, resource_name, before_resources, after_resources
1411
        )
1412
        return self._visit_resource(
1✔
1413
            scope=resource_scope,
1414
            resource_name=resource_name,
1415
            before_resource=before_resource,
1416
            after_resource=after_resource,
1417
        )
1418

1419
    @staticmethod
1✔
1420
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1421
        # TODO: are intrinsic functions soft keywords?
1422
        return function_name in INTRINSIC_FUNCTIONS
1✔
1423

1424
    @staticmethod
1✔
1425
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1426
        results = list()
1✔
1427
        for obj in objects:
1✔
1428
            if not isinstance(obj, (dict, NothingType)):
1✔
UNCOV
1429
                raise RuntimeError(f"Invalid definition type at '{obj}'")
×
1430
            if not isinstance(obj, NothingType):
1✔
1431
                results.append(obj.get(key, Nothing))
1✔
1432
            else:
1433
                results.append(obj)
1✔
1434
        new_scope = scope.open_scope(name=key)
1✔
1435
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1436

1437
    @staticmethod
1✔
1438
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1439
        key_set: set[str] = set()
1✔
1440
        for obj in objects:
1✔
1441
            # TODO: raise errors if not dict
1442
            if isinstance(obj, dict):
1✔
1443
                key_set.update(obj.keys())
1✔
1444
        # The keys list is sorted to increase reproducibility of the
1445
        # update graph build process or downstream logics.
1446
        keys = sorted(key_set)
1✔
1447
        return keys
1✔
1448

1449
    @staticmethod
1✔
1450
    def _name_if_intrinsic_function(value: Maybe[Any]) -> str | None:
1✔
1451
        if isinstance(value, dict):
1✔
1452
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1453
            if len(keys) == 1:
1✔
1454
                key_name = keys[0]
1✔
1455
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1456
                    return key_name
1✔
1457
        return None
1✔
1458

1459
    @staticmethod
1✔
1460
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1461
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1462
        if maybe_intrinsic_function_name is not None:
1✔
1463
            return maybe_intrinsic_function_name
1✔
1464
        return type(value).__name__
1✔
1465

1466
    @staticmethod
1✔
1467
    def _is_terminal(value: Any) -> bool:
1✔
1468
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1469

1470
    @staticmethod
1✔
1471
    def _is_object(value: Any) -> bool:
1✔
1472
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1473

1474
    @staticmethod
1✔
1475
    def _is_array(value: Any) -> bool:
1✔
1476
        return isinstance(value, list)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc