• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 533d4262-4f08-49b7-b7ba-18c46e51ac1a

02 Jun 2025 06:43PM UTC coverage: 86.752% (+0.1%) from 86.654%
533d4262-4f08-49b7-b7ba-18c46e51ac1a

push

circleci

web-flow
APIGW: implement Canary Deployments CRUD logic (#12694)

142 of 147 new or added lines in 3 files covered. (96.6%)

187 existing lines in 16 files now uncovered.

64937 of 74854 relevant lines covered (86.75%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.08
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_executor.py
1
import copy
1✔
2
import logging
1✔
3
import uuid
1✔
4
from dataclasses import dataclass
1✔
5
from typing import Final, Optional
1✔
6

7
from localstack.aws.api.cloudformation import ChangeAction, StackStatus
1✔
8
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
9
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
10
    NodeDependsOn,
11
    NodeOutput,
12
    NodeParameter,
13
    NodeResource,
14
    is_nothing,
15
)
16
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
17
    ChangeSetModelPreproc,
18
    PreprocEntityDelta,
19
    PreprocOutput,
20
    PreprocProperties,
21
    PreprocResource,
22
)
23
from localstack.services.cloudformation.resource_provider import (
1✔
24
    Credentials,
25
    OperationStatus,
26
    ProgressEvent,
27
    ResourceProviderExecutor,
28
    ResourceProviderPayload,
29
)
30
from localstack.services.cloudformation.v2.entities import ChangeSet
1✔
31

32
LOG = logging.getLogger(__name__)
1✔
33

34

35
@dataclass
1✔
36
class ChangeSetModelExecutorResult:
1✔
37
    resources: dict
1✔
38
    parameters: dict
1✔
39
    outputs: dict
1✔
40

41

42
class ChangeSetModelExecutor(ChangeSetModelPreproc):
1✔
43
    # TODO: add typing for resolved resources and parameters.
44
    resources: Final[dict]
1✔
45
    outputs: Final[dict]
1✔
46
    resolved_parameters: Final[dict]
1✔
47

48
    def __init__(self, change_set: ChangeSet):
1✔
49
        super().__init__(change_set=change_set)
1✔
50
        self.resources = dict()
1✔
51
        self.outputs = dict()
1✔
52
        self.resolved_parameters = dict()
1✔
53

54
    # TODO: use a structured type for the return value
55
    def execute(self) -> ChangeSetModelExecutorResult:
1✔
56
        self.process()
1✔
57
        return ChangeSetModelExecutorResult(
1✔
58
            resources=self.resources, parameters=self.resolved_parameters, outputs=self.outputs
59
        )
60

61
    def visit_node_parameter(self, node_parameter: NodeParameter) -> PreprocEntityDelta:
1✔
62
        delta = super().visit_node_parameter(node_parameter=node_parameter)
1✔
63
        self.resolved_parameters[node_parameter.name] = delta.after
1✔
64
        return delta
1✔
65

66
    def _after_deployed_property_value_of(
1✔
67
        self, resource_logical_id: str, property_name: str
68
    ) -> str:
69
        after_resolved_resources = self.resources
1✔
70
        return self._deployed_property_value_of(
1✔
71
            resource_logical_id=resource_logical_id,
72
            property_name=property_name,
73
            resolved_resources=after_resolved_resources,
74
        )
75

76
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
77
        after_resolved_resources = self.resources
1✔
78
        return self._resource_physical_resource_id_from(
1✔
79
            logical_resource_id=resource_logical_id, resolved_resources=after_resolved_resources
80
        )
81

82
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
83
        array_identifiers_delta = super().visit_node_depends_on(node_depends_on=node_depends_on)
1✔
84

85
        # Visit depends_on resources before returning.
86
        depends_on_resource_logical_ids: set[str] = set()
1✔
87
        if array_identifiers_delta.before:
1✔
88
            depends_on_resource_logical_ids.update(array_identifiers_delta.before)
1✔
89
        if array_identifiers_delta.after:
1✔
90
            depends_on_resource_logical_ids.update(array_identifiers_delta.after)
1✔
91
        for depends_on_resource_logical_id in depends_on_resource_logical_ids:
1✔
92
            node_resource = self._get_node_resource_for(
1✔
93
                resource_name=depends_on_resource_logical_id, node_template=self._node_template
94
            )
95
            self.visit_node_resource(node_resource)
1✔
96

97
        return array_identifiers_delta
1✔
98

99
    def visit_node_resource(
1✔
100
        self, node_resource: NodeResource
101
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
102
        """
103
        Overrides the default preprocessing for NodeResource objects by annotating the
104
        `after` delta with the physical resource ID, if side effects resulted in an update.
105
        """
106
        delta = super().visit_node_resource(node_resource=node_resource)
1✔
107
        before = delta.before
1✔
108
        after = delta.after
1✔
109

110
        if before != after:
1✔
111
            # There are changes for this resource.
112
            self._execute_resource_change(name=node_resource.name, before=before, after=after)
1✔
113
        else:
114
            # There are no updates for this resource; iff the resource was previously
115
            # deployed, then the resolved details are copied in the current state for
116
            # references or other downstream operations.
117
            if not is_nothing(before):
1✔
118
                before_logical_id = delta.before.logical_id
1✔
119
                before_resource = self._before_resolved_resources.get(before_logical_id, dict())
1✔
120
                self.resources[before_logical_id] = before_resource
1✔
121

122
        # Update the latest version of this resource for downstream references.
123
        if not is_nothing(after):
1✔
124
            after_logical_id = after.logical_id
1✔
125
            after_physical_id: str = self._after_resource_physical_id(
1✔
126
                resource_logical_id=after_logical_id
127
            )
128
            after.physical_resource_id = after_physical_id
1✔
129
        return delta
1✔
130

131
    def visit_node_output(
1✔
132
        self, node_output: NodeOutput
133
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
134
        delta = super().visit_node_output(node_output=node_output)
1✔
135
        after = delta.after
1✔
136
        if is_nothing(after) or (isinstance(after, PreprocOutput) and after.condition is False):
1✔
137
            return delta
1✔
138
        self.outputs[delta.after.name] = delta.after.value
1✔
139
        return delta
1✔
140

141
    def _execute_resource_change(
1✔
142
        self, name: str, before: Optional[PreprocResource], after: Optional[PreprocResource]
143
    ) -> None:
144
        # Changes are to be made about this resource.
145
        # TODO: this logic is a POC and should be revised.
146
        if not is_nothing(before) and not is_nothing(after):
1✔
147
            # Case: change on same type.
148
            if before.resource_type == after.resource_type:
1✔
149
                # Register a Modified if changed.
150
                # XXX hacky, stick the previous resources' properties into the payload
151
                before_properties = self._merge_before_properties(name, before)
1✔
152

153
                self._execute_resource_action(
1✔
154
                    action=ChangeAction.Modify,
155
                    logical_resource_id=name,
156
                    resource_type=before.resource_type,
157
                    before_properties=before_properties,
158
                    after_properties=after.properties,
159
                )
160
            # Case: type migration.
161
            # TODO: Add test to assert that on type change the resources are replaced.
162
            else:
163
                # XXX hacky, stick the previous resources' properties into the payload
UNCOV
164
                before_properties = self._merge_before_properties(name, before)
×
165
                # Register a Removed for the previous type.
UNCOV
166
                self._execute_resource_action(
×
167
                    action=ChangeAction.Remove,
168
                    logical_resource_id=name,
169
                    resource_type=before.resource_type,
170
                    before_properties=before_properties,
171
                    after_properties=None,
172
                )
173
                # Register a Create for the next type.
UNCOV
174
                self._execute_resource_action(
×
175
                    action=ChangeAction.Add,
176
                    logical_resource_id=name,
177
                    resource_type=after.resource_type,
178
                    before_properties=None,
179
                    after_properties=after.properties,
180
                )
181
        elif not is_nothing(before):
1✔
182
            # Case: removal
183
            # XXX hacky, stick the previous resources' properties into the payload
184
            # XXX hacky, stick the previous resources' properties into the payload
185
            before_properties = self._merge_before_properties(name, before)
1✔
186

187
            self._execute_resource_action(
1✔
188
                action=ChangeAction.Remove,
189
                logical_resource_id=name,
190
                resource_type=before.resource_type,
191
                before_properties=before_properties,
192
                after_properties=None,
193
            )
194
        elif not is_nothing(after):
1✔
195
            # Case: addition
196
            self._execute_resource_action(
1✔
197
                action=ChangeAction.Add,
198
                logical_resource_id=name,
199
                resource_type=after.resource_type,
200
                before_properties=None,
201
                after_properties=after.properties,
202
            )
203

204
    def _merge_before_properties(
1✔
205
        self, name: str, preproc_resource: PreprocResource
206
    ) -> PreprocProperties:
207
        if previous_resource_properties := self._change_set.stack.resolved_resources.get(
1✔
208
            name, {}
209
        ).get("Properties"):
210
            return PreprocProperties(properties=previous_resource_properties)
1✔
211

212
        # XXX fall back to returning the input value
UNCOV
213
        return copy.deepcopy(preproc_resource.properties)
×
214

215
    def _execute_resource_action(
1✔
216
        self,
217
        action: ChangeAction,
218
        logical_resource_id: str,
219
        resource_type: str,
220
        before_properties: Optional[PreprocProperties],
221
        after_properties: Optional[PreprocProperties],
222
    ) -> None:
223
        LOG.debug("Executing resource action: %s for resource '%s'", action, logical_resource_id)
1✔
224
        resource_provider_executor = ResourceProviderExecutor(
1✔
225
            stack_name=self._change_set.stack.stack_name, stack_id=self._change_set.stack.stack_id
226
        )
227
        payload = self.create_resource_provider_payload(
1✔
228
            action=action,
229
            logical_resource_id=logical_resource_id,
230
            resource_type=resource_type,
231
            before_properties=before_properties,
232
            after_properties=after_properties,
233
        )
234
        resource_provider = resource_provider_executor.try_load_resource_provider(resource_type)
1✔
235

236
        extra_resource_properties = {}
1✔
237
        if resource_provider is not None:
1✔
238
            # TODO: stack events
239
            try:
1✔
240
                event = resource_provider_executor.deploy_loop(
1✔
241
                    resource_provider, extra_resource_properties, payload
242
                )
243
            except Exception as e:
1✔
244
                reason = str(e)
1✔
245
                LOG.warning(
1✔
246
                    "Resource provider operation failed: '%s'",
247
                    reason,
248
                    exc_info=LOG.isEnabledFor(logging.DEBUG),
249
                )
250
                stack = self._change_set.stack
1✔
251
                stack_status = stack.status
1✔
252
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
1✔
253
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
1✔
UNCOV
254
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
UNCOV
255
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
256
                return
1✔
257
        else:
UNCOV
258
            event = ProgressEvent(OperationStatus.SUCCESS, resource_model={})
×
259

260
        self.resources.setdefault(logical_resource_id, {"Properties": {}})
1✔
261
        match event.status:
1✔
262
            case OperationStatus.SUCCESS:
1✔
263
                # merge the resources state with the external state
264
                # TODO: this is likely a duplicate of updating from extra_resource_properties
265

266
                # TODO: add typing
267
                # TODO: avoid the use of string literals for sampling from the object, use typed classes instead
268
                # TODO: avoid sampling from resources and use tmp var reference
269
                # TODO: add utils functions to abstract this logic away (resource.update(..))
270
                # TODO: avoid the use of setdefault (debuggability/readability)
271
                # TODO: review the use of merge
272

273
                self.resources[logical_resource_id]["Properties"].update(event.resource_model)
1✔
274
                self.resources[logical_resource_id].update(extra_resource_properties)
1✔
275
                # XXX for legacy delete_stack compatibility
276
                self.resources[logical_resource_id]["LogicalResourceId"] = logical_resource_id
1✔
277
                self.resources[logical_resource_id]["Type"] = resource_type
1✔
278

279
                # TODO: review why the physical id is returned as None during updates
280
                # TODO: abstract this in member function of resource classes instead
281
                physical_resource_id = None
1✔
282
                try:
1✔
283
                    physical_resource_id = self._after_resource_physical_id(logical_resource_id)
1✔
284
                except RuntimeError:
1✔
285
                    # The physical id is missing or is set to None, which is invalid.
286
                    pass
1✔
287
                if physical_resource_id is None:
1✔
288
                    # The physical resource id is None after an update that didn't rewrite the resource, the previous
289
                    # resource id is therefore the current physical id of this resource.
290
                    physical_resource_id = self._before_resource_physical_id(logical_resource_id)
1✔
291
                    self.resources[logical_resource_id]["PhysicalResourceId"] = physical_resource_id
1✔
292

UNCOV
293
            case OperationStatus.FAILED:
×
UNCOV
294
                reason = event.message
×
UNCOV
295
                LOG.warning(
×
296
                    "Resource provider operation failed: '%s'",
297
                    reason,
298
                )
299
                # TODO: duplication
UNCOV
300
                stack = self._change_set.stack
×
UNCOV
301
                stack_status = stack.status
×
UNCOV
302
                if stack_status == StackStatus.CREATE_IN_PROGRESS:
×
UNCOV
303
                    stack.set_stack_status(StackStatus.CREATE_FAILED, reason=reason)
×
UNCOV
304
                elif stack_status == StackStatus.UPDATE_IN_PROGRESS:
×
UNCOV
305
                    stack.set_stack_status(StackStatus.UPDATE_FAILED, reason=reason)
×
306
                else:
307
                    raise NotImplementedError(f"Unhandled stack status: '{stack.status}'")
UNCOV
308
            case any:
×
309
                raise NotImplementedError(f"Event status '{any}' not handled")
310

311
    def create_resource_provider_payload(
1✔
312
        self,
313
        action: ChangeAction,
314
        logical_resource_id: str,
315
        resource_type: str,
316
        before_properties: Optional[PreprocProperties],
317
        after_properties: Optional[PreprocProperties],
318
    ) -> Optional[ResourceProviderPayload]:
319
        # FIXME: use proper credentials
320
        creds: Credentials = {
1✔
321
            "accessKeyId": self._change_set.stack.account_id,
322
            "secretAccessKey": INTERNAL_AWS_SECRET_ACCESS_KEY,
323
            "sessionToken": "",
324
        }
325
        before_properties_value = before_properties.properties if before_properties else None
1✔
326
        after_properties_value = after_properties.properties if after_properties else None
1✔
327

328
        match action:
1✔
329
            case ChangeAction.Add:
1✔
330
                resource_properties = after_properties_value or {}
1✔
331
                previous_resource_properties = None
1✔
332
            case ChangeAction.Modify | ChangeAction.Dynamic:
1✔
333
                resource_properties = after_properties_value or {}
1✔
334
                previous_resource_properties = before_properties_value or {}
1✔
335
            case ChangeAction.Remove:
1✔
336
                resource_properties = before_properties_value or {}
1✔
337
                previous_resource_properties = None
1✔
UNCOV
338
            case _:
×
339
                raise NotImplementedError(f"Action '{action}' not handled")
340

341
        resource_provider_payload: ResourceProviderPayload = {
1✔
342
            "awsAccountId": self._change_set.stack.account_id,
343
            "callbackContext": {},
344
            "stackId": self._change_set.stack.stack_name,
345
            "resourceType": resource_type,
346
            "resourceTypeVersion": "000000",
347
            # TODO: not actually a UUID
348
            "bearerToken": str(uuid.uuid4()),
349
            "region": self._change_set.stack.region_name,
350
            "action": str(action),
351
            "requestData": {
352
                "logicalResourceId": logical_resource_id,
353
                "resourceProperties": resource_properties,
354
                "previousResourceProperties": previous_resource_properties,
355
                "callerCredentials": creds,
356
                "providerCredentials": creds,
357
                "systemTags": {},
358
                "previousSystemTags": {},
359
                "stackTags": {},
360
                "previousStackTags": {},
361
            },
362
        }
363
        return resource_provider_payload
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc