• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20014894797

06 Dec 2025 01:32PM UTC coverage: 86.869% (-0.04%) from 86.904%
20014894797

push

github

web-flow
CFn: handle updates with empty resource properties (#13471)

2 of 2 new or added lines in 1 file covered. (100.0%)

419 existing lines in 20 files now uncovered.

69891 of 80456 relevant lines covered (86.87%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/localstack-core/localstack/services/stepfunctions/backend/execution.py
1
from __future__ import annotations
1✔
2

3
import datetime
1✔
4
import logging
1✔
5
from typing import Any, Final
1✔
6

7
from localstack.aws.api.events import PutEventsRequestEntry
1✔
8
from localstack.aws.api.stepfunctions import (
1✔
9
    Arn,
10
    CloudWatchEventsExecutionDataDetails,
11
    DescribeExecutionOutput,
12
    DescribeStateMachineForExecutionOutput,
13
    ExecutionListItem,
14
    ExecutionStatus,
15
    GetExecutionHistoryOutput,
16
    HistoryEventList,
17
    InvalidName,
18
    SensitiveCause,
19
    SensitiveError,
20
    StartExecutionOutput,
21
    StartSyncExecutionOutput,
22
    StateMachineType,
23
    SyncExecutionStatus,
24
    Timestamp,
25
    TraceHeader,
26
    VariableReferences,
27
)
28
from localstack.aws.connect import connect_to
1✔
29
from localstack.services.stepfunctions.asl.eval.evaluation_details import (
1✔
30
    AWSExecutionDetails,
31
    EvaluationDetails,
32
    ExecutionDetails,
33
    StateMachineDetails,
34
)
35
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
36
    CloudWatchLoggingSession,
37
)
38
from localstack.services.stepfunctions.asl.eval.program_state import (
1✔
39
    ProgramEnded,
40
    ProgramError,
41
    ProgramState,
42
    ProgramStopped,
43
    ProgramTimedOut,
44
)
45
from localstack.services.stepfunctions.asl.static_analyser.variable_references_static_analyser import (
1✔
46
    VariableReferencesStaticAnalyser,
47
)
48
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
49
from localstack.services.stepfunctions.backend.activity import Activity
1✔
50
from localstack.services.stepfunctions.backend.execution_worker import (
1✔
51
    ExecutionWorker,
52
    SyncExecutionWorker,
53
)
54
from localstack.services.stepfunctions.backend.execution_worker_comm import (
1✔
55
    ExecutionWorkerCommunication,
56
)
57
from localstack.services.stepfunctions.backend.state_machine import (
1✔
58
    StateMachineInstance,
59
    StateMachineVersion,
60
)
61
from localstack.services.stepfunctions.local_mocking.mock_config import LocalMockTestCase
1✔
62

63
LOG = logging.getLogger(__name__)
1✔
64

65

66
class BaseExecutionWorkerCommunication(ExecutionWorkerCommunication):
1✔
67
    execution: Final[Execution]
1✔
68

69
    def __init__(self, execution: Execution):
1✔
70
        self.execution = execution
1✔
71

72
    def _reflect_execution_status(self):
1✔
73
        exit_program_state: ProgramState = self.execution.exec_worker.env.program_state()
1✔
74
        self.execution.stop_date = datetime.datetime.now(tz=datetime.UTC)
1✔
75
        if isinstance(exit_program_state, ProgramEnded):
1✔
76
            self.execution.exec_status = ExecutionStatus.SUCCEEDED
1✔
77
            self.execution.output = self.execution.exec_worker.env.states.get_input()
1✔
78
        elif isinstance(exit_program_state, ProgramStopped):
1✔
79
            self.execution.exec_status = ExecutionStatus.ABORTED
1✔
80
        elif isinstance(exit_program_state, ProgramError):
1✔
81
            self.execution.exec_status = ExecutionStatus.FAILED
1✔
82
            self.execution.error = exit_program_state.error.get("error")
1✔
83
            self.execution.cause = exit_program_state.error.get("cause")
1✔
84
        elif isinstance(exit_program_state, ProgramTimedOut):
1✔
85
            self.execution.exec_status = ExecutionStatus.TIMED_OUT
1✔
86
        else:
UNCOV
87
            raise RuntimeWarning(
×
88
                f"Execution ended with unsupported ProgramState type '{type(exit_program_state)}'."
89
            )
90

91
    def terminated(self) -> None:
1✔
92
        self._reflect_execution_status()
1✔
93
        self.execution.publish_execution_status_change_event()
1✔
94

95

96
class Execution:
1✔
97
    name: Final[str]
1✔
98
    sm_type: Final[StateMachineType]
1✔
99
    role_arn: Final[Arn]
1✔
100
    exec_arn: Final[Arn]
1✔
101

102
    account_id: str
1✔
103
    region_name: str
1✔
104

105
    state_machine: Final[StateMachineInstance]
1✔
106
    state_machine_arn: Final[Arn]
1✔
107
    state_machine_version_arn: Final[Arn | None]
1✔
108
    state_machine_alias_arn: Final[Arn | None]
1✔
109

110
    local_mock_test_case: Final[LocalMockTestCase | None]
1✔
111

112
    start_date: Final[Timestamp]
1✔
113
    input_data: Final[dict[str, Any] | None]
1✔
114
    input_details: Final[CloudWatchEventsExecutionDataDetails | None]
1✔
115
    trace_header: Final[TraceHeader | None]
1✔
116
    _cloud_watch_logging_session: Final[CloudWatchLoggingSession | None]
1✔
117

118
    exec_status: ExecutionStatus | None
1✔
119
    stop_date: Timestamp | None
1✔
120

121
    output: dict[str, Any] | None
1✔
122
    output_details: CloudWatchEventsExecutionDataDetails | None
1✔
123

124
    error: SensitiveError | None
1✔
125
    cause: SensitiveCause | None
1✔
126

127
    exec_worker: ExecutionWorker | None
1✔
128

129
    _activity_store: dict[Arn, Activity]
1✔
130

131
    def __init__(
1✔
132
        self,
133
        name: str,
134
        sm_type: StateMachineType,
135
        role_arn: Arn,
136
        exec_arn: Arn,
137
        account_id: str,
138
        region_name: str,
139
        state_machine: StateMachineInstance,
140
        start_date: Timestamp,
141
        cloud_watch_logging_session: CloudWatchLoggingSession | None,
142
        activity_store: dict[Arn, Activity],
143
        input_data: dict[str, Any] | None = None,
144
        trace_header: TraceHeader | None = None,
145
        state_machine_alias_arn: Arn | None = None,
146
        local_mock_test_case: LocalMockTestCase | None = None,
147
    ):
148
        self.name = name
1✔
149
        self.sm_type = sm_type
1✔
150
        self.role_arn = role_arn
1✔
151
        self.exec_arn = exec_arn
1✔
152
        self.account_id = account_id
1✔
153
        self.region_name = region_name
1✔
154
        self.state_machine = state_machine
1✔
155
        if isinstance(state_machine, StateMachineVersion):
1✔
156
            self.state_machine_arn = state_machine.source_arn
1✔
157
            self.state_machine_version_arn = state_machine.arn
1✔
158
        else:
159
            self.state_machine_arn = state_machine.arn
1✔
160
            self.state_machine_version_arn = None
1✔
161
        self.state_machine_alias_arn = state_machine_alias_arn
1✔
162
        self.start_date = start_date
1✔
163
        self._cloud_watch_logging_session = cloud_watch_logging_session
1✔
164
        self.input_data = input_data
1✔
165
        self.input_details = CloudWatchEventsExecutionDataDetails(included=True)
1✔
166
        self.trace_header = trace_header
1✔
167
        self.exec_status = None
1✔
168
        self.stop_date = None
1✔
169
        self.output = None
1✔
170
        self.output_details = CloudWatchEventsExecutionDataDetails(included=True)
1✔
171
        self.exec_worker = None
1✔
172
        self.error = None
1✔
173
        self.cause = None
1✔
174
        self._activity_store = activity_store
1✔
175
        self.local_mock_test_case = local_mock_test_case
1✔
176

177
    def _get_events_client(self):
1✔
178
        return connect_to(aws_access_key_id=self.account_id, region_name=self.region_name).events
1✔
179

180
    def to_start_output(self) -> StartExecutionOutput:
1✔
181
        return StartExecutionOutput(executionArn=self.exec_arn, startDate=self.start_date)
1✔
182

183
    def to_describe_output(self) -> DescribeExecutionOutput:
1✔
184
        describe_output = DescribeExecutionOutput(
1✔
185
            executionArn=self.exec_arn,
186
            stateMachineArn=self.state_machine_arn,
187
            name=self.name,
188
            status=self.exec_status,
189
            startDate=self.start_date,
190
            stopDate=self.stop_date,
191
            input=to_json_str(self.input_data, separators=(",", ":")),
192
            inputDetails=self.input_details,
193
            traceHeader=self.trace_header,
194
        )
195
        if describe_output["status"] == ExecutionStatus.SUCCEEDED:
1✔
196
            describe_output["output"] = to_json_str(self.output, separators=(",", ":"))
1✔
197
            describe_output["outputDetails"] = self.output_details
1✔
198
        if self.error is not None:
1✔
199
            describe_output["error"] = self.error
1✔
200
        if self.cause is not None:
1✔
201
            describe_output["cause"] = self.cause
1✔
202
        if self.state_machine_version_arn is not None:
1✔
203
            describe_output["stateMachineVersionArn"] = self.state_machine_version_arn
1✔
204
        if self.state_machine_alias_arn is not None:
1✔
205
            describe_output["stateMachineAliasArn"] = self.state_machine_alias_arn
1✔
206
        return describe_output
1✔
207

208
    def to_describe_state_machine_for_execution_output(
1✔
209
        self,
210
    ) -> DescribeStateMachineForExecutionOutput:
211
        state_machine: StateMachineInstance = self.state_machine
1✔
212
        state_machine_arn = (
1✔
213
            state_machine.source_arn
214
            if isinstance(state_machine, StateMachineVersion)
215
            else state_machine.arn
216
        )
217
        out = DescribeStateMachineForExecutionOutput(
1✔
218
            stateMachineArn=state_machine_arn,
219
            name=state_machine.name,
220
            definition=state_machine.definition,
221
            roleArn=self.role_arn,
222
            # The date and time the state machine associated with an execution was updated.
223
            updateDate=state_machine.create_date,
224
            loggingConfiguration=state_machine.logging_config,
225
        )
226
        revision_id = self.state_machine.revision_id
1✔
227
        if self.state_machine.revision_id:
1✔
228
            out["revisionId"] = revision_id
1✔
229
        variable_references: VariableReferences = VariableReferencesStaticAnalyser.process_and_get(
1✔
230
            definition=self.state_machine.definition
231
        )
232
        if variable_references:
1✔
233
            out["variableReferences"] = variable_references
1✔
234
        return out
1✔
235

236
    def to_execution_list_item(self) -> ExecutionListItem:
1✔
237
        if isinstance(self.state_machine, StateMachineVersion):
1✔
238
            state_machine_arn = self.state_machine.source_arn
1✔
239
            state_machine_version_arn = self.state_machine.arn
1✔
240
        else:
241
            state_machine_arn = self.state_machine.arn
1✔
242
            state_machine_version_arn = None
1✔
243

244
        item = ExecutionListItem(
1✔
245
            executionArn=self.exec_arn,
246
            stateMachineArn=state_machine_arn,
247
            name=self.name,
248
            status=self.exec_status,
249
            startDate=self.start_date,
250
            stopDate=self.stop_date,
251
        )
252
        if state_machine_version_arn is not None:
1✔
253
            item["stateMachineVersionArn"] = state_machine_version_arn
1✔
254
        if self.state_machine_alias_arn is not None:
1✔
255
            item["stateMachineAliasArn"] = self.state_machine_alias_arn
1✔
256
        return item
1✔
257

258
    def to_history_output(self) -> GetExecutionHistoryOutput:
1✔
259
        env = self.exec_worker.env
1✔
260
        event_history: HistoryEventList = []
1✔
261
        if env is not None:
1✔
262
            # The execution has not started yet.
263
            event_history: HistoryEventList = env.event_manager.get_event_history()
1✔
264
        return GetExecutionHistoryOutput(events=event_history)
1✔
265

266
    @staticmethod
1✔
267
    def _to_serialized_date(timestamp: datetime.datetime) -> str:
1✔
268
        """See test in tests.aws.services.stepfunctions.v2.base.test_base.TestSnfBase.test_execution_dateformat"""
269
        return f"{timestamp.astimezone(datetime.UTC).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z"
1✔
270

271
    def _get_start_execution_worker_comm(self) -> BaseExecutionWorkerCommunication:
1✔
272
        return BaseExecutionWorkerCommunication(self)
1✔
273

274
    def _get_start_aws_execution_details(self) -> AWSExecutionDetails:
1✔
275
        return AWSExecutionDetails(
1✔
276
            account=self.account_id, region=self.region_name, role_arn=self.role_arn
277
        )
278

279
    def get_start_execution_details(self) -> ExecutionDetails:
1✔
280
        return ExecutionDetails(
1✔
281
            arn=self.exec_arn,
282
            name=self.name,
283
            role_arn=self.role_arn,
284
            inpt=self.input_data,
285
            start_time=self._to_serialized_date(self.start_date),
286
        )
287

288
    def get_start_state_machine_details(self) -> StateMachineDetails:
1✔
289
        return StateMachineDetails(
1✔
290
            arn=self.state_machine.arn,
291
            name=self.state_machine.name,
292
            typ=self.state_machine.sm_type,
293
            definition=self.state_machine.definition,
294
        )
295

296
    def _get_start_execution_worker(self) -> ExecutionWorker:
1✔
297
        return ExecutionWorker(
1✔
298
            evaluation_details=EvaluationDetails(
299
                aws_execution_details=self._get_start_aws_execution_details(),
300
                execution_details=self.get_start_execution_details(),
301
                state_machine_details=self.get_start_state_machine_details(),
302
            ),
303
            exec_comm=self._get_start_execution_worker_comm(),
304
            cloud_watch_logging_session=self._cloud_watch_logging_session,
305
            activity_store=self._activity_store,
306
            local_mock_test_case=self.local_mock_test_case,
307
        )
308

309
    def start(self) -> None:
1✔
310
        # TODO: checks exec_worker does not exists already?
311
        if self.exec_worker:
1✔
UNCOV
312
            raise InvalidName()  # TODO.
×
313
        self.exec_worker = self._get_start_execution_worker()
1✔
314
        self.exec_status = ExecutionStatus.RUNNING
1✔
315
        self.publish_execution_status_change_event()
1✔
316
        self.exec_worker.start()
1✔
317

318
    def stop(self, stop_date: datetime.datetime, error: str | None, cause: str | None):
1✔
319
        exec_worker: ExecutionWorker | None = self.exec_worker
1✔
320
        if exec_worker:
1✔
321
            exec_worker.stop(stop_date=stop_date, cause=cause, error=error)
1✔
322

323
    def publish_execution_status_change_event(self):
1✔
324
        input_value = (
1✔
325
            {} if not self.input_data else to_json_str(self.input_data, separators=(",", ":"))
326
        )
327
        output_value = (
1✔
328
            None if self.output is None else to_json_str(self.output, separators=(",", ":"))
329
        )
330
        output_details = None if output_value is None else self.output_details
1✔
331
        entry = PutEventsRequestEntry(
1✔
332
            Source="aws.states",
333
            Resources=[self.exec_arn],
334
            DetailType="Step Functions Execution Status Change",
335
            Detail=to_json_str(
336
                # Note: this operation carries significant changes from a describe_execution request.
337
                DescribeExecutionOutput(
338
                    executionArn=self.exec_arn,
339
                    stateMachineArn=self.state_machine.arn,
340
                    stateMachineAliasArn=None,
341
                    stateMachineVersionArn=None,
342
                    name=self.name,
343
                    status=self.exec_status,
344
                    startDate=self.start_date,
345
                    stopDate=self.stop_date,
346
                    input=input_value,
347
                    inputDetails=self.input_details,
348
                    output=output_value,
349
                    outputDetails=output_details,
350
                    error=self.error,
351
                    cause=self.cause,
352
                )
353
            ),
354
        )
355
        try:
1✔
356
            self._get_events_client().put_events(Entries=[entry])
1✔
UNCOV
357
        except Exception:
×
358
            LOG.error(
×
359
                "Unable to send notification of Entry='%s' for Step Function execution with Arn='%s' to EventBridge.",
360
                entry,
361
                self.exec_arn,
362
                exc_info=LOG.isEnabledFor(logging.DEBUG),
363
            )
364

365

366
class SyncExecutionWorkerCommunication(BaseExecutionWorkerCommunication):
1✔
367
    execution: Final[SyncExecution]
1✔
368

369
    def _reflect_execution_status(self) -> None:
1✔
370
        super()._reflect_execution_status()
1✔
371
        exit_status: ExecutionStatus = self.execution.exec_status
1✔
372
        if exit_status == ExecutionStatus.SUCCEEDED:
1✔
373
            self.execution.sync_execution_status = SyncExecutionStatus.SUCCEEDED
1✔
374
        elif exit_status == ExecutionStatus.TIMED_OUT:
1✔
UNCOV
375
            self.execution.sync_execution_status = SyncExecutionStatus.TIMED_OUT
×
376
        else:
377
            self.execution.sync_execution_status = SyncExecutionStatus.FAILED
1✔
378

379

380
class SyncExecution(Execution):
1✔
381
    sync_execution_status: SyncExecutionStatus | None = None
1✔
382

383
    def _get_start_execution_worker(self) -> SyncExecutionWorker:
1✔
384
        return SyncExecutionWorker(
1✔
385
            evaluation_details=EvaluationDetails(
386
                aws_execution_details=self._get_start_aws_execution_details(),
387
                execution_details=self.get_start_execution_details(),
388
                state_machine_details=self.get_start_state_machine_details(),
389
            ),
390
            exec_comm=self._get_start_execution_worker_comm(),
391
            cloud_watch_logging_session=self._cloud_watch_logging_session,
392
            activity_store=self._activity_store,
393
            local_mock_test_case=self.local_mock_test_case,
394
        )
395

396
    def _get_start_execution_worker_comm(self) -> BaseExecutionWorkerCommunication:
1✔
397
        return SyncExecutionWorkerCommunication(self)
1✔
398

399
    def to_start_sync_execution_output(self) -> StartSyncExecutionOutput:
1✔
400
        start_output = StartSyncExecutionOutput(
1✔
401
            executionArn=self.exec_arn,
402
            stateMachineArn=self.state_machine.arn,
403
            name=self.name,
404
            status=self.sync_execution_status,
405
            startDate=self.start_date,
406
            stopDate=self.stop_date,
407
            input=to_json_str(self.input_data, separators=(",", ":")),
408
            inputDetails=self.input_details,
409
            traceHeader=self.trace_header,
410
        )
411
        if self.sync_execution_status == SyncExecutionStatus.SUCCEEDED:
1✔
412
            start_output["output"] = to_json_str(self.output, separators=(",", ":"))
1✔
413
        if self.output_details:
1✔
414
            start_output["outputDetails"] = self.output_details
1✔
415
        if self.error is not None:
1✔
416
            start_output["error"] = self.error
1✔
417
        if self.cause is not None:
1✔
418
            start_output["cause"] = self.cause
1✔
419
        return start_output
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc