• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d8dc9956-71ea-40c6-95cb-26b2b584943a

08 May 2025 05:15PM UTC coverage: 86.66% (+0.1%) from 86.535%
d8dc9956-71ea-40c6-95cb-26b2b584943a

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64346 of 74251 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.63
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_executor.py
1
import copy
1✔
2
import logging
1✔
3
import uuid
1✔
4
from dataclasses import dataclass
1✔
5
from typing import Final, Optional
1✔
6

7
from localstack.aws.api.cloudformation import ChangeAction, StackStatus
1✔
8
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
9
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
10
    NodeOutput,
11
    NodeParameter,
12
    NodeResource,
13
)
14
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
15
    ChangeSetModelPreproc,
16
    PreprocEntityDelta,
17
    PreprocOutput,
18
    PreprocProperties,
19
    PreprocResource,
20
)
21
from localstack.services.cloudformation.resource_provider import (
1✔
22
    Credentials,
23
    OperationStatus,
24
    ProgressEvent,
25
    ResourceProviderExecutor,
26
    ResourceProviderPayload,
27
)
28
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
29

30
LOG = logging.getLogger(__name__)
1✔
31

32

33
@dataclass
1✔
34
class ChangeSetModelExecutorResult:
1✔
35
    resources: dict
1✔
36
    parameters: dict
1✔
37
    outputs: dict
1✔
38

39

40
class ChangeSetModelExecutor(ChangeSetModelPreproc):
1✔
41
    _change_set: Final[ChangeSet]
1✔
42
    # TODO: add typing for resolved resources and parameters.
43
    resources: Final[dict]
1✔
44
    outputs: Final[dict]
1✔
45
    resolved_parameters: Final[dict]
1✔
46

47
    def __init__(self, change_set: ChangeSet):
1✔
48
        super().__init__(
1✔
49
            node_template=change_set.update_graph,
50
            before_resolved_resources=change_set.stack.resolved_resources,
51
        )
52
        self._change_set = change_set
1✔
53
        self.resources = dict()
1✔
54
        self.outputs = dict()
1✔
55
        self.resolved_parameters = dict()
1✔
56

57
    # TODO: use a structured type for the return value
58
    def execute(self) -> ChangeSetModelExecutorResult:
1✔
59
        self.process()
1✔
60
        return ChangeSetModelExecutorResult(
1✔
61
            resources=self.resources, parameters=self.resolved_parameters, outputs=self.outputs
62
        )
63

64
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
65
        delta = super().visit_node_parameter(node_parameter=node_parameter)
1✔
66
        self.resolved_parameters[node_parameter.name] = delta.after
1✔
67
        return delta
1✔
68

69
    def _after_deployed_property_value_of(
1✔
70
        self, resource_logical_id: str, property_name: str
71
    ) -> str:
72
        after_resolved_resources = self.resources
1✔
73
        return self._deployed_property_value_of(
1✔
74
            resource_logical_id=resource_logical_id,
75
            property_name=property_name,
76
            resolved_resources=after_resolved_resources,
77
        )
78

79
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
80
        after_resolved_resources = self.resources
1✔
81
        return self._resource_physical_resource_id_from(
1✔
82
            logical_resource_id=resource_logical_id, resolved_resources=after_resolved_resources
83
        )
84

85
    def visit_node_resource(
1✔
86
        self, node_resource: NodeResource
87
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
88
        """
89
        Overrides the default preprocessing for NodeResource objects by annotating the
90
        `after` delta with the physical resource ID, if side effects resulted in an update.
91
        """
92
        delta = super().visit_node_resource(node_resource=node_resource)
1✔
93
        self._execute_on_resource_change(
1✔
94
            name=node_resource.name, before=delta.before, after=delta.after
95
        )
96
        after_resource = delta.after
1✔
97
        if after_resource is not None and delta.before != delta.after:
1✔
98
            after_logical_id = after_resource.logical_id
1✔
99
            after_physical_id: Optional[str] = self._after_resource_physical_id(
1✔
100
                resource_logical_id=after_logical_id
101
            )
102
            if after_physical_id is None:
1✔
103
                raise RuntimeError(
×
104
                    f"No PhysicalResourceId was found for resource '{after_physical_id}' post-update."
105
                )
106
            after_resource.physical_resource_id = after_physical_id
1✔
107
        return delta
1✔
108

109
    def visit_node_output(
1✔
110
        self, node_output: NodeOutput
111
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
112
        delta = super().visit_node_output(node_output=node_output)
1✔
113
        if delta.after is None:
1✔
114
            # handling deletion so the output does not really matter
115
            # TODO: are there other situations?
116
            return delta
×
117

118
        self.outputs[delta.after.name] = delta.after.value
1✔
119
        return delta
1✔
120

121
    def _execute_on_resource_change(
1✔
122
        self, name: str, before: Optional[PreprocResource], after: Optional[PreprocResource]
123
    ) -> None:
124
        if before == after:
1✔
125
            # unchanged: nothing to do.
126
            return
1✔
127
        # TODO: this logic is a POC and should be revised.
128
        if before is not None and after is not None:
1✔
129
            # Case: change on same type.
130
            if before.resource_type == after.resource_type:
1✔
131
                # Register a Modified if changed.
132
                # XXX hacky, stick the previous resources' properties into the payload
133
                before_properties = self._merge_before_properties(name, before)
1✔
134

135
                self._execute_resource_action(
1✔
136
                    action=ChangeAction.Modify,
137
                    logical_resource_id=name,
138
                    resource_type=before.resource_type,
139
                    before_properties=before_properties,
140
                    after_properties=after.properties,
141
                )
142
            # Case: type migration.
143
            # TODO: Add test to assert that on type change the resources are replaced.
144
            else:
145
                # XXX hacky, stick the previous resources' properties into the payload
146
                before_properties = self._merge_before_properties(name, before)
×
147
                # Register a Removed for the previous type.
148
                self._execute_resource_action(
×
149
                    action=ChangeAction.Remove,
150
                    logical_resource_id=name,
151
                    resource_type=before.resource_type,
152
                    before_properties=before_properties,
153
                    after_properties=None,
154
                )
155
                # Register a Create for the next type.
156
                self._execute_resource_action(
×
157
                    action=ChangeAction.Add,
158
                    logical_resource_id=name,
159
                    resource_type=after.resource_type,
160
                    before_properties=None,
161
                    after_properties=after.properties,
162
                )
163
        elif before is not None:
1✔
164
            # Case: removal
165
            # XXX hacky, stick the previous resources' properties into the payload
166
            # XXX hacky, stick the previous resources' properties into the payload
167
            before_properties = self._merge_before_properties(name, before)
1✔
168

169
            self._execute_resource_action(
1✔
170
                action=ChangeAction.Remove,
171
                logical_resource_id=name,
172
                resource_type=before.resource_type,
173
                before_properties=before_properties,
174
                after_properties=None,
175
            )
176
        elif after is not None:
1✔
177
            # Case: addition
178
            self._execute_resource_action(
1✔
179
                action=ChangeAction.Add,
180
                logical_resource_id=name,
181
                resource_type=after.resource_type,
182
                before_properties=None,
183
                after_properties=after.properties,
184
            )
185

186
    def _merge_before_properties(
1✔
187
        self, name: str, preproc_resource: PreprocResource
188
    ) -> PreprocProperties:
189
        if previous_resource_properties := self._change_set.stack.resolved_resources.get(
1✔
190
            name, {}
191
        ).get("Properties"):
192
            return PreprocProperties(properties=previous_resource_properties)
1✔
193

194
        # XXX fall back to returning the input value
195
        return copy.deepcopy(preproc_resource.properties)
×
196

197
    def _execute_resource_action(
1✔
198
        self,
199
        action: ChangeAction,
200
        logical_resource_id: str,
201
        resource_type: str,
202
        before_properties: Optional[PreprocProperties],
203
        after_properties: Optional[PreprocProperties],
204
    ) -> None:
205
        LOG.debug("Executing resource action: %s for resource '%s'", action, logical_resource_id)
1✔
206
        resource_provider_executor = ResourceProviderExecutor(
1✔
207
            stack_name=self._change_set.stack.stack_name, stack_id=self._change_set.stack.stack_id
208
        )
209
        payload = self.create_resource_provider_payload(
1✔
210
            action=action,
211
            logical_resource_id=logical_resource_id,
212
            resource_type=resource_type,
213
            before_properties=before_properties,
214
            after_properties=after_properties,
215
        )
216
        resource_provider = resource_provider_executor.try_load_resource_provider(resource_type)
1✔
217

218
        extra_resource_properties = {}
1✔
219
        if resource_provider is not None:
1✔
220
            # TODO: stack events
221
            try:
1✔
222
                event = resource_provider_executor.deploy_loop(
1✔
223
                    resource_provider, extra_resource_properties, payload
224
                )
225
            except Exception as e:
×
226
                reason = str(e)
×
227
                LOG.warning(
×
228
                    "Resource provider operation failed: '%s'",
229
                    reason,
230
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
231
                )
232
                stack = self._change_set.stack
×
233
                stack_status = stack.status
×
234
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
×
235
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
×
236
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
237
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
238
                return
×
239
        else:
240
            event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
×
241

242
        self.resources.setdefault(logical_resource_id, {"Properties": {}})
1✔
243
        match event.status:
1✔
244
            case OperationStatus.SUCCESS:
1✔
245
                # merge the resources state with the external state
246
                # TODO: this is likely a duplicate of updating from extra_resource_properties
247
                self.resources[logical_resource_id]["Properties"].update(event.resource_model)
1✔
248
                self.resources[logical_resource_id].update(extra_resource_properties)
1✔
249
                # XXX for legacy delete_stack compatibility
250
                self.resources[logical_resource_id]["LogicalResourceId"] = logical_resource_id
1✔
251
                self.resources[logical_resource_id]["Type"] = resource_type
1✔
252
            case OperationStatus.FAILED:
×
253
                reason = event.message
×
254
                LOG.warning(
×
255
                    "Resource provider operation failed: '%s'",
256
                    reason,
257
                )
258
                # TODO: duplication
259
                stack = self._change_set.stack
×
260
                stack_status = stack.status
×
261
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
×
262
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
×
263
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
264
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
265
                else:
266
                    raise NotImplementedError(f"Unhandled stack status: '{stack.status}'")
267
            case any:
×
268
                raise NotImplementedError(f"Event status '{any}' not handled")
269

270
    def create_resource_provider_payload(
1✔
271
        self,
272
        action: ChangeAction,
273
        logical_resource_id: str,
274
        resource_type: str,
275
        before_properties: Optional[PreprocProperties],
276
        after_properties: Optional[PreprocProperties],
277
    ) -> Optional[ResourceProviderPayload]:
278
        # FIXME: use proper credentials
279
        creds: Credentials = {
1✔
280
            "accessKeyId": self._change_set.stack.account_id,
281
            "secretAccessKey": INTERNAL_AWS_SECRET_ACCESS_KEY,
282
            "sessionToken": "",
283
        }
284
        before_properties_value = before_properties.properties if before_properties else None
1✔
285
        after_properties_value = after_properties.properties if after_properties else None
1✔
286

287
        match action:
1✔
288
            case ChangeAction.Add:
1✔
289
                resource_properties = after_properties_value or {}
1✔
290
                previous_resource_properties = None
1✔
291
            case ChangeAction.Modify | ChangeAction.Dynamic:
1✔
292
                resource_properties = after_properties_value or {}
1✔
293
                previous_resource_properties = before_properties_value or {}
1✔
294
            case ChangeAction.Remove:
1✔
295
                resource_properties = before_properties_value or {}
1✔
296
                previous_resource_properties = None
1✔
297
            case _:
×
298
                raise NotImplementedError(f"Action '{action}' not handled")
299

300
        resource_provider_payload: ResourceProviderPayload = {
1✔
301
            "awsAccountId": self._change_set.stack.account_id,
302
            "callbackContext": {},
303
            "stackId": self._change_set.stack.stack_name,
304
            "resourceType": resource_type,
305
            "resourceTypeVersion": "000000",
306
            # TODO: not actually a UUID
307
            "bearerToken": str(uuid.uuid4()),
308
            "region": self._change_set.stack.region_name,
309
            "action": str(action),
310
            "requestData": {
311
                "logicalResourceId": logical_resource_id,
312
                "resourceProperties": resource_properties,
313
                "previousResourceProperties": previous_resource_properties,
314
                "callerCredentials": creds,
315
                "providerCredentials": creds,
316
                "systemTags": {},
317
                "previousSystemTags": {},
318
                "stackTags": {},
319
                "previousStackTags": {},
320
            },
321
        }
322
        return resource_provider_payload
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc