• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4e27dc30-df7d-47cf-9ddb-0b539d612501

17 Apr 2025 08:11PM UTC coverage: 86.279% (-0.02%) from 86.294%
4e27dc30-df7d-47cf-9ddb-0b539d612501

push

circleci

web-flow
Step Functions: Surface Support for Mocked Responses (#12525)

200 of 245 new or added lines in 9 files covered. (81.63%)

201 existing lines in 15 files now uncovered.

63889 of 74049 relevant lines covered (86.28%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.05
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_executor.py
1
import copy
1✔
2
import logging
1✔
3
import uuid
1✔
4
from typing import Any, Final, Optional
1✔
5

6
from localstack.aws.api.cloudformation import ChangeAction, StackStatus
1✔
7
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
8
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
9
    NodeParameter,
10
    NodeResource,
11
)
12
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
13
    ChangeSetModelPreproc,
14
    PreprocEntityDelta,
15
    PreprocProperties,
16
    PreprocResource,
17
)
18
from localstack.services.cloudformation.resource_provider import (
1✔
19
    Credentials,
20
    OperationStatus,
21
    ProgressEvent,
22
    ResourceProviderExecutor,
23
    ResourceProviderPayload,
24
)
25
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
26

27
LOG = logging.getLogger(__name__)
1✔
28

29

30
class ChangeSetModelExecutor(ChangeSetModelPreproc):
1✔
31
    change_set: Final[ChangeSet]
1✔
32
    # TODO: add typing.
33
    resources: Final[dict]
1✔
34
    resolved_parameters: Final[dict]
1✔
35

36
    def __init__(self, change_set: ChangeSet):
1✔
UNCOV
37
        super().__init__(node_template=change_set.update_graph)
×
38
        self.change_set = change_set
×
39
        self.resources = dict()
×
40
        self.resolved_parameters = dict()
×
41

42
    # TODO: use a structured type for the return value
43
    def execute(self) -> tuple[dict, dict]:
1✔
44
        self.process()
×
45
        return self.resources, self.resolved_parameters
×
46

47
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
UNCOV
48
        delta = super().visit_node_parameter(node_parameter=node_parameter)
×
UNCOV
49
        self.resolved_parameters[node_parameter.name] = delta.after
×
50
        return delta
×
51

52
    def visit_node_resource(
1✔
53
        self, node_resource: NodeResource
54
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
55
        delta = super().visit_node_resource(node_resource=node_resource)
×
56
        self._execute_on_resource_change(
×
57
            name=node_resource.name, before=delta.before, after=delta.after
58
        )
UNCOV
59
        return delta
×
60

61
    def _reduce_intrinsic_function_ref_value(self, preproc_value: Any) -> PreprocEntityDelta:
1✔
62
        if not isinstance(preproc_value, PreprocResource):
×
UNCOV
63
            return super()._reduce_intrinsic_function_ref_value(preproc_value=preproc_value)
×
64

65
        logical_id = preproc_value.name
×
66

UNCOV
67
        def _get_physical_id_of_resolved_resource(resolved_resource: dict) -> str:
×
UNCOV
68
            physical_resource_id = resolved_resource.get("PhysicalResourceId")
×
UNCOV
69
            if not isinstance(physical_resource_id, str):
×
UNCOV
70
                raise RuntimeError(
×
71
                    f"No physical resource id found for resource '{logical_id}' during ChangeSet execution"
72
                )
73
            return physical_resource_id
×
74

75
        before_resolved_resources = self.change_set.stack.resolved_resources
×
UNCOV
76
        after_resolved_resources = self.resources
×
77

78
        before_physical_id = None
×
UNCOV
79
        if logical_id in before_resolved_resources:
×
UNCOV
80
            before_resolved_resource = before_resolved_resources[logical_id]
×
UNCOV
81
            before_physical_id = _get_physical_id_of_resolved_resource(before_resolved_resource)
×
82
        after_physical_id = None
×
UNCOV
83
        if logical_id in after_resolved_resources:
×
UNCOV
84
            after_resolved_resource = after_resolved_resources[logical_id]
×
UNCOV
85
            after_physical_id = _get_physical_id_of_resolved_resource(after_resolved_resource)
×
86

UNCOV
87
        if before_physical_id is None and after_physical_id is None:
×
88
            raise RuntimeError(f"No resource '{logical_id}' found during ChangeSet execution")
×
UNCOV
89
        return PreprocEntityDelta(before=before_physical_id, after=after_physical_id)
×
90

91
    def _execute_on_resource_change(
1✔
92
        self, name: str, before: Optional[PreprocResource], after: Optional[PreprocResource]
93
    ) -> None:
UNCOV
94
        if before == after:
×
95
            # unchanged: nothing to do.
UNCOV
96
            return
×
97
        # TODO: this logic is a POC and should be revised.
UNCOV
98
        if before is not None and after is not None:
×
99
            # Case: change on same type.
UNCOV
100
            if before.resource_type == after.resource_type:
×
101
                # Register a Modified if changed.
102
                # XXX hacky, stick the previous resources' properties into the payload
UNCOV
103
                before_properties = self._merge_before_properties(name, before)
×
104

UNCOV
105
                self._execute_resource_action(
×
106
                    action=ChangeAction.Modify,
107
                    logical_resource_id=name,
108
                    resource_type=before.resource_type,
109
                    before_properties=before_properties,
110
                    after_properties=after.properties,
111
                )
112
            # Case: type migration.
113
            # TODO: Add test to assert that on type change the resources are replaced.
114
            else:
115
                # XXX hacky, stick the previous resources' properties into the payload
116
                before_properties = self._merge_before_properties(name, before)
×
117
                # Register a Removed for the previous type.
UNCOV
118
                self._execute_resource_action(
×
119
                    action=ChangeAction.Remove,
120
                    logical_resource_id=name,
121
                    resource_type=before.resource_type,
122
                    before_properties=before_properties,
123
                    after_properties=None,
124
                )
125
                # Register a Create for the next type.
UNCOV
126
                self._execute_resource_action(
×
127
                    action=ChangeAction.Add,
128
                    logical_resource_id=name,
129
                    resource_type=after.resource_type,
130
                    before_properties=None,
131
                    after_properties=after.properties,
132
                )
UNCOV
133
        elif before is not None:
×
134
            # Case: removal
135
            # XXX hacky, stick the previous resources' properties into the payload
136
            # XXX hacky, stick the previous resources' properties into the payload
UNCOV
137
            before_properties = self._merge_before_properties(name, before)
×
138

UNCOV
139
            self._execute_resource_action(
×
140
                action=ChangeAction.Remove,
141
                logical_resource_id=name,
142
                resource_type=before.resource_type,
143
                before_properties=before_properties,
144
                after_properties=None,
145
            )
UNCOV
146
        elif after is not None:
×
147
            # Case: addition
UNCOV
148
            self._execute_resource_action(
×
149
                action=ChangeAction.Add,
150
                logical_resource_id=name,
151
                resource_type=after.resource_type,
152
                before_properties=None,
153
                after_properties=after.properties,
154
            )
155

156
    def _merge_before_properties(
1✔
157
        self, name: str, preproc_resource: PreprocResource
158
    ) -> PreprocProperties:
UNCOV
159
        if previous_resource_properties := self.change_set.stack.resolved_resources.get(
×
160
            name, {}
161
        ).get("Properties"):
UNCOV
162
            return PreprocProperties(properties=previous_resource_properties)
×
163

164
        # XXX fall back to returning the input value
165
        return copy.deepcopy(preproc_resource.properties)
×
166

167
    def _execute_resource_action(
1✔
168
        self,
169
        action: ChangeAction,
170
        logical_resource_id: str,
171
        resource_type: str,
172
        before_properties: Optional[PreprocProperties],
173
        after_properties: Optional[PreprocProperties],
174
    ) -> None:
UNCOV
175
        LOG.debug("Executing resource action: %s for resource '%s'", action, logical_resource_id)
×
176
        resource_provider_executor = ResourceProviderExecutor(
×
177
            stack_name=self.change_set.stack.stack_name, stack_id=self.change_set.stack.stack_id
178
        )
179
        payload = self.create_resource_provider_payload(
×
180
            action=action,
181
            logical_resource_id=logical_resource_id,
182
            resource_type=resource_type,
183
            before_properties=before_properties,
184
            after_properties=after_properties,
185
        )
186
        resource_provider = resource_provider_executor.try_load_resource_provider(resource_type)
×
187

UNCOV
188
        extra_resource_properties = {}
×
UNCOV
189
        if resource_provider is not None:
×
190
            # TODO: stack events
UNCOV
191
            try:
×
192
                event = resource_provider_executor.deploy_loop(
×
193
                    resource_provider, extra_resource_properties, payload
194
                )
195
            except Exception as e:
×
196
                reason = str(e)
×
UNCOV
197
                LOG.warning(
×
198
                    "Resource provider operation failed: '%s'",
199
                    reason,
200
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
201
                )
202
                stack = self.change_set.stack
×
UNCOV
203
                stack_status = stack.status
×
UNCOV
204
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
×
205
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
×
206
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
UNCOV
207
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
208
                return
×
209
        else:
210
            event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
×
211

212
        self.resources.setdefault(logical_resource_id, {"Properties": {}})
×
UNCOV
213
        match event.status:
×
UNCOV
214
            case OperationStatus.SUCCESS:
×
215
                # merge the resources state with the external state
216
                # TODO: this is likely a duplicate of updating from extra_resource_properties
217
                self.resources[logical_resource_id]["Properties"].update(event.resource_model)
×
218
                self.resources[logical_resource_id].update(extra_resource_properties)
×
219
                # XXX for legacy delete_stack compatibility
UNCOV
220
                self.resources[logical_resource_id]["LogicalResourceId"] = logical_resource_id
×
UNCOV
221
                self.resources[logical_resource_id]["Type"] = resource_type
×
222
            case OperationStatus.FAILED:
×
UNCOV
223
                reason = event.message
×
UNCOV
224
                LOG.warning(
×
225
                    "Resource provider operation failed: '%s'",
226
                    reason,
227
                )
228
                # TODO: duplication
UNCOV
229
                stack = self.change_set.stack
×
UNCOV
230
                stack_status = stack.status
×
UNCOV
231
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
×
UNCOV
232
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
×
UNCOV
233
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
234
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
235
                else:
236
                    raise NotImplementedError(f"Unhandled stack status: '{stack.status}'")
UNCOV
237
            case any:
×
238
                raise NotImplementedError(f"Event status '{any}' not handled")
239

240
    def create_resource_provider_payload(
1✔
241
        self,
242
        action: ChangeAction,
243
        logical_resource_id: str,
244
        resource_type: str,
245
        before_properties: Optional[PreprocProperties],
246
        after_properties: Optional[PreprocProperties],
247
    ) -> Optional[ResourceProviderPayload]:
248
        # FIXME: use proper credentials
249
        creds: Credentials = {
×
250
            "accessKeyId": self.change_set.stack.account_id,
251
            "secretAccessKey": INTERNAL_AWS_SECRET_ACCESS_KEY,
252
            "sessionToken": "",
253
        }
UNCOV
254
        before_properties_value = before_properties.properties if before_properties else None
×
255
        after_properties_value = after_properties.properties if after_properties else None
×
256

UNCOV
257
        match action:
×
UNCOV
258
            case ChangeAction.Add:
×
UNCOV
259
                resource_properties = after_properties_value or {}
×
UNCOV
260
                previous_resource_properties = None
×
UNCOV
261
            case ChangeAction.Modify | ChangeAction.Dynamic:
×
UNCOV
262
                resource_properties = after_properties_value or {}
×
UNCOV
263
                previous_resource_properties = before_properties_value or {}
×
UNCOV
264
            case ChangeAction.Remove:
×
UNCOV
265
                resource_properties = before_properties_value or {}
×
UNCOV
266
                previous_resource_properties = None
×
UNCOV
267
            case _:
×
268
                raise NotImplementedError(f"Action '{action}' not handled")
269

UNCOV
270
        resource_provider_payload: ResourceProviderPayload = {
×
271
            "awsAccountId": self.change_set.stack.account_id,
272
            "callbackContext": {},
273
            "stackId": self.change_set.stack.stack_name,
274
            "resourceType": resource_type,
275
            "resourceTypeVersion": "000000",
276
            # TODO: not actually a UUID
277
            "bearerToken": str(uuid.uuid4()),
278
            "region": self.change_set.stack.region_name,
279
            "action": str(action),
280
            "requestData": {
281
                "logicalResourceId": logical_resource_id,
282
                "resourceProperties": resource_properties,
283
                "previousResourceProperties": previous_resource_properties,
284
                "callerCredentials": creds,
285
                "providerCredentials": creds,
286
                "systemTags": {},
287
                "previousSystemTags": {},
288
                "stackTags": {},
289
                "previousStackTags": {},
290
            },
291
        }
UNCOV
292
        return resource_provider_payload
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc