• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21891233725

10 Feb 2026 10:59PM UTC coverage: 86.883% (+0.01%) from 86.871%
21891233725

push

github

web-flow
SFN: Fix Local mock iterations for states without retry (#13693)

Co-authored-by: Hernan <hernaner28@gmail.com>

The Step Functions Local mock configuration was using RetryCount to index numbered mock responses. In case of successful invocations or states without retry configuration this caused all invocations to return the same response ("0") instead of iterating through the sequence ("0", "1", "2", etc.).

RetryCount only increments on actual retry attempts (failures), not on successful invocations. This made the mock iteration feature unusable for testing state transition scenarios with multiple invocations of the same state.

- Added next_local_mock_invocation_number dict to execution environment to track mock invocations.
- Modified get_current_local_mocked_response() to use next_local_mock_invocation_number instead of retry_count.
- Shared next_local_mock_invocation_number between parent and child frames to maintain consistent counting across execution context

An existing `aws.services.stepfunctions.v2.local_mocking.test_base_scenarios.TestBaseScenarios.test_map_state_lambda test` was giving a false positive. It used a mock response with only one mocked invocation and should have failed when 2 invocations were done. Effectively, it started to fail after the fix and mocked response has been adjusted.

Also, adds `test_numbered_mock_responses_multiple_success_invocations` that tests multiple success invocations in a regular top-level state, outside of map configuration.

7 of 7 new or added lines in 1 file covered. (100.0%)

255 existing lines in 12 files now uncovered.

69977 of 80542 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.74
/localstack-core/localstack/services/cloudformation/engine/v2/change_set_model_executor.py
1
import copy
1✔
2
import logging
1✔
3
import os
1✔
4
import re
1✔
5
import uuid
1✔
6
from collections.abc import Callable
1✔
7
from dataclasses import dataclass
1✔
8
from datetime import UTC, datetime
1✔
9
from typing import Final, Protocol, TypeVar
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api.cloudformation import (
1✔
13
    ChangeAction,
14
    Output,
15
    ResourceStatus,
16
    StackStatus,
17
)
18
from localstack.constants import INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
19
from localstack.services.cloudformation.analytics import (
1✔
20
    emit_stack_failure,
21
    track_resource_operation,
22
)
23
from localstack.services.cloudformation.deployment_utils import log_not_available_message
1✔
24
from localstack.services.cloudformation.engine.v2.change_set_model import (
1✔
25
    NodeDependsOn,
26
    NodeOutput,
27
    NodeResource,
28
    is_nothing,
29
)
30
from localstack.services.cloudformation.engine.v2.change_set_model_preproc import (
1✔
31
    _AWS_URL_SUFFIX,
32
    MOCKED_REFERENCE,
33
    ChangeSetModelPreproc,
34
    DeletionPolicy,
35
    PreprocEntityDelta,
36
    PreprocOutput,
37
    PreprocProperties,
38
    PreprocResource,
39
    UpdateReplacePolicy,
40
)
41
from localstack.services.cloudformation.engine.v2.unsupported_resource import (
1✔
42
    should_ignore_unsupported_resource_type,
43
)
44
from localstack.services.cloudformation.resource_provider import (
1✔
45
    Credentials,
46
    OperationStatus,
47
    ProgressEvent,
48
    ResourceProviderExecutor,
49
    ResourceProviderPayload,
50
)
51
from localstack.services.cloudformation.v2.entities import ChangeSet, ResolvedResource
1✔
52

53
LOG = logging.getLogger(__name__)
1✔
54

55
EventOperationFromAction = {"Add": "CREATE", "Modify": "UPDATE", "Remove": "DELETE"}
1✔
56

57
REGEX_OUTPUT_APIGATEWAY = re.compile(
1✔
58
    rf"^(https?://.+\.execute-api\.)(?:[^-]+-){{2,3}}\d\.(amazonaws\.com|{_AWS_URL_SUFFIX})/?(.*)$"
59
)
60

61
_T = TypeVar("_T")
1✔
62

63

64
@dataclass
1✔
65
class ChangeSetModelExecutorResult:
1✔
66
    resources: dict[str, ResolvedResource]
1✔
67
    outputs: list[Output]
1✔
68
    failure_message: str | None = None
1✔
69

70

71
class DeferredAction(Protocol):
1✔
72
    def __call__(self) -> None: ...
1✔
73

74

75
@dataclass
1✔
76
class Deferred:
1✔
77
    name: str
1✔
78
    action: DeferredAction
1✔
79

80

81
class TriggerRollback(Exception):
1✔
82
    """
83
    Sentinel exception to signal that the deployment should be stopped for a reason
84
    """
85

86
    def __init__(self, logical_resource_id: str, reason: str | None):
1✔
87
        self.logical_resource_id = logical_resource_id
1✔
88
        self.reason = reason
1✔
89

90

91
class ChangeSetModelExecutor(ChangeSetModelPreproc):
1✔
92
    # TODO: add typing for resolved resources and parameters.
93
    resources: Final[dict[str, ResolvedResource]]
1✔
94
    outputs: Final[list[Output]]
1✔
95
    _deferred_actions: list[Deferred]
1✔
96

97
    def __init__(self, change_set: ChangeSet):
1✔
98
        super().__init__(change_set=change_set)
1✔
99
        self.resources = {}
1✔
100
        self.outputs = []
1✔
101
        self._deferred_actions = []
1✔
102
        self.resource_provider_executor = ResourceProviderExecutor(
1✔
103
            stack_name=change_set.stack.stack_name,
104
            stack_id=change_set.stack.stack_id,
105
        )
106

107
    def execute(self) -> ChangeSetModelExecutorResult:
1✔
108
        # constructive process
109
        failure_message = None
1✔
110
        try:
1✔
111
            self.process()
1✔
112
        except TriggerRollback as e:
1✔
113
            failure_message = e.reason
1✔
114
        except Exception as e:
1✔
115
            failure_message = str(e)
1✔
116

117
        is_deletion = self._change_set.stack.status == StackStatus.DELETE_IN_PROGRESS
1✔
118
        if self._deferred_actions:
1✔
119
            if not is_deletion:
1✔
120
                # TODO: correct status
121
                # TODO: differentiate between update and create
122
                self._change_set.stack.set_stack_status(
1✔
123
                    StackStatus.ROLLBACK_IN_PROGRESS
124
                    if failure_message
125
                    else StackStatus.UPDATE_COMPLETE_CLEANUP_IN_PROGRESS
126
                )
127

128
            # perform all deferred actions such as deletions. These must happen in reverse from their
129
            # defined order so that resource dependencies are honoured
130
            # TODO: errors will stop all rollbacks; get parity on this behaviour
131
            for deferred in self._deferred_actions[::-1]:
1✔
132
                LOG.debug("executing deferred action: '%s'", deferred.name)
1✔
133
                deferred.action()
1✔
134

135
        if failure_message and not is_deletion:
1✔
136
            # TODO: differentiate between update and create
137
            self._change_set.stack.set_stack_status(StackStatus.ROLLBACK_COMPLETE)
1✔
138

139
        return ChangeSetModelExecutorResult(
1✔
140
            resources=self.resources, outputs=self.outputs, failure_message=failure_message
141
        )
142

143
    def _defer_action(self, name: str, action: DeferredAction):
1✔
144
        self._deferred_actions.append(Deferred(name=name, action=action))
1✔
145

146
    def _get_physical_id(self, logical_resource_id, strict: bool = True) -> str | None:
1✔
147
        physical_resource_id = None
1✔
148
        try:
1✔
149
            physical_resource_id = self._after_resource_physical_id(logical_resource_id)
1✔
UNCOV
150
        except RuntimeError:
×
151
            # The physical id is missing or is set to None, which is invalid.
UNCOV
152
            pass
×
153
        if physical_resource_id is None:
1✔
154
            # The physical resource id is None after an update that didn't rewrite the resource, the previous
155
            # resource id is therefore the current physical id of this resource.
156

157
            try:
1✔
158
                physical_resource_id = self._before_resource_physical_id(logical_resource_id)
1✔
UNCOV
159
            except RuntimeError as e:
×
UNCOV
160
                if strict:
×
UNCOV
161
                    raise e
×
162
        return physical_resource_id
1✔
163

164
    def _process_event(
1✔
165
        self,
166
        *,
167
        action: ChangeAction,
168
        logical_resource_id,
169
        event_status: OperationStatus,
170
        resource_type: str,
171
        special_action: str = None,
172
        reason: str = None,
173
        custom_status: ResourceStatus | str | None = None,
174
    ):
175
        status_from_action = special_action or EventOperationFromAction[action.value]
1✔
176

177
        status: ResourceStatus
178
        if event_status == OperationStatus.SUCCESS:
1✔
179
            status = ResourceStatus(f"{status_from_action}_COMPLETE")
1✔
180
        else:
181
            status = ResourceStatus(f"{status_from_action}_{event_status.name}")
1✔
182

183
        if custom_status:
1✔
184
            status = ResourceStatus(custom_status)
1✔
185

186
        physical_resource_id = self._get_physical_id(logical_resource_id, False)
1✔
187
        self._change_set.stack.set_resource_status(
1✔
188
            logical_resource_id=logical_resource_id,
189
            physical_resource_id=physical_resource_id,
190
            resource_type=resource_type,
191
            status=status,
192
            resource_status_reason=reason,
193
        )
194

195
        if event_status == OperationStatus.FAILED:
1✔
196
            self._change_set.stack.set_stack_status(StackStatus(status))
1✔
197

198
    def _after_deployed_property_value_of(
1✔
199
        self, resource_logical_id: str, property_name: str
200
    ) -> str:
201
        after_resolved_resources = self.resources
1✔
202
        return self._deployed_property_value_of(
1✔
203
            resource_logical_id=resource_logical_id,
204
            property_name=property_name,
205
            resolved_resources=after_resolved_resources,
206
        )
207

208
    def _after_resource_physical_id(self, resource_logical_id: str) -> str:
1✔
209
        after_resolved_resources = self.resources
1✔
210
        return self._resource_physical_resource_id_from(
1✔
211
            logical_resource_id=resource_logical_id, resolved_resources=after_resolved_resources
212
        )
213

214
    def visit_node_depends_on(self, node_depends_on: NodeDependsOn) -> PreprocEntityDelta:
1✔
215
        array_identifiers_delta = super().visit_node_depends_on(node_depends_on=node_depends_on)
1✔
216

217
        # Visit depends_on resources before returning.
218
        depends_on_resource_logical_ids: set[str] = set()
1✔
219
        if array_identifiers_delta.before:
1✔
220
            depends_on_resource_logical_ids.update(array_identifiers_delta.before)
1✔
221
        if array_identifiers_delta.after:
1✔
222
            depends_on_resource_logical_ids.update(array_identifiers_delta.after)
1✔
223
        for depends_on_resource_logical_id in depends_on_resource_logical_ids:
1✔
224
            node_resource = self._get_node_resource_for(
1✔
225
                resource_name=depends_on_resource_logical_id,
226
                node_template=self._change_set.update_model.node_template,
227
            )
228
            self.visit(node_resource)
1✔
229

230
        return array_identifiers_delta
1✔
231

232
    def visit_node_resource(
1✔
233
        self, node_resource: NodeResource
234
    ) -> PreprocEntityDelta[PreprocResource, PreprocResource]:
235
        """
236
        Overrides the default preprocessing for NodeResource objects by annotating the
237
        `after` delta with the physical resource ID, if side effects resulted in an update.
238
        """
239
        try:
1✔
240
            delta = super().visit_node_resource(node_resource=node_resource)
1✔
UNCOV
241
        except Exception as e:
×
UNCOV
242
            LOG.debug(
×
243
                "preprocessing resource '%s' failed: %s",
244
                node_resource.name,
245
                e,
246
                exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
247
            )
UNCOV
248
            self._process_event(
×
249
                action=node_resource.change_type.to_change_action(),
250
                logical_resource_id=node_resource.name,
251
                event_status=OperationStatus.FAILED,
252
                resource_type=node_resource.type_.value,
253
                reason=str(e),
254
            )
UNCOV
255
            raise e
×
256

257
        before = delta.before
1✔
258
        after = delta.after
1✔
259

260
        if before != after:
1✔
261
            # There are changes for this resource.
262
            self._execute_resource_change(name=node_resource.name, before=before, after=after)
1✔
263
        else:
264
            # There are no updates for this resource; iff the resource was previously
265
            # deployed, then the resolved details are copied in the current state for
266
            # references or other downstream operations.
267
            if not is_nothing(before):
1✔
268
                before_logical_id = delta.before.logical_id
1✔
269
                before_resource = self._before_resolved_resources.get(before_logical_id, {})
1✔
270
                self.resources[before_logical_id] = before_resource
1✔
271

272
        # Update the latest version of this resource for downstream references.
273
        if not is_nothing(after):
1✔
274
            after_logical_id = after.logical_id
1✔
275
            resource = self.resources[after_logical_id]
1✔
276
            resource_failed_to_deploy = resource["ResourceStatus"] in {
1✔
277
                ResourceStatus.CREATE_FAILED,
278
                ResourceStatus.UPDATE_FAILED,
279
            }
280
            if not resource_failed_to_deploy:
1✔
281
                after_physical_id: str = self._after_resource_physical_id(
1✔
282
                    resource_logical_id=after_logical_id
283
                )
284
                after.physical_resource_id = after_physical_id
1✔
285
            after.status = resource["ResourceStatus"]
1✔
286

287
            # terminate the deployment process
288
            if resource_failed_to_deploy:
1✔
289
                raise TriggerRollback(
1✔
290
                    logical_resource_id=after_logical_id,
291
                    reason=resource.get("ResourceStatusReason"),
292
                )
293
        return delta
1✔
294

295
    def visit_node_output(
1✔
296
        self, node_output: NodeOutput
297
    ) -> PreprocEntityDelta[PreprocOutput, PreprocOutput]:
298
        delta = super().visit_node_output(node_output=node_output)
1✔
299
        after = delta.after
1✔
300
        if is_nothing(after) or (isinstance(after, PreprocOutput) and after.condition is False):
1✔
301
            return delta
1✔
302

303
        output = Output(
1✔
304
            OutputKey=delta.after.name,
305
            OutputValue=delta.after.value,
306
            # TODO
307
            # Description=delta.after.description
308
        )
309
        if after.export:
1✔
310
            output["ExportName"] = after.export["Name"]
1✔
311
        self.outputs.append(output)
1✔
312
        return delta
1✔
313

314
    def _execute_resource_change(
1✔
315
        self, name: str, before: PreprocResource | None, after: PreprocResource | None
316
    ) -> None:
317
        # Changes are to be made about this resource.
318
        # TODO: this logic is a POC and should be revised.
319
        if not is_nothing(before) and not is_nothing(after):
1✔
320
            # Case: change on same type.
321
            if before.resource_type == after.resource_type:
1✔
322
                # Register a Modified if changed.
323
                # XXX hacky, stick the previous resources' properties into the payload
324
                before_properties = self._merge_before_properties(name, before)
1✔
325

326
                self._process_event(
1✔
327
                    action=ChangeAction.Modify,
328
                    logical_resource_id=name,
329
                    event_status=OperationStatus.IN_PROGRESS,
330
                    resource_type=before.resource_type,
331
                )
332
                if after.requires_replacement:
1✔
333
                    event = self._execute_resource_action(
1✔
334
                        action=ChangeAction.Add,
335
                        logical_resource_id=name,
336
                        resource_type=before.resource_type,
337
                        before_properties=None,
338
                        after_properties=after.properties,
339
                    )
340
                    self._process_event(
1✔
341
                        action=ChangeAction.Modify,
342
                        logical_resource_id=name,
343
                        event_status=event.status,
344
                        resource_type=before.resource_type,
345
                        reason=event.message,
346
                    )
347

348
                    def cleanup():
1✔
349
                        # TODO: handle other update replace policy values
350
                        if after.update_replace_policy != UpdateReplacePolicy.Retain:
1✔
351
                            self._process_event(
1✔
352
                                action=ChangeAction.Remove,
353
                                logical_resource_id=name,
354
                                event_status=OperationStatus.IN_PROGRESS,
355
                                resource_type=before.resource_type,
356
                            )
357
                            event = self._execute_resource_action(
1✔
358
                                action=ChangeAction.Remove,
359
                                logical_resource_id=name,
360
                                resource_type=before.resource_type,
361
                                before_properties=before_properties,
362
                                after_properties=None,
363
                                part_of_replacement=True,
364
                            )
365
                            self._process_event(
1✔
366
                                action=ChangeAction.Remove,
367
                                logical_resource_id=name,
368
                                event_status=event.status,
369
                                resource_type=before.resource_type,
370
                                reason=event.message,
371
                            )
372
                        else:
373
                            self._process_event(
1✔
374
                                action=ChangeAction.Remove,
375
                                logical_resource_id=name,
376
                                event_status=OperationStatus.SUCCESS,
377
                                resource_type=before.resource_type,
378
                                custom_status=ResourceStatus.DELETE_SKIPPED,
379
                            )
380

381
                    self._defer_action(f"cleanup-from-replacement-{name}", cleanup)
1✔
382
                else:
383
                    event = self._execute_resource_action(
1✔
384
                        action=ChangeAction.Modify,
385
                        logical_resource_id=name,
386
                        resource_type=before.resource_type,
387
                        before_properties=before_properties,
388
                        after_properties=after.properties,
389
                    )
390
                    self._process_event(
1✔
391
                        action=ChangeAction.Modify,
392
                        logical_resource_id=name,
393
                        event_status=event.status,
394
                        resource_type=before.resource_type,
395
                        reason=event.message,
396
                    )
397
            # Case: type migration.
398
            # TODO: Add test to assert that on type change the resources are replaced.
399
            else:
400
                # XXX hacky, stick the previous resources' properties into the payload
401
                before_properties = self._merge_before_properties(name, before)
×
402
                # Register a Removed for the previous type.
403

UNCOV
404
                def perform_deletion():
×
UNCOV
405
                    event = self._execute_resource_action(
×
406
                        action=ChangeAction.Remove,
407
                        logical_resource_id=name,
408
                        resource_type=before.resource_type,
409
                        before_properties=before_properties,
410
                        after_properties=None,
411
                    )
UNCOV
412
                    self._process_event(
×
413
                        action=ChangeAction.Modify,
414
                        logical_resource_id=name,
415
                        event_status=event.status,
416
                        resource_type=before.resource_type,
417
                        reason=event.message,
418
                    )
419

UNCOV
420
                self._defer_action(f"type-migration-{name}", perform_deletion)
×
421

UNCOV
422
                event = self._execute_resource_action(
×
423
                    action=ChangeAction.Add,
424
                    logical_resource_id=name,
425
                    resource_type=after.resource_type,
426
                    before_properties=None,
427
                    after_properties=after.properties,
428
                )
UNCOV
429
                self._process_event(
×
430
                    action=ChangeAction.Modify,
431
                    logical_resource_id=name,
432
                    event_status=event.status,
433
                    resource_type=before.resource_type,
434
                    reason=event.message,
435
                )
436
        elif not is_nothing(before):
1✔
437
            # Case: removal
438
            # XXX hacky, stick the previous resources' properties into the payload
439
            # XXX hacky, stick the previous resources' properties into the payload
440
            before_properties = self._merge_before_properties(name, before)
1✔
441

442
            def perform_deletion():
1✔
443
                # TODO: other deletion policies
444
                if before.deletion_policy != DeletionPolicy.Retain:
1✔
445
                    self._process_event(
1✔
446
                        action=ChangeAction.Remove,
447
                        logical_resource_id=name,
448
                        resource_type=before.resource_type,
449
                        event_status=OperationStatus.IN_PROGRESS,
450
                    )
451
                    event = self._execute_resource_action(
1✔
452
                        action=ChangeAction.Remove,
453
                        logical_resource_id=name,
454
                        resource_type=before.resource_type,
455
                        before_properties=before_properties,
456
                        after_properties=None,
457
                    )
458

459
                    self._process_event(
1✔
460
                        action=ChangeAction.Remove,
461
                        logical_resource_id=name,
462
                        event_status=event.status,
463
                        resource_type=before.resource_type,
464
                        reason=event.message,
465
                    )
466
                else:
467
                    self._process_event(
1✔
468
                        action=ChangeAction.Remove,
469
                        logical_resource_id=name,
470
                        event_status=OperationStatus.SUCCESS,
471
                        resource_type=before.resource_type,
472
                        custom_status=ResourceStatus.DELETE_SKIPPED,
473
                    )
474

475
            self._defer_action(f"remove-{name}", perform_deletion)
1✔
476
        elif not is_nothing(after):
1✔
477
            # Case: addition
478
            self._process_event(
1✔
479
                action=ChangeAction.Add,
480
                logical_resource_id=name,
481
                event_status=OperationStatus.IN_PROGRESS,
482
                resource_type=after.resource_type,
483
            )
484
            event = self._execute_resource_action(
1✔
485
                action=ChangeAction.Add,
486
                logical_resource_id=name,
487
                resource_type=after.resource_type,
488
                before_properties=None,
489
                after_properties=after.properties,
490
            )
491
            self._process_event(
1✔
492
                action=ChangeAction.Add,
493
                logical_resource_id=name,
494
                event_status=event.status,
495
                resource_type=after.resource_type,
496
                reason=event.message,
497
            )
498

499
    def _merge_before_properties(
1✔
500
        self, name: str, preproc_resource: PreprocResource
501
    ) -> PreprocProperties:
502
        if previous_resource_properties := self._change_set.stack.resolved_resources.get(
1✔
503
            name, {}
504
        ).get("Properties"):
505
            return PreprocProperties(properties=previous_resource_properties)
1✔
506

507
        # XXX fall back to returning the input value
508
        return copy.deepcopy(preproc_resource.properties)
1✔
509

510
    def _execute_resource_action(
1✔
511
        self,
512
        action: ChangeAction,
513
        logical_resource_id: str,
514
        resource_type: str,
515
        before_properties: PreprocProperties | None,
516
        after_properties: PreprocProperties | None,
517
        part_of_replacement: bool = False,
518
    ) -> ProgressEvent:
519
        LOG.debug("Executing resource action: %s for resource '%s'", action, logical_resource_id)
1✔
520
        payload = self.create_resource_provider_payload(
1✔
521
            action=action,
522
            logical_resource_id=logical_resource_id,
523
            resource_type=resource_type,
524
            before_properties=before_properties,
525
            after_properties=after_properties,
526
        )
527
        resource_provider = self.resource_provider_executor.try_load_resource_provider(
1✔
528
            resource_type
529
        )
530
        track_resource_operation(action, resource_type, missing=resource_provider is not None)
1✔
531

532
        extra_resource_properties = {}
1✔
533
        if resource_provider is not None:
1✔
534
            try:
1✔
535
                event = self.resource_provider_executor.deploy_loop(
1✔
536
                    resource_provider, extra_resource_properties, payload
537
                )
538
            except Exception as e:
1✔
539
                reason = str(e)
1✔
540
                LOG.warning(
1✔
541
                    "Resource provider operation failed: '%s'",
542
                    reason,
543
                    exc_info=LOG.isEnabledFor(logging.DEBUG) and config.CFN_VERBOSE_ERRORS,
544
                )
545
                event = ProgressEvent(
1✔
546
                    OperationStatus.FAILED,
547
                    resource_model={},
548
                    message=f"Resource provider operation failed: {reason}",
549
                    custom_context={"exception": e},
550
                )
551
        elif should_ignore_unsupported_resource_type(
1✔
552
            resource_type=resource_type, change_set_type=self._change_set.change_set_type
553
        ):
554
            log_not_available_message(
1✔
555
                resource_type,
556
                f'No resource provider found for "{resource_type}"',
557
            )
558
            if "CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES" not in os.environ:
1✔
559
                LOG.warning(
1✔
560
                    "Deployment of resource type %s succeeded, but will fail in upcoming LocalStack releases unless CFN_IGNORE_UNSUPPORTED_RESOURCE_TYPES is explicitly enabled.",
561
                    resource_type,
562
                )
563
            event = ProgressEvent(
1✔
564
                OperationStatus.SUCCESS,
565
                resource_model={},
566
                message=f"Resource type {resource_type} is not supported but was deployed as a fallback",
567
            )
568
        else:
569
            log_not_available_message(
1✔
570
                resource_type,
571
                f'No resource provider found for "{resource_type}"',
572
            )
573
            event = ProgressEvent(
1✔
574
                OperationStatus.FAILED,
575
                resource_model={},
576
                message=f"Resource type {resource_type} not supported",
577
            )
578

579
        if part_of_replacement and action == ChangeAction.Remove:
1✔
580
            # Early return as we don't want to update internal state of the executor if this is a
581
            # cleanup of an old resource. The new resource has already been created and the state
582
            # updated
583
            return event
1✔
584

585
        status_from_action = EventOperationFromAction[action.value]
1✔
586
        resolved_resource = ResolvedResource(
1✔
587
            Properties=event.resource_model,
588
            LogicalResourceId=logical_resource_id,
589
            Type=resource_type,
590
            LastUpdatedTimestamp=datetime.now(UTC),
591
        )
592
        match event.status:
1✔
593
            case OperationStatus.SUCCESS:
1✔
594
                # merge the resources state with the external state
595
                # TODO: this is likely a duplicate of updating from extra_resource_properties
596

597
                # TODO: add typing
598
                # TODO: avoid the use of string literals for sampling from the object, use typed classes instead
599
                # TODO: avoid sampling from resources and use tmp var reference
600
                # TODO: add utils functions to abstract this logic away (resource.update(..))
601
                # TODO: avoid the use of setdefault (debuggability/readability)
602
                # TODO: review the use of merge
603

604
                # Don't update the resolved resources if we have deleted that resource
605
                if action != ChangeAction.Remove:
1✔
606
                    physical_resource_id = (
1✔
607
                        extra_resource_properties["PhysicalResourceId"]
608
                        if resource_provider
609
                        else MOCKED_REFERENCE
610
                    )
611
                    resolved_resource["PhysicalResourceId"] = physical_resource_id
1✔
612
                    resolved_resource["ResourceStatus"] = ResourceStatus(
1✔
613
                        f"{status_from_action}_COMPLETE"
614
                    )
615
                    # TODO: do we actually need this line?
616
                    resolved_resource.update(extra_resource_properties)
1✔
617
            case OperationStatus.FAILED:
1✔
618
                reason = event.message
1✔
619
                LOG.warning(
1✔
620
                    "Resource provider operation failed: '%s'",
621
                    reason,
622
                )
623
                resolved_resource["ResourceStatus"] = ResourceStatus(f"{status_from_action}_FAILED")
1✔
624
                resolved_resource["ResourceStatusReason"] = reason
1✔
625

626
                exception = event.custom_context.get("exception")
1✔
627
                emit_stack_failure(reason, exception=exception)
1✔
628

UNCOV
629
            case other:
×
630
                raise NotImplementedError(f"Event status '{other}' not handled")
631

632
        self.resources[logical_resource_id] = resolved_resource
1✔
633
        return event
1✔
634

635
    def create_resource_provider_payload(
1✔
636
        self,
637
        action: ChangeAction,
638
        logical_resource_id: str,
639
        resource_type: str,
640
        before_properties: PreprocProperties | None,
641
        after_properties: PreprocProperties | None,
642
    ) -> ResourceProviderPayload | None:
643
        # FIXME: use proper credentials
644
        creds: Credentials = {
1✔
645
            "accessKeyId": self._change_set.stack.account_id,
646
            "secretAccessKey": INTERNAL_AWS_SECRET_ACCESS_KEY,
647
            "sessionToken": "",
648
        }
649
        before_properties_value = before_properties.properties if before_properties else None
1✔
650
        after_properties_value = after_properties.properties if after_properties else None
1✔
651

652
        match action:
1✔
653
            case ChangeAction.Add:
1✔
654
                resource_properties = after_properties_value or {}
1✔
655
                previous_resource_properties = None
1✔
656
            case ChangeAction.Modify | ChangeAction.Dynamic:
1✔
657
                resource_properties = after_properties_value or {}
1✔
658
                previous_resource_properties = before_properties_value or {}
1✔
659
            case ChangeAction.Remove:
1✔
660
                resource_properties = before_properties_value or {}
1✔
661
                # previous_resource_properties = None
662
                # HACK: our providers use a mix of `desired_state` and `previous_state` so ensure the payload is present for both
663
                previous_resource_properties = resource_properties
1✔
UNCOV
664
            case _:
×
665
                raise NotImplementedError(f"Action '{action}' not handled")
666

667
        resource_provider_payload: ResourceProviderPayload = {
1✔
668
            "awsAccountId": self._change_set.stack.account_id,
669
            "callbackContext": {},
670
            "stackId": self._change_set.stack.stack_name,
671
            "resourceType": resource_type,
672
            "resourceTypeVersion": "000000",
673
            # TODO: not actually a UUID
674
            "bearerToken": str(uuid.uuid4()),
675
            "region": self._change_set.stack.region_name,
676
            "action": str(action),
677
            "requestData": {
678
                "logicalResourceId": logical_resource_id,
679
                "resourceProperties": resource_properties,
680
                "previousResourceProperties": previous_resource_properties,
681
                "callerCredentials": creds,
682
                "providerCredentials": creds,
683
                "systemTags": {},
684
                "previousSystemTags": {},
685
                "stackTags": {},
686
                "previousStackTags": {},
687
            },
688
        }
689
        return resource_provider_payload
1✔
690

691
    def _maybe_perform_on_delta(
1✔
692
        self, delta: PreprocEntityDelta, f: Callable[[_T], _T]
693
    ) -> PreprocEntityDelta:
694
        # we only care about the after state
695
        if isinstance(delta.after, str):
1✔
696
            delta.after = f(delta.after)
1✔
697
        return delta
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc