• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / a8850804-5a35-4cf0-98c0-1c624f795040

10 Mar 2025 11:34PM UTC coverage: 86.929% (+0.05%) from 86.877%
a8850804-5a35-4cf0-98c0-1c624f795040

push

circleci

web-flow
Update CODEOWNERS (#12359)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>

62138 of 71481 relevant lines covered (86.93%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.47
/localstack-core/localstack/services/stepfunctions/backend/execution.py
1
from __future__ import annotations
1✔
2

3
import datetime
1✔
4
import json
1✔
5
import logging
1✔
6
from typing import Final, Optional
1✔
7

8
from localstack.aws.api.events import PutEventsRequestEntry
1✔
9
from localstack.aws.api.stepfunctions import (
1✔
10
    Arn,
11
    CloudWatchEventsExecutionDataDetails,
12
    DescribeExecutionOutput,
13
    DescribeStateMachineForExecutionOutput,
14
    ExecutionListItem,
15
    ExecutionStatus,
16
    GetExecutionHistoryOutput,
17
    HistoryEventList,
18
    InvalidName,
19
    SensitiveCause,
20
    SensitiveError,
21
    StartExecutionOutput,
22
    StartSyncExecutionOutput,
23
    StateMachineType,
24
    SyncExecutionStatus,
25
    Timestamp,
26
    TraceHeader,
27
    VariableReferences,
28
)
29
from localstack.aws.connect import connect_to
1✔
30
from localstack.services.stepfunctions.asl.eval.evaluation_details import (
1✔
31
    AWSExecutionDetails,
32
    EvaluationDetails,
33
    ExecutionDetails,
34
    StateMachineDetails,
35
)
36
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
37
    CloudWatchLoggingSession,
38
)
39
from localstack.services.stepfunctions.asl.eval.program_state import (
1✔
40
    ProgramEnded,
41
    ProgramError,
42
    ProgramState,
43
    ProgramStopped,
44
    ProgramTimedOut,
45
)
46
from localstack.services.stepfunctions.asl.static_analyser.variable_references_static_analyser import (
1✔
47
    VariableReferencesStaticAnalyser,
48
)
49
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
50
from localstack.services.stepfunctions.backend.activity import Activity
1✔
51
from localstack.services.stepfunctions.backend.execution_worker import (
1✔
52
    ExecutionWorker,
53
    SyncExecutionWorker,
54
)
55
from localstack.services.stepfunctions.backend.execution_worker_comm import (
1✔
56
    ExecutionWorkerCommunication,
57
)
58
from localstack.services.stepfunctions.backend.state_machine import (
1✔
59
    StateMachineInstance,
60
    StateMachineVersion,
61
)
62

63
LOG = logging.getLogger(__name__)
1✔
64

65

66
class BaseExecutionWorkerCommunication(ExecutionWorkerCommunication):
1✔
67
    execution: Final[Execution]
1✔
68

69
    def __init__(self, execution: Execution):
1✔
70
        self.execution = execution
1✔
71

72
    def _reflect_execution_status(self):
1✔
73
        exit_program_state: ProgramState = self.execution.exec_worker.env.program_state()
1✔
74
        self.execution.stop_date = datetime.datetime.now(tz=datetime.timezone.utc)
1✔
75
        if isinstance(exit_program_state, ProgramEnded):
1✔
76
            self.execution.exec_status = ExecutionStatus.SUCCEEDED
1✔
77
            self.execution.output = self.execution.exec_worker.env.states.get_input()
1✔
78
        elif isinstance(exit_program_state, ProgramStopped):
1✔
79
            self.execution.exec_status = ExecutionStatus.ABORTED
1✔
80
        elif isinstance(exit_program_state, ProgramError):
1✔
81
            self.execution.exec_status = ExecutionStatus.FAILED
1✔
82
            self.execution.error = exit_program_state.error.get("error")
1✔
83
            self.execution.cause = exit_program_state.error.get("cause")
1✔
84
        elif isinstance(exit_program_state, ProgramTimedOut):
1✔
85
            self.execution.exec_status = ExecutionStatus.TIMED_OUT
1✔
86
        else:
87
            raise RuntimeWarning(
×
88
                f"Execution ended with unsupported ProgramState type '{type(exit_program_state)}'."
89
            )
90

91
    def terminated(self) -> None:
1✔
92
        self._reflect_execution_status()
1✔
93
        self.execution.publish_execution_status_change_event()
1✔
94

95

96
class Execution:
1✔
97
    name: Final[str]
1✔
98
    sm_type: Final[StateMachineType]
1✔
99
    role_arn: Final[Arn]
1✔
100
    exec_arn: Final[Arn]
1✔
101

102
    account_id: str
1✔
103
    region_name: str
1✔
104

105
    state_machine: Final[StateMachineInstance]
1✔
106
    state_machine_arn: Final[Arn]
1✔
107
    state_machine_version_arn: Final[Optional[Arn]]
1✔
108
    state_machine_alias_arn: Final[Optional[Arn]]
1✔
109

110
    start_date: Final[Timestamp]
1✔
111
    input_data: Final[Optional[json]]
1✔
112
    input_details: Final[Optional[CloudWatchEventsExecutionDataDetails]]
1✔
113
    trace_header: Final[Optional[TraceHeader]]
1✔
114
    _cloud_watch_logging_session: Final[Optional[CloudWatchLoggingSession]]
1✔
115

116
    exec_status: Optional[ExecutionStatus]
1✔
117
    stop_date: Optional[Timestamp]
1✔
118

119
    output: Optional[json]
1✔
120
    output_details: Optional[CloudWatchEventsExecutionDataDetails]
1✔
121

122
    error: Optional[SensitiveError]
1✔
123
    cause: Optional[SensitiveCause]
1✔
124

125
    exec_worker: Optional[ExecutionWorker]
1✔
126

127
    _activity_store: dict[Arn, Activity]
1✔
128

129
    def __init__(
1✔
130
        self,
131
        name: str,
132
        sm_type: StateMachineType,
133
        role_arn: Arn,
134
        exec_arn: Arn,
135
        account_id: str,
136
        region_name: str,
137
        state_machine: StateMachineInstance,
138
        start_date: Timestamp,
139
        cloud_watch_logging_session: Optional[CloudWatchLoggingSession],
140
        activity_store: dict[Arn, Activity],
141
        input_data: Optional[json] = None,
142
        trace_header: Optional[TraceHeader] = None,
143
        state_machine_alias_arn: Optional[Arn] = None,
144
    ):
145
        self.name = name
1✔
146
        self.sm_type = sm_type
1✔
147
        self.role_arn = role_arn
1✔
148
        self.exec_arn = exec_arn
1✔
149
        self.account_id = account_id
1✔
150
        self.region_name = region_name
1✔
151
        self.state_machine = state_machine
1✔
152
        if isinstance(state_machine, StateMachineVersion):
1✔
153
            self.state_machine_arn = state_machine.source_arn
1✔
154
            self.state_machine_version_arn = state_machine.arn
1✔
155
        else:
156
            self.state_machine_arn = state_machine.arn
1✔
157
            self.state_machine_version_arn = None
1✔
158
        self.state_machine_alias_arn = state_machine_alias_arn
1✔
159
        self.start_date = start_date
1✔
160
        self._cloud_watch_logging_session = cloud_watch_logging_session
1✔
161
        self.input_data = input_data
1✔
162
        self.input_details = CloudWatchEventsExecutionDataDetails(included=True)
1✔
163
        self.trace_header = trace_header
1✔
164
        self.exec_status = None
1✔
165
        self.stop_date = None
1✔
166
        self.output = None
1✔
167
        self.output_details = CloudWatchEventsExecutionDataDetails(included=True)
1✔
168
        self.exec_worker = None
1✔
169
        self.error = None
1✔
170
        self.cause = None
1✔
171
        self._activity_store = activity_store
1✔
172

173
    def _get_events_client(self):
1✔
174
        return connect_to(aws_access_key_id=self.account_id, region_name=self.region_name).events
1✔
175

176
    def to_start_output(self) -> StartExecutionOutput:
1✔
177
        return StartExecutionOutput(executionArn=self.exec_arn, startDate=self.start_date)
1✔
178

179
    def to_describe_output(self) -> DescribeExecutionOutput:
1✔
180
        describe_output = DescribeExecutionOutput(
1✔
181
            executionArn=self.exec_arn,
182
            stateMachineArn=self.state_machine_arn,
183
            name=self.name,
184
            status=self.exec_status,
185
            startDate=self.start_date,
186
            stopDate=self.stop_date,
187
            input=to_json_str(self.input_data, separators=(",", ":")),
188
            inputDetails=self.input_details,
189
            traceHeader=self.trace_header,
190
        )
191
        if describe_output["status"] == ExecutionStatus.SUCCEEDED:
1✔
192
            describe_output["output"] = to_json_str(self.output, separators=(",", ":"))
1✔
193
            describe_output["outputDetails"] = self.output_details
1✔
194
        if self.error is not None:
1✔
195
            describe_output["error"] = self.error
1✔
196
        if self.cause is not None:
1✔
197
            describe_output["cause"] = self.cause
1✔
198
        if self.state_machine_version_arn is not None:
1✔
199
            describe_output["stateMachineVersionArn"] = self.state_machine_version_arn
1✔
200
        if self.state_machine_alias_arn is not None:
1✔
201
            describe_output["stateMachineAliasArn"] = self.state_machine_alias_arn
1✔
202
        return describe_output
1✔
203

204
    def to_describe_state_machine_for_execution_output(
1✔
205
        self,
206
    ) -> DescribeStateMachineForExecutionOutput:
207
        state_machine: StateMachineInstance = self.state_machine
1✔
208
        state_machine_arn = (
1✔
209
            state_machine.source_arn
210
            if isinstance(state_machine, StateMachineVersion)
211
            else state_machine.arn
212
        )
213
        out = DescribeStateMachineForExecutionOutput(
1✔
214
            stateMachineArn=state_machine_arn,
215
            name=state_machine.name,
216
            definition=state_machine.definition,
217
            roleArn=self.role_arn,
218
            # The date and time the state machine associated with an execution was updated.
219
            updateDate=state_machine.create_date,
220
            loggingConfiguration=state_machine.logging_config,
221
        )
222
        revision_id = self.state_machine.revision_id
1✔
223
        if self.state_machine.revision_id:
1✔
224
            out["revisionId"] = revision_id
1✔
225
        variable_references: VariableReferences = VariableReferencesStaticAnalyser.process_and_get(
1✔
226
            definition=self.state_machine.definition
227
        )
228
        if variable_references:
1✔
229
            out["variableReferences"] = variable_references
1✔
230
        return out
1✔
231

232
    def to_execution_list_item(self) -> ExecutionListItem:
1✔
233
        if isinstance(self.state_machine, StateMachineVersion):
1✔
234
            state_machine_arn = self.state_machine.source_arn
1✔
235
            state_machine_version_arn = self.state_machine.arn
1✔
236
        else:
237
            state_machine_arn = self.state_machine.arn
1✔
238
            state_machine_version_arn = None
1✔
239

240
        item = ExecutionListItem(
1✔
241
            executionArn=self.exec_arn,
242
            stateMachineArn=state_machine_arn,
243
            name=self.name,
244
            status=self.exec_status,
245
            startDate=self.start_date,
246
            stopDate=self.stop_date,
247
        )
248
        if state_machine_version_arn is not None:
1✔
249
            item["stateMachineVersionArn"] = state_machine_version_arn
1✔
250
        if self.state_machine_alias_arn is not None:
1✔
251
            item["stateMachineAliasArn"] = self.state_machine_alias_arn
1✔
252
        return item
1✔
253

254
    def to_history_output(self) -> GetExecutionHistoryOutput:
1✔
255
        env = self.exec_worker.env
1✔
256
        event_history: HistoryEventList = list()
1✔
257
        if env is not None:
1✔
258
            # The execution has not started yet.
259
            event_history: HistoryEventList = env.event_manager.get_event_history()
1✔
260
        return GetExecutionHistoryOutput(events=event_history)
1✔
261

262
    @staticmethod
1✔
263
    def _to_serialized_date(timestamp: datetime.datetime) -> str:
1✔
264
        """See test in tests.aws.services.stepfunctions.v2.base.test_base.TestSnfBase.test_execution_dateformat"""
265
        return (
1✔
266
            f"{timestamp.astimezone(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]}Z"
267
        )
268

269
    def _get_start_execution_worker_comm(self) -> BaseExecutionWorkerCommunication:
1✔
270
        return BaseExecutionWorkerCommunication(self)
1✔
271

272
    def _get_start_aws_execution_details(self) -> AWSExecutionDetails:
1✔
273
        return AWSExecutionDetails(
1✔
274
            account=self.account_id, region=self.region_name, role_arn=self.role_arn
275
        )
276

277
    def get_start_execution_details(self) -> ExecutionDetails:
1✔
278
        return ExecutionDetails(
1✔
279
            arn=self.exec_arn,
280
            name=self.name,
281
            role_arn=self.role_arn,
282
            inpt=self.input_data,
283
            start_time=self._to_serialized_date(self.start_date),
284
        )
285

286
    def get_start_state_machine_details(self) -> StateMachineDetails:
1✔
287
        return StateMachineDetails(
1✔
288
            arn=self.state_machine.arn,
289
            name=self.state_machine.name,
290
            typ=self.state_machine.sm_type,
291
            definition=self.state_machine.definition,
292
        )
293

294
    def _get_start_execution_worker(self) -> ExecutionWorker:
1✔
295
        return ExecutionWorker(
1✔
296
            evaluation_details=EvaluationDetails(
297
                aws_execution_details=self._get_start_aws_execution_details(),
298
                execution_details=self.get_start_execution_details(),
299
                state_machine_details=self.get_start_state_machine_details(),
300
            ),
301
            exec_comm=self._get_start_execution_worker_comm(),
302
            cloud_watch_logging_session=self._cloud_watch_logging_session,
303
            activity_store=self._activity_store,
304
        )
305

306
    def start(self) -> None:
1✔
307
        # TODO: checks exec_worker does not exists already?
308
        if self.exec_worker:
1✔
309
            raise InvalidName()  # TODO.
×
310
        self.exec_worker = self._get_start_execution_worker()
1✔
311
        self.exec_status = ExecutionStatus.RUNNING
1✔
312
        self.publish_execution_status_change_event()
1✔
313
        self.exec_worker.start()
1✔
314

315
    def stop(self, stop_date: datetime.datetime, error: Optional[str], cause: Optional[str]):
1✔
316
        exec_worker: Optional[ExecutionWorker] = self.exec_worker
1✔
317
        if exec_worker:
1✔
318
            exec_worker.stop(stop_date=stop_date, cause=cause, error=error)
1✔
319

320
    def publish_execution_status_change_event(self):
1✔
321
        input_value = (
1✔
322
            dict() if not self.input_data else to_json_str(self.input_data, separators=(",", ":"))
323
        )
324
        output_value = (
1✔
325
            None if self.output is None else to_json_str(self.output, separators=(",", ":"))
326
        )
327
        output_details = None if output_value is None else self.output_details
1✔
328
        entry = PutEventsRequestEntry(
1✔
329
            Source="aws.states",
330
            Resources=[self.exec_arn],
331
            DetailType="Step Functions Execution Status Change",
332
            Detail=to_json_str(
333
                # Note: this operation carries significant changes from a describe_execution request.
334
                DescribeExecutionOutput(
335
                    executionArn=self.exec_arn,
336
                    stateMachineArn=self.state_machine.arn,
337
                    stateMachineAliasArn=None,
338
                    stateMachineVersionArn=None,
339
                    name=self.name,
340
                    status=self.exec_status,
341
                    startDate=self.start_date,
342
                    stopDate=self.stop_date,
343
                    input=input_value,
344
                    inputDetails=self.input_details,
345
                    output=output_value,
346
                    outputDetails=output_details,
347
                    error=self.error,
348
                    cause=self.cause,
349
                )
350
            ),
351
        )
352
        try:
1✔
353
            self._get_events_client().put_events(Entries=[entry])
1✔
354
        except Exception:
×
355
            LOG.exception(
×
356
                "Unable to send notification of Entry='%s' for Step Function execution with Arn='%s' to EventBridge.",
357
                entry,
358
                self.exec_arn,
359
            )
360

361

362
class SyncExecutionWorkerCommunication(BaseExecutionWorkerCommunication):
1✔
363
    execution: Final[SyncExecution]
1✔
364

365
    def _reflect_execution_status(self) -> None:
1✔
366
        super()._reflect_execution_status()
1✔
367
        exit_status: ExecutionStatus = self.execution.exec_status
1✔
368
        if exit_status == ExecutionStatus.SUCCEEDED:
1✔
369
            self.execution.sync_execution_status = SyncExecutionStatus.SUCCEEDED
1✔
370
        elif exit_status == ExecutionStatus.TIMED_OUT:
1✔
371
            self.execution.sync_execution_status = SyncExecutionStatus.TIMED_OUT
×
372
        else:
373
            self.execution.sync_execution_status = SyncExecutionStatus.FAILED
1✔
374

375

376
class SyncExecution(Execution):
1✔
377
    sync_execution_status: Optional[SyncExecutionStatus] = None
1✔
378

379
    def _get_start_execution_worker(self) -> SyncExecutionWorker:
1✔
380
        return SyncExecutionWorker(
1✔
381
            evaluation_details=EvaluationDetails(
382
                aws_execution_details=self._get_start_aws_execution_details(),
383
                execution_details=self.get_start_execution_details(),
384
                state_machine_details=self.get_start_state_machine_details(),
385
            ),
386
            exec_comm=self._get_start_execution_worker_comm(),
387
            cloud_watch_logging_session=self._cloud_watch_logging_session,
388
            activity_store=self._activity_store,
389
        )
390

391
    def _get_start_execution_worker_comm(self) -> BaseExecutionWorkerCommunication:
1✔
392
        return SyncExecutionWorkerCommunication(self)
1✔
393

394
    def to_start_sync_execution_output(self) -> StartSyncExecutionOutput:
1✔
395
        start_output = StartSyncExecutionOutput(
1✔
396
            executionArn=self.exec_arn,
397
            stateMachineArn=self.state_machine.arn,
398
            name=self.name,
399
            status=self.sync_execution_status,
400
            startDate=self.start_date,
401
            stopDate=self.stop_date,
402
            input=to_json_str(self.input_data, separators=(",", ":")),
403
            inputDetails=self.input_details,
404
            traceHeader=self.trace_header,
405
        )
406
        if self.sync_execution_status == SyncExecutionStatus.SUCCEEDED:
1✔
407
            start_output["output"] = to_json_str(self.output, separators=(",", ":"))
1✔
408
        if self.output_details:
1✔
409
            start_output["outputDetails"] = self.output_details
1✔
410
        if self.error is not None:
1✔
411
            start_output["error"] = self.error
1✔
412
        if self.cause is not None:
1✔
413
            start_output["cause"] = self.cause
1✔
414
        return start_output
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc