• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17144436094

21 Aug 2025 11:28PM UTC coverage: 86.843% (-0.03%) from 86.876%
17144436094

push

github

web-flow
APIGW: internalize DeleteIntegrationResponse (#13046)

40 of 45 new or added lines in 1 file covered. (88.89%)

235 existing lines in 11 files now uncovered.

67068 of 77229 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.82
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from collections.abc import Generator
1✔
6
from itertools import zip_longest
1✔
7
from typing import Any, Final, TypedDict, cast
1✔
8

9
from typing_extensions import TypeVar
1✔
10

11
from localstack.aws.api.cloudformation import ChangeAction
1✔
12
from localstack.services.cloudformation.resource_provider import ResourceProviderExecutor
1✔
13
from localstack.services.cloudformation.v2.types import (
1✔
14
    EngineParameter,
15
    engine_parameter_value,
16
)
17
from localstack.utils.json import extract_jsonpath
1✔
18
from localstack.utils.strings import camel_to_snake_case
1✔
19

20
T = TypeVar("T")
1✔
21

22

23
class NothingType:
1✔
24
    """A sentinel that denotes 'no value' (distinct from None)."""
25

26
    _singleton = None
1✔
27
    __slots__ = ()
1✔
28

29
    def __new__(cls):
1✔
30
        if cls._singleton is None:
1✔
31
            cls._singleton = super().__new__(cls)
1✔
32
        return cls._singleton
1✔
33

34
    def __eq__(self, other):
1✔
35
        return is_nothing(other)
1✔
36

37
    def __str__(self):
1✔
38
        return repr(self)
×
39

40
    def __repr__(self) -> str:
41
        return "Nothing"
42

43
    def __bool__(self):
1✔
44
        return False
1✔
45

46
    def __iter__(self):
1✔
47
        return iter(())
1✔
48

49
    def __contains__(self, item):
1✔
50
        return False
1✔
51

52

53
Maybe = T | NothingType
1✔
54
Nothing = NothingType()
1✔
55

56

57
def is_nothing(value: Any) -> bool:
1✔
58
    return isinstance(value, NothingType)
1✔
59

60

61
def is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
62
    return is_nothing(before) and not is_nothing(after)
1✔
63

64

65
def is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
66
    return not is_nothing(before) and is_nothing(after)
1✔
67

68

69
def parent_change_type_of(children: list[Maybe[ChangeSetEntity]]):
1✔
70
    change_types = [c.change_type for c in children if not is_nothing(c)]
1✔
71
    if not change_types:
1✔
72
        return ChangeType.UNCHANGED
1✔
73
    # TODO: rework this logic. Currently if any values are different then we consider it
74
    #  modified, but e.g. if everything is unchanged or created, the result should probably be
75
    #  "created"
76
    first_type = change_types[0]
1✔
77
    if all(ct == first_type for ct in change_types):
1✔
78
        return first_type
1✔
79
    return ChangeType.MODIFIED
1✔
80

81

82
def change_type_of(before: Maybe[Any], after: Maybe[Any], children: list[Maybe[ChangeSetEntity]]):
1✔
83
    if is_created(before, after):
1✔
84
        change_type = ChangeType.CREATED
1✔
85
    elif is_removed(before, after):
1✔
86
        change_type = ChangeType.REMOVED
1✔
87
    else:
88
        change_type = parent_change_type_of(children)
1✔
89
    return change_type
1✔
90

91

92
class NormalisedGlobalTransformDefinition(TypedDict):
1✔
93
    Name: Any
1✔
94
    Parameters: Maybe[Any]
1✔
95

96

97
class Scope(str):
1✔
98
    _ROOT_SCOPE: Final[str] = ""
1✔
99
    _SEPARATOR: Final[str] = "/"
1✔
100

101
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
102
        return cast(Scope, super().__new__(cls, scope))
1✔
103

104
    def open_scope(self, name: Scope | str) -> Scope:
1✔
105
        return Scope(self._SEPARATOR.join([self, name]))
1✔
106

107
    def open_index(self, index: int) -> Scope:
1✔
108
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
109

110
    def unwrap(self) -> list[str]:
1✔
111
        return self.split(self._SEPARATOR)
×
112

113
    @property
1✔
114
    def parent(self) -> Scope:
1✔
115
        return Scope(self._SEPARATOR.join(self.split(self._SEPARATOR)[:-1]))
1✔
116

117
    @property
1✔
118
    def jsonpath(self) -> str:
1✔
119
        parts = self.split("/")
1✔
120
        json_parts = []
1✔
121

122
        for part in parts:
1✔
123
            if not part:  # Skip empty strings from leading/trailing slashes
1✔
124
                continue
1✔
125

126
            if part == "divergence":
1✔
127
                continue
1✔
128

129
            # Wrap keys with special characters (e.g., colon) in quotes
130
            if ":" in part:
1✔
131
                json_parts.append(f'"{part}"')
×
132
            else:
133
                json_parts.append(part)
1✔
134

135
        return f"$.{'.'.join(json_parts)}"
1✔
136

137

138
class ChangeType(enum.Enum):
1✔
139
    UNCHANGED = "Unchanged"
1✔
140
    CREATED = "Created"
1✔
141
    MODIFIED = "Modified"
1✔
142
    REMOVED = "Removed"
1✔
143

144
    def __str__(self):
1✔
145
        return self.value
×
146

147
    def to_change_action(self) -> ChangeAction:
1✔
148
        # Convert this change type into the change action used throughout the CFn API
149
        return {
×
150
            ChangeType.CREATED: ChangeAction.Add,
151
            ChangeType.MODIFIED: ChangeAction.Modify,
152
            ChangeType.REMOVED: ChangeAction.Remove,
153
        }.get(self, ChangeAction.Add)
154

155

156
class ChangeSetEntity(abc.ABC):
1✔
157
    scope: Final[Scope]
1✔
158
    change_type: ChangeType
1✔
159

160
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
161
        self.scope = scope
1✔
162
        self.change_type = change_type
1✔
163

164
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
165
        for child in self.__dict__.values():
1✔
166
            yield from self._get_children_in(child)
1✔
167

168
    @staticmethod
1✔
169
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
170
        # TODO: could avoid the inductive logic here, and check for loops?
171
        if isinstance(obj, ChangeSetEntity):
1✔
172
            yield obj
1✔
173
        elif isinstance(obj, list):
1✔
174
            for item in obj:
1✔
175
                yield from ChangeSetEntity._get_children_in(item)
1✔
176
        elif isinstance(obj, dict):
1✔
177
            for item in obj.values():
×
178
                yield from ChangeSetEntity._get_children_in(item)
×
179

180
    def __str__(self):
1✔
181
        return f"({self.__class__.__name__}| {vars(self)}"
×
182

183
    def __repr__(self):
184
        return str(self)
185

186

187
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
188

189

190
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
191

192

193
class UpdateModel:
1✔
194
    # TODO: may be expanded to keep track of other runtime values such as resolved_parameters.
195

196
    node_template: Final[NodeTemplate]
1✔
197
    before_runtime_cache: Final[dict]
1✔
198
    after_runtime_cache: Final[dict]
1✔
199

200
    def __init__(
1✔
201
        self,
202
        node_template: NodeTemplate,
203
    ):
204
        self.node_template = node_template
1✔
205
        self.before_runtime_cache = {}
1✔
206
        self.after_runtime_cache = {}
1✔
207

208

209
class NodeTemplate(ChangeSetNode):
1✔
210
    transform: Final[NodeTransform]
1✔
211
    mappings: Final[NodeMappings]
1✔
212
    parameters: Final[NodeParameters]
1✔
213
    conditions: Final[NodeConditions]
1✔
214
    resources: Final[NodeResources]
1✔
215
    outputs: Final[NodeOutputs]
1✔
216

217
    def __init__(
1✔
218
        self,
219
        scope: Scope,
220
        transform: NodeTransform,
221
        mappings: NodeMappings,
222
        parameters: NodeParameters,
223
        conditions: NodeConditions,
224
        resources: NodeResources,
225
        outputs: NodeOutputs,
226
    ):
227
        change_type = parent_change_type_of(
1✔
228
            [transform, mappings, parameters, conditions, resources, outputs]
229
        )
230
        super().__init__(scope=scope, change_type=change_type)
1✔
231
        self.transform = transform
1✔
232
        self.mappings = mappings
1✔
233
        self.parameters = parameters
1✔
234
        self.conditions = conditions
1✔
235
        self.resources = resources
1✔
236
        self.outputs = outputs
1✔
237

238

239
class NodeDivergence(ChangeSetNode):
1✔
240
    value: Final[ChangeSetEntity]
1✔
241
    divergence: Final[ChangeSetEntity]
1✔
242

243
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
244
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
245
        self.value = value
1✔
246
        self.divergence = divergence
1✔
247

248

249
class NodeParameter(ChangeSetNode):
1✔
250
    name: Final[str]
1✔
251
    type_: Final[ChangeSetEntity]
1✔
252
    dynamic_value: Final[ChangeSetEntity]
1✔
253
    default_value: Final[Maybe[ChangeSetEntity]]
1✔
254

255
    def __init__(
1✔
256
        self,
257
        scope: Scope,
258
        name: str,
259
        type_: ChangeSetEntity,
260
        dynamic_value: ChangeSetEntity,
261
        default_value: Maybe[ChangeSetEntity],
262
    ):
263
        change_type = parent_change_type_of([type_, default_value, dynamic_value])
1✔
264
        super().__init__(scope=scope, change_type=change_type)
1✔
265
        self.name = name
1✔
266
        self.type_ = type_
1✔
267
        self.dynamic_value = dynamic_value
1✔
268
        self.default_value = default_value
1✔
269

270

271
class NodeParameters(ChangeSetNode):
1✔
272
    parameters: Final[list[NodeParameter]]
1✔
273

274
    def __init__(self, scope: Scope, parameters: list[NodeParameter]):
1✔
275
        change_type = parent_change_type_of(parameters)
1✔
276
        super().__init__(scope=scope, change_type=change_type)
1✔
277
        self.parameters = parameters
1✔
278

279

280
class NodeMapping(ChangeSetNode):
1✔
281
    name: Final[str]
1✔
282
    bindings: Final[NodeObject]
1✔
283

284
    def __init__(self, scope: Scope, name: str, bindings: NodeObject):
1✔
285
        super().__init__(scope=scope, change_type=bindings.change_type)
1✔
286
        self.name = name
1✔
287
        self.bindings = bindings
1✔
288

289

290
class NodeMappings(ChangeSetNode):
1✔
291
    mappings: Final[list[NodeMapping]]
1✔
292

293
    def __init__(self, scope: Scope, mappings: list[NodeMapping]):
1✔
294
        change_type = parent_change_type_of(mappings)
1✔
295
        super().__init__(scope=scope, change_type=change_type)
1✔
296
        self.mappings = mappings
1✔
297

298

299
class NodeOutput(ChangeSetNode):
1✔
300
    name: Final[str]
1✔
301
    value: Final[ChangeSetEntity]
1✔
302
    export: Final[Maybe[ChangeSetEntity]]
1✔
303
    condition_reference: Final[Maybe[TerminalValue]]
1✔
304

305
    def __init__(
1✔
306
        self,
307
        scope: Scope,
308
        name: str,
309
        value: ChangeSetEntity,
310
        export: Maybe[ChangeSetEntity],
311
        conditional_reference: Maybe[TerminalValue],
312
    ):
313
        change_type = parent_change_type_of([value, export, conditional_reference])
1✔
314
        super().__init__(scope=scope, change_type=change_type)
1✔
315
        self.name = name
1✔
316
        self.value = value
1✔
317
        self.export = export
1✔
318
        self.condition_reference = conditional_reference
1✔
319

320

321
class NodeOutputs(ChangeSetNode):
1✔
322
    outputs: Final[list[NodeOutput]]
1✔
323

324
    def __init__(self, scope: Scope, outputs: list[NodeOutput]):
1✔
325
        change_type = parent_change_type_of(outputs)
1✔
326
        super().__init__(scope=scope, change_type=change_type)
1✔
327
        self.outputs = outputs
1✔
328

329

330
class NodeCondition(ChangeSetNode):
1✔
331
    name: Final[str]
1✔
332
    body: Final[ChangeSetEntity]
1✔
333

334
    def __init__(self, scope: Scope, name: str, body: ChangeSetEntity):
1✔
335
        super().__init__(scope=scope, change_type=body.change_type)
1✔
336
        self.name = name
1✔
337
        self.body = body
1✔
338

339

340
class NodeConditions(ChangeSetNode):
1✔
341
    conditions: Final[list[NodeCondition]]
1✔
342

343
    def __init__(self, scope: Scope, conditions: list[NodeCondition]):
1✔
344
        change_type = parent_change_type_of(conditions)
1✔
345
        super().__init__(scope=scope, change_type=change_type)
1✔
346
        self.conditions = conditions
1✔
347

348

349
class NodeGlobalTransform(ChangeSetNode):
1✔
350
    name: Final[TerminalValue]
1✔
351
    parameters: Final[Maybe[ChangeSetEntity]]
1✔
352

353
    def __init__(self, scope: Scope, name: TerminalValue, parameters: Maybe[ChangeSetEntity]):
1✔
354
        if not is_nothing(parameters):
1✔
355
            change_type = parent_change_type_of([name, parameters])
1✔
356
        else:
357
            change_type = name.change_type
×
358
        super().__init__(scope=scope, change_type=change_type)
1✔
359
        self.name = name
1✔
360
        self.parameters = parameters
1✔
361

362

363
class NodeTransform(ChangeSetNode):
1✔
364
    global_transforms: Final[list[NodeGlobalTransform]]
1✔
365

366
    def __init__(self, scope: Scope, global_transforms: list[NodeGlobalTransform]):
1✔
367
        change_type = parent_change_type_of(global_transforms)
1✔
368
        super().__init__(scope=scope, change_type=change_type)
1✔
369
        self.global_transforms = global_transforms
1✔
370

371

372
class NodeResources(ChangeSetNode):
1✔
373
    resources: Final[list[NodeResource]]
1✔
374
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
375
    fn_foreaches: Final[list[NodeForEach]]
1✔
376

377
    def __init__(
1✔
378
        self,
379
        scope: Scope,
380
        resources: list[NodeResource],
381
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
382
        fn_foreaches: list[NodeForEach],
383
    ):
384
        change_type = parent_change_type_of(resources + [fn_transform] + fn_foreaches)
1✔
385
        super().__init__(scope=scope, change_type=change_type)
1✔
386
        self.resources = resources
1✔
387
        self.fn_transform = fn_transform
1✔
388
        self.fn_foreaches = fn_foreaches
1✔
389

390

391
class NodeResource(ChangeSetNode):
1✔
392
    name: Final[str]
1✔
393
    type_: Final[ChangeSetTerminal]
1✔
394
    properties: Final[NodeProperties]
1✔
395
    condition_reference: Final[Maybe[TerminalValue]]
1✔
396
    depends_on: Final[Maybe[NodeDependsOn]]
1✔
397
    requires_replacement: Final[bool]
1✔
398
    deletion_policy: Final[Maybe[ChangeSetTerminal]]
1✔
399
    update_replace_policy: Final[Maybe[ChangeSetTerminal]]
1✔
400
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
401

402
    def __init__(
1✔
403
        self,
404
        scope: Scope,
405
        change_type: ChangeType,
406
        name: str,
407
        type_: ChangeSetTerminal,
408
        properties: NodeProperties,
409
        condition_reference: Maybe[TerminalValue],
410
        depends_on: Maybe[NodeDependsOn],
411
        requires_replacement: bool,
412
        deletion_policy: Maybe[ChangeSetTerminal],
413
        update_replace_policy: Maybe[ChangeSetTerminal],
414
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
415
    ):
416
        super().__init__(scope=scope, change_type=change_type)
1✔
417
        self.name = name
1✔
418
        self.type_ = type_
1✔
419
        self.properties = properties
1✔
420
        self.condition_reference = condition_reference
1✔
421
        self.depends_on = depends_on
1✔
422
        self.requires_replacement = requires_replacement
1✔
423
        self.deletion_policy = deletion_policy
1✔
424
        self.update_replace_policy = update_replace_policy
1✔
425
        self.fn_transform = fn_transform
1✔
426

427

428
class NodeProperties(ChangeSetNode):
1✔
429
    properties: Final[list[NodeProperty]]
1✔
430
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
431

432
    def __init__(
1✔
433
        self,
434
        scope: Scope,
435
        properties: list[NodeProperty],
436
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
437
    ):
438
        change_type = parent_change_type_of(properties)
1✔
439
        super().__init__(scope=scope, change_type=change_type)
1✔
440
        self.properties = properties
1✔
441
        self.fn_transform = fn_transform
1✔
442

443

444
class NodeDependsOn(ChangeSetNode):
1✔
445
    depends_on: Final[NodeArray]
1✔
446

447
    def __init__(self, scope: Scope, depends_on: NodeArray):
1✔
448
        super().__init__(scope=scope, change_type=depends_on.change_type)
1✔
449
        self.depends_on = depends_on
1✔
450

451

452
class NodeProperty(ChangeSetNode):
1✔
453
    name: Final[str]
1✔
454
    value: Final[ChangeSetEntity]
1✔
455

456
    def __init__(self, scope: Scope, name: str, value: ChangeSetEntity):
1✔
457
        super().__init__(scope=scope, change_type=value.change_type)
1✔
458
        self.name = name
1✔
459
        self.value = value
1✔
460

461

462
class NodeIntrinsicFunction(ChangeSetNode):
1✔
463
    intrinsic_function: Final[str]
1✔
464
    arguments: Final[ChangeSetEntity]
1✔
465

466
    def __init__(
1✔
467
        self,
468
        scope: Scope,
469
        change_type: ChangeType,
470
        intrinsic_function: str,
471
        arguments: ChangeSetEntity,
472
    ):
473
        super().__init__(scope=scope, change_type=change_type)
1✔
474
        self.intrinsic_function = intrinsic_function
1✔
475
        self.arguments = arguments
1✔
476

477

478
class NodeIntrinsicFunctionFnTransform(NodeIntrinsicFunction):
1✔
479
    def __init__(
1✔
480
        self,
481
        scope: Scope,
482
        change_type: ChangeType,
483
        intrinsic_function: str,
484
        arguments: ChangeSetEntity,
485
        before_siblings: list[Any],
486
        after_siblings: list[Any],
487
    ):
488
        super().__init__(
1✔
489
            scope=scope,
490
            change_type=change_type,
491
            intrinsic_function=intrinsic_function,
492
            arguments=arguments,
493
        )
494
        self.before_siblings = before_siblings
1✔
495
        self.after_siblings = after_siblings
1✔
496

497

498
class NodeForEach(ChangeSetNode):
1✔
499
    def __init__(
1✔
500
        self,
501
        scope: Scope,
502
        change_type: Final[ChangeType],
503
        arguments: Final[ChangeSetEntity],
504
    ):
505
        super().__init__(
1✔
506
            scope=scope,
507
            change_type=change_type,
508
        )
509
        self.arguments = arguments
1✔
510

511

512
class NodeObject(ChangeSetNode):
1✔
513
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
514

515
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
516
        super().__init__(scope=scope, change_type=change_type)
1✔
517
        self.bindings = bindings
1✔
518

519

520
class NodeArray(ChangeSetNode):
1✔
521
    array: Final[list[ChangeSetEntity]]
1✔
522

523
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
524
        super().__init__(scope=scope, change_type=change_type)
1✔
525
        self.array = array
1✔
526

527

528
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
529
    value: Final[Any]
1✔
530

531
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
532
        super().__init__(scope=scope, change_type=change_type)
1✔
533
        self.value = value
1✔
534

535

536
class TerminalValueModified(TerminalValue):
1✔
537
    modified_value: Final[Any]
1✔
538

539
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
540
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
541
        self.modified_value = modified_value
1✔
542

543

544
class TerminalValueCreated(TerminalValue):
1✔
545
    def __init__(self, scope: Scope, value: Any):
1✔
546
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
547

548

549
class TerminalValueRemoved(TerminalValue):
1✔
550
    def __init__(self, scope: Scope, value: Any):
1✔
551
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
552

553

554
class TerminalValueUnchanged(TerminalValue):
1✔
555
    def __init__(self, scope: Scope, value: Any):
1✔
556
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
557

558

559
NameKey: Final[str] = "Name"
1✔
560
TransformKey: Final[str] = "Transform"
1✔
561
TypeKey: Final[str] = "Type"
1✔
562
ConditionKey: Final[str] = "Condition"
1✔
563
ConditionsKey: Final[str] = "Conditions"
1✔
564
MappingsKey: Final[str] = "Mappings"
1✔
565
ResourcesKey: Final[str] = "Resources"
1✔
566
PropertiesKey: Final[str] = "Properties"
1✔
567
ParametersKey: Final[str] = "Parameters"
1✔
568
DefaultKey: Final[str] = "Default"
1✔
569
ValueKey: Final[str] = "Value"
1✔
570
ExportKey: Final[str] = "Export"
1✔
571
OutputsKey: Final[str] = "Outputs"
1✔
572
DependsOnKey: Final[str] = "DependsOn"
1✔
573
DeletionPolicyKey: Final[str] = "DeletionPolicy"
1✔
574
UpdateReplacePolicyKey: Final[str] = "UpdateReplacePolicy"
1✔
575
# TODO: expand intrinsic functions set.
576
RefKey: Final[str] = "Ref"
1✔
577
RefConditionKey: Final[str] = "Condition"
1✔
578
FnIfKey: Final[str] = "Fn::If"
1✔
579
FnAnd: Final[str] = "Fn::And"
1✔
580
FnOr: Final[str] = "Fn::Or"
1✔
581
FnNotKey: Final[str] = "Fn::Not"
1✔
582
FnJoinKey: Final[str] = "Fn::Join"
1✔
583
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
584
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
585
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
586
FnSubKey: Final[str] = "Fn::Sub"
1✔
587
FnTransform: Final[str] = "Fn::Transform"
1✔
588
FnSelect: Final[str] = "Fn::Select"
1✔
589
FnSplit: Final[str] = "Fn::Split"
1✔
590
FnGetAZs: Final[str] = "Fn::GetAZs"
1✔
591
FnBase64: Final[str] = "Fn::Base64"
1✔
592
FnImportValue: Final[str] = "Fn::ImportValue"
1✔
593
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
594
    RefKey,
595
    RefConditionKey,
596
    FnIfKey,
597
    FnAnd,
598
    FnOr,
599
    FnNotKey,
600
    FnJoinKey,
601
    FnEqualsKey,
602
    FnGetAttKey,
603
    FnFindInMapKey,
604
    FnSubKey,
605
    FnTransform,
606
    FnSelect,
607
    FnSplit,
608
    FnGetAZs,
609
    FnBase64,
610
    FnImportValue,
611
}
612

613

614
class ChangeSetModel:
1✔
615
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
616

617
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
618

619
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
620
    #  such as intrinsic functions.
621

622
    _before_template: Final[Maybe[dict]]
1✔
623
    _after_template: Final[Maybe[dict]]
1✔
624
    _before_parameters: Final[Maybe[dict]]
1✔
625
    _after_parameters: Final[Maybe[dict]]
1✔
626
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
627
    _node_template: Final[NodeTemplate]
1✔
628

629
    def __init__(
1✔
630
        self,
631
        before_template: dict | None,
632
        after_template: dict | None,
633
        before_parameters: dict | None,
634
        after_parameters: dict[str, EngineParameter] | None,
635
    ):
636
        self._before_template = before_template or Nothing
1✔
637
        self._after_template = after_template or Nothing
1✔
638
        self._before_parameters = before_parameters or Nothing
1✔
639
        self._after_parameters = after_parameters or Nothing
1✔
640
        self._visited_scopes = {}
1✔
641
        # TODO: move this modeling process to the `get_update_model` method as constructors shouldn't do work
642
        self._node_template = self._model(
1✔
643
            before_template=self._before_template, after_template=self._after_template
644
        )
645
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
646

647
    def get_update_model(self) -> UpdateModel:
1✔
648
        return UpdateModel(node_template=self._node_template)
1✔
649

650
    def _visit_terminal_value(
1✔
651
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
652
    ) -> TerminalValue:
653
        terminal_value = self._visited_scopes.get(scope)
1✔
654
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
655
            return terminal_value
×
656
        if is_created(before=before_value, after=after_value):
1✔
657
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
658
        elif is_removed(before=before_value, after=after_value):
1✔
659
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
660
        elif before_value == after_value:
1✔
661
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
662
        else:
663
            terminal_value = TerminalValueModified(
1✔
664
                scope=scope, value=before_value, modified_value=after_value
665
            )
666
        self._visited_scopes[scope] = terminal_value
1✔
667
        return terminal_value
1✔
668

669
    def _visit_intrinsic_function(
1✔
670
        self,
671
        scope: Scope,
672
        intrinsic_function: str,
673
        before_arguments: Maybe[Any],
674
        after_arguments: Maybe[Any],
675
    ) -> NodeIntrinsicFunction:
676
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
677
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
678
            return node_intrinsic_function
×
679
        arguments_scope = scope.open_scope("args")
1✔
680
        arguments = self._visit_value(
1✔
681
            scope=arguments_scope, before_value=before_arguments, after_value=after_arguments
682
        )
683

684
        if intrinsic_function == "Ref" and arguments.value == "AWS::NoValue":
1✔
685
            arguments.value = Nothing
1✔
686

687
        if is_created(before=before_arguments, after=after_arguments):
1✔
688
            change_type = ChangeType.CREATED
1✔
689
        elif is_removed(before=before_arguments, after=after_arguments):
1✔
690
            change_type = ChangeType.REMOVED
1✔
691
        else:
692
            function_name = intrinsic_function.replace("::", "_")
1✔
693
            function_name = camel_to_snake_case(function_name)
1✔
694
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
695
            if hasattr(self, resolve_function_name):
1✔
696
                resolve_function = getattr(self, resolve_function_name)
1✔
697
                change_type = resolve_function(arguments)
1✔
698
            else:
699
                change_type = arguments.change_type
1✔
700

701
        if intrinsic_function == FnTransform:
1✔
702
            if scope.count(FnTransform) > 1:
1✔
UNCOV
703
                raise RuntimeError(
×
704
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
705
                )
706

707
            path = "$" + ".".join(scope.split("/")[:-1])
1✔
708
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
709
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
710

711
            node_intrinsic_function = NodeIntrinsicFunctionFnTransform(
1✔
712
                scope=scope,
713
                change_type=change_type,
714
                arguments=arguments,
715
                intrinsic_function=intrinsic_function,
716
                before_siblings=before_siblings,
717
                after_siblings=after_siblings,
718
            )
719
        else:
720
            node_intrinsic_function = NodeIntrinsicFunction(
1✔
721
                scope=scope,
722
                change_type=change_type,
723
                intrinsic_function=intrinsic_function,
724
                arguments=arguments,
725
            )
726
        self._visited_scopes[scope] = node_intrinsic_function
1✔
727
        return node_intrinsic_function
1✔
728

729
    def _visit_foreach(
1✔
730
        self, scope: Scope, before_arguments: Maybe[list], after_arguments: Maybe[list]
731
    ) -> NodeForEach:
732
        node_foreach = self._visited_scopes.get(scope)
1✔
733
        if isinstance(node_foreach, NodeForEach):
1✔
UNCOV
734
            return node_foreach
×
735
        arguments_scope = scope.open_scope("args")
1✔
736
        arguments = self._visit_array(
1✔
737
            arguments_scope, before_array=before_arguments, after_array=after_arguments
738
        )
739
        return NodeForEach(scope=scope, change_type=arguments.change_type, arguments=arguments)
1✔
740

741
    def _resolve_intrinsic_function_fn_sub(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
742
        # TODO: This routine should instead export the implicit Ref and GetAtt calls within the first
743
        #       string template parameter and compute the respective change set types. Currently,
744
        #       changes referenced by Fn::Sub templates are only picked up during preprocessing; not
745
        #       at modelling.
746
        return arguments.change_type
1✔
747

748
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
749
        # TODO: add support for nested intrinsic functions.
750
        # TODO: validate arguments structure and type.
751
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
752

753
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
754
            raise RuntimeError()
×
755
        logical_name_of_resource_entity = arguments.array[0]
1✔
756
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
UNCOV
757
            raise RuntimeError()
×
758
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
759
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
760
            raise RuntimeError()
×
761
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
762
            resource_name=logical_name_of_resource
763
        )
764

765
        node_property_attribute_name = arguments.array[1]
1✔
766
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
767
            raise RuntimeError()
×
768
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
UNCOV
769
            attribute_name = node_property_attribute_name.modified_value
×
770
        else:
771
            attribute_name = node_property_attribute_name.value
1✔
772

773
        # TODO: this is another use case for which properties should be referenced by name
774
        for node_property in node_resource.properties.properties:
1✔
775
            if node_property.name == attribute_name:
1✔
776
                return node_property.change_type
1✔
777

778
        return ChangeType.UNCHANGED
1✔
779

780
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
781
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
782
            return arguments.change_type
×
783
        if not isinstance(arguments, TerminalValue):
1✔
UNCOV
784
            return arguments.change_type
×
785

786
        logical_id = arguments.value
1✔
787

788
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
789
        if isinstance(node_condition, NodeCondition):
1✔
790
            return node_condition.change_type
×
791

792
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
793
        if isinstance(node_parameter, NodeParameter):
1✔
794
            return node_parameter.change_type
1✔
795

796
        # TODO: this should check the replacement flag for a resource update.
797
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
798
        return node_resource.change_type
1✔
799

800
    def _resolve_intrinsic_function_condition(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
UNCOV
801
        if arguments.change_type != ChangeType.UNCHANGED:
×
UNCOV
802
            return arguments.change_type
×
UNCOV
803
        if not isinstance(arguments, TerminalValue):
×
UNCOV
804
            return arguments.change_type
×
805

UNCOV
806
        condition_name = arguments.value
×
UNCOV
807
        node_condition = self._retrieve_condition_if_exists(condition_name=condition_name)
×
UNCOV
808
        if isinstance(node_condition, NodeCondition):
×
809
            return node_condition.change_type
×
UNCOV
810
        raise RuntimeError(f"Undefined condition '{condition_name}'")
×
811

812
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
813
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
814
            return arguments.change_type
1✔
815
        # TODO: validate arguments structure and type.
816
        # TODO: add support for nested functions, here we assume the arguments are string literals.
817

818
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
819
            raise RuntimeError()
×
820
        argument_mapping_name = arguments.array[0]
1✔
821
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
822
            raise NotImplementedError()
823
        argument_top_level_key = arguments.array[1]
1✔
824
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
825
            raise NotImplementedError()
826
        argument_second_level_key = arguments.array[2]
1✔
827
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
828
            raise NotImplementedError()
829
        mapping_name = argument_mapping_name.value
1✔
830
        top_level_key = argument_top_level_key.value
1✔
831
        second_level_key = argument_second_level_key.value
1✔
832

833
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
834
        # TODO: a lookup would be beneficial in this scenario too;
835
        #  consider implications downstream and for replication.
836
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
837
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
838
            raise RuntimeError()
×
839
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
840
        return target_map_value.change_type
1✔
841

842
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
843
        # TODO: validate arguments structure and type.
UNCOV
844
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
UNCOV
845
            raise RuntimeError()
×
UNCOV
846
        logical_name_of_condition_entity = arguments.array[0]
×
UNCOV
847
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
UNCOV
848
            raise RuntimeError()
×
UNCOV
849
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
UNCOV
850
        if not isinstance(logical_name_of_condition, str):
×
UNCOV
851
            raise RuntimeError()
×
852

UNCOV
853
        node_condition = self._retrieve_condition_if_exists(
×
854
            condition_name=logical_name_of_condition
855
        )
UNCOV
856
        if not isinstance(node_condition, NodeCondition):
×
UNCOV
857
            raise RuntimeError()
×
UNCOV
858
        change_type = parent_change_type_of([node_condition, *arguments.array[1:]])
×
UNCOV
859
        return change_type
×
860

861
    def _resolve_requires_replacement(
1✔
862
        self, node_properties: NodeProperties, resource_type: TerminalValue
863
    ) -> bool:
864
        # a bit hacky but we have to load the resource provider executor _and_ resource provider to get the schema
865
        # Note: we don't log the attempt to load the resource provider, we need to make sure this is only done once and we already do this in the executor
866
        resource_provider = ResourceProviderExecutor.try_load_resource_provider(resource_type.value)
1✔
867
        if not resource_provider:
1✔
868
            # if we don't support a resource, assume an in-place update for simplicity
869
            return False
1✔
870

871
        create_only_properties: list[str] = resource_provider.SCHEMA.get("createOnlyProperties", [])
1✔
872
        # TODO: also hacky: strip the leading `/properties/` string from the definition
873
        #       ideally we should use a jsonpath or similar
874
        create_only_properties = [
1✔
875
            property.replace("/properties/", "", 1) for property in create_only_properties
876
        ]
877
        for node_property in node_properties.properties:
1✔
878
            if (
1✔
879
                node_property.change_type == ChangeType.MODIFIED
880
                and node_property.name in create_only_properties
881
            ):
882
                return True
1✔
883
        return False
1✔
884

885
    def _visit_array(
1✔
886
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
887
    ) -> NodeArray:
888
        array: list[ChangeSetEntity] = []
1✔
889
        for index, (before_value, after_value) in enumerate(
1✔
890
            zip_longest(before_array, after_array, fillvalue=Nothing)
891
        ):
892
            value_scope = scope.open_index(index=index)
1✔
893
            value = self._visit_value(
1✔
894
                scope=value_scope, before_value=before_value, after_value=after_value
895
            )
896
            array.append(value)
1✔
897
        change_type = change_type_of(before_array, after_array, array)
1✔
898
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
899

900
    def _visit_object(
1✔
901
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
902
    ) -> NodeObject:
903
        node_object = self._visited_scopes.get(scope)
1✔
904
        if isinstance(node_object, NodeObject):
1✔
905
            return node_object
1✔
906
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
907
        bindings: dict[str, ChangeSetEntity] = {}
1✔
908
        for binding_name in binding_names:
1✔
909
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
910
                scope, binding_name, before_object, after_object
911
            )
912
            value = self._visit_value(
1✔
913
                scope=binding_scope, before_value=before_value, after_value=after_value
914
            )
915
            bindings[binding_name] = value
1✔
916
        change_type = change_type_of(before_object, after_object, list(bindings.values()))
1✔
917
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
918
        self._visited_scopes[scope] = node_object
1✔
919
        return node_object
1✔
920

921
    def _visit_divergence(
1✔
922
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
923
    ) -> NodeDivergence:
924
        scope_value = scope.open_scope("value")
1✔
925
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
926
        scope_divergence = scope.open_scope("divergence")
1✔
927
        divergence = self._visit_value(
1✔
928
            scope=scope_divergence, before_value=Nothing, after_value=after_value
929
        )
930
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
931

932
    def _visit_value(
1✔
933
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
934
    ) -> ChangeSetEntity:
935
        value = self._visited_scopes.get(scope)
1✔
936
        if isinstance(value, ChangeSetEntity):
1✔
UNCOV
937
            return value
×
938

939
        before_type_name = self._type_name_of(before_value)
1✔
940
        after_type_name = self._type_name_of(after_value)
1✔
941
        unset = object()
1✔
942
        if before_type_name == after_type_name:
1✔
943
            dominant_value = before_value
1✔
944
        elif is_created(before=before_value, after=after_value):
1✔
945
            dominant_value = after_value
1✔
946
        elif is_removed(before=before_value, after=after_value):
1✔
947
            dominant_value = before_value
1✔
948
        else:
949
            dominant_value = unset
1✔
950
        if dominant_value is not unset:
1✔
951
            dominant_type_name = self._type_name_of(dominant_value)
1✔
952
            if self._is_terminal(value=dominant_value):
1✔
953
                value = self._visit_terminal_value(
1✔
954
                    scope=scope, before_value=before_value, after_value=after_value
955
                )
956
            elif self._is_object(value=dominant_value):
1✔
957
                value = self._visit_object(
1✔
958
                    scope=scope, before_object=before_value, after_object=after_value
959
                )
960
            elif self._is_array(value=dominant_value):
1✔
961
                value = self._visit_array(
1✔
962
                    scope=scope, before_array=before_value, after_array=after_value
963
                )
964
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
965
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
966
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
967
                )
968
                value = self._visit_intrinsic_function(
1✔
969
                    scope=intrinsic_function_scope,
970
                    intrinsic_function=dominant_type_name,
971
                    before_arguments=before_arguments,
972
                    after_arguments=after_arguments,
973
                )
974
            else:
UNCOV
975
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
976
        # Case: type divergence.
977
        else:
978
            value = self._visit_divergence(
1✔
979
                scope=scope, before_value=before_value, after_value=after_value
980
            )
981
        self._visited_scopes[scope] = value
1✔
982
        return value
1✔
983

984
    def _visit_property(
1✔
985
        self,
986
        scope: Scope,
987
        property_name: str,
988
        before_property: Maybe[Any],
989
        after_property: Maybe[Any],
990
    ) -> NodeProperty:
991
        node_property = self._visited_scopes.get(scope)
1✔
992
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
993
            return node_property
×
994
        # TODO: Review the use of Fn::Transform as resource properties.
995
        value = self._visit_value(
1✔
996
            scope=scope, before_value=before_property, after_value=after_property
997
        )
998
        node_property = NodeProperty(scope=scope, name=property_name, value=value)
1✔
999
        self._visited_scopes[scope] = node_property
1✔
1000
        return node_property
1✔
1001

1002
    def _visit_properties(
1✔
1003
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
1004
    ) -> NodeProperties:
1005
        node_properties = self._visited_scopes.get(scope)
1✔
1006
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
1007
            return node_properties
×
1008
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
1009
        properties: list[NodeProperty] = []
1✔
1010
        fn_transform = Nothing
1✔
1011

1012
        for property_name in property_names:
1✔
1013
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
1014
                scope, property_name, before_properties, after_properties
1015
            )
1016
            if property_name == FnTransform:
1✔
1017
                fn_transform = self._visit_intrinsic_function(
1✔
1018
                    property_scope, FnTransform, before_property, after_property
1019
                )
1020
                continue
1✔
1021

1022
            property_ = self._visit_property(
1✔
1023
                scope=property_scope,
1024
                property_name=property_name,
1025
                before_property=before_property,
1026
                after_property=after_property,
1027
            )
1028
            properties.append(property_)
1✔
1029

1030
        node_properties = NodeProperties(
1✔
1031
            scope=scope, properties=properties, fn_transform=fn_transform
1032
        )
1033
        self._visited_scopes[scope] = node_properties
1✔
1034
        return node_properties
1✔
1035

1036
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
1037
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
1038
        if not isinstance(value, TerminalValue):
1✔
1039
            # TODO: decide where template schema validation should occur.
UNCOV
1040
            raise RuntimeError()
×
1041
        return value
1✔
1042

1043
    def _visit_deletion_policy(
1✔
1044
        self, scope: Scope, before_deletion_policy: Any, after_deletion_policy: Any
1045
    ) -> TerminalValue:
1046
        value = self._visit_value(
1✔
1047
            scope=scope, before_value=before_deletion_policy, after_value=after_deletion_policy
1048
        )
1049
        if not isinstance(value, TerminalValue):
1✔
1050
            # TODO: decide where template schema validation should occur.
UNCOV
1051
            raise RuntimeError()
×
1052
        return value
1✔
1053

1054
    def _visit_update_replace_policy(
1✔
1055
        self, scope: Scope, before_update_replace_policy: Any, after_deletion_policy: Any
1056
    ) -> TerminalValue:
1057
        value = self._visit_value(
1✔
1058
            scope=scope,
1059
            before_value=before_update_replace_policy,
1060
            after_value=after_deletion_policy,
1061
        )
1062
        if not isinstance(value, TerminalValue):
1✔
1063
            # TODO: decide where template schema validation should occur.
UNCOV
1064
            raise RuntimeError()
×
1065
        return value
1✔
1066

1067
    def _visit_resource(
1✔
1068
        self,
1069
        scope: Scope,
1070
        resource_name: str,
1071
        before_resource: Maybe[dict],
1072
        after_resource: Maybe[dict],
1073
    ) -> NodeResource:
1074
        node_resource = self._visited_scopes.get(scope)
1✔
1075
        if isinstance(node_resource, NodeResource):
1✔
1076
            return node_resource
1✔
1077

1078
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
1079
            scope, TypeKey, before_resource, after_resource
1080
        )
1081
        terminal_value_type = self._visit_type(
1✔
1082
            scope=scope_type, before_type=before_type, after_type=after_type
1083
        )
1084

1085
        condition_reference = Nothing
1✔
1086
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1087
            scope, ConditionKey, before_resource, after_resource
1088
        )
1089
        if before_condition or after_condition:
1✔
1090
            condition_reference = self._visit_terminal_value(
1✔
1091
                scope_condition, before_condition, after_condition
1092
            )
1093

1094
        depends_on = Nothing
1✔
1095
        scope_depends_on, (before_depends_on, after_depends_on) = self._safe_access_in(
1✔
1096
            scope, DependsOnKey, before_resource, after_resource
1097
        )
1098
        if before_depends_on or after_depends_on:
1✔
1099
            depends_on = self._visit_depends_on(
1✔
1100
                scope_depends_on, before_depends_on, after_depends_on
1101
            )
1102

1103
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
1104
            scope, PropertiesKey, before_resource, after_resource
1105
        )
1106
        properties = self._visit_properties(
1✔
1107
            scope=scope_properties,
1108
            before_properties=before_properties,
1109
            after_properties=after_properties,
1110
        )
1111

1112
        deletion_policy = Nothing
1✔
1113
        scope_deletion_policy, (before_deletion_policy, after_deletion_policy) = (
1✔
1114
            self._safe_access_in(scope, DeletionPolicyKey, before_resource, after_resource)
1115
        )
1116
        if before_deletion_policy or after_deletion_policy:
1✔
1117
            deletion_policy = self._visit_deletion_policy(
1✔
1118
                scope_deletion_policy, before_deletion_policy, after_deletion_policy
1119
            )
1120

1121
        update_replace_policy = Nothing
1✔
1122
        scope_update_replace_policy, (before_update_replace_policy, after_update_replace_policy) = (
1✔
1123
            self._safe_access_in(scope, UpdateReplacePolicyKey, before_resource, after_resource)
1124
        )
1125
        if before_update_replace_policy or after_update_replace_policy:
1✔
1126
            update_replace_policy = self._visit_update_replace_policy(
1✔
1127
                scope_update_replace_policy,
1128
                before_update_replace_policy,
1129
                after_update_replace_policy,
1130
            )
1131

1132
        fn_transform = Nothing
1✔
1133
        scope_fn_transform, (before_fn_transform_args, after_fn_transform_args) = (
1✔
1134
            self._safe_access_in(scope, FnTransform, before_resource, after_resource)
1135
        )
1136
        if not is_nothing(before_fn_transform_args) or not is_nothing(after_fn_transform_args):
1✔
1137
            if scope_fn_transform.count(FnTransform) > 1:
1✔
UNCOV
1138
                raise RuntimeError(
×
1139
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
1140
                )
1141
            path = "$" + ".".join(scope_fn_transform.split("/")[:-1])
1✔
1142
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
1143
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
1144
            arguments_scope = scope.open_scope("args")
1✔
1145
            arguments = self._visit_value(
1✔
1146
                scope=arguments_scope,
1147
                before_value=before_fn_transform_args,
1148
                after_value=after_fn_transform_args,
1149
            )
1150
            fn_transform = NodeIntrinsicFunctionFnTransform(
1✔
1151
                scope=scope_fn_transform,
1152
                change_type=ChangeType.MODIFIED,  # TODO
1153
                arguments=arguments,  # TODO
1154
                intrinsic_function=FnTransform,
1155
                before_siblings=before_siblings,
1156
                after_siblings=after_siblings,
1157
            )
1158

1159
        change_type = change_type_of(
1✔
1160
            before_resource,
1161
            after_resource,
1162
            [
1163
                properties,
1164
                condition_reference,
1165
                depends_on,
1166
                deletion_policy,
1167
                update_replace_policy,
1168
                fn_transform,
1169
            ],
1170
        )
1171
        requires_replacement = self._resolve_requires_replacement(
1✔
1172
            node_properties=properties, resource_type=terminal_value_type
1173
        )
1174
        node_resource = NodeResource(
1✔
1175
            scope=scope,
1176
            change_type=change_type,
1177
            name=resource_name,
1178
            type_=terminal_value_type,
1179
            properties=properties,
1180
            condition_reference=condition_reference,
1181
            depends_on=depends_on,
1182
            requires_replacement=requires_replacement,
1183
            deletion_policy=deletion_policy,
1184
            update_replace_policy=update_replace_policy,
1185
            fn_transform=fn_transform,
1186
        )
1187
        self._visited_scopes[scope] = node_resource
1✔
1188
        return node_resource
1✔
1189

1190
    def _visit_resources(
1✔
1191
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
1192
    ) -> NodeResources:
1193
        # TODO: investigate type changes behavior.
1194
        resources: list[NodeResource] = []
1✔
1195
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
1196
        fn_transform = Nothing
1✔
1197
        fn_foreaches = []
1✔
1198
        for resource_name in resource_names:
1✔
1199
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1200
                scope, resource_name, before_resources, after_resources
1201
            )
1202
            if resource_name == FnTransform:
1✔
1203
                fn_transform = self._visit_intrinsic_function(
1✔
1204
                    scope=resource_scope,
1205
                    intrinsic_function=resource_name,
1206
                    before_arguments=before_resource,
1207
                    after_arguments=after_resource,
1208
                )
1209
                continue
1✔
1210
            elif resource_name.startswith("Fn::ForEach"):
1✔
1211
                fn_for_each = self._visit_foreach(
1✔
1212
                    scope=resource_scope,
1213
                    before_arguments=before_resource,
1214
                    after_arguments=after_resource,
1215
                )
1216
                fn_foreaches.append(fn_for_each)
1✔
1217
                continue
1✔
1218
            resource = self._visit_resource(
1✔
1219
                scope=resource_scope,
1220
                resource_name=resource_name,
1221
                before_resource=before_resource,
1222
                after_resource=after_resource,
1223
            )
1224
            resources.append(resource)
1✔
1225
        return NodeResources(
1✔
1226
            scope=scope,
1227
            resources=resources,
1228
            fn_transform=fn_transform,
1229
            fn_foreaches=fn_foreaches,
1230
        )
1231

1232
    def _visit_mapping(
1✔
1233
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
1234
    ) -> NodeMapping:
1235
        bindings = self._visit_object(
1✔
1236
            scope=scope, before_object=before_mapping, after_object=after_mapping
1237
        )
1238
        return NodeMapping(scope=scope, name=name, bindings=bindings)
1✔
1239

1240
    def _visit_mappings(
1✔
1241
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
1242
    ) -> NodeMappings:
1243
        mappings: list[NodeMapping] = []
1✔
1244
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
1245
        for mapping_name in mapping_names:
1✔
1246
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1247
                scope, mapping_name, before_mappings, after_mappings
1248
            )
1249
            mapping = self._visit_mapping(
1✔
1250
                scope=scope_mapping,
1251
                name=mapping_name,
1252
                before_mapping=before_mapping,
1253
                after_mapping=after_mapping,
1254
            )
1255
            mappings.append(mapping)
1✔
1256
        return NodeMappings(scope=scope, mappings=mappings)
1✔
1257

1258
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
1259
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
1260
        scope_parameter, (before_parameter_dct, after_parameter_dct) = self._safe_access_in(
1✔
1261
            scope, parameter_name, self._before_parameters, self._after_parameters
1262
        )
1263

1264
        before_parameter = Nothing
1✔
1265
        if not is_nothing(before_parameter_dct):
1✔
1266
            before_parameter = before_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1267
                before_parameter_dct
1268
            )
1269

1270
        after_parameter = Nothing
1✔
1271
        if not is_nothing(after_parameter_dct):
1✔
1272
            after_parameter = after_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1273
                after_parameter_dct
1274
            )
1275

1276
        parameter = self._visit_value(
1✔
1277
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
1278
        )
1279
        return parameter
1✔
1280

1281
    def _visit_parameter(
1✔
1282
        self,
1283
        scope: Scope,
1284
        parameter_name: str,
1285
        before_parameter: Maybe[dict],
1286
        after_parameter: Maybe[dict],
1287
    ) -> NodeParameter:
1288
        node_parameter = self._visited_scopes.get(scope)
1✔
1289
        if isinstance(node_parameter, NodeParameter):
1✔
1290
            return node_parameter
1✔
1291

1292
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
1293
            scope, TypeKey, before_parameter, after_parameter
1294
        )
1295
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
1296

1297
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
1298
            scope, DefaultKey, before_parameter, after_parameter
1299
        )
1300
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
1301

1302
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
1303

1304
        node_parameter = NodeParameter(
1✔
1305
            scope=scope,
1306
            name=parameter_name,
1307
            type_=type_,
1308
            default_value=default_value,
1309
            dynamic_value=dynamic_value,
1310
        )
1311
        self._visited_scopes[scope] = node_parameter
1✔
1312
        return node_parameter
1✔
1313

1314
    def _visit_parameters(
1✔
1315
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
1316
    ) -> NodeParameters:
1317
        node_parameters = self._visited_scopes.get(scope)
1✔
1318
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
1319
            return node_parameters
×
1320
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
1321
        parameters: list[NodeParameter] = []
1✔
1322
        for parameter_name in parameter_names:
1✔
1323
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1324
                scope, parameter_name, before_parameters, after_parameters
1325
            )
1326
            parameter = self._visit_parameter(
1✔
1327
                scope=parameter_scope,
1328
                parameter_name=parameter_name,
1329
                before_parameter=before_parameter,
1330
                after_parameter=after_parameter,
1331
            )
1332
            parameters.append(parameter)
1✔
1333
        node_parameters = NodeParameters(scope=scope, parameters=parameters)
1✔
1334
        self._visited_scopes[scope] = node_parameters
1✔
1335
        return node_parameters
1✔
1336

1337
    @staticmethod
1✔
1338
    def _normalise_depends_on_value(value: Maybe[str | list[str]]) -> Maybe[list[str]]:
1✔
1339
        # To simplify downstream logics, reduce the type options to array of strings.
1340
        # TODO: Add integrations tests for DependsOn validations (invalid types, duplicate identifiers, etc.)
1341
        if isinstance(value, NothingType):
1✔
1342
            return value
1✔
1343
        if isinstance(value, str):
1✔
1344
            value = [value]
1✔
1345
        elif isinstance(value, list):
1✔
1346
            value.sort()
1✔
1347
        else:
UNCOV
1348
            raise RuntimeError(
×
1349
                f"Invalid type for DependsOn, expected a String or Array of String, but got: '{value}'"
1350
            )
1351
        return value
1✔
1352

1353
    def _visit_depends_on(
1✔
1354
        self,
1355
        scope: Scope,
1356
        before_depends_on: Maybe[str | list[str]],
1357
        after_depends_on: Maybe[str | list[str]],
1358
    ) -> NodeDependsOn:
1359
        before_depends_on = self._normalise_depends_on_value(value=before_depends_on)
1✔
1360
        after_depends_on = self._normalise_depends_on_value(value=after_depends_on)
1✔
1361
        node_array = self._visit_array(
1✔
1362
            scope=scope, before_array=before_depends_on, after_array=after_depends_on
1363
        )
1364
        node_depends_on = NodeDependsOn(scope=scope, depends_on=node_array)
1✔
1365
        return node_depends_on
1✔
1366

1367
    def _visit_condition(
1✔
1368
        self,
1369
        scope: Scope,
1370
        condition_name: str,
1371
        before_condition: Maybe[dict],
1372
        after_condition: Maybe[dict],
1373
    ) -> NodeCondition:
1374
        node_condition = self._visited_scopes.get(scope)
1✔
1375
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
1376
            return node_condition
×
1377
        body = self._visit_value(
1✔
1378
            scope=scope, before_value=before_condition, after_value=after_condition
1379
        )
1380
        node_condition = NodeCondition(scope=scope, name=condition_name, body=body)
1✔
1381
        self._visited_scopes[scope] = node_condition
1✔
1382
        return node_condition
1✔
1383

1384
    def _visit_conditions(
1✔
1385
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
1386
    ) -> NodeConditions:
1387
        node_conditions = self._visited_scopes.get(scope)
1✔
1388
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
1389
            return node_conditions
×
1390
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
1391
        conditions: list[NodeCondition] = []
1✔
1392
        for condition_name in condition_names:
1✔
1393
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1394
                scope, condition_name, before_conditions, after_conditions
1395
            )
1396
            condition = self._visit_condition(
1✔
1397
                scope=condition_scope,
1398
                condition_name=condition_name,
1399
                before_condition=before_condition,
1400
                after_condition=after_condition,
1401
            )
1402
            conditions.append(condition)
1✔
1403
        node_conditions = NodeConditions(scope=scope, conditions=conditions)
1✔
1404
        self._visited_scopes[scope] = node_conditions
1✔
1405
        return node_conditions
1✔
1406

1407
    def _visit_output(
1✔
1408
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
1409
    ) -> NodeOutput:
1410
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
1411
            scope, ValueKey, before_output, after_output
1412
        )
1413
        value = self._visit_value(scope_value, before_value, after_value)
1✔
1414

1415
        export: Maybe[ChangeSetEntity] = Nothing
1✔
1416
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
1417
            scope, ExportKey, before_output, after_output
1418
        )
1419
        if before_export or after_export:
1✔
1420
            export = self._visit_value(scope_export, before_export, after_export)
1✔
1421

1422
        # TODO: condition references should be resolved for the condition's change_type?
1423
        condition_reference: Maybe[TerminalValue] = Nothing
1✔
1424
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1425
            scope, ConditionKey, before_output, after_output
1426
        )
1427
        if before_condition or after_condition:
1✔
1428
            condition_reference = self._visit_terminal_value(
1✔
1429
                scope_condition, before_condition, after_condition
1430
            )
1431

1432
        return NodeOutput(
1✔
1433
            scope=scope,
1434
            name=name,
1435
            value=value,
1436
            export=export,
1437
            conditional_reference=condition_reference,
1438
        )
1439

1440
    def _visit_outputs(
1✔
1441
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1442
    ) -> NodeOutputs:
1443
        outputs: list[NodeOutput] = []
1✔
1444
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1445
        for output_name in output_names:
1✔
1446
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1447
                scope, output_name, before_outputs, after_outputs
1448
            )
1449
            output = self._visit_output(
1✔
1450
                scope=scope_output,
1451
                name=output_name,
1452
                before_output=before_output,
1453
                after_output=after_output,
1454
            )
1455
            outputs.append(output)
1✔
1456
        return NodeOutputs(scope=scope, outputs=outputs)
1✔
1457

1458
    def _visit_global_transform(
1✔
1459
        self,
1460
        scope: Scope,
1461
        before_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1462
        after_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1463
    ) -> NodeGlobalTransform:
1464
        name_scope, (before_name, after_name) = self._safe_access_in(
1✔
1465
            scope, NameKey, before_global_transform, after_global_transform
1466
        )
1467
        name = self._visit_terminal_value(
1✔
1468
            scope=name_scope, before_value=before_name, after_value=after_name
1469
        )
1470

1471
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1472
            scope, ParametersKey, before_global_transform, after_global_transform
1473
        )
1474
        parameters = self._visit_value(
1✔
1475
            scope=parameters_scope, before_value=before_parameters, after_value=after_parameters
1476
        )
1477

1478
        return NodeGlobalTransform(scope=scope, name=name, parameters=parameters)
1✔
1479

1480
    @staticmethod
1✔
1481
    def _normalise_transformer_value(value: Maybe[str | list[Any]]) -> Maybe[list[Any]]:
1✔
1482
        # To simplify downstream logics, reduce the type options to array of transformations.
1483
        # TODO: add further validation logic
1484
        # TODO: should we sort to avoid detecting user-side ordering changes as template changes?
1485
        if isinstance(value, NothingType):
1✔
1486
            return value
1✔
1487
        elif isinstance(value, str):
1✔
1488
            value = [NormalisedGlobalTransformDefinition(Name=value, Parameters=Nothing)]
1✔
1489
        elif isinstance(value, list):
1✔
1490
            tmp_value = []
1✔
1491
            for item in value:
1✔
1492
                if isinstance(item, str):
1✔
1493
                    tmp_value.append(
1✔
1494
                        NormalisedGlobalTransformDefinition(Name=item, Parameters=Nothing)
1495
                    )
1496
                else:
1497
                    tmp_value.append(item)
1✔
1498
            value = tmp_value
1✔
1499
        elif isinstance(value, dict):
1✔
1500
            if "Name" not in value:
1✔
UNCOV
1501
                raise RuntimeError(f"Missing 'Name' field in Transform definition '{value}'")
×
1502
            name = value["Name"]
1✔
1503
            parameters = value.get("Parameters", Nothing)
1✔
1504
            value = [NormalisedGlobalTransformDefinition(Name=name, Parameters=parameters)]
1✔
1505
        else:
UNCOV
1506
            raise RuntimeError(f"Invalid Transform definition: '{value}'")
×
1507
        return value
1✔
1508

1509
    def _visit_transform(
1✔
1510
        self, scope: Scope, before_transform: Maybe[Any], after_transform: Maybe[Any]
1511
    ) -> NodeTransform:
1512
        before_transform_normalised = self._normalise_transformer_value(before_transform)
1✔
1513
        after_transform_normalised = self._normalise_transformer_value(after_transform)
1✔
1514
        global_transforms = []
1✔
1515
        for index, (before_global_transform, after_global_transform) in enumerate(
1✔
1516
            zip_longest(before_transform_normalised, after_transform_normalised, fillvalue=Nothing)
1517
        ):
1518
            global_transform_scope = scope.open_index(index=index)
1✔
1519
            global_transform: NodeGlobalTransform = self._visit_global_transform(
1✔
1520
                scope=global_transform_scope,
1521
                before_global_transform=before_global_transform,
1522
                after_global_transform=after_global_transform,
1523
            )
1524
            global_transforms.append(global_transform)
1✔
1525
        return NodeTransform(scope=scope, global_transforms=global_transforms)
1✔
1526

1527
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1528
        root_scope = Scope()
1✔
1529
        # TODO: visit other child types
1530

1531
        transform_scope, (before_transform, after_transform) = self._safe_access_in(
1✔
1532
            root_scope, TransformKey, before_template, after_template
1533
        )
1534
        transform = self._visit_transform(
1✔
1535
            scope=transform_scope,
1536
            before_transform=before_transform,
1537
            after_transform=after_transform,
1538
        )
1539

1540
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1541
            root_scope, MappingsKey, before_template, after_template
1542
        )
1543
        mappings = self._visit_mappings(
1✔
1544
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1545
        )
1546

1547
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1548
            root_scope, ParametersKey, before_template, after_template
1549
        )
1550
        parameters = self._visit_parameters(
1✔
1551
            scope=parameters_scope,
1552
            before_parameters=before_parameters,
1553
            after_parameters=after_parameters,
1554
        )
1555

1556
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1557
            root_scope, ConditionsKey, before_template, after_template
1558
        )
1559
        conditions = self._visit_conditions(
1✔
1560
            scope=conditions_scope,
1561
            before_conditions=before_conditions,
1562
            after_conditions=after_conditions,
1563
        )
1564

1565
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1566
            root_scope, ResourcesKey, before_template, after_template
1567
        )
1568
        resources = self._visit_resources(
1✔
1569
            scope=resources_scope,
1570
            before_resources=before_resources,
1571
            after_resources=after_resources,
1572
        )
1573

1574
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1575
            root_scope, OutputsKey, before_template, after_template
1576
        )
1577
        outputs = self._visit_outputs(
1✔
1578
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1579
        )
1580

1581
        return NodeTemplate(
1✔
1582
            scope=root_scope,
1583
            transform=transform,
1584
            mappings=mappings,
1585
            parameters=parameters,
1586
            conditions=conditions,
1587
            resources=resources,
1588
            outputs=outputs,
1589
        )
1590

1591
    def _retrieve_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
1592
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1593
            Scope(), ConditionsKey, self._before_template, self._after_template
1594
        )
1595
        before_conditions = before_conditions or {}
1✔
1596
        after_conditions = after_conditions or {}
1✔
1597
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
UNCOV
1598
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1599
                conditions_scope, condition_name, before_conditions, after_conditions
1600
            )
UNCOV
1601
            node_condition = self._visit_condition(
×
1602
                conditions_scope,
1603
                condition_name,
1604
                before_condition=before_condition,
1605
                after_condition=after_condition,
1606
            )
UNCOV
1607
            return node_condition
×
1608
        return Nothing
1✔
1609

1610
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
1611
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1612
            Scope(), ParametersKey, self._before_template, self._after_template
1613
        )
1614
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1615
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1616
                parameters_scope, parameter_name, before_parameters, after_parameters
1617
            )
1618
            node_parameter = self._visit_parameter(
1✔
1619
                parameter_scope,
1620
                parameter_name,
1621
                before_parameter=before_parameter,
1622
                after_parameter=after_parameter,
1623
            )
1624
            return node_parameter
1✔
1625
        return Nothing
1✔
1626

1627
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1628
        # TODO: add caching mechanism, and raise appropriate error if missing.
1629
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1630
            Scope(), MappingsKey, self._before_template, self._after_template
1631
        )
1632
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1633
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1634
                scope_mappings, mapping_name, before_mappings, after_mappings
1635
            )
1636
            node_mapping = self._visit_mapping(
1✔
1637
                scope_mapping, mapping_name, before_mapping, after_mapping
1638
            )
1639
            return node_mapping
1✔
UNCOV
1640
        raise RuntimeError()
×
1641

1642
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1643
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1644
            Scope(),
1645
            ResourcesKey,
1646
            self._before_template,
1647
            self._after_template,
1648
        )
1649
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1650
            resources_scope, resource_name, before_resources, after_resources
1651
        )
1652
        return self._visit_resource(
1✔
1653
            scope=resource_scope,
1654
            resource_name=resource_name,
1655
            before_resource=before_resource,
1656
            after_resource=after_resource,
1657
        )
1658

1659
    @staticmethod
1✔
1660
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1661
        # TODO: are intrinsic functions soft keywords?
1662
        return function_name in INTRINSIC_FUNCTIONS
1✔
1663

1664
    @staticmethod
1✔
1665
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1666
        results = []
1✔
1667
        for obj in objects:
1✔
1668
            if not isinstance(obj, (dict, NothingType)):
1✔
UNCOV
1669
                raise RuntimeError(f"Invalid definition type at '{obj}'")
×
1670
            if not isinstance(obj, NothingType):
1✔
1671
                results.append(obj.get(key, Nothing))
1✔
1672
            else:
1673
                results.append(obj)
1✔
1674
        new_scope = scope.open_scope(name=key)
1✔
1675
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1676

1677
    @staticmethod
1✔
1678
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1679
        key_set: set[str] = set()
1✔
1680
        for obj in objects:
1✔
1681
            # TODO: raise errors if not dict
1682
            if isinstance(obj, dict):
1✔
1683
                key_set.update(obj.keys())
1✔
1684
        # The keys list is sorted to increase reproducibility of the
1685
        # update graph build process or downstream logics.
1686
        keys = sorted(key_set)
1✔
1687
        return keys
1✔
1688

1689
    @staticmethod
1✔
1690
    def _name_if_intrinsic_function(value: Maybe[Any]) -> str | None:
1✔
1691
        if isinstance(value, dict):
1✔
1692
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1693
            if len(keys) == 1:
1✔
1694
                key_name = keys[0]
1✔
1695
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1696
                    return key_name
1✔
1697
        return None
1✔
1698

1699
    @staticmethod
1✔
1700
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1701
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1702
        if maybe_intrinsic_function_name is not None:
1✔
1703
            return maybe_intrinsic_function_name
1✔
1704
        return type(value).__name__
1✔
1705

1706
    @staticmethod
1✔
1707
    def _is_terminal(value: Any) -> bool:
1✔
1708
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1709

1710
    @staticmethod
1✔
1711
    def _is_object(value: Any) -> bool:
1✔
1712
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1713

1714
    @staticmethod
1✔
1715
    def _is_array(value: Any) -> bool:
1✔
1716
        return isinstance(value, list)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc