• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.29
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from collections.abc import Generator
1✔
6
from itertools import zip_longest
1✔
7
from typing import Any, Final, TypedDict, TypeVar, cast
1✔
8

9
from localstack.aws.api.cloudformation import ChangeAction
1✔
10
from localstack.services.cloudformation.resource_provider import ResourceProviderExecutor
1✔
11
from localstack.services.cloudformation.v2.types import (
1✔
12
    EngineParameter,
13
    engine_parameter_value,
14
)
15
from localstack.utils.json import extract_jsonpath
1✔
16
from localstack.utils.strings import camel_to_snake_case
1✔
17

18
T = TypeVar("T")
1✔
19

20

21
class NothingType:
1✔
22
    """A sentinel that denotes 'no value' (distinct from None)."""
23

24
    _singleton = None
1✔
25
    __slots__ = ()
1✔
26

27
    def __new__(cls):
1✔
28
        if cls._singleton is None:
1✔
29
            cls._singleton = super().__new__(cls)
1✔
30
        return cls._singleton
1✔
31

32
    def __eq__(self, other):
1✔
33
        return is_nothing(other)
1✔
34

35
    def __str__(self):
1✔
UNCOV
36
        return repr(self)
×
37

38
    def __repr__(self) -> str:
39
        return "Nothing"
40

41
    def __bool__(self):
1✔
42
        return False
1✔
43

44
    def __iter__(self):
1✔
45
        return iter(())
1✔
46

47
    def __contains__(self, item):
1✔
48
        return False
1✔
49

50

51
Maybe = T | NothingType
1✔
52
Nothing = NothingType()
1✔
53

54

55
def is_nothing(value: Any) -> bool:
1✔
56
    return isinstance(value, NothingType)
1✔
57

58

59
def is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
60
    return is_nothing(before) and not is_nothing(after)
1✔
61

62

63
def is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
64
    return not is_nothing(before) and is_nothing(after)
1✔
65

66

67
def parent_change_type_of(children: list[Maybe[ChangeSetEntity]]):
1✔
68
    change_types = [c.change_type for c in children if not is_nothing(c)]
1✔
69
    if not change_types:
1✔
70
        return ChangeType.UNCHANGED
1✔
71
    # TODO: rework this logic. Currently if any values are different then we consider it
72
    #  modified, but e.g. if everything is unchanged or created, the result should probably be
73
    #  "created"
74
    first_type = change_types[0]
1✔
75
    if all(ct == first_type for ct in change_types):
1✔
76
        return first_type
1✔
77
    return ChangeType.MODIFIED
1✔
78

79

80
def change_type_of(before: Maybe[Any], after: Maybe[Any], children: list[Maybe[ChangeSetEntity]]):
1✔
81
    if is_created(before, after):
1✔
82
        change_type = ChangeType.CREATED
1✔
83
    elif is_removed(before, after):
1✔
84
        change_type = ChangeType.REMOVED
1✔
85
    else:
86
        change_type = parent_change_type_of(children)
1✔
87
    return change_type
1✔
88

89

90
class NormalisedGlobalTransformDefinition(TypedDict):
1✔
91
    Name: Any
1✔
92
    Parameters: Maybe[Any]
1✔
93

94

95
class Scope(str):
1✔
96
    _ROOT_SCOPE: Final[str] = ""
1✔
97
    _SEPARATOR: Final[str] = "/"
1✔
98

99
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
100
        return cast(Scope, super().__new__(cls, scope))
1✔
101

102
    def open_scope(self, name: Scope | str) -> Scope:
1✔
103
        return Scope(self._SEPARATOR.join([self, name]))
1✔
104

105
    def open_index(self, index: int) -> Scope:
1✔
106
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
107

108
    def unwrap(self) -> list[str]:
1✔
UNCOV
109
        return self.split(self._SEPARATOR)
×
110

111
    @property
1✔
112
    def parent(self) -> Scope:
1✔
113
        return Scope(self._SEPARATOR.join(self.split(self._SEPARATOR)[:-1]))
1✔
114

115
    @property
1✔
116
    def jsonpath(self) -> str:
1✔
117
        parts = self.split("/")
1✔
118
        json_parts = []
1✔
119

120
        for part in parts:
1✔
121
            if not part:  # Skip empty strings from leading/trailing slashes
1✔
122
                continue
1✔
123

124
            if part == "divergence":
1✔
125
                continue
1✔
126

127
            # Wrap keys with special characters (e.g., colon) in quotes
128
            if ":" in part:
1✔
UNCOV
129
                json_parts.append(f'"{part}"')
×
130
            else:
131
                json_parts.append(part)
1✔
132

133
        return f"$.{'.'.join(json_parts)}"
1✔
134

135

136
class ChangeType(enum.Enum):
1✔
137
    UNCHANGED = "Unchanged"
1✔
138
    CREATED = "Created"
1✔
139
    MODIFIED = "Modified"
1✔
140
    REMOVED = "Removed"
1✔
141

142
    def __str__(self):
1✔
UNCOV
143
        return self.value
×
144

145
    def to_change_action(self) -> ChangeAction:
1✔
146
        # Convert this change type into the change action used throughout the CFn API
UNCOV
147
        return {
×
148
            ChangeType.CREATED: ChangeAction.Add,
149
            ChangeType.MODIFIED: ChangeAction.Modify,
150
            ChangeType.REMOVED: ChangeAction.Remove,
151
        }.get(self, ChangeAction.Add)
152

153

154
class ChangeSetEntity(abc.ABC):
1✔
155
    scope: Final[Scope]
1✔
156
    change_type: ChangeType
1✔
157

158
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
159
        self.scope = scope
1✔
160
        self.change_type = change_type
1✔
161

162
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
163
        for child in self.__dict__.values():
1✔
164
            yield from self._get_children_in(child)
1✔
165

166
    @staticmethod
1✔
167
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
168
        # TODO: could avoid the inductive logic here, and check for loops?
169
        if isinstance(obj, ChangeSetEntity):
1✔
170
            yield obj
1✔
171
        elif isinstance(obj, list):
1✔
172
            for item in obj:
1✔
173
                yield from ChangeSetEntity._get_children_in(item)
1✔
174
        elif isinstance(obj, dict):
1✔
UNCOV
175
            for item in obj.values():
×
UNCOV
176
                yield from ChangeSetEntity._get_children_in(item)
×
177

178
    def __str__(self):
1✔
UNCOV
179
        return f"({self.__class__.__name__}| {vars(self)}"
×
180

181
    def __repr__(self):
182
        return str(self)
183

184

185
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
186

187

188
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
189

190

191
class UpdateModel:
1✔
192
    # TODO: may be expanded to keep track of other runtime values such as resolved_parameters.
193

194
    node_template: Final[NodeTemplate]
1✔
195
    before_runtime_cache: Final[dict]
1✔
196
    after_runtime_cache: Final[dict]
1✔
197

198
    def __init__(
1✔
199
        self,
200
        node_template: NodeTemplate,
201
    ):
202
        self.node_template = node_template
1✔
203
        self.before_runtime_cache = {}
1✔
204
        self.after_runtime_cache = {}
1✔
205

206

207
class NodeTemplate(ChangeSetNode):
1✔
208
    transform: Final[NodeTransform]
1✔
209
    mappings: Final[NodeMappings]
1✔
210
    parameters: Final[NodeParameters]
1✔
211
    conditions: Final[NodeConditions]
1✔
212
    resources: Final[NodeResources]
1✔
213
    outputs: Final[NodeOutputs]
1✔
214

215
    def __init__(
1✔
216
        self,
217
        scope: Scope,
218
        transform: NodeTransform,
219
        mappings: NodeMappings,
220
        parameters: NodeParameters,
221
        conditions: NodeConditions,
222
        resources: NodeResources,
223
        outputs: NodeOutputs,
224
    ):
225
        change_type = parent_change_type_of(
1✔
226
            [transform, mappings, parameters, conditions, resources, outputs]
227
        )
228
        super().__init__(scope=scope, change_type=change_type)
1✔
229
        self.transform = transform
1✔
230
        self.mappings = mappings
1✔
231
        self.parameters = parameters
1✔
232
        self.conditions = conditions
1✔
233
        self.resources = resources
1✔
234
        self.outputs = outputs
1✔
235

236

237
class NodeDivergence(ChangeSetNode):
1✔
238
    value: Final[ChangeSetEntity]
1✔
239
    divergence: Final[ChangeSetEntity]
1✔
240

241
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
242
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
243
        self.value = value
1✔
244
        self.divergence = divergence
1✔
245

246

247
class NodeParameter(ChangeSetNode):
1✔
248
    name: Final[str]
1✔
249
    type_: Final[ChangeSetEntity]
1✔
250
    dynamic_value: Final[ChangeSetEntity]
1✔
251
    default_value: Final[Maybe[ChangeSetEntity]]
1✔
252

253
    def __init__(
1✔
254
        self,
255
        scope: Scope,
256
        name: str,
257
        type_: ChangeSetEntity,
258
        dynamic_value: ChangeSetEntity,
259
        default_value: Maybe[ChangeSetEntity],
260
    ):
261
        change_type = parent_change_type_of([type_, default_value, dynamic_value])
1✔
262
        super().__init__(scope=scope, change_type=change_type)
1✔
263
        self.name = name
1✔
264
        self.type_ = type_
1✔
265
        self.dynamic_value = dynamic_value
1✔
266
        self.default_value = default_value
1✔
267

268

269
class NodeParameters(ChangeSetNode):
1✔
270
    parameters: Final[list[NodeParameter]]
1✔
271

272
    def __init__(self, scope: Scope, parameters: list[NodeParameter]):
1✔
273
        change_type = parent_change_type_of(parameters)
1✔
274
        super().__init__(scope=scope, change_type=change_type)
1✔
275
        self.parameters = parameters
1✔
276

277

278
class NodeMapping(ChangeSetNode):
1✔
279
    name: Final[str]
1✔
280
    bindings: Final[NodeObject]
1✔
281

282
    def __init__(self, scope: Scope, name: str, bindings: NodeObject):
1✔
283
        super().__init__(scope=scope, change_type=bindings.change_type)
1✔
284
        self.name = name
1✔
285
        self.bindings = bindings
1✔
286

287

288
class NodeMappings(ChangeSetNode):
1✔
289
    mappings: Final[list[NodeMapping]]
1✔
290

291
    def __init__(self, scope: Scope, mappings: list[NodeMapping]):
1✔
292
        change_type = parent_change_type_of(mappings)
1✔
293
        super().__init__(scope=scope, change_type=change_type)
1✔
294
        self.mappings = mappings
1✔
295

296

297
class NodeOutput(ChangeSetNode):
1✔
298
    name: Final[str]
1✔
299
    value: Final[ChangeSetEntity]
1✔
300
    export: Final[Maybe[ChangeSetEntity]]
1✔
301
    condition_reference: Final[Maybe[TerminalValue]]
1✔
302

303
    def __init__(
1✔
304
        self,
305
        scope: Scope,
306
        name: str,
307
        value: ChangeSetEntity,
308
        export: Maybe[ChangeSetEntity],
309
        conditional_reference: Maybe[TerminalValue],
310
    ):
311
        change_type = parent_change_type_of([value, export, conditional_reference])
1✔
312
        super().__init__(scope=scope, change_type=change_type)
1✔
313
        self.name = name
1✔
314
        self.value = value
1✔
315
        self.export = export
1✔
316
        self.condition_reference = conditional_reference
1✔
317

318

319
class NodeOutputs(ChangeSetNode):
1✔
320
    outputs: Final[list[NodeOutput]]
1✔
321

322
    def __init__(self, scope: Scope, outputs: list[NodeOutput]):
1✔
323
        change_type = parent_change_type_of(outputs)
1✔
324
        super().__init__(scope=scope, change_type=change_type)
1✔
325
        self.outputs = outputs
1✔
326

327

328
class NodeCondition(ChangeSetNode):
1✔
329
    name: Final[str]
1✔
330
    body: Final[ChangeSetEntity]
1✔
331

332
    def __init__(self, scope: Scope, name: str, body: ChangeSetEntity):
1✔
333
        super().__init__(scope=scope, change_type=body.change_type)
1✔
334
        self.name = name
1✔
335
        self.body = body
1✔
336

337

338
class NodeConditions(ChangeSetNode):
1✔
339
    conditions: Final[list[NodeCondition]]
1✔
340

341
    def __init__(self, scope: Scope, conditions: list[NodeCondition]):
1✔
342
        change_type = parent_change_type_of(conditions)
1✔
343
        super().__init__(scope=scope, change_type=change_type)
1✔
344
        self.conditions = conditions
1✔
345

346

347
class NodeGlobalTransform(ChangeSetNode):
1✔
348
    name: Final[TerminalValue]
1✔
349
    parameters: Final[Maybe[ChangeSetEntity]]
1✔
350

351
    def __init__(self, scope: Scope, name: TerminalValue, parameters: Maybe[ChangeSetEntity]):
1✔
352
        if not is_nothing(parameters):
1✔
353
            change_type = parent_change_type_of([name, parameters])
1✔
354
        else:
UNCOV
355
            change_type = name.change_type
×
356
        super().__init__(scope=scope, change_type=change_type)
1✔
357
        self.name = name
1✔
358
        self.parameters = parameters
1✔
359

360

361
class NodeTransform(ChangeSetNode):
1✔
362
    global_transforms: Final[list[NodeGlobalTransform]]
1✔
363

364
    def __init__(self, scope: Scope, global_transforms: list[NodeGlobalTransform]):
1✔
365
        change_type = parent_change_type_of(global_transforms)
1✔
366
        super().__init__(scope=scope, change_type=change_type)
1✔
367
        self.global_transforms = global_transforms
1✔
368

369

370
class NodeResources(ChangeSetNode):
1✔
371
    resources: Final[list[NodeResource]]
1✔
372
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
373
    fn_foreaches: Final[list[NodeForEach]]
1✔
374

375
    def __init__(
1✔
376
        self,
377
        scope: Scope,
378
        resources: list[NodeResource],
379
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
380
        fn_foreaches: list[NodeForEach],
381
    ):
382
        change_type = parent_change_type_of(resources + [fn_transform] + fn_foreaches)
1✔
383
        super().__init__(scope=scope, change_type=change_type)
1✔
384
        self.resources = resources
1✔
385
        self.fn_transform = fn_transform
1✔
386
        self.fn_foreaches = fn_foreaches
1✔
387

388

389
class NodeResource(ChangeSetNode):
1✔
390
    name: Final[str]
1✔
391
    type_: Final[ChangeSetTerminal]
1✔
392
    properties: Final[NodeProperties]
1✔
393
    condition_reference: Final[Maybe[TerminalValue]]
1✔
394
    depends_on: Final[Maybe[NodeDependsOn]]
1✔
395
    requires_replacement: Final[bool]
1✔
396
    deletion_policy: Final[Maybe[ChangeSetTerminal]]
1✔
397
    update_replace_policy: Final[Maybe[ChangeSetTerminal]]
1✔
398
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
399

400
    def __init__(
1✔
401
        self,
402
        scope: Scope,
403
        change_type: ChangeType,
404
        name: str,
405
        type_: ChangeSetTerminal,
406
        properties: NodeProperties,
407
        condition_reference: Maybe[TerminalValue],
408
        depends_on: Maybe[NodeDependsOn],
409
        requires_replacement: bool,
410
        deletion_policy: Maybe[ChangeSetTerminal],
411
        update_replace_policy: Maybe[ChangeSetTerminal],
412
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
413
    ):
414
        super().__init__(scope=scope, change_type=change_type)
1✔
415
        self.name = name
1✔
416
        self.type_ = type_
1✔
417
        self.properties = properties
1✔
418
        self.condition_reference = condition_reference
1✔
419
        self.depends_on = depends_on
1✔
420
        self.requires_replacement = requires_replacement
1✔
421
        self.deletion_policy = deletion_policy
1✔
422
        self.update_replace_policy = update_replace_policy
1✔
423
        self.fn_transform = fn_transform
1✔
424

425

426
class NodeProperties(ChangeSetNode):
1✔
427
    properties: Final[list[NodeProperty]]
1✔
428
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
429

430
    def __init__(
1✔
431
        self,
432
        scope: Scope,
433
        properties: list[NodeProperty],
434
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
435
    ):
436
        change_type = parent_change_type_of(properties)
1✔
437
        super().__init__(scope=scope, change_type=change_type)
1✔
438
        self.properties = properties
1✔
439
        self.fn_transform = fn_transform
1✔
440

441

442
class NodeDependsOn(ChangeSetNode):
1✔
443
    depends_on: Final[NodeArray]
1✔
444

445
    def __init__(self, scope: Scope, depends_on: NodeArray):
1✔
446
        super().__init__(scope=scope, change_type=depends_on.change_type)
1✔
447
        self.depends_on = depends_on
1✔
448

449

450
class NodeProperty(ChangeSetNode):
1✔
451
    name: Final[str]
1✔
452
    value: Final[ChangeSetEntity]
1✔
453

454
    def __init__(self, scope: Scope, name: str, value: ChangeSetEntity):
1✔
455
        super().__init__(scope=scope, change_type=value.change_type)
1✔
456
        self.name = name
1✔
457
        self.value = value
1✔
458

459

460
class NodeIntrinsicFunction(ChangeSetNode):
1✔
461
    intrinsic_function: Final[str]
1✔
462
    arguments: Final[ChangeSetEntity]
1✔
463

464
    def __init__(
1✔
465
        self,
466
        scope: Scope,
467
        change_type: ChangeType,
468
        intrinsic_function: str,
469
        arguments: ChangeSetEntity,
470
    ):
471
        super().__init__(scope=scope, change_type=change_type)
1✔
472
        self.intrinsic_function = intrinsic_function
1✔
473
        self.arguments = arguments
1✔
474

475

476
class NodeIntrinsicFunctionFnTransform(NodeIntrinsicFunction):
1✔
477
    def __init__(
1✔
478
        self,
479
        scope: Scope,
480
        change_type: ChangeType,
481
        intrinsic_function: str,
482
        arguments: ChangeSetEntity,
483
        before_siblings: list[Any],
484
        after_siblings: list[Any],
485
    ):
486
        super().__init__(
1✔
487
            scope=scope,
488
            change_type=change_type,
489
            intrinsic_function=intrinsic_function,
490
            arguments=arguments,
491
        )
492
        self.before_siblings = before_siblings
1✔
493
        self.after_siblings = after_siblings
1✔
494

495

496
class NodeForEach(ChangeSetNode):
1✔
497
    def __init__(
1✔
498
        self,
499
        scope: Scope,
500
        change_type: Final[ChangeType],
501
        arguments: Final[ChangeSetEntity],
502
    ):
503
        super().__init__(
1✔
504
            scope=scope,
505
            change_type=change_type,
506
        )
507
        self.arguments = arguments
1✔
508

509

510
class NodeObject(ChangeSetNode):
1✔
511
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
512

513
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
514
        super().__init__(scope=scope, change_type=change_type)
1✔
515
        self.bindings = bindings
1✔
516

517

518
class NodeArray(ChangeSetNode):
1✔
519
    array: Final[list[ChangeSetEntity]]
1✔
520

521
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
522
        super().__init__(scope=scope, change_type=change_type)
1✔
523
        self.array = array
1✔
524

525

526
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
527
    value: Final[Any]
1✔
528

529
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
530
        super().__init__(scope=scope, change_type=change_type)
1✔
531
        self.value = value
1✔
532

533

534
class TerminalValueModified(TerminalValue):
1✔
535
    modified_value: Final[Any]
1✔
536

537
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
538
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
539
        self.modified_value = modified_value
1✔
540

541

542
class TerminalValueCreated(TerminalValue):
1✔
543
    def __init__(self, scope: Scope, value: Any):
1✔
544
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
545

546

547
class TerminalValueRemoved(TerminalValue):
1✔
548
    def __init__(self, scope: Scope, value: Any):
1✔
549
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
550

551

552
class TerminalValueUnchanged(TerminalValue):
1✔
553
    def __init__(self, scope: Scope, value: Any):
1✔
554
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
555

556

557
NameKey: Final[str] = "Name"
1✔
558
TransformKey: Final[str] = "Transform"
1✔
559
TypeKey: Final[str] = "Type"
1✔
560
ConditionKey: Final[str] = "Condition"
1✔
561
ConditionsKey: Final[str] = "Conditions"
1✔
562
MappingsKey: Final[str] = "Mappings"
1✔
563
ResourcesKey: Final[str] = "Resources"
1✔
564
PropertiesKey: Final[str] = "Properties"
1✔
565
ParametersKey: Final[str] = "Parameters"
1✔
566
DefaultKey: Final[str] = "Default"
1✔
567
ValueKey: Final[str] = "Value"
1✔
568
ExportKey: Final[str] = "Export"
1✔
569
OutputsKey: Final[str] = "Outputs"
1✔
570
DependsOnKey: Final[str] = "DependsOn"
1✔
571
DeletionPolicyKey: Final[str] = "DeletionPolicy"
1✔
572
UpdateReplacePolicyKey: Final[str] = "UpdateReplacePolicy"
1✔
573
# TODO: expand intrinsic functions set.
574
RefKey: Final[str] = "Ref"
1✔
575
RefConditionKey: Final[str] = "Condition"
1✔
576
FnIfKey: Final[str] = "Fn::If"
1✔
577
FnAnd: Final[str] = "Fn::And"
1✔
578
FnOr: Final[str] = "Fn::Or"
1✔
579
FnNotKey: Final[str] = "Fn::Not"
1✔
580
FnJoinKey: Final[str] = "Fn::Join"
1✔
581
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
582
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
583
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
584
FnSubKey: Final[str] = "Fn::Sub"
1✔
585
FnTransform: Final[str] = "Fn::Transform"
1✔
586
FnSelect: Final[str] = "Fn::Select"
1✔
587
FnSplit: Final[str] = "Fn::Split"
1✔
588
FnGetAZs: Final[str] = "Fn::GetAZs"
1✔
589
FnBase64: Final[str] = "Fn::Base64"
1✔
590
FnImportValue: Final[str] = "Fn::ImportValue"
1✔
591
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
592
    RefKey,
593
    RefConditionKey,
594
    FnIfKey,
595
    FnAnd,
596
    FnOr,
597
    FnNotKey,
598
    FnJoinKey,
599
    FnEqualsKey,
600
    FnGetAttKey,
601
    FnFindInMapKey,
602
    FnSubKey,
603
    FnTransform,
604
    FnSelect,
605
    FnSplit,
606
    FnGetAZs,
607
    FnBase64,
608
    FnImportValue,
609
}
610

611

612
class ChangeSetModel:
1✔
613
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
614

615
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
616

617
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
618
    #  such as intrinsic functions.
619

620
    _before_template: Final[Maybe[dict]]
1✔
621
    _after_template: Final[Maybe[dict]]
1✔
622
    _before_parameters: Final[Maybe[dict]]
1✔
623
    _after_parameters: Final[Maybe[dict]]
1✔
624
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
625
    _node_template: Final[NodeTemplate]
1✔
626

627
    def __init__(
1✔
628
        self,
629
        before_template: dict | None,
630
        after_template: dict | None,
631
        before_parameters: dict | None,
632
        after_parameters: dict[str, EngineParameter] | None,
633
    ):
634
        self._before_template = before_template or Nothing
1✔
635
        self._after_template = after_template or Nothing
1✔
636
        self._before_parameters = before_parameters or Nothing
1✔
637
        self._after_parameters = after_parameters or Nothing
1✔
638
        self._visited_scopes = {}
1✔
639
        # TODO: move this modeling process to the `get_update_model` method as constructors shouldn't do work
640
        self._node_template = self._model(
1✔
641
            before_template=self._before_template, after_template=self._after_template
642
        )
643
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
644

645
    def get_update_model(self) -> UpdateModel:
1✔
646
        return UpdateModel(node_template=self._node_template)
1✔
647

648
    def _visit_terminal_value(
1✔
649
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
650
    ) -> TerminalValue:
651
        terminal_value = self._visited_scopes.get(scope)
1✔
652
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
653
            return terminal_value
×
654
        if is_created(before=before_value, after=after_value):
1✔
655
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
656
        elif is_removed(before=before_value, after=after_value):
1✔
657
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
658
        elif before_value == after_value:
1✔
659
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
660
        else:
661
            terminal_value = TerminalValueModified(
1✔
662
                scope=scope, value=before_value, modified_value=after_value
663
            )
664
        self._visited_scopes[scope] = terminal_value
1✔
665
        return terminal_value
1✔
666

667
    def _visit_intrinsic_function(
1✔
668
        self,
669
        scope: Scope,
670
        intrinsic_function: str,
671
        before_arguments: Maybe[Any],
672
        after_arguments: Maybe[Any],
673
    ) -> NodeIntrinsicFunction:
674
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
675
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
676
            return node_intrinsic_function
×
677
        arguments_scope = scope.open_scope("args")
1✔
678
        arguments = self._visit_value(
1✔
679
            scope=arguments_scope, before_value=before_arguments, after_value=after_arguments
680
        )
681

682
        if is_created(before=before_arguments, after=after_arguments):
1✔
683
            change_type = ChangeType.CREATED
1✔
684
        elif is_removed(before=before_arguments, after=after_arguments):
1✔
685
            change_type = ChangeType.REMOVED
1✔
686
        else:
687
            function_name = intrinsic_function.replace("::", "_")
1✔
688
            function_name = camel_to_snake_case(function_name)
1✔
689
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
690
            if hasattr(self, resolve_function_name):
1✔
691
                resolve_function = getattr(self, resolve_function_name)
1✔
692
                change_type = resolve_function(arguments)
1✔
693
            else:
694
                change_type = arguments.change_type
1✔
695

696
        if intrinsic_function == FnTransform:
1✔
697
            if scope.count(FnTransform) > 1:
1✔
UNCOV
698
                raise RuntimeError(
×
699
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
700
                )
701

702
            path = scope.parent.jsonpath
1✔
703
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
704
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
705

706
            node_intrinsic_function = NodeIntrinsicFunctionFnTransform(
1✔
707
                scope=scope,
708
                change_type=change_type,
709
                arguments=arguments,
710
                intrinsic_function=intrinsic_function,
711
                before_siblings=before_siblings,
712
                after_siblings=after_siblings,
713
            )
714
        else:
715
            node_intrinsic_function = NodeIntrinsicFunction(
1✔
716
                scope=scope,
717
                change_type=change_type,
718
                intrinsic_function=intrinsic_function,
719
                arguments=arguments,
720
            )
721
        self._visited_scopes[scope] = node_intrinsic_function
1✔
722
        return node_intrinsic_function
1✔
723

724
    def _visit_foreach(
1✔
725
        self, scope: Scope, before_arguments: Maybe[list], after_arguments: Maybe[list]
726
    ) -> NodeForEach:
727
        node_foreach = self._visited_scopes.get(scope)
1✔
728
        if isinstance(node_foreach, NodeForEach):
1✔
UNCOV
729
            return node_foreach
×
730
        arguments_scope = scope.open_scope("args")
1✔
731
        arguments = self._visit_array(
1✔
732
            arguments_scope, before_array=before_arguments, after_array=after_arguments
733
        )
734
        return NodeForEach(scope=scope, change_type=arguments.change_type, arguments=arguments)
1✔
735

736
    def _resolve_intrinsic_function_fn_sub(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
737
        # TODO: This routine should instead export the implicit Ref and GetAtt calls within the first
738
        #       string template parameter and compute the respective change set types. Currently,
739
        #       changes referenced by Fn::Sub templates are only picked up during preprocessing; not
740
        #       at modelling.
741
        return arguments.change_type
1✔
742

743
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
744
        # TODO: add support for nested intrinsic functions.
745
        # TODO: validate arguments structure and type.
746
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
747

748
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
749
            raise RuntimeError()
×
750
        logical_name_of_resource_entity = arguments.array[0]
1✔
751
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
UNCOV
752
            raise RuntimeError()
×
753
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
754
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
755
            raise RuntimeError()
×
756
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
757
            resource_name=logical_name_of_resource
758
        )
759

760
        node_property_attribute_name = arguments.array[1]
1✔
761
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
762
            raise RuntimeError()
×
763
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
764
            attribute_name = node_property_attribute_name.modified_value
×
765
        else:
766
            attribute_name = node_property_attribute_name.value
1✔
767

768
        # TODO: this is another use case for which properties should be referenced by name
769
        for node_property in node_resource.properties.properties:
1✔
770
            if node_property.name == attribute_name:
1✔
771
                return node_property.change_type
1✔
772

773
        return ChangeType.UNCHANGED
1✔
774

775
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
776
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
777
            return arguments.change_type
×
778
        if not isinstance(arguments, TerminalValue):
1✔
779
            return arguments.change_type
×
780

781
        logical_id = arguments.value
1✔
782

783
        if isinstance(logical_id, str) and logical_id.startswith("AWS::"):
1✔
784
            return arguments.change_type
1✔
785

786
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
787
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
788
            return node_condition.change_type
×
789

790
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
791
        if isinstance(node_parameter, NodeParameter):
1✔
792
            return node_parameter.change_type
1✔
793

794
        # TODO: this should check the replacement flag for a resource update.
795
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
796
        return node_resource.change_type
1✔
797

798
    def _resolve_intrinsic_function_condition(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
UNCOV
799
        if arguments.change_type != ChangeType.UNCHANGED:
×
UNCOV
800
            return arguments.change_type
×
801
        if not isinstance(arguments, TerminalValue):
×
802
            return arguments.change_type
×
803

804
        condition_name = arguments.value
×
UNCOV
805
        node_condition = self._retrieve_condition_if_exists(condition_name=condition_name)
×
806
        if isinstance(node_condition, NodeCondition):
×
807
            return node_condition.change_type
×
808
        raise RuntimeError(f"Undefined condition '{condition_name}'")
×
809

810
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
811
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
812
            return arguments.change_type
1✔
813
        # TODO: validate arguments structure and type.
814

815
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
816
            raise RuntimeError()
×
817
        argument_mapping_name = arguments.array[0]
1✔
818
        argument_top_level_key = arguments.array[1]
1✔
819
        argument_second_level_key = arguments.array[2]
1✔
820

821
        # If any argument is not a terminal value (e.g., it contains nested intrinsic functions
822
        # like Ref, Fn::Sub, etc.), we cannot perform the static mapping lookup at this stage.
823
        # Instead, return the parent change type based on all arguments' change states.
824
        if any(
1✔
825
            not isinstance(arg, TerminalValue)
826
            for arg in [argument_mapping_name, argument_top_level_key, argument_second_level_key]
827
        ):
828
            return parent_change_type_of(
1✔
829
                [argument_mapping_name, argument_top_level_key, argument_second_level_key]
830
            )
831

832
        mapping_name = argument_mapping_name.value
1✔
833
        top_level_key = argument_top_level_key.value
1✔
834
        second_level_key = argument_second_level_key.value
1✔
835

836
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
837
        # TODO: a lookup would be beneficial in this scenario too;
838
        #  consider implications downstream and for replication.
839
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
840
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
841
            raise RuntimeError()
×
842
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
843
        return target_map_value.change_type
1✔
844

845
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
846
        # TODO: validate arguments structure and type.
847
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
848
            raise RuntimeError()
×
849
        logical_name_of_condition_entity = arguments.array[0]
1✔
850
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
1✔
UNCOV
851
            raise RuntimeError()
×
852
        logical_name_of_condition: str = logical_name_of_condition_entity.value
1✔
853
        if not isinstance(logical_name_of_condition, str):
1✔
UNCOV
854
            raise RuntimeError()
×
855

856
        node_condition = self._retrieve_condition_if_exists(
1✔
857
            condition_name=logical_name_of_condition
858
        )
859
        if not isinstance(node_condition, NodeCondition):
1✔
UNCOV
860
            raise RuntimeError()
×
861
        change_type = parent_change_type_of([node_condition, *arguments.array[1:]])
1✔
862
        return change_type
1✔
863

864
    def _resolve_requires_replacement(
1✔
865
        self, node_properties: NodeProperties, resource_type: TerminalValue
866
    ) -> bool:
867
        # a bit hacky but we have to load the resource provider executor _and_ resource provider to get the schema
868
        # Note: we don't log the attempt to load the resource provider, we need to make sure this is only done once and we already do this in the executor
869

870
        resource_provider = ResourceProviderExecutor.try_load_resource_provider(resource_type.value)
1✔
871
        if not resource_provider:
1✔
872
            # if we don't support a resource, assume an in-place update for simplicity
873
            return False
1✔
874

875
        create_only_properties: list[str] = resource_provider.SCHEMA.get("createOnlyProperties", [])
1✔
876
        # TODO: also hacky: strip the leading `/properties/` string from the definition
877
        #       ideally we should use a jsonpath or similar
878
        create_only_properties = [
1✔
879
            property.replace("/properties/", "", 1) for property in create_only_properties
880
        ]
881
        for node_property in node_properties.properties:
1✔
882
            if node_property.name not in create_only_properties:
1✔
883
                continue
1✔
884

885
            if node_property.change_type != ChangeType.UNCHANGED:
1✔
886
                return True
1✔
887
        return False
1✔
888

889
    def _visit_array(
1✔
890
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
891
    ) -> NodeArray:
892
        array: list[ChangeSetEntity] = []
1✔
893
        for index, (before_value, after_value) in enumerate(
1✔
894
            zip_longest(before_array, after_array, fillvalue=Nothing)
895
        ):
896
            value_scope = scope.open_index(index=index)
1✔
897
            value = self._visit_value(
1✔
898
                scope=value_scope, before_value=before_value, after_value=after_value
899
            )
900
            array.append(value)
1✔
901
        change_type = change_type_of(before_array, after_array, array)
1✔
902
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
903

904
    def _visit_object(
1✔
905
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
906
    ) -> NodeObject:
907
        node_object = self._visited_scopes.get(scope)
1✔
908
        if isinstance(node_object, NodeObject):
1✔
909
            return node_object
1✔
910
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
911
        bindings: dict[str, ChangeSetEntity] = {}
1✔
912
        for binding_name in binding_names:
1✔
913
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
914
                scope, binding_name, before_object, after_object
915
            )
916
            value = self._visit_value(
1✔
917
                scope=binding_scope, before_value=before_value, after_value=after_value
918
            )
919
            bindings[binding_name] = value
1✔
920
        change_type = change_type_of(before_object, after_object, list(bindings.values()))
1✔
921
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
922
        self._visited_scopes[scope] = node_object
1✔
923
        return node_object
1✔
924

925
    def _visit_divergence(
1✔
926
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
927
    ) -> NodeDivergence:
928
        scope_value = scope.open_scope("value")
1✔
929
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
930
        scope_divergence = scope.open_scope("divergence")
1✔
931
        divergence = self._visit_value(
1✔
932
            scope=scope_divergence, before_value=Nothing, after_value=after_value
933
        )
934
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
935

936
    def _visit_value(
1✔
937
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
938
    ) -> ChangeSetEntity:
939
        value = self._visited_scopes.get(scope)
1✔
940
        if isinstance(value, ChangeSetEntity):
1✔
941
            return value
1✔
942

943
        before_type_name = self._type_name_of(before_value)
1✔
944
        after_type_name = self._type_name_of(after_value)
1✔
945
        unset = object()
1✔
946
        if before_type_name == after_type_name:
1✔
947
            dominant_value = before_value
1✔
948
        elif is_created(before=before_value, after=after_value):
1✔
949
            dominant_value = after_value
1✔
950
        elif is_removed(before=before_value, after=after_value):
1✔
951
            dominant_value = before_value
1✔
952
        else:
953
            dominant_value = unset
1✔
954
        if dominant_value is not unset:
1✔
955
            dominant_type_name = self._type_name_of(dominant_value)
1✔
956
            if self._is_terminal(value=dominant_value):
1✔
957
                value = self._visit_terminal_value(
1✔
958
                    scope=scope, before_value=before_value, after_value=after_value
959
                )
960
            elif self._is_object(value=dominant_value):
1✔
961
                value = self._visit_object(
1✔
962
                    scope=scope, before_object=before_value, after_object=after_value
963
                )
964
            elif self._is_array(value=dominant_value):
1✔
965
                value = self._visit_array(
1✔
966
                    scope=scope, before_array=before_value, after_array=after_value
967
                )
968
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
969
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
970
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
971
                )
972
                value = self._visit_intrinsic_function(
1✔
973
                    scope=intrinsic_function_scope,
974
                    intrinsic_function=dominant_type_name,
975
                    before_arguments=before_arguments,
976
                    after_arguments=after_arguments,
977
                )
978
            else:
UNCOV
979
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
980
        # Case: type divergence.
981
        else:
982
            value = self._visit_divergence(
1✔
983
                scope=scope, before_value=before_value, after_value=after_value
984
            )
985
        self._visited_scopes[scope] = value
1✔
986
        return value
1✔
987

988
    def _visit_property(
1✔
989
        self,
990
        scope: Scope,
991
        property_name: str,
992
        before_property: Maybe[Any],
993
        after_property: Maybe[Any],
994
    ) -> NodeProperty:
995
        node_property = self._visited_scopes.get(scope)
1✔
996
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
997
            return node_property
×
998
        # TODO: Review the use of Fn::Transform as resource properties.
999
        value = self._visit_value(
1✔
1000
            scope=scope, before_value=before_property, after_value=after_property
1001
        )
1002
        node_property = NodeProperty(scope=scope, name=property_name, value=value)
1✔
1003
        self._visited_scopes[scope] = node_property
1✔
1004
        return node_property
1✔
1005

1006
    def _visit_properties(
1✔
1007
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
1008
    ) -> NodeProperties:
1009
        node_properties = self._visited_scopes.get(scope)
1✔
1010
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
1011
            return node_properties
×
1012
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
1013
        properties: list[NodeProperty] = []
1✔
1014
        fn_transform = Nothing
1✔
1015

1016
        for property_name in property_names:
1✔
1017
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
1018
                scope, property_name, before_properties, after_properties
1019
            )
1020
            if property_name == FnTransform:
1✔
1021
                fn_transform = self._visit_intrinsic_function(
1✔
1022
                    property_scope, FnTransform, before_property, after_property
1023
                )
1024
                continue
1✔
1025

1026
            property_ = self._visit_property(
1✔
1027
                scope=property_scope,
1028
                property_name=property_name,
1029
                before_property=before_property,
1030
                after_property=after_property,
1031
            )
1032
            properties.append(property_)
1✔
1033

1034
        node_properties = NodeProperties(
1✔
1035
            scope=scope, properties=properties, fn_transform=fn_transform
1036
        )
1037
        self._visited_scopes[scope] = node_properties
1✔
1038
        return node_properties
1✔
1039

1040
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
1041
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
1042
        if not isinstance(value, TerminalValue):
1✔
1043
            # TODO: decide where template schema validation should occur.
UNCOV
1044
            raise RuntimeError()
×
1045
        return value
1✔
1046

1047
    def _visit_deletion_policy(
1✔
1048
        self, scope: Scope, before_deletion_policy: Any, after_deletion_policy: Any
1049
    ) -> TerminalValue:
1050
        value = self._visit_value(
1✔
1051
            scope=scope, before_value=before_deletion_policy, after_value=after_deletion_policy
1052
        )
1053
        if not isinstance(value, TerminalValue):
1✔
1054
            # TODO: decide where template schema validation should occur.
UNCOV
1055
            raise RuntimeError()
×
1056
        return value
1✔
1057

1058
    def _visit_update_replace_policy(
1✔
1059
        self, scope: Scope, before_update_replace_policy: Any, after_deletion_policy: Any
1060
    ) -> TerminalValue:
1061
        value = self._visit_value(
1✔
1062
            scope=scope,
1063
            before_value=before_update_replace_policy,
1064
            after_value=after_deletion_policy,
1065
        )
1066
        if not isinstance(value, TerminalValue):
1✔
1067
            # TODO: decide where template schema validation should occur.
UNCOV
1068
            raise RuntimeError()
×
1069
        return value
1✔
1070

1071
    def _visit_resource(
1✔
1072
        self,
1073
        scope: Scope,
1074
        resource_name: str,
1075
        before_resource: Maybe[dict],
1076
        after_resource: Maybe[dict],
1077
    ) -> NodeResource:
1078
        node_resource = self._visited_scopes.get(scope)
1✔
1079
        if isinstance(node_resource, NodeResource):
1✔
1080
            return node_resource
1✔
1081

1082
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
1083
            scope, TypeKey, before_resource, after_resource
1084
        )
1085
        terminal_value_type = self._visit_type(
1✔
1086
            scope=scope_type, before_type=before_type, after_type=after_type
1087
        )
1088

1089
        condition_reference = Nothing
1✔
1090
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1091
            scope, ConditionKey, before_resource, after_resource
1092
        )
1093
        if before_condition or after_condition:
1✔
1094
            condition_reference = self._visit_terminal_value(
1✔
1095
                scope_condition, before_condition, after_condition
1096
            )
1097

1098
        depends_on = Nothing
1✔
1099
        scope_depends_on, (before_depends_on, after_depends_on) = self._safe_access_in(
1✔
1100
            scope, DependsOnKey, before_resource, after_resource
1101
        )
1102
        if before_depends_on or after_depends_on:
1✔
1103
            depends_on = self._visit_depends_on(
1✔
1104
                scope_depends_on, before_depends_on, after_depends_on
1105
            )
1106

1107
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
1108
            scope, PropertiesKey, before_resource, after_resource
1109
        )
1110
        properties = self._visit_properties(
1✔
1111
            scope=scope_properties,
1112
            before_properties=before_properties,
1113
            after_properties=after_properties,
1114
        )
1115

1116
        deletion_policy = Nothing
1✔
1117
        scope_deletion_policy, (before_deletion_policy, after_deletion_policy) = (
1✔
1118
            self._safe_access_in(scope, DeletionPolicyKey, before_resource, after_resource)
1119
        )
1120
        if before_deletion_policy or after_deletion_policy:
1✔
1121
            deletion_policy = self._visit_deletion_policy(
1✔
1122
                scope_deletion_policy, before_deletion_policy, after_deletion_policy
1123
            )
1124

1125
        update_replace_policy = Nothing
1✔
1126
        scope_update_replace_policy, (before_update_replace_policy, after_update_replace_policy) = (
1✔
1127
            self._safe_access_in(scope, UpdateReplacePolicyKey, before_resource, after_resource)
1128
        )
1129
        if before_update_replace_policy or after_update_replace_policy:
1✔
1130
            update_replace_policy = self._visit_update_replace_policy(
1✔
1131
                scope_update_replace_policy,
1132
                before_update_replace_policy,
1133
                after_update_replace_policy,
1134
            )
1135

1136
        fn_transform = Nothing
1✔
1137
        scope_fn_transform, (before_fn_transform_args, after_fn_transform_args) = (
1✔
1138
            self._safe_access_in(scope, FnTransform, before_resource, after_resource)
1139
        )
1140
        if not is_nothing(before_fn_transform_args) or not is_nothing(after_fn_transform_args):
1✔
1141
            if scope_fn_transform.count(FnTransform) > 1:
1✔
UNCOV
1142
                raise RuntimeError(
×
1143
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
1144
                )
1145
            path = "$" + ".".join(scope_fn_transform.split("/")[:-1])
1✔
1146
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
1147
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
1148
            arguments_scope = scope.open_scope("args")
1✔
1149
            arguments = self._visit_value(
1✔
1150
                scope=arguments_scope,
1151
                before_value=before_fn_transform_args,
1152
                after_value=after_fn_transform_args,
1153
            )
1154
            fn_transform = NodeIntrinsicFunctionFnTransform(
1✔
1155
                scope=scope_fn_transform,
1156
                change_type=ChangeType.MODIFIED,  # TODO
1157
                arguments=arguments,  # TODO
1158
                intrinsic_function=FnTransform,
1159
                before_siblings=before_siblings,
1160
                after_siblings=after_siblings,
1161
            )
1162

1163
        change_type = change_type_of(
1✔
1164
            before_resource,
1165
            after_resource,
1166
            [
1167
                properties,
1168
                condition_reference,
1169
                depends_on,
1170
                deletion_policy,
1171
                update_replace_policy,
1172
                fn_transform,
1173
            ],
1174
        )
1175

1176
        # special case of where either the before or after state does not specify properties but
1177
        # the resource was in the previous template
1178
        if (
1✔
1179
            terminal_value_type.change_type == ChangeType.UNCHANGED
1180
            and properties.change_type != ChangeType.UNCHANGED
1181
        ):
1182
            change_type = ChangeType.MODIFIED
1✔
1183

1184
        requires_replacement = self._resolve_requires_replacement(
1✔
1185
            node_properties=properties, resource_type=terminal_value_type
1186
        )
1187
        node_resource = NodeResource(
1✔
1188
            scope=scope,
1189
            change_type=change_type,
1190
            name=resource_name,
1191
            type_=terminal_value_type,
1192
            properties=properties,
1193
            condition_reference=condition_reference,
1194
            depends_on=depends_on,
1195
            requires_replacement=requires_replacement,
1196
            deletion_policy=deletion_policy,
1197
            update_replace_policy=update_replace_policy,
1198
            fn_transform=fn_transform,
1199
        )
1200
        self._visited_scopes[scope] = node_resource
1✔
1201
        return node_resource
1✔
1202

1203
    def _visit_resources(
1✔
1204
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
1205
    ) -> NodeResources:
1206
        # TODO: investigate type changes behavior.
1207
        resources: list[NodeResource] = []
1✔
1208
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
1209
        fn_transform = Nothing
1✔
1210
        fn_foreaches = []
1✔
1211
        for resource_name in resource_names:
1✔
1212
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1213
                scope, resource_name, before_resources, after_resources
1214
            )
1215
            if resource_name == FnTransform:
1✔
1216
                fn_transform = self._visit_intrinsic_function(
1✔
1217
                    scope=resource_scope,
1218
                    intrinsic_function=resource_name,
1219
                    before_arguments=before_resource,
1220
                    after_arguments=after_resource,
1221
                )
1222
                continue
1✔
1223
            elif resource_name.startswith("Fn::ForEach"):
1✔
1224
                fn_for_each = self._visit_foreach(
1✔
1225
                    scope=resource_scope,
1226
                    before_arguments=before_resource,
1227
                    after_arguments=after_resource,
1228
                )
1229
                fn_foreaches.append(fn_for_each)
1✔
1230
                continue
1✔
1231
            resource = self._visit_resource(
1✔
1232
                scope=resource_scope,
1233
                resource_name=resource_name,
1234
                before_resource=before_resource,
1235
                after_resource=after_resource,
1236
            )
1237
            resources.append(resource)
1✔
1238
        return NodeResources(
1✔
1239
            scope=scope,
1240
            resources=resources,
1241
            fn_transform=fn_transform,
1242
            fn_foreaches=fn_foreaches,
1243
        )
1244

1245
    def _visit_mapping(
1✔
1246
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
1247
    ) -> NodeMapping:
1248
        bindings = self._visit_object(
1✔
1249
            scope=scope, before_object=before_mapping, after_object=after_mapping
1250
        )
1251
        return NodeMapping(scope=scope, name=name, bindings=bindings)
1✔
1252

1253
    def _visit_mappings(
1✔
1254
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
1255
    ) -> NodeMappings:
1256
        mappings: list[NodeMapping] = []
1✔
1257
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
1258
        for mapping_name in mapping_names:
1✔
1259
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1260
                scope, mapping_name, before_mappings, after_mappings
1261
            )
1262
            mapping = self._visit_mapping(
1✔
1263
                scope=scope_mapping,
1264
                name=mapping_name,
1265
                before_mapping=before_mapping,
1266
                after_mapping=after_mapping,
1267
            )
1268
            mappings.append(mapping)
1✔
1269
        return NodeMappings(scope=scope, mappings=mappings)
1✔
1270

1271
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
1272
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
1273
        scope_parameter, (before_parameter_dct, after_parameter_dct) = self._safe_access_in(
1✔
1274
            scope, parameter_name, self._before_parameters, self._after_parameters
1275
        )
1276

1277
        before_parameter = Nothing
1✔
1278
        if not is_nothing(before_parameter_dct):
1✔
1279
            before_parameter = before_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1280
                before_parameter_dct
1281
            )
1282

1283
        after_parameter = Nothing
1✔
1284
        if not is_nothing(after_parameter_dct):
1✔
1285
            after_parameter = after_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1286
                after_parameter_dct
1287
            )
1288

1289
        parameter = self._visit_value(
1✔
1290
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
1291
        )
1292
        return parameter
1✔
1293

1294
    def _visit_parameter(
1✔
1295
        self,
1296
        scope: Scope,
1297
        parameter_name: str,
1298
        before_parameter: Maybe[dict],
1299
        after_parameter: Maybe[dict],
1300
    ) -> NodeParameter:
1301
        node_parameter = self._visited_scopes.get(scope)
1✔
1302
        if isinstance(node_parameter, NodeParameter):
1✔
1303
            return node_parameter
1✔
1304

1305
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
1306
            scope, TypeKey, before_parameter, after_parameter
1307
        )
1308
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
1309

1310
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
1311
            scope, DefaultKey, before_parameter, after_parameter
1312
        )
1313
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
1314

1315
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
1316

1317
        node_parameter = NodeParameter(
1✔
1318
            scope=scope,
1319
            name=parameter_name,
1320
            type_=type_,
1321
            default_value=default_value,
1322
            dynamic_value=dynamic_value,
1323
        )
1324
        self._visited_scopes[scope] = node_parameter
1✔
1325
        return node_parameter
1✔
1326

1327
    def _visit_parameters(
1✔
1328
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
1329
    ) -> NodeParameters:
1330
        node_parameters = self._visited_scopes.get(scope)
1✔
1331
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
1332
            return node_parameters
×
1333
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
1334
        parameters: list[NodeParameter] = []
1✔
1335
        for parameter_name in parameter_names:
1✔
1336
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1337
                scope, parameter_name, before_parameters, after_parameters
1338
            )
1339
            parameter = self._visit_parameter(
1✔
1340
                scope=parameter_scope,
1341
                parameter_name=parameter_name,
1342
                before_parameter=before_parameter,
1343
                after_parameter=after_parameter,
1344
            )
1345
            parameters.append(parameter)
1✔
1346
        node_parameters = NodeParameters(scope=scope, parameters=parameters)
1✔
1347
        self._visited_scopes[scope] = node_parameters
1✔
1348
        return node_parameters
1✔
1349

1350
    @staticmethod
1✔
1351
    def _normalise_depends_on_value(value: Maybe[str | list[str]]) -> Maybe[list[str]]:
1✔
1352
        # To simplify downstream logics, reduce the type options to array of strings.
1353
        # TODO: Add integrations tests for DependsOn validations (invalid types, duplicate identifiers, etc.)
1354
        if isinstance(value, NothingType):
1✔
1355
            return value
1✔
1356
        if isinstance(value, str):
1✔
1357
            value = [value]
1✔
1358
        elif isinstance(value, list):
1✔
1359
            value.sort()
1✔
1360
        else:
UNCOV
1361
            raise RuntimeError(
×
1362
                f"Invalid type for DependsOn, expected a String or Array of String, but got: '{value}'"
1363
            )
1364
        return value
1✔
1365

1366
    def _visit_depends_on(
1✔
1367
        self,
1368
        scope: Scope,
1369
        before_depends_on: Maybe[str | list[str]],
1370
        after_depends_on: Maybe[str | list[str]],
1371
    ) -> NodeDependsOn:
1372
        before_depends_on = self._normalise_depends_on_value(value=before_depends_on)
1✔
1373
        after_depends_on = self._normalise_depends_on_value(value=after_depends_on)
1✔
1374
        node_array = self._visit_array(
1✔
1375
            scope=scope, before_array=before_depends_on, after_array=after_depends_on
1376
        )
1377
        node_depends_on = NodeDependsOn(scope=scope, depends_on=node_array)
1✔
1378
        return node_depends_on
1✔
1379

1380
    def _visit_condition(
1✔
1381
        self,
1382
        scope: Scope,
1383
        condition_name: str,
1384
        before_condition: Maybe[dict],
1385
        after_condition: Maybe[dict],
1386
    ) -> NodeCondition:
1387
        node_condition = self._visited_scopes.get(scope)
1✔
1388
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
1389
            return node_condition
×
1390
        body = self._visit_value(
1✔
1391
            scope=scope, before_value=before_condition, after_value=after_condition
1392
        )
1393
        node_condition = NodeCondition(scope=scope, name=condition_name, body=body)
1✔
1394
        self._visited_scopes[scope] = node_condition
1✔
1395
        return node_condition
1✔
1396

1397
    def _visit_conditions(
1✔
1398
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
1399
    ) -> NodeConditions:
1400
        node_conditions = self._visited_scopes.get(scope)
1✔
1401
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
1402
            return node_conditions
×
1403
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
1404
        conditions: list[NodeCondition] = []
1✔
1405
        for condition_name in condition_names:
1✔
1406
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1407
                scope, condition_name, before_conditions, after_conditions
1408
            )
1409
            condition = self._visit_condition(
1✔
1410
                scope=condition_scope,
1411
                condition_name=condition_name,
1412
                before_condition=before_condition,
1413
                after_condition=after_condition,
1414
            )
1415
            conditions.append(condition)
1✔
1416
        node_conditions = NodeConditions(scope=scope, conditions=conditions)
1✔
1417
        self._visited_scopes[scope] = node_conditions
1✔
1418
        return node_conditions
1✔
1419

1420
    def _visit_output(
1✔
1421
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
1422
    ) -> NodeOutput:
1423
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
1424
            scope, ValueKey, before_output, after_output
1425
        )
1426
        value = self._visit_value(scope_value, before_value, after_value)
1✔
1427

1428
        export: Maybe[ChangeSetEntity] = Nothing
1✔
1429
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
1430
            scope, ExportKey, before_output, after_output
1431
        )
1432
        if before_export or after_export:
1✔
1433
            export = self._visit_value(scope_export, before_export, after_export)
1✔
1434

1435
        # TODO: condition references should be resolved for the condition's change_type?
1436
        condition_reference: Maybe[TerminalValue] = Nothing
1✔
1437
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1438
            scope, ConditionKey, before_output, after_output
1439
        )
1440
        if before_condition or after_condition:
1✔
1441
            condition_reference = self._visit_terminal_value(
1✔
1442
                scope_condition, before_condition, after_condition
1443
            )
1444

1445
        return NodeOutput(
1✔
1446
            scope=scope,
1447
            name=name,
1448
            value=value,
1449
            export=export,
1450
            conditional_reference=condition_reference,
1451
        )
1452

1453
    def _visit_outputs(
1✔
1454
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1455
    ) -> NodeOutputs:
1456
        outputs: list[NodeOutput] = []
1✔
1457
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1458
        for output_name in output_names:
1✔
1459
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1460
                scope, output_name, before_outputs, after_outputs
1461
            )
1462
            output = self._visit_output(
1✔
1463
                scope=scope_output,
1464
                name=output_name,
1465
                before_output=before_output,
1466
                after_output=after_output,
1467
            )
1468
            outputs.append(output)
1✔
1469
        return NodeOutputs(scope=scope, outputs=outputs)
1✔
1470

1471
    def _visit_global_transform(
1✔
1472
        self,
1473
        scope: Scope,
1474
        before_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1475
        after_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1476
    ) -> NodeGlobalTransform:
1477
        name_scope, (before_name, after_name) = self._safe_access_in(
1✔
1478
            scope, NameKey, before_global_transform, after_global_transform
1479
        )
1480
        name = self._visit_terminal_value(
1✔
1481
            scope=name_scope, before_value=before_name, after_value=after_name
1482
        )
1483

1484
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1485
            scope, ParametersKey, before_global_transform, after_global_transform
1486
        )
1487
        parameters = self._visit_value(
1✔
1488
            scope=parameters_scope, before_value=before_parameters, after_value=after_parameters
1489
        )
1490

1491
        return NodeGlobalTransform(scope=scope, name=name, parameters=parameters)
1✔
1492

1493
    @staticmethod
1✔
1494
    def _normalise_transformer_value(value: Maybe[str | list[Any]]) -> Maybe[list[Any]]:
1✔
1495
        # To simplify downstream logics, reduce the type options to array of transformations.
1496
        # TODO: add further validation logic
1497
        # TODO: should we sort to avoid detecting user-side ordering changes as template changes?
1498
        if isinstance(value, NothingType):
1✔
1499
            return value
1✔
1500
        elif isinstance(value, str):
1✔
1501
            value = [NormalisedGlobalTransformDefinition(Name=value, Parameters=Nothing)]
1✔
1502
        elif isinstance(value, list):
1✔
1503
            tmp_value = []
1✔
1504
            for item in value:
1✔
1505
                if isinstance(item, str):
1✔
1506
                    tmp_value.append(
1✔
1507
                        NormalisedGlobalTransformDefinition(Name=item, Parameters=Nothing)
1508
                    )
1509
                else:
1510
                    tmp_value.append(item)
1✔
1511
            value = tmp_value
1✔
1512
        elif isinstance(value, dict):
1✔
1513
            if "Name" not in value:
1✔
UNCOV
1514
                raise RuntimeError(f"Missing 'Name' field in Transform definition '{value}'")
×
1515
            name = value["Name"]
1✔
1516
            parameters = value.get("Parameters", Nothing)
1✔
1517
            value = [NormalisedGlobalTransformDefinition(Name=name, Parameters=parameters)]
1✔
1518
        else:
UNCOV
1519
            raise RuntimeError(f"Invalid Transform definition: '{value}'")
×
1520
        return value
1✔
1521

1522
    def _visit_transform(
1✔
1523
        self, scope: Scope, before_transform: Maybe[Any], after_transform: Maybe[Any]
1524
    ) -> NodeTransform:
1525
        before_transform_normalised = self._normalise_transformer_value(before_transform)
1✔
1526
        after_transform_normalised = self._normalise_transformer_value(after_transform)
1✔
1527
        global_transforms = []
1✔
1528
        for index, (before_global_transform, after_global_transform) in enumerate(
1✔
1529
            zip_longest(before_transform_normalised, after_transform_normalised, fillvalue=Nothing)
1530
        ):
1531
            global_transform_scope = scope.open_index(index=index)
1✔
1532
            global_transform: NodeGlobalTransform = self._visit_global_transform(
1✔
1533
                scope=global_transform_scope,
1534
                before_global_transform=before_global_transform,
1535
                after_global_transform=after_global_transform,
1536
            )
1537
            global_transforms.append(global_transform)
1✔
1538
        return NodeTransform(scope=scope, global_transforms=global_transforms)
1✔
1539

1540
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1541
        root_scope = Scope()
1✔
1542
        # TODO: visit other child types
1543

1544
        transform_scope, (before_transform, after_transform) = self._safe_access_in(
1✔
1545
            root_scope, TransformKey, before_template, after_template
1546
        )
1547
        transform = self._visit_transform(
1✔
1548
            scope=transform_scope,
1549
            before_transform=before_transform,
1550
            after_transform=after_transform,
1551
        )
1552

1553
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1554
            root_scope, MappingsKey, before_template, after_template
1555
        )
1556
        mappings = self._visit_mappings(
1✔
1557
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1558
        )
1559

1560
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1561
            root_scope, ParametersKey, before_template, after_template
1562
        )
1563
        parameters = self._visit_parameters(
1✔
1564
            scope=parameters_scope,
1565
            before_parameters=before_parameters,
1566
            after_parameters=after_parameters,
1567
        )
1568

1569
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1570
            root_scope, ConditionsKey, before_template, after_template
1571
        )
1572
        conditions = self._visit_conditions(
1✔
1573
            scope=conditions_scope,
1574
            before_conditions=before_conditions,
1575
            after_conditions=after_conditions,
1576
        )
1577

1578
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1579
            root_scope, ResourcesKey, before_template, after_template
1580
        )
1581
        resources = self._visit_resources(
1✔
1582
            scope=resources_scope,
1583
            before_resources=before_resources,
1584
            after_resources=after_resources,
1585
        )
1586

1587
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1588
            root_scope, OutputsKey, before_template, after_template
1589
        )
1590
        outputs = self._visit_outputs(
1✔
1591
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1592
        )
1593

1594
        return NodeTemplate(
1✔
1595
            scope=root_scope,
1596
            transform=transform,
1597
            mappings=mappings,
1598
            parameters=parameters,
1599
            conditions=conditions,
1600
            resources=resources,
1601
            outputs=outputs,
1602
        )
1603

1604
    def _retrieve_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
1605
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1606
            Scope(), ConditionsKey, self._before_template, self._after_template
1607
        )
1608
        before_conditions = before_conditions or {}
1✔
1609
        after_conditions = after_conditions or {}
1✔
1610
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
1611
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1612
                conditions_scope, condition_name, before_conditions, after_conditions
1613
            )
1614
            node_condition = self._visit_condition(
1✔
1615
                conditions_scope,
1616
                condition_name,
1617
                before_condition=before_condition,
1618
                after_condition=after_condition,
1619
            )
1620
            return node_condition
1✔
1621
        return Nothing
1✔
1622

1623
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
1624
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1625
            Scope(), ParametersKey, self._before_template, self._after_template
1626
        )
1627
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1628
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1629
                parameters_scope, parameter_name, before_parameters, after_parameters
1630
            )
1631
            node_parameter = self._visit_parameter(
1✔
1632
                parameter_scope,
1633
                parameter_name,
1634
                before_parameter=before_parameter,
1635
                after_parameter=after_parameter,
1636
            )
1637
            return node_parameter
1✔
1638
        return Nothing
1✔
1639

1640
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1641
        # TODO: add caching mechanism, and raise appropriate error if missing.
1642
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1643
            Scope(), MappingsKey, self._before_template, self._after_template
1644
        )
1645
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1646
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1647
                scope_mappings, mapping_name, before_mappings, after_mappings
1648
            )
1649
            node_mapping = self._visit_mapping(
1✔
1650
                scope_mapping, mapping_name, before_mapping, after_mapping
1651
            )
1652
            return node_mapping
1✔
UNCOV
1653
        raise RuntimeError()
×
1654

1655
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1656
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1657
            Scope(),
1658
            ResourcesKey,
1659
            self._before_template,
1660
            self._after_template,
1661
        )
1662
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1663
            resources_scope, resource_name, before_resources, after_resources
1664
        )
1665
        return self._visit_resource(
1✔
1666
            scope=resource_scope,
1667
            resource_name=resource_name,
1668
            before_resource=before_resource,
1669
            after_resource=after_resource,
1670
        )
1671

1672
    @staticmethod
1✔
1673
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1674
        # TODO: are intrinsic functions soft keywords?
1675
        return function_name in INTRINSIC_FUNCTIONS
1✔
1676

1677
    @staticmethod
1✔
1678
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1679
        results = []
1✔
1680
        for obj in objects:
1✔
1681
            if not isinstance(obj, (dict, NothingType)):
1✔
UNCOV
1682
                raise RuntimeError(f"Invalid definition type at '{obj}'")
×
1683
            if not isinstance(obj, NothingType):
1✔
1684
                results.append(obj.get(key, Nothing))
1✔
1685
            else:
1686
                results.append(obj)
1✔
1687
        new_scope = scope.open_scope(name=key)
1✔
1688
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1689

1690
    @staticmethod
1✔
1691
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1692
        key_set: set[str] = set()
1✔
1693
        for obj in objects:
1✔
1694
            # TODO: raise errors if not dict
1695
            if isinstance(obj, dict):
1✔
1696
                key_set.update(obj.keys())
1✔
1697
        # The keys list is sorted to increase reproducibility of the
1698
        # update graph build process or downstream logics.
1699
        keys = sorted(key_set)
1✔
1700
        return keys
1✔
1701

1702
    @staticmethod
1✔
1703
    def _name_if_intrinsic_function(value: Maybe[Any]) -> str | None:
1✔
1704
        if isinstance(value, dict):
1✔
1705
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1706
            if len(keys) == 1:
1✔
1707
                key_name = keys[0]
1✔
1708
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1709
                    return key_name
1✔
1710
        return None
1✔
1711

1712
    @staticmethod
1✔
1713
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1714
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1715
        if maybe_intrinsic_function_name is not None:
1✔
1716
            return maybe_intrinsic_function_name
1✔
1717
        return type(value).__name__
1✔
1718

1719
    @staticmethod
1✔
1720
    def _is_terminal(value: Any) -> bool:
1✔
1721
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1722

1723
    @staticmethod
1✔
1724
    def _is_object(value: Any) -> bool:
1✔
1725
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1726

1727
    @staticmethod
1✔
1728
    def _is_array(value: Any) -> bool:
1✔
1729
        return isinstance(value, list)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc