• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17086927072

19 Aug 2025 10:02PM UTC coverage: 86.889% (+0.01%) from 86.875%
17086927072

push

github

web-flow
APIGW: fix TestInvokeMethod path logic (#13030)

4 of 23 new or added lines in 1 file covered. (17.39%)

264 existing lines in 17 files now uncovered.

67018 of 77131 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.78
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model.py
1
from __future__ import annotations
1✔
2

3
import abc
1✔
4
import enum
1✔
5
from collections.abc import Generator
1✔
6
from itertools import zip_longest
1✔
7
from typing import Any, Final, TypedDict, cast
1✔
8

9
from typing_extensions import TypeVar
1✔
10

11
from localstack.aws.api.cloudformation import ChangeAction
1✔
12
from localstack.services.cloudformation.resource_provider import ResourceProviderExecutor
1✔
13
from localstack.services.cloudformation.v2.types import (
1✔
14
    EngineParameter,
15
    engine_parameter_value,
16
)
17
from localstack.utils.json import extract_jsonpath
1✔
18
from localstack.utils.strings import camel_to_snake_case
1✔
19

20
T = TypeVar("T")
1✔
21

22

23
class NothingType:
1✔
24
    """A sentinel that denotes 'no value' (distinct from None)."""
25

26
    _singleton = None
1✔
27
    __slots__ = ()
1✔
28

29
    def __new__(cls):
1✔
30
        if cls._singleton is None:
1✔
31
            cls._singleton = super().__new__(cls)
1✔
32
        return cls._singleton
1✔
33

34
    def __eq__(self, other):
1✔
35
        return is_nothing(other)
1✔
36

37
    def __str__(self):
1✔
38
        return repr(self)
×
39

40
    def __repr__(self) -> str:
41
        return "Nothing"
42

43
    def __bool__(self):
1✔
44
        return False
1✔
45

46
    def __iter__(self):
1✔
47
        return iter(())
1✔
48

49
    def __contains__(self, item):
1✔
50
        return False
1✔
51

52

53
Maybe = T | NothingType
1✔
54
Nothing = NothingType()
1✔
55

56

57
def is_nothing(value: Any) -> bool:
1✔
58
    return isinstance(value, NothingType)
1✔
59

60

61
def is_created(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
62
    return is_nothing(before) and not is_nothing(after)
1✔
63

64

65
def is_removed(before: Maybe[Any], after: Maybe[Any]) -> bool:
1✔
66
    return not is_nothing(before) and is_nothing(after)
1✔
67

68

69
def parent_change_type_of(children: list[Maybe[ChangeSetEntity]]):
1✔
70
    change_types = [c.change_type for c in children if not is_nothing(c)]
1✔
71
    if not change_types:
1✔
72
        return ChangeType.UNCHANGED
1✔
73
    # TODO: rework this logic. Currently if any values are different then we consider it
74
    #  modified, but e.g. if everything is unchanged or created, the result should probably be
75
    #  "created"
76
    first_type = change_types[0]
1✔
77
    if all(ct == first_type for ct in change_types):
1✔
78
        return first_type
1✔
79
    return ChangeType.MODIFIED
1✔
80

81

82
def change_type_of(before: Maybe[Any], after: Maybe[Any], children: list[Maybe[ChangeSetEntity]]):
1✔
83
    if is_created(before, after):
1✔
84
        change_type = ChangeType.CREATED
1✔
85
    elif is_removed(before, after):
1✔
86
        change_type = ChangeType.REMOVED
1✔
87
    else:
88
        change_type = parent_change_type_of(children)
1✔
89
    return change_type
1✔
90

91

92
class NormalisedGlobalTransformDefinition(TypedDict):
1✔
93
    Name: Any
1✔
94
    Parameters: Maybe[Any]
1✔
95

96

97
class Scope(str):
1✔
98
    _ROOT_SCOPE: Final[str] = ""
1✔
99
    _SEPARATOR: Final[str] = "/"
1✔
100

101
    def __new__(cls, scope: str = _ROOT_SCOPE) -> Scope:
1✔
102
        return cast(Scope, super().__new__(cls, scope))
1✔
103

104
    def open_scope(self, name: Scope | str) -> Scope:
1✔
105
        return Scope(self._SEPARATOR.join([self, name]))
1✔
106

107
    def open_index(self, index: int) -> Scope:
1✔
108
        return Scope(self._SEPARATOR.join([self, str(index)]))
1✔
109

110
    def unwrap(self) -> list[str]:
1✔
UNCOV
111
        return self.split(self._SEPARATOR)
×
112

113
    @property
1✔
114
    def parent(self) -> Scope:
1✔
115
        return Scope(self._SEPARATOR.join(self.split(self._SEPARATOR)[:-1]))
1✔
116

117
    @property
1✔
118
    def jsonpath(self) -> str:
1✔
119
        parts = self.split("/")
1✔
120
        json_parts = []
1✔
121

122
        for part in parts:
1✔
123
            if not part:  # Skip empty strings from leading/trailing slashes
1✔
124
                continue
1✔
125

126
            if part == "divergence":
1✔
127
                continue
1✔
128

129
            # Wrap keys with special characters (e.g., colon) in quotes
130
            if ":" in part:
1✔
UNCOV
131
                json_parts.append(f'"{part}"')
×
132
            else:
133
                json_parts.append(part)
1✔
134

135
        return f"$.{'.'.join(json_parts)}"
1✔
136

137

138
class ChangeType(enum.Enum):
1✔
139
    UNCHANGED = "Unchanged"
1✔
140
    CREATED = "Created"
1✔
141
    MODIFIED = "Modified"
1✔
142
    REMOVED = "Removed"
1✔
143

144
    def __str__(self):
1✔
UNCOV
145
        return self.value
×
146

147
    def to_change_action(self) -> ChangeAction:
1✔
148
        # Convert this change type into the change action used throughout the CFn API
UNCOV
149
        return {
×
150
            ChangeType.CREATED: ChangeAction.Add,
151
            ChangeType.MODIFIED: ChangeAction.Modify,
152
            ChangeType.REMOVED: ChangeAction.Remove,
153
        }.get(self, ChangeAction.Add)
154

155

156
class ChangeSetEntity(abc.ABC):
1✔
157
    scope: Final[Scope]
1✔
158
    change_type: ChangeType
1✔
159

160
    def __init__(self, scope: Scope, change_type: ChangeType):
1✔
161
        self.scope = scope
1✔
162
        self.change_type = change_type
1✔
163

164
    def get_children(self) -> Generator[ChangeSetEntity]:
1✔
165
        for child in self.__dict__.values():
1✔
166
            yield from self._get_children_in(child)
1✔
167

168
    @staticmethod
1✔
169
    def _get_children_in(obj: Any) -> Generator[ChangeSetEntity]:
1✔
170
        # TODO: could avoid the inductive logic here, and check for loops?
171
        if isinstance(obj, ChangeSetEntity):
1✔
172
            yield obj
1✔
173
        elif isinstance(obj, list):
1✔
174
            for item in obj:
1✔
175
                yield from ChangeSetEntity._get_children_in(item)
1✔
176
        elif isinstance(obj, dict):
1✔
UNCOV
177
            for item in obj.values():
×
178
                yield from ChangeSetEntity._get_children_in(item)
×
179

180
    def __str__(self):
1✔
UNCOV
181
        return f"({self.__class__.__name__}| {vars(self)}"
×
182

183
    def __repr__(self):
184
        return str(self)
185

186

187
class ChangeSetNode(ChangeSetEntity, abc.ABC): ...
1✔
188

189

190
class ChangeSetTerminal(ChangeSetEntity, abc.ABC): ...
1✔
191

192

193
class UpdateModel:
1✔
194
    # TODO: may be expanded to keep track of other runtime values such as resolved_parameters.
195

196
    node_template: Final[NodeTemplate]
1✔
197
    before_runtime_cache: Final[dict]
1✔
198
    after_runtime_cache: Final[dict]
1✔
199

200
    def __init__(
1✔
201
        self,
202
        node_template: NodeTemplate,
203
    ):
204
        self.node_template = node_template
1✔
205
        self.before_runtime_cache = {}
1✔
206
        self.after_runtime_cache = {}
1✔
207

208

209
class NodeTemplate(ChangeSetNode):
1✔
210
    transform: Final[NodeTransform]
1✔
211
    mappings: Final[NodeMappings]
1✔
212
    parameters: Final[NodeParameters]
1✔
213
    conditions: Final[NodeConditions]
1✔
214
    resources: Final[NodeResources]
1✔
215
    outputs: Final[NodeOutputs]
1✔
216

217
    def __init__(
1✔
218
        self,
219
        scope: Scope,
220
        transform: NodeTransform,
221
        mappings: NodeMappings,
222
        parameters: NodeParameters,
223
        conditions: NodeConditions,
224
        resources: NodeResources,
225
        outputs: NodeOutputs,
226
    ):
227
        change_type = parent_change_type_of(
1✔
228
            [transform, mappings, parameters, conditions, resources, outputs]
229
        )
230
        super().__init__(scope=scope, change_type=change_type)
1✔
231
        self.transform = transform
1✔
232
        self.mappings = mappings
1✔
233
        self.parameters = parameters
1✔
234
        self.conditions = conditions
1✔
235
        self.resources = resources
1✔
236
        self.outputs = outputs
1✔
237

238

239
class NodeDivergence(ChangeSetNode):
1✔
240
    value: Final[ChangeSetEntity]
1✔
241
    divergence: Final[ChangeSetEntity]
1✔
242

243
    def __init__(self, scope: Scope, value: ChangeSetEntity, divergence: ChangeSetEntity):
1✔
244
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED)
1✔
245
        self.value = value
1✔
246
        self.divergence = divergence
1✔
247

248

249
class NodeParameter(ChangeSetNode):
1✔
250
    name: Final[str]
1✔
251
    type_: Final[ChangeSetEntity]
1✔
252
    dynamic_value: Final[ChangeSetEntity]
1✔
253
    default_value: Final[Maybe[ChangeSetEntity]]
1✔
254

255
    def __init__(
1✔
256
        self,
257
        scope: Scope,
258
        name: str,
259
        type_: ChangeSetEntity,
260
        dynamic_value: ChangeSetEntity,
261
        default_value: Maybe[ChangeSetEntity],
262
    ):
263
        change_type = parent_change_type_of([type_, default_value, dynamic_value])
1✔
264
        super().__init__(scope=scope, change_type=change_type)
1✔
265
        self.name = name
1✔
266
        self.type_ = type_
1✔
267
        self.dynamic_value = dynamic_value
1✔
268
        self.default_value = default_value
1✔
269

270

271
class NodeParameters(ChangeSetNode):
1✔
272
    parameters: Final[list[NodeParameter]]
1✔
273

274
    def __init__(self, scope: Scope, parameters: list[NodeParameter]):
1✔
275
        change_type = parent_change_type_of(parameters)
1✔
276
        super().__init__(scope=scope, change_type=change_type)
1✔
277
        self.parameters = parameters
1✔
278

279

280
class NodeMapping(ChangeSetNode):
1✔
281
    name: Final[str]
1✔
282
    bindings: Final[NodeObject]
1✔
283

284
    def __init__(self, scope: Scope, name: str, bindings: NodeObject):
1✔
285
        super().__init__(scope=scope, change_type=bindings.change_type)
1✔
286
        self.name = name
1✔
287
        self.bindings = bindings
1✔
288

289

290
class NodeMappings(ChangeSetNode):
1✔
291
    mappings: Final[list[NodeMapping]]
1✔
292

293
    def __init__(self, scope: Scope, mappings: list[NodeMapping]):
1✔
294
        change_type = parent_change_type_of(mappings)
1✔
295
        super().__init__(scope=scope, change_type=change_type)
1✔
296
        self.mappings = mappings
1✔
297

298

299
class NodeOutput(ChangeSetNode):
1✔
300
    name: Final[str]
1✔
301
    value: Final[ChangeSetEntity]
1✔
302
    export: Final[Maybe[ChangeSetEntity]]
1✔
303
    condition_reference: Final[Maybe[TerminalValue]]
1✔
304

305
    def __init__(
1✔
306
        self,
307
        scope: Scope,
308
        name: str,
309
        value: ChangeSetEntity,
310
        export: Maybe[ChangeSetEntity],
311
        conditional_reference: Maybe[TerminalValue],
312
    ):
313
        change_type = parent_change_type_of([value, export, conditional_reference])
1✔
314
        super().__init__(scope=scope, change_type=change_type)
1✔
315
        self.name = name
1✔
316
        self.value = value
1✔
317
        self.export = export
1✔
318
        self.condition_reference = conditional_reference
1✔
319

320

321
class NodeOutputs(ChangeSetNode):
1✔
322
    outputs: Final[list[NodeOutput]]
1✔
323

324
    def __init__(self, scope: Scope, outputs: list[NodeOutput]):
1✔
325
        change_type = parent_change_type_of(outputs)
1✔
326
        super().__init__(scope=scope, change_type=change_type)
1✔
327
        self.outputs = outputs
1✔
328

329

330
class NodeCondition(ChangeSetNode):
1✔
331
    name: Final[str]
1✔
332
    body: Final[ChangeSetEntity]
1✔
333

334
    def __init__(self, scope: Scope, name: str, body: ChangeSetEntity):
1✔
335
        super().__init__(scope=scope, change_type=body.change_type)
1✔
336
        self.name = name
1✔
337
        self.body = body
1✔
338

339

340
class NodeConditions(ChangeSetNode):
1✔
341
    conditions: Final[list[NodeCondition]]
1✔
342

343
    def __init__(self, scope: Scope, conditions: list[NodeCondition]):
1✔
344
        change_type = parent_change_type_of(conditions)
1✔
345
        super().__init__(scope=scope, change_type=change_type)
1✔
346
        self.conditions = conditions
1✔
347

348

349
class NodeGlobalTransform(ChangeSetNode):
1✔
350
    name: Final[TerminalValue]
1✔
351
    parameters: Final[Maybe[ChangeSetEntity]]
1✔
352

353
    def __init__(self, scope: Scope, name: TerminalValue, parameters: Maybe[ChangeSetEntity]):
1✔
354
        if not is_nothing(parameters):
1✔
355
            change_type = parent_change_type_of([name, parameters])
1✔
356
        else:
UNCOV
357
            change_type = name.change_type
×
358
        super().__init__(scope=scope, change_type=change_type)
1✔
359
        self.name = name
1✔
360
        self.parameters = parameters
1✔
361

362

363
class NodeTransform(ChangeSetNode):
1✔
364
    global_transforms: Final[list[NodeGlobalTransform]]
1✔
365

366
    def __init__(self, scope: Scope, global_transforms: list[NodeGlobalTransform]):
1✔
367
        change_type = parent_change_type_of(global_transforms)
1✔
368
        super().__init__(scope=scope, change_type=change_type)
1✔
369
        self.global_transforms = global_transforms
1✔
370

371

372
class NodeResources(ChangeSetNode):
1✔
373
    resources: Final[list[NodeResource]]
1✔
374
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
375

376
    def __init__(
1✔
377
        self,
378
        scope: Scope,
379
        resources: list[NodeResource],
380
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
381
    ):
382
        change_type = parent_change_type_of(resources)
1✔
383
        super().__init__(scope=scope, change_type=change_type)
1✔
384
        self.resources = resources
1✔
385
        self.fn_transform = fn_transform
1✔
386

387

388
class NodeResource(ChangeSetNode):
1✔
389
    name: Final[str]
1✔
390
    type_: Final[ChangeSetTerminal]
1✔
391
    properties: Final[NodeProperties]
1✔
392
    condition_reference: Final[Maybe[TerminalValue]]
1✔
393
    depends_on: Final[Maybe[NodeDependsOn]]
1✔
394
    requires_replacement: Final[bool]
1✔
395
    deletion_policy: Final[Maybe[ChangeSetTerminal]]
1✔
396
    update_replace_policy: Final[Maybe[ChangeSetTerminal]]
1✔
397
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
398

399
    def __init__(
1✔
400
        self,
401
        scope: Scope,
402
        change_type: ChangeType,
403
        name: str,
404
        type_: ChangeSetTerminal,
405
        properties: NodeProperties,
406
        condition_reference: Maybe[TerminalValue],
407
        depends_on: Maybe[NodeDependsOn],
408
        requires_replacement: bool,
409
        deletion_policy: Maybe[ChangeSetTerminal],
410
        update_replace_policy: Maybe[ChangeSetTerminal],
411
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
412
    ):
413
        super().__init__(scope=scope, change_type=change_type)
1✔
414
        self.name = name
1✔
415
        self.type_ = type_
1✔
416
        self.properties = properties
1✔
417
        self.condition_reference = condition_reference
1✔
418
        self.depends_on = depends_on
1✔
419
        self.requires_replacement = requires_replacement
1✔
420
        self.deletion_policy = deletion_policy
1✔
421
        self.update_replace_policy = update_replace_policy
1✔
422
        self.fn_transform = fn_transform
1✔
423

424

425
class NodeProperties(ChangeSetNode):
1✔
426
    properties: Final[list[NodeProperty]]
1✔
427
    fn_transform: Final[Maybe[NodeIntrinsicFunctionFnTransform]]
1✔
428

429
    def __init__(
1✔
430
        self,
431
        scope: Scope,
432
        properties: list[NodeProperty],
433
        fn_transform: Maybe[NodeIntrinsicFunctionFnTransform],
434
    ):
435
        change_type = parent_change_type_of(properties)
1✔
436
        super().__init__(scope=scope, change_type=change_type)
1✔
437
        self.properties = properties
1✔
438
        self.fn_transform = fn_transform
1✔
439

440

441
class NodeDependsOn(ChangeSetNode):
1✔
442
    depends_on: Final[NodeArray]
1✔
443

444
    def __init__(self, scope: Scope, depends_on: NodeArray):
1✔
445
        super().__init__(scope=scope, change_type=depends_on.change_type)
1✔
446
        self.depends_on = depends_on
1✔
447

448

449
class NodeProperty(ChangeSetNode):
1✔
450
    name: Final[str]
1✔
451
    value: Final[ChangeSetEntity]
1✔
452

453
    def __init__(self, scope: Scope, name: str, value: ChangeSetEntity):
1✔
454
        super().__init__(scope=scope, change_type=value.change_type)
1✔
455
        self.name = name
1✔
456
        self.value = value
1✔
457

458

459
class NodeIntrinsicFunction(ChangeSetNode):
1✔
460
    intrinsic_function: Final[str]
1✔
461
    arguments: Final[ChangeSetEntity]
1✔
462

463
    def __init__(
1✔
464
        self,
465
        scope: Scope,
466
        change_type: ChangeType,
467
        intrinsic_function: str,
468
        arguments: ChangeSetEntity,
469
    ):
470
        super().__init__(scope=scope, change_type=change_type)
1✔
471
        self.intrinsic_function = intrinsic_function
1✔
472
        self.arguments = arguments
1✔
473

474

475
class NodeIntrinsicFunctionFnTransform(NodeIntrinsicFunction):
1✔
476
    def __init__(
1✔
477
        self,
478
        scope: Scope,
479
        change_type: ChangeType,
480
        intrinsic_function: str,
481
        arguments: ChangeSetEntity,
482
        before_siblings: list[Any],
483
        after_siblings: list[Any],
484
    ):
485
        super().__init__(
1✔
486
            scope=scope,
487
            change_type=change_type,
488
            intrinsic_function=intrinsic_function,
489
            arguments=arguments,
490
        )
491
        self.before_siblings = before_siblings
1✔
492
        self.after_siblings = after_siblings
1✔
493

494

495
class NodeObject(ChangeSetNode):
1✔
496
    bindings: Final[dict[str, ChangeSetEntity]]
1✔
497

498
    def __init__(self, scope: Scope, change_type: ChangeType, bindings: dict[str, ChangeSetEntity]):
1✔
499
        super().__init__(scope=scope, change_type=change_type)
1✔
500
        self.bindings = bindings
1✔
501

502

503
class NodeArray(ChangeSetNode):
1✔
504
    array: Final[list[ChangeSetEntity]]
1✔
505

506
    def __init__(self, scope: Scope, change_type: ChangeType, array: list[ChangeSetEntity]):
1✔
507
        super().__init__(scope=scope, change_type=change_type)
1✔
508
        self.array = array
1✔
509

510

511
class TerminalValue(ChangeSetTerminal, abc.ABC):
1✔
512
    value: Final[Any]
1✔
513

514
    def __init__(self, scope: Scope, change_type: ChangeType, value: Any):
1✔
515
        super().__init__(scope=scope, change_type=change_type)
1✔
516
        self.value = value
1✔
517

518

519
class TerminalValueModified(TerminalValue):
1✔
520
    modified_value: Final[Any]
1✔
521

522
    def __init__(self, scope: Scope, value: Any, modified_value: Any):
1✔
523
        super().__init__(scope=scope, change_type=ChangeType.MODIFIED, value=value)
1✔
524
        self.modified_value = modified_value
1✔
525

526

527
class TerminalValueCreated(TerminalValue):
1✔
528
    def __init__(self, scope: Scope, value: Any):
1✔
529
        super().__init__(scope=scope, change_type=ChangeType.CREATED, value=value)
1✔
530

531

532
class TerminalValueRemoved(TerminalValue):
1✔
533
    def __init__(self, scope: Scope, value: Any):
1✔
534
        super().__init__(scope=scope, change_type=ChangeType.REMOVED, value=value)
1✔
535

536

537
class TerminalValueUnchanged(TerminalValue):
1✔
538
    def __init__(self, scope: Scope, value: Any):
1✔
539
        super().__init__(scope=scope, change_type=ChangeType.UNCHANGED, value=value)
1✔
540

541

542
NameKey: Final[str] = "Name"
1✔
543
TransformKey: Final[str] = "Transform"
1✔
544
TypeKey: Final[str] = "Type"
1✔
545
ConditionKey: Final[str] = "Condition"
1✔
546
ConditionsKey: Final[str] = "Conditions"
1✔
547
MappingsKey: Final[str] = "Mappings"
1✔
548
ResourcesKey: Final[str] = "Resources"
1✔
549
PropertiesKey: Final[str] = "Properties"
1✔
550
ParametersKey: Final[str] = "Parameters"
1✔
551
DefaultKey: Final[str] = "Default"
1✔
552
ValueKey: Final[str] = "Value"
1✔
553
ExportKey: Final[str] = "Export"
1✔
554
OutputsKey: Final[str] = "Outputs"
1✔
555
DependsOnKey: Final[str] = "DependsOn"
1✔
556
DeletionPolicyKey: Final[str] = "DeletionPolicy"
1✔
557
UpdateReplacePolicyKey: Final[str] = "UpdateReplacePolicy"
1✔
558
# TODO: expand intrinsic functions set.
559
RefKey: Final[str] = "Ref"
1✔
560
RefConditionKey: Final[str] = "Condition"
1✔
561
FnIfKey: Final[str] = "Fn::If"
1✔
562
FnAnd: Final[str] = "Fn::And"
1✔
563
FnOr: Final[str] = "Fn::Or"
1✔
564
FnNotKey: Final[str] = "Fn::Not"
1✔
565
FnJoinKey: Final[str] = "Fn::Join"
1✔
566
FnGetAttKey: Final[str] = "Fn::GetAtt"
1✔
567
FnEqualsKey: Final[str] = "Fn::Equals"
1✔
568
FnFindInMapKey: Final[str] = "Fn::FindInMap"
1✔
569
FnSubKey: Final[str] = "Fn::Sub"
1✔
570
FnTransform: Final[str] = "Fn::Transform"
1✔
571
FnSelect: Final[str] = "Fn::Select"
1✔
572
FnSplit: Final[str] = "Fn::Split"
1✔
573
FnGetAZs: Final[str] = "Fn::GetAZs"
1✔
574
FnBase64: Final[str] = "Fn::Base64"
1✔
575
FnImportValue: Final[str] = "Fn::ImportValue"
1✔
576
INTRINSIC_FUNCTIONS: Final[set[str]] = {
1✔
577
    RefKey,
578
    RefConditionKey,
579
    FnIfKey,
580
    FnAnd,
581
    FnOr,
582
    FnNotKey,
583
    FnJoinKey,
584
    FnEqualsKey,
585
    FnGetAttKey,
586
    FnFindInMapKey,
587
    FnSubKey,
588
    FnTransform,
589
    FnSelect,
590
    FnSplit,
591
    FnGetAZs,
592
    FnBase64,
593
    FnImportValue,
594
}
595

596

597
class ChangeSetModel:
1✔
598
    # TODO: should this instead be generalised to work on "Stack" objects instead of just "Template"s?
599

600
    # TODO: can probably improve the typehints to use CFN's 'language' eg. dict -> Template|Properties, etc.
601

602
    # TODO: add support for 'replacement' computation, and ensure this state is propagated in tree traversals
603
    #  such as intrinsic functions.
604

605
    _before_template: Final[Maybe[dict]]
1✔
606
    _after_template: Final[Maybe[dict]]
1✔
607
    _before_parameters: Final[Maybe[dict]]
1✔
608
    _after_parameters: Final[Maybe[dict]]
1✔
609
    _visited_scopes: Final[dict[str, ChangeSetEntity]]
1✔
610
    _node_template: Final[NodeTemplate]
1✔
611

612
    def __init__(
1✔
613
        self,
614
        before_template: dict | None,
615
        after_template: dict | None,
616
        before_parameters: dict | None,
617
        after_parameters: dict[str, EngineParameter] | None,
618
    ):
619
        self._before_template = before_template or Nothing
1✔
620
        self._after_template = after_template or Nothing
1✔
621
        self._before_parameters = before_parameters or Nothing
1✔
622
        self._after_parameters = after_parameters or Nothing
1✔
623
        self._visited_scopes = {}
1✔
624
        # TODO: move this modeling process to the `get_update_model` method as constructors shouldn't do work
625
        self._node_template = self._model(
1✔
626
            before_template=self._before_template, after_template=self._after_template
627
        )
628
        # TODO: need to do template preprocessing e.g. parameter resolution, conditions etc.
629

630
    def get_update_model(self) -> UpdateModel:
1✔
631
        return UpdateModel(node_template=self._node_template)
1✔
632

633
    def _visit_terminal_value(
1✔
634
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
635
    ) -> TerminalValue:
636
        terminal_value = self._visited_scopes.get(scope)
1✔
637
        if isinstance(terminal_value, TerminalValue):
1✔
UNCOV
638
            return terminal_value
×
639
        if is_created(before=before_value, after=after_value):
1✔
640
            terminal_value = TerminalValueCreated(scope=scope, value=after_value)
1✔
641
        elif is_removed(before=before_value, after=after_value):
1✔
642
            terminal_value = TerminalValueRemoved(scope=scope, value=before_value)
1✔
643
        elif before_value == after_value:
1✔
644
            terminal_value = TerminalValueUnchanged(scope=scope, value=before_value)
1✔
645
        else:
646
            terminal_value = TerminalValueModified(
1✔
647
                scope=scope, value=before_value, modified_value=after_value
648
            )
649
        self._visited_scopes[scope] = terminal_value
1✔
650
        return terminal_value
1✔
651

652
    def _visit_intrinsic_function(
1✔
653
        self,
654
        scope: Scope,
655
        intrinsic_function: str,
656
        before_arguments: Maybe[Any],
657
        after_arguments: Maybe[Any],
658
    ) -> NodeIntrinsicFunction:
659
        node_intrinsic_function = self._visited_scopes.get(scope)
1✔
660
        if isinstance(node_intrinsic_function, NodeIntrinsicFunction):
1✔
UNCOV
661
            return node_intrinsic_function
×
662
        arguments_scope = scope.open_scope("args")
1✔
663
        arguments = self._visit_value(
1✔
664
            scope=arguments_scope, before_value=before_arguments, after_value=after_arguments
665
        )
666

667
        if intrinsic_function == "Ref" and arguments.value == "AWS::NoValue":
1✔
668
            arguments.value = Nothing
1✔
669

670
        if is_created(before=before_arguments, after=after_arguments):
1✔
671
            change_type = ChangeType.CREATED
1✔
672
        elif is_removed(before=before_arguments, after=after_arguments):
1✔
673
            change_type = ChangeType.REMOVED
1✔
674
        else:
675
            function_name = intrinsic_function.replace("::", "_")
1✔
676
            function_name = camel_to_snake_case(function_name)
1✔
677
            resolve_function_name = f"_resolve_intrinsic_function_{function_name}"
1✔
678
            if hasattr(self, resolve_function_name):
1✔
679
                resolve_function = getattr(self, resolve_function_name)
1✔
680
                change_type = resolve_function(arguments)
1✔
681
            else:
682
                change_type = arguments.change_type
1✔
683

684
        if intrinsic_function == FnTransform:
1✔
685
            if scope.count(FnTransform) > 1:
1✔
UNCOV
686
                raise RuntimeError(
×
687
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
688
                )
689

690
            path = "$" + ".".join(scope.split("/")[:-1])
1✔
691
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
692
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
693

694
            node_intrinsic_function = NodeIntrinsicFunctionFnTransform(
1✔
695
                scope=scope,
696
                change_type=change_type,
697
                arguments=arguments,
698
                intrinsic_function=intrinsic_function,
699
                before_siblings=before_siblings,
700
                after_siblings=after_siblings,
701
            )
702
        else:
703
            node_intrinsic_function = NodeIntrinsicFunction(
1✔
704
                scope=scope,
705
                change_type=change_type,
706
                intrinsic_function=intrinsic_function,
707
                arguments=arguments,
708
            )
709
        self._visited_scopes[scope] = node_intrinsic_function
1✔
710
        return node_intrinsic_function
1✔
711

712
    def _resolve_intrinsic_function_fn_sub(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
713
        # TODO: This routine should instead export the implicit Ref and GetAtt calls within the first
714
        #       string template parameter and compute the respective change set types. Currently,
715
        #       changes referenced by Fn::Sub templates are only picked up during preprocessing; not
716
        #       at modelling.
717
        return arguments.change_type
1✔
718

719
    def _resolve_intrinsic_function_fn_get_att(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
720
        # TODO: add support for nested intrinsic functions.
721
        # TODO: validate arguments structure and type.
722
        # TODO: should this check for deletion of resources and/or properties, if so what error should be raised?
723

724
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
725
            raise RuntimeError()
×
726
        logical_name_of_resource_entity = arguments.array[0]
1✔
727
        if not isinstance(logical_name_of_resource_entity, TerminalValue):
1✔
728
            raise RuntimeError()
×
729
        logical_name_of_resource: str = logical_name_of_resource_entity.value
1✔
730
        if not isinstance(logical_name_of_resource, str):
1✔
UNCOV
731
            raise RuntimeError()
×
732
        node_resource: NodeResource = self._retrieve_or_visit_resource(
1✔
733
            resource_name=logical_name_of_resource
734
        )
735

736
        node_property_attribute_name = arguments.array[1]
1✔
737
        if not isinstance(node_property_attribute_name, TerminalValue):
1✔
UNCOV
738
            raise RuntimeError()
×
739
        if isinstance(node_property_attribute_name, TerminalValueModified):
1✔
UNCOV
740
            attribute_name = node_property_attribute_name.modified_value
×
741
        else:
742
            attribute_name = node_property_attribute_name.value
1✔
743

744
        # TODO: this is another use case for which properties should be referenced by name
745
        for node_property in node_resource.properties.properties:
1✔
746
            if node_property.name == attribute_name:
1✔
747
                return node_property.change_type
1✔
748

749
        return ChangeType.UNCHANGED
1✔
750

751
    def _resolve_intrinsic_function_ref(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
752
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
UNCOV
753
            return arguments.change_type
×
754
        if not isinstance(arguments, TerminalValue):
1✔
UNCOV
755
            return arguments.change_type
×
756

757
        logical_id = arguments.value
1✔
758

759
        node_condition = self._retrieve_condition_if_exists(condition_name=logical_id)
1✔
760
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
761
            return node_condition.change_type
×
762

763
        node_parameter = self._retrieve_parameter_if_exists(parameter_name=logical_id)
1✔
764
        if isinstance(node_parameter, NodeParameter):
1✔
765
            return node_parameter.change_type
1✔
766

767
        # TODO: this should check the replacement flag for a resource update.
768
        node_resource = self._retrieve_or_visit_resource(resource_name=logical_id)
1✔
769
        return node_resource.change_type
1✔
770

771
    def _resolve_intrinsic_function_condition(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
772
        if arguments.change_type != ChangeType.UNCHANGED:
×
UNCOV
773
            return arguments.change_type
×
774
        if not isinstance(arguments, TerminalValue):
×
775
            return arguments.change_type
×
776

777
        condition_name = arguments.value
×
778
        node_condition = self._retrieve_condition_if_exists(condition_name=condition_name)
×
UNCOV
779
        if isinstance(node_condition, NodeCondition):
×
UNCOV
780
            return node_condition.change_type
×
UNCOV
781
        raise RuntimeError(f"Undefined condition '{condition_name}'")
×
782

783
    def _resolve_intrinsic_function_fn_find_in_map(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
784
        if arguments.change_type != ChangeType.UNCHANGED:
1✔
785
            return arguments.change_type
1✔
786
        # TODO: validate arguments structure and type.
787
        # TODO: add support for nested functions, here we assume the arguments are string literals.
788

789
        if not isinstance(arguments, NodeArray) or not arguments.array:
1✔
UNCOV
790
            raise RuntimeError()
×
791
        argument_mapping_name = arguments.array[0]
1✔
792
        if not isinstance(argument_mapping_name, TerminalValue):
1✔
793
            raise NotImplementedError()
794
        argument_top_level_key = arguments.array[1]
1✔
795
        if not isinstance(argument_top_level_key, TerminalValue):
1✔
796
            raise NotImplementedError()
797
        argument_second_level_key = arguments.array[2]
1✔
798
        if not isinstance(argument_second_level_key, TerminalValue):
1✔
799
            raise NotImplementedError()
800
        mapping_name = argument_mapping_name.value
1✔
801
        top_level_key = argument_top_level_key.value
1✔
802
        second_level_key = argument_second_level_key.value
1✔
803

804
        node_mapping = self._retrieve_mapping(mapping_name=mapping_name)
1✔
805
        # TODO: a lookup would be beneficial in this scenario too;
806
        #  consider implications downstream and for replication.
807
        top_level_object = node_mapping.bindings.bindings.get(top_level_key)
1✔
808
        if not isinstance(top_level_object, NodeObject):
1✔
UNCOV
809
            raise RuntimeError()
×
810
        target_map_value = top_level_object.bindings.get(second_level_key)
1✔
811
        return target_map_value.change_type
1✔
812

813
    def _resolve_intrinsic_function_fn_if(self, arguments: ChangeSetEntity) -> ChangeType:
1✔
814
        # TODO: validate arguments structure and type.
815
        if not isinstance(arguments, NodeArray) or not arguments.array:
×
816
            raise RuntimeError()
×
817
        logical_name_of_condition_entity = arguments.array[0]
×
818
        if not isinstance(logical_name_of_condition_entity, TerminalValue):
×
819
            raise RuntimeError()
×
UNCOV
820
        logical_name_of_condition: str = logical_name_of_condition_entity.value
×
821
        if not isinstance(logical_name_of_condition, str):
×
UNCOV
822
            raise RuntimeError()
×
823

824
        node_condition = self._retrieve_condition_if_exists(
×
825
            condition_name=logical_name_of_condition
826
        )
827
        if not isinstance(node_condition, NodeCondition):
×
UNCOV
828
            raise RuntimeError()
×
UNCOV
829
        change_type = parent_change_type_of([node_condition, *arguments.array[1:]])
×
UNCOV
830
        return change_type
×
831

832
    def _resolve_requires_replacement(
1✔
833
        self, node_properties: NodeProperties, resource_type: TerminalValue
834
    ) -> bool:
835
        # a bit hacky but we have to load the resource provider executor _and_ resource provider to get the schema
836
        # Note: we don't log the attempt to load the resource provider, we need to make sure this is only done once and we already do this in the executor
837
        resource_provider = ResourceProviderExecutor.try_load_resource_provider(resource_type.value)
1✔
838
        if not resource_provider:
1✔
839
            # if we don't support a resource, assume an in-place update for simplicity
840
            return False
1✔
841

842
        create_only_properties: list[str] = resource_provider.SCHEMA.get("createOnlyProperties", [])
1✔
843
        # TODO: also hacky: strip the leading `/properties/` string from the definition
844
        #       ideally we should use a jsonpath or similar
845
        create_only_properties = [
1✔
846
            property.replace("/properties/", "", 1) for property in create_only_properties
847
        ]
848
        for node_property in node_properties.properties:
1✔
849
            if (
1✔
850
                node_property.change_type == ChangeType.MODIFIED
851
                and node_property.name in create_only_properties
852
            ):
853
                return True
1✔
854
        return False
1✔
855

856
    def _visit_array(
1✔
857
        self, scope: Scope, before_array: Maybe[list], after_array: Maybe[list]
858
    ) -> NodeArray:
859
        array: list[ChangeSetEntity] = []
1✔
860
        for index, (before_value, after_value) in enumerate(
1✔
861
            zip_longest(before_array, after_array, fillvalue=Nothing)
862
        ):
863
            value_scope = scope.open_index(index=index)
1✔
864
            value = self._visit_value(
1✔
865
                scope=value_scope, before_value=before_value, after_value=after_value
866
            )
867
            array.append(value)
1✔
868
        change_type = change_type_of(before_array, after_array, array)
1✔
869
        return NodeArray(scope=scope, change_type=change_type, array=array)
1✔
870

871
    def _visit_object(
1✔
872
        self, scope: Scope, before_object: Maybe[dict], after_object: Maybe[dict]
873
    ) -> NodeObject:
874
        node_object = self._visited_scopes.get(scope)
1✔
875
        if isinstance(node_object, NodeObject):
1✔
876
            return node_object
1✔
877
        binding_names = self._safe_keys_of(before_object, after_object)
1✔
878
        bindings: dict[str, ChangeSetEntity] = {}
1✔
879
        for binding_name in binding_names:
1✔
880
            binding_scope, (before_value, after_value) = self._safe_access_in(
1✔
881
                scope, binding_name, before_object, after_object
882
            )
883
            value = self._visit_value(
1✔
884
                scope=binding_scope, before_value=before_value, after_value=after_value
885
            )
886
            bindings[binding_name] = value
1✔
887
        change_type = change_type_of(before_object, after_object, list(bindings.values()))
1✔
888
        node_object = NodeObject(scope=scope, change_type=change_type, bindings=bindings)
1✔
889
        self._visited_scopes[scope] = node_object
1✔
890
        return node_object
1✔
891

892
    def _visit_divergence(
1✔
893
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
894
    ) -> NodeDivergence:
895
        scope_value = scope.open_scope("value")
1✔
896
        value = self._visit_value(scope=scope_value, before_value=before_value, after_value=Nothing)
1✔
897
        scope_divergence = scope.open_scope("divergence")
1✔
898
        divergence = self._visit_value(
1✔
899
            scope=scope_divergence, before_value=Nothing, after_value=after_value
900
        )
901
        return NodeDivergence(scope=scope, value=value, divergence=divergence)
1✔
902

903
    def _visit_value(
1✔
904
        self, scope: Scope, before_value: Maybe[Any], after_value: Maybe[Any]
905
    ) -> ChangeSetEntity:
906
        value = self._visited_scopes.get(scope)
1✔
907
        if isinstance(value, ChangeSetEntity):
1✔
UNCOV
908
            return value
×
909

910
        before_type_name = self._type_name_of(before_value)
1✔
911
        after_type_name = self._type_name_of(after_value)
1✔
912
        unset = object()
1✔
913
        if before_type_name == after_type_name:
1✔
914
            dominant_value = before_value
1✔
915
        elif is_created(before=before_value, after=after_value):
1✔
916
            dominant_value = after_value
1✔
917
        elif is_removed(before=before_value, after=after_value):
1✔
918
            dominant_value = before_value
1✔
919
        else:
920
            dominant_value = unset
1✔
921
        if dominant_value is not unset:
1✔
922
            dominant_type_name = self._type_name_of(dominant_value)
1✔
923
            if self._is_terminal(value=dominant_value):
1✔
924
                value = self._visit_terminal_value(
1✔
925
                    scope=scope, before_value=before_value, after_value=after_value
926
                )
927
            elif self._is_object(value=dominant_value):
1✔
928
                value = self._visit_object(
1✔
929
                    scope=scope, before_object=before_value, after_object=after_value
930
                )
931
            elif self._is_array(value=dominant_value):
1✔
932
                value = self._visit_array(
1✔
933
                    scope=scope, before_array=before_value, after_array=after_value
934
                )
935
            elif self._is_intrinsic_function_name(dominant_type_name):
1✔
936
                intrinsic_function_scope, (before_arguments, after_arguments) = (
1✔
937
                    self._safe_access_in(scope, dominant_type_name, before_value, after_value)
938
                )
939
                value = self._visit_intrinsic_function(
1✔
940
                    scope=intrinsic_function_scope,
941
                    intrinsic_function=dominant_type_name,
942
                    before_arguments=before_arguments,
943
                    after_arguments=after_arguments,
944
                )
945
            else:
UNCOV
946
                raise RuntimeError(f"Unsupported type {type(dominant_value)}")
×
947
        # Case: type divergence.
948
        else:
949
            value = self._visit_divergence(
1✔
950
                scope=scope, before_value=before_value, after_value=after_value
951
            )
952
        self._visited_scopes[scope] = value
1✔
953
        return value
1✔
954

955
    def _visit_property(
1✔
956
        self,
957
        scope: Scope,
958
        property_name: str,
959
        before_property: Maybe[Any],
960
        after_property: Maybe[Any],
961
    ) -> NodeProperty:
962
        node_property = self._visited_scopes.get(scope)
1✔
963
        if isinstance(node_property, NodeProperty):
1✔
UNCOV
964
            return node_property
×
965
        # TODO: Review the use of Fn::Transform as resource properties.
966
        value = self._visit_value(
1✔
967
            scope=scope, before_value=before_property, after_value=after_property
968
        )
969
        node_property = NodeProperty(scope=scope, name=property_name, value=value)
1✔
970
        self._visited_scopes[scope] = node_property
1✔
971
        return node_property
1✔
972

973
    def _visit_properties(
1✔
974
        self, scope: Scope, before_properties: Maybe[dict], after_properties: Maybe[dict]
975
    ) -> NodeProperties:
976
        node_properties = self._visited_scopes.get(scope)
1✔
977
        if isinstance(node_properties, NodeProperties):
1✔
UNCOV
978
            return node_properties
×
979
        property_names: list[str] = self._safe_keys_of(before_properties, after_properties)
1✔
980
        properties: list[NodeProperty] = []
1✔
981
        fn_transform = Nothing
1✔
982

983
        for property_name in property_names:
1✔
984
            property_scope, (before_property, after_property) = self._safe_access_in(
1✔
985
                scope, property_name, before_properties, after_properties
986
            )
987
            if property_name == FnTransform:
1✔
988
                fn_transform = self._visit_intrinsic_function(
1✔
989
                    property_scope, FnTransform, before_property, after_property
990
                )
991
                continue
1✔
992

993
            property_ = self._visit_property(
1✔
994
                scope=property_scope,
995
                property_name=property_name,
996
                before_property=before_property,
997
                after_property=after_property,
998
            )
999
            properties.append(property_)
1✔
1000

1001
        node_properties = NodeProperties(
1✔
1002
            scope=scope, properties=properties, fn_transform=fn_transform
1003
        )
1004
        self._visited_scopes[scope] = node_properties
1✔
1005
        return node_properties
1✔
1006

1007
    def _visit_type(self, scope: Scope, before_type: Any, after_type: Any) -> TerminalValue:
1✔
1008
        value = self._visit_value(scope=scope, before_value=before_type, after_value=after_type)
1✔
1009
        if not isinstance(value, TerminalValue):
1✔
1010
            # TODO: decide where template schema validation should occur.
UNCOV
1011
            raise RuntimeError()
×
1012
        return value
1✔
1013

1014
    def _visit_deletion_policy(
1✔
1015
        self, scope: Scope, before_deletion_policy: Any, after_deletion_policy: Any
1016
    ) -> TerminalValue:
1017
        value = self._visit_value(
1✔
1018
            scope=scope, before_value=before_deletion_policy, after_value=after_deletion_policy
1019
        )
1020
        if not isinstance(value, TerminalValue):
1✔
1021
            # TODO: decide where template schema validation should occur.
UNCOV
1022
            raise RuntimeError()
×
1023
        return value
1✔
1024

1025
    def _visit_update_replace_policy(
1✔
1026
        self, scope: Scope, before_update_replace_policy: Any, after_deletion_policy: Any
1027
    ) -> TerminalValue:
1028
        value = self._visit_value(
1✔
1029
            scope=scope,
1030
            before_value=before_update_replace_policy,
1031
            after_value=after_deletion_policy,
1032
        )
1033
        if not isinstance(value, TerminalValue):
1✔
1034
            # TODO: decide where template schema validation should occur.
UNCOV
1035
            raise RuntimeError()
×
1036
        return value
1✔
1037

1038
    def _visit_resource(
1✔
1039
        self,
1040
        scope: Scope,
1041
        resource_name: str,
1042
        before_resource: Maybe[dict],
1043
        after_resource: Maybe[dict],
1044
    ) -> NodeResource:
1045
        node_resource = self._visited_scopes.get(scope)
1✔
1046
        if isinstance(node_resource, NodeResource):
1✔
1047
            return node_resource
1✔
1048

1049
        scope_type, (before_type, after_type) = self._safe_access_in(
1✔
1050
            scope, TypeKey, before_resource, after_resource
1051
        )
1052
        terminal_value_type = self._visit_type(
1✔
1053
            scope=scope_type, before_type=before_type, after_type=after_type
1054
        )
1055

1056
        condition_reference = Nothing
1✔
1057
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1058
            scope, ConditionKey, before_resource, after_resource
1059
        )
1060
        if before_condition or after_condition:
1✔
1061
            condition_reference = self._visit_terminal_value(
1✔
1062
                scope_condition, before_condition, after_condition
1063
            )
1064

1065
        depends_on = Nothing
1✔
1066
        scope_depends_on, (before_depends_on, after_depends_on) = self._safe_access_in(
1✔
1067
            scope, DependsOnKey, before_resource, after_resource
1068
        )
1069
        if before_depends_on or after_depends_on:
1✔
1070
            depends_on = self._visit_depends_on(
1✔
1071
                scope_depends_on, before_depends_on, after_depends_on
1072
            )
1073

1074
        scope_properties, (before_properties, after_properties) = self._safe_access_in(
1✔
1075
            scope, PropertiesKey, before_resource, after_resource
1076
        )
1077
        properties = self._visit_properties(
1✔
1078
            scope=scope_properties,
1079
            before_properties=before_properties,
1080
            after_properties=after_properties,
1081
        )
1082

1083
        deletion_policy = Nothing
1✔
1084
        scope_deletion_policy, (before_deletion_policy, after_deletion_policy) = (
1✔
1085
            self._safe_access_in(scope, DeletionPolicyKey, before_resource, after_resource)
1086
        )
1087
        if before_deletion_policy or after_deletion_policy:
1✔
1088
            deletion_policy = self._visit_deletion_policy(
1✔
1089
                scope_deletion_policy, before_deletion_policy, after_deletion_policy
1090
            )
1091

1092
        update_replace_policy = Nothing
1✔
1093
        scope_update_replace_policy, (before_update_replace_policy, after_update_replace_policy) = (
1✔
1094
            self._safe_access_in(scope, UpdateReplacePolicyKey, before_resource, after_resource)
1095
        )
1096
        if before_update_replace_policy or after_update_replace_policy:
1✔
1097
            update_replace_policy = self._visit_update_replace_policy(
1✔
1098
                scope_update_replace_policy,
1099
                before_update_replace_policy,
1100
                after_update_replace_policy,
1101
            )
1102

1103
        fn_transform = Nothing
1✔
1104
        scope_fn_transform, (before_fn_transform_args, after_fn_transform_args) = (
1✔
1105
            self._safe_access_in(scope, FnTransform, before_resource, after_resource)
1106
        )
1107
        if not is_nothing(before_fn_transform_args) or not is_nothing(after_fn_transform_args):
1✔
1108
            if scope_fn_transform.count(FnTransform) > 1:
1✔
UNCOV
1109
                raise RuntimeError(
×
1110
                    "Invalid: Fn::Transforms cannot be nested inside another Fn::Transform"
1111
                )
1112
            path = "$" + ".".join(scope_fn_transform.split("/")[:-1])
1✔
1113
            before_siblings = extract_jsonpath(self._before_template, path)
1✔
1114
            after_siblings = extract_jsonpath(self._after_template, path)
1✔
1115
            arguments_scope = scope.open_scope("args")
1✔
1116
            arguments = self._visit_value(
1✔
1117
                scope=arguments_scope,
1118
                before_value=before_fn_transform_args,
1119
                after_value=after_fn_transform_args,
1120
            )
1121
            fn_transform = NodeIntrinsicFunctionFnTransform(
1✔
1122
                scope=scope_fn_transform,
1123
                change_type=ChangeType.MODIFIED,  # TODO
1124
                arguments=arguments,  # TODO
1125
                intrinsic_function=FnTransform,
1126
                before_siblings=before_siblings,
1127
                after_siblings=after_siblings,
1128
            )
1129

1130
        change_type = change_type_of(
1✔
1131
            before_resource,
1132
            after_resource,
1133
            [
1134
                properties,
1135
                condition_reference,
1136
                depends_on,
1137
                deletion_policy,
1138
                update_replace_policy,
1139
                fn_transform,
1140
            ],
1141
        )
1142
        requires_replacement = self._resolve_requires_replacement(
1✔
1143
            node_properties=properties, resource_type=terminal_value_type
1144
        )
1145
        node_resource = NodeResource(
1✔
1146
            scope=scope,
1147
            change_type=change_type,
1148
            name=resource_name,
1149
            type_=terminal_value_type,
1150
            properties=properties,
1151
            condition_reference=condition_reference,
1152
            depends_on=depends_on,
1153
            requires_replacement=requires_replacement,
1154
            deletion_policy=deletion_policy,
1155
            update_replace_policy=update_replace_policy,
1156
            fn_transform=fn_transform,
1157
        )
1158
        self._visited_scopes[scope] = node_resource
1✔
1159
        return node_resource
1✔
1160

1161
    def _visit_resources(
1✔
1162
        self, scope: Scope, before_resources: Maybe[dict], after_resources: Maybe[dict]
1163
    ) -> NodeResources:
1164
        # TODO: investigate type changes behavior.
1165
        resources: list[NodeResource] = []
1✔
1166
        resource_names = self._safe_keys_of(before_resources, after_resources)
1✔
1167
        fn_transform = Nothing
1✔
1168
        for resource_name in resource_names:
1✔
1169
            resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1170
                scope, resource_name, before_resources, after_resources
1171
            )
1172
            if resource_name == FnTransform:
1✔
1173
                fn_transform = self._visit_intrinsic_function(
1✔
1174
                    scope=resource_scope,
1175
                    intrinsic_function=resource_name,
1176
                    before_arguments=before_resource,
1177
                    after_arguments=after_resource,
1178
                )
1179
                continue
1✔
1180
            resource = self._visit_resource(
1✔
1181
                scope=resource_scope,
1182
                resource_name=resource_name,
1183
                before_resource=before_resource,
1184
                after_resource=after_resource,
1185
            )
1186
            resources.append(resource)
1✔
1187
        return NodeResources(scope=scope, resources=resources, fn_transform=fn_transform)
1✔
1188

1189
    def _visit_mapping(
1✔
1190
        self, scope: Scope, name: str, before_mapping: Maybe[dict], after_mapping: Maybe[dict]
1191
    ) -> NodeMapping:
1192
        bindings = self._visit_object(
1✔
1193
            scope=scope, before_object=before_mapping, after_object=after_mapping
1194
        )
1195
        return NodeMapping(scope=scope, name=name, bindings=bindings)
1✔
1196

1197
    def _visit_mappings(
1✔
1198
        self, scope: Scope, before_mappings: Maybe[dict], after_mappings: Maybe[dict]
1199
    ) -> NodeMappings:
1200
        mappings: list[NodeMapping] = []
1✔
1201
        mapping_names = self._safe_keys_of(before_mappings, after_mappings)
1✔
1202
        for mapping_name in mapping_names:
1✔
1203
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1204
                scope, mapping_name, before_mappings, after_mappings
1205
            )
1206
            mapping = self._visit_mapping(
1✔
1207
                scope=scope_mapping,
1208
                name=mapping_name,
1209
                before_mapping=before_mapping,
1210
                after_mapping=after_mapping,
1211
            )
1212
            mappings.append(mapping)
1✔
1213
        return NodeMappings(scope=scope, mappings=mappings)
1✔
1214

1215
    def _visit_dynamic_parameter(self, parameter_name: str) -> ChangeSetEntity:
1✔
1216
        scope = Scope("Dynamic").open_scope("Parameters")
1✔
1217
        scope_parameter, (before_parameter_dct, after_parameter_dct) = self._safe_access_in(
1✔
1218
            scope, parameter_name, self._before_parameters, self._after_parameters
1219
        )
1220

1221
        before_parameter = Nothing
1✔
1222
        if not is_nothing(before_parameter_dct):
1✔
1223
            before_parameter = before_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1224
                before_parameter_dct
1225
            )
1226

1227
        after_parameter = Nothing
1✔
1228
        if not is_nothing(after_parameter_dct):
1✔
1229
            after_parameter = after_parameter_dct.get("resolved_value") or engine_parameter_value(
1✔
1230
                after_parameter_dct
1231
            )
1232

1233
        parameter = self._visit_value(
1✔
1234
            scope=scope_parameter, before_value=before_parameter, after_value=after_parameter
1235
        )
1236
        return parameter
1✔
1237

1238
    def _visit_parameter(
1✔
1239
        self,
1240
        scope: Scope,
1241
        parameter_name: str,
1242
        before_parameter: Maybe[dict],
1243
        after_parameter: Maybe[dict],
1244
    ) -> NodeParameter:
1245
        node_parameter = self._visited_scopes.get(scope)
1✔
1246
        if isinstance(node_parameter, NodeParameter):
1✔
1247
            return node_parameter
1✔
1248

1249
        type_scope, (before_type, after_type) = self._safe_access_in(
1✔
1250
            scope, TypeKey, before_parameter, after_parameter
1251
        )
1252
        type_ = self._visit_value(type_scope, before_type, after_type)
1✔
1253

1254
        default_scope, (before_default, after_default) = self._safe_access_in(
1✔
1255
            scope, DefaultKey, before_parameter, after_parameter
1256
        )
1257
        default_value = self._visit_value(default_scope, before_default, after_default)
1✔
1258

1259
        dynamic_value = self._visit_dynamic_parameter(parameter_name=parameter_name)
1✔
1260

1261
        node_parameter = NodeParameter(
1✔
1262
            scope=scope,
1263
            name=parameter_name,
1264
            type_=type_,
1265
            default_value=default_value,
1266
            dynamic_value=dynamic_value,
1267
        )
1268
        self._visited_scopes[scope] = node_parameter
1✔
1269
        return node_parameter
1✔
1270

1271
    def _visit_parameters(
1✔
1272
        self, scope: Scope, before_parameters: Maybe[dict], after_parameters: Maybe[dict]
1273
    ) -> NodeParameters:
1274
        node_parameters = self._visited_scopes.get(scope)
1✔
1275
        if isinstance(node_parameters, NodeParameters):
1✔
UNCOV
1276
            return node_parameters
×
1277
        parameter_names: list[str] = self._safe_keys_of(before_parameters, after_parameters)
1✔
1278
        parameters: list[NodeParameter] = []
1✔
1279
        for parameter_name in parameter_names:
1✔
1280
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1281
                scope, parameter_name, before_parameters, after_parameters
1282
            )
1283
            parameter = self._visit_parameter(
1✔
1284
                scope=parameter_scope,
1285
                parameter_name=parameter_name,
1286
                before_parameter=before_parameter,
1287
                after_parameter=after_parameter,
1288
            )
1289
            parameters.append(parameter)
1✔
1290
        node_parameters = NodeParameters(scope=scope, parameters=parameters)
1✔
1291
        self._visited_scopes[scope] = node_parameters
1✔
1292
        return node_parameters
1✔
1293

1294
    @staticmethod
1✔
1295
    def _normalise_depends_on_value(value: Maybe[str | list[str]]) -> Maybe[list[str]]:
1✔
1296
        # To simplify downstream logics, reduce the type options to array of strings.
1297
        # TODO: Add integrations tests for DependsOn validations (invalid types, duplicate identifiers, etc.)
1298
        if isinstance(value, NothingType):
1✔
1299
            return value
1✔
1300
        if isinstance(value, str):
1✔
1301
            value = [value]
1✔
1302
        elif isinstance(value, list):
1✔
1303
            value.sort()
1✔
1304
        else:
UNCOV
1305
            raise RuntimeError(
×
1306
                f"Invalid type for DependsOn, expected a String or Array of String, but got: '{value}'"
1307
            )
1308
        return value
1✔
1309

1310
    def _visit_depends_on(
1✔
1311
        self,
1312
        scope: Scope,
1313
        before_depends_on: Maybe[str | list[str]],
1314
        after_depends_on: Maybe[str | list[str]],
1315
    ) -> NodeDependsOn:
1316
        before_depends_on = self._normalise_depends_on_value(value=before_depends_on)
1✔
1317
        after_depends_on = self._normalise_depends_on_value(value=after_depends_on)
1✔
1318
        node_array = self._visit_array(
1✔
1319
            scope=scope, before_array=before_depends_on, after_array=after_depends_on
1320
        )
1321
        node_depends_on = NodeDependsOn(scope=scope, depends_on=node_array)
1✔
1322
        return node_depends_on
1✔
1323

1324
    def _visit_condition(
1✔
1325
        self,
1326
        scope: Scope,
1327
        condition_name: str,
1328
        before_condition: Maybe[dict],
1329
        after_condition: Maybe[dict],
1330
    ) -> NodeCondition:
1331
        node_condition = self._visited_scopes.get(scope)
1✔
1332
        if isinstance(node_condition, NodeCondition):
1✔
UNCOV
1333
            return node_condition
×
1334
        body = self._visit_value(
1✔
1335
            scope=scope, before_value=before_condition, after_value=after_condition
1336
        )
1337
        node_condition = NodeCondition(scope=scope, name=condition_name, body=body)
1✔
1338
        self._visited_scopes[scope] = node_condition
1✔
1339
        return node_condition
1✔
1340

1341
    def _visit_conditions(
1✔
1342
        self, scope: Scope, before_conditions: Maybe[dict], after_conditions: Maybe[dict]
1343
    ) -> NodeConditions:
1344
        node_conditions = self._visited_scopes.get(scope)
1✔
1345
        if isinstance(node_conditions, NodeConditions):
1✔
UNCOV
1346
            return node_conditions
×
1347
        condition_names: list[str] = self._safe_keys_of(before_conditions, after_conditions)
1✔
1348
        conditions: list[NodeCondition] = []
1✔
1349
        for condition_name in condition_names:
1✔
1350
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
1✔
1351
                scope, condition_name, before_conditions, after_conditions
1352
            )
1353
            condition = self._visit_condition(
1✔
1354
                scope=condition_scope,
1355
                condition_name=condition_name,
1356
                before_condition=before_condition,
1357
                after_condition=after_condition,
1358
            )
1359
            conditions.append(condition)
1✔
1360
        node_conditions = NodeConditions(scope=scope, conditions=conditions)
1✔
1361
        self._visited_scopes[scope] = node_conditions
1✔
1362
        return node_conditions
1✔
1363

1364
    def _visit_output(
1✔
1365
        self, scope: Scope, name: str, before_output: Maybe[dict], after_output: Maybe[dict]
1366
    ) -> NodeOutput:
1367
        scope_value, (before_value, after_value) = self._safe_access_in(
1✔
1368
            scope, ValueKey, before_output, after_output
1369
        )
1370
        value = self._visit_value(scope_value, before_value, after_value)
1✔
1371

1372
        export: Maybe[ChangeSetEntity] = Nothing
1✔
1373
        scope_export, (before_export, after_export) = self._safe_access_in(
1✔
1374
            scope, ExportKey, before_output, after_output
1375
        )
1376
        if before_export or after_export:
1✔
1377
            export = self._visit_value(scope_export, before_export, after_export)
1✔
1378

1379
        # TODO: condition references should be resolved for the condition's change_type?
1380
        condition_reference: Maybe[TerminalValue] = Nothing
1✔
1381
        scope_condition, (before_condition, after_condition) = self._safe_access_in(
1✔
1382
            scope, ConditionKey, before_output, after_output
1383
        )
1384
        if before_condition or after_condition:
1✔
1385
            condition_reference = self._visit_terminal_value(
1✔
1386
                scope_condition, before_condition, after_condition
1387
            )
1388

1389
        return NodeOutput(
1✔
1390
            scope=scope,
1391
            name=name,
1392
            value=value,
1393
            export=export,
1394
            conditional_reference=condition_reference,
1395
        )
1396

1397
    def _visit_outputs(
1✔
1398
        self, scope: Scope, before_outputs: Maybe[dict], after_outputs: Maybe[dict]
1399
    ) -> NodeOutputs:
1400
        outputs: list[NodeOutput] = []
1✔
1401
        output_names: list[str] = self._safe_keys_of(before_outputs, after_outputs)
1✔
1402
        for output_name in output_names:
1✔
1403
            scope_output, (before_output, after_output) = self._safe_access_in(
1✔
1404
                scope, output_name, before_outputs, after_outputs
1405
            )
1406
            output = self._visit_output(
1✔
1407
                scope=scope_output,
1408
                name=output_name,
1409
                before_output=before_output,
1410
                after_output=after_output,
1411
            )
1412
            outputs.append(output)
1✔
1413
        return NodeOutputs(scope=scope, outputs=outputs)
1✔
1414

1415
    def _visit_global_transform(
1✔
1416
        self,
1417
        scope: Scope,
1418
        before_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1419
        after_global_transform: Maybe[NormalisedGlobalTransformDefinition],
1420
    ) -> NodeGlobalTransform:
1421
        name_scope, (before_name, after_name) = self._safe_access_in(
1✔
1422
            scope, NameKey, before_global_transform, after_global_transform
1423
        )
1424
        name = self._visit_terminal_value(
1✔
1425
            scope=name_scope, before_value=before_name, after_value=after_name
1426
        )
1427

1428
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1429
            scope, ParametersKey, before_global_transform, after_global_transform
1430
        )
1431
        parameters = self._visit_value(
1✔
1432
            scope=parameters_scope, before_value=before_parameters, after_value=after_parameters
1433
        )
1434

1435
        return NodeGlobalTransform(scope=scope, name=name, parameters=parameters)
1✔
1436

1437
    @staticmethod
1✔
1438
    def _normalise_transformer_value(value: Maybe[str | list[Any]]) -> Maybe[list[Any]]:
1✔
1439
        # To simplify downstream logics, reduce the type options to array of transformations.
1440
        # TODO: add further validation logic
1441
        # TODO: should we sort to avoid detecting user-side ordering changes as template changes?
1442
        if isinstance(value, NothingType):
1✔
1443
            return value
1✔
1444
        elif isinstance(value, str):
1✔
1445
            value = [NormalisedGlobalTransformDefinition(Name=value, Parameters=Nothing)]
1✔
1446
        elif isinstance(value, list):
1✔
1447
            tmp_value = []
1✔
1448
            for item in value:
1✔
1449
                if isinstance(item, str):
1✔
1450
                    tmp_value.append(
1✔
1451
                        NormalisedGlobalTransformDefinition(Name=item, Parameters=Nothing)
1452
                    )
1453
                else:
1454
                    tmp_value.append(item)
1✔
1455
            value = tmp_value
1✔
1456
        elif isinstance(value, dict):
1✔
1457
            if "Name" not in value:
1✔
UNCOV
1458
                raise RuntimeError(f"Missing 'Name' field in Transform definition '{value}'")
×
1459
            name = value["Name"]
1✔
1460
            parameters = value.get("Parameters", Nothing)
1✔
1461
            value = [NormalisedGlobalTransformDefinition(Name=name, Parameters=parameters)]
1✔
1462
        else:
UNCOV
1463
            raise RuntimeError(f"Invalid Transform definition: '{value}'")
×
1464
        return value
1✔
1465

1466
    def _visit_transform(
1✔
1467
        self, scope: Scope, before_transform: Maybe[Any], after_transform: Maybe[Any]
1468
    ) -> NodeTransform:
1469
        before_transform_normalised = self._normalise_transformer_value(before_transform)
1✔
1470
        after_transform_normalised = self._normalise_transformer_value(after_transform)
1✔
1471
        global_transforms = []
1✔
1472
        for index, (before_global_transform, after_global_transform) in enumerate(
1✔
1473
            zip_longest(before_transform_normalised, after_transform_normalised, fillvalue=Nothing)
1474
        ):
1475
            global_transform_scope = scope.open_index(index=index)
1✔
1476
            global_transform: NodeGlobalTransform = self._visit_global_transform(
1✔
1477
                scope=global_transform_scope,
1478
                before_global_transform=before_global_transform,
1479
                after_global_transform=after_global_transform,
1480
            )
1481
            global_transforms.append(global_transform)
1✔
1482
        return NodeTransform(scope=scope, global_transforms=global_transforms)
1✔
1483

1484
    def _model(self, before_template: Maybe[dict], after_template: Maybe[dict]) -> NodeTemplate:
1✔
1485
        root_scope = Scope()
1✔
1486
        # TODO: visit other child types
1487

1488
        transform_scope, (before_transform, after_transform) = self._safe_access_in(
1✔
1489
            root_scope, TransformKey, before_template, after_template
1490
        )
1491
        transform = self._visit_transform(
1✔
1492
            scope=transform_scope,
1493
            before_transform=before_transform,
1494
            after_transform=after_transform,
1495
        )
1496

1497
        mappings_scope, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1498
            root_scope, MappingsKey, before_template, after_template
1499
        )
1500
        mappings = self._visit_mappings(
1✔
1501
            scope=mappings_scope, before_mappings=before_mappings, after_mappings=after_mappings
1502
        )
1503

1504
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1505
            root_scope, ParametersKey, before_template, after_template
1506
        )
1507
        parameters = self._visit_parameters(
1✔
1508
            scope=parameters_scope,
1509
            before_parameters=before_parameters,
1510
            after_parameters=after_parameters,
1511
        )
1512

1513
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1514
            root_scope, ConditionsKey, before_template, after_template
1515
        )
1516
        conditions = self._visit_conditions(
1✔
1517
            scope=conditions_scope,
1518
            before_conditions=before_conditions,
1519
            after_conditions=after_conditions,
1520
        )
1521

1522
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1523
            root_scope, ResourcesKey, before_template, after_template
1524
        )
1525
        resources = self._visit_resources(
1✔
1526
            scope=resources_scope,
1527
            before_resources=before_resources,
1528
            after_resources=after_resources,
1529
        )
1530

1531
        outputs_scope, (before_outputs, after_outputs) = self._safe_access_in(
1✔
1532
            root_scope, OutputsKey, before_template, after_template
1533
        )
1534
        outputs = self._visit_outputs(
1✔
1535
            scope=outputs_scope, before_outputs=before_outputs, after_outputs=after_outputs
1536
        )
1537

1538
        return NodeTemplate(
1✔
1539
            scope=root_scope,
1540
            transform=transform,
1541
            mappings=mappings,
1542
            parameters=parameters,
1543
            conditions=conditions,
1544
            resources=resources,
1545
            outputs=outputs,
1546
        )
1547

1548
    def _retrieve_condition_if_exists(self, condition_name: str) -> Maybe[NodeCondition]:
1✔
1549
        conditions_scope, (before_conditions, after_conditions) = self._safe_access_in(
1✔
1550
            Scope(), ConditionsKey, self._before_template, self._after_template
1551
        )
1552
        before_conditions = before_conditions or {}
1✔
1553
        after_conditions = after_conditions or {}
1✔
1554
        if condition_name in before_conditions or condition_name in after_conditions:
1✔
UNCOV
1555
            condition_scope, (before_condition, after_condition) = self._safe_access_in(
×
1556
                conditions_scope, condition_name, before_conditions, after_conditions
1557
            )
UNCOV
1558
            node_condition = self._visit_condition(
×
1559
                conditions_scope,
1560
                condition_name,
1561
                before_condition=before_condition,
1562
                after_condition=after_condition,
1563
            )
UNCOV
1564
            return node_condition
×
1565
        return Nothing
1✔
1566

1567
    def _retrieve_parameter_if_exists(self, parameter_name: str) -> Maybe[NodeParameter]:
1✔
1568
        parameters_scope, (before_parameters, after_parameters) = self._safe_access_in(
1✔
1569
            Scope(), ParametersKey, self._before_template, self._after_template
1570
        )
1571
        if parameter_name in before_parameters or parameter_name in after_parameters:
1✔
1572
            parameter_scope, (before_parameter, after_parameter) = self._safe_access_in(
1✔
1573
                parameters_scope, parameter_name, before_parameters, after_parameters
1574
            )
1575
            node_parameter = self._visit_parameter(
1✔
1576
                parameter_scope,
1577
                parameter_name,
1578
                before_parameter=before_parameter,
1579
                after_parameter=after_parameter,
1580
            )
1581
            return node_parameter
1✔
1582
        return Nothing
1✔
1583

1584
    def _retrieve_mapping(self, mapping_name) -> NodeMapping:
1✔
1585
        # TODO: add caching mechanism, and raise appropriate error if missing.
1586
        scope_mappings, (before_mappings, after_mappings) = self._safe_access_in(
1✔
1587
            Scope(), MappingsKey, self._before_template, self._after_template
1588
        )
1589
        if mapping_name in before_mappings or mapping_name in after_mappings:
1✔
1590
            scope_mapping, (before_mapping, after_mapping) = self._safe_access_in(
1✔
1591
                scope_mappings, mapping_name, before_mappings, after_mappings
1592
            )
1593
            node_mapping = self._visit_mapping(
1✔
1594
                scope_mapping, mapping_name, before_mapping, after_mapping
1595
            )
1596
            return node_mapping
1✔
UNCOV
1597
        raise RuntimeError()
×
1598

1599
    def _retrieve_or_visit_resource(self, resource_name: str) -> NodeResource:
1✔
1600
        resources_scope, (before_resources, after_resources) = self._safe_access_in(
1✔
1601
            Scope(),
1602
            ResourcesKey,
1603
            self._before_template,
1604
            self._after_template,
1605
        )
1606
        resource_scope, (before_resource, after_resource) = self._safe_access_in(
1✔
1607
            resources_scope, resource_name, before_resources, after_resources
1608
        )
1609
        return self._visit_resource(
1✔
1610
            scope=resource_scope,
1611
            resource_name=resource_name,
1612
            before_resource=before_resource,
1613
            after_resource=after_resource,
1614
        )
1615

1616
    @staticmethod
1✔
1617
    def _is_intrinsic_function_name(function_name: str) -> bool:
1✔
1618
        # TODO: are intrinsic functions soft keywords?
1619
        return function_name in INTRINSIC_FUNCTIONS
1✔
1620

1621
    @staticmethod
1✔
1622
    def _safe_access_in(scope: Scope, key: str, *objects: Maybe[dict]) -> tuple[Scope, Maybe[Any]]:
1✔
1623
        results = []
1✔
1624
        for obj in objects:
1✔
1625
            if not isinstance(obj, (dict, NothingType)):
1✔
UNCOV
1626
                raise RuntimeError(f"Invalid definition type at '{obj}'")
×
1627
            if not isinstance(obj, NothingType):
1✔
1628
                results.append(obj.get(key, Nothing))
1✔
1629
            else:
1630
                results.append(obj)
1✔
1631
        new_scope = scope.open_scope(name=key)
1✔
1632
        return new_scope, results[0] if len(objects) == 1 else tuple(results)
1✔
1633

1634
    @staticmethod
1✔
1635
    def _safe_keys_of(*objects: Maybe[dict]) -> list[str]:
1✔
1636
        key_set: set[str] = set()
1✔
1637
        for obj in objects:
1✔
1638
            # TODO: raise errors if not dict
1639
            if isinstance(obj, dict):
1✔
1640
                key_set.update(obj.keys())
1✔
1641
        # The keys list is sorted to increase reproducibility of the
1642
        # update graph build process or downstream logics.
1643
        keys = sorted(key_set)
1✔
1644
        return keys
1✔
1645

1646
    @staticmethod
1✔
1647
    def _name_if_intrinsic_function(value: Maybe[Any]) -> str | None:
1✔
1648
        if isinstance(value, dict):
1✔
1649
            keys = ChangeSetModel._safe_keys_of(value)
1✔
1650
            if len(keys) == 1:
1✔
1651
                key_name = keys[0]
1✔
1652
                if ChangeSetModel._is_intrinsic_function_name(key_name):
1✔
1653
                    return key_name
1✔
1654
        return None
1✔
1655

1656
    @staticmethod
1✔
1657
    def _type_name_of(value: Maybe[Any]) -> str:
1✔
1658
        maybe_intrinsic_function_name = ChangeSetModel._name_if_intrinsic_function(value)
1✔
1659
        if maybe_intrinsic_function_name is not None:
1✔
1660
            return maybe_intrinsic_function_name
1✔
1661
        return type(value).__name__
1✔
1662

1663
    @staticmethod
1✔
1664
    def _is_terminal(value: Any) -> bool:
1✔
1665
        return type(value) in {int, float, bool, str, None, NothingType}
1✔
1666

1667
    @staticmethod
1✔
1668
    def _is_object(value: Any) -> bool:
1✔
1669
        return isinstance(value, dict) and ChangeSetModel._name_if_intrinsic_function(value) is None
1✔
1670

1671
    @staticmethod
1✔
1672
    def _is_array(value: Any) -> bool:
1✔
1673
        return isinstance(value, list)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc