• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/localstack-core/localstack/services/stepfunctions/asl/component/program/program.py
1
import logging
1✔
2
import threading
1✔
3
from typing import Final
1✔
4

5
from localstack.aws.api.stepfunctions import (
1✔
6
    ExecutionAbortedEventDetails,
7
    ExecutionFailedEventDetails,
8
    ExecutionSucceededEventDetails,
9
    ExecutionTimedOutEventDetails,
10
    HistoryEventExecutionDataDetails,
11
    HistoryEventType,
12
)
13
from localstack.services.stepfunctions.asl.component.common.comment import Comment
1✔
14
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
1✔
15
    FailureEventException,
16
)
17
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name import (
1✔
18
    StatesErrorName,
19
)
20
from localstack.services.stepfunctions.asl.component.common.error_name.states_error_name_type import (
1✔
21
    StatesErrorNameType,
22
)
23
from localstack.services.stepfunctions.asl.component.common.flow.start_at import StartAt
1✔
24
from localstack.services.stepfunctions.asl.component.common.query_language import QueryLanguage
1✔
25
from localstack.services.stepfunctions.asl.component.common.timeouts.timeout import TimeoutSeconds
1✔
26
from localstack.services.stepfunctions.asl.component.eval_component import EvalComponent
1✔
27
from localstack.services.stepfunctions.asl.component.program.states import States
1✔
28
from localstack.services.stepfunctions.asl.component.program.version import Version
1✔
29
from localstack.services.stepfunctions.asl.component.state.state import CommonStateField
1✔
30
from localstack.services.stepfunctions.asl.eval.environment import Environment
1✔
31
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
1✔
32
from localstack.services.stepfunctions.asl.eval.program_state import (
1✔
33
    ProgramEnded,
34
    ProgramError,
35
    ProgramState,
36
    ProgramStopped,
37
    ProgramTimedOut,
38
)
39
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
40
from localstack.utils.collections import select_from_typed_dict
1✔
41
from localstack.utils.threads import TMP_THREADS
1✔
42

43
LOG = logging.getLogger(__name__)
1✔
44

45

46
class Program(EvalComponent):
1✔
47
    query_language: Final[QueryLanguage]
1✔
48
    start_at: Final[StartAt]
1✔
49
    states: Final[States]
1✔
50
    timeout_seconds: Final[TimeoutSeconds | None]
1✔
51
    comment: Final[Comment | None]
1✔
52
    version: Final[Version | None]
1✔
53

54
    def __init__(
1✔
55
        self,
56
        query_language: QueryLanguage,
57
        start_at: StartAt,
58
        states: States,
59
        timeout_seconds: TimeoutSeconds | None,
60
        comment: Comment | None = None,
61
        version: Version | None = None,
62
    ):
63
        self.query_language = query_language
1✔
64
        self.start_at = start_at
1✔
65
        self.states = states
1✔
66
        self.timeout_seconds = timeout_seconds
1✔
67
        self.comment = comment
1✔
68
        self.version = version
1✔
69

70
    def _get_state(self, state_name: str) -> CommonStateField:
1✔
71
        state: CommonStateField | None = self.states.states.get(state_name, None)
1✔
72
        if state is None:
1✔
73
            raise ValueError(f"No such state {state}.")
×
74
        return state
1✔
75

76
    def eval(self, env: Environment) -> None:
1✔
77
        timeout = self.timeout_seconds.timeout_seconds if self.timeout_seconds else None
1✔
78
        env.next_state_name = self.start_at.start_at_name
1✔
79
        worker_thread = threading.Thread(target=super().eval, args=(env,), daemon=True)
1✔
80
        TMP_THREADS.append(worker_thread)
1✔
81
        worker_thread.start()
1✔
82
        worker_thread.join(timeout=timeout)
1✔
83
        is_timeout = worker_thread.is_alive()
1✔
84
        if is_timeout:
1✔
85
            env.set_timed_out()
1✔
86

87
    def _eval_body(self, env: Environment) -> None:
1✔
88
        try:
1✔
89
            while env.is_running():
1✔
90
                next_state: CommonStateField = self._get_state(env.next_state_name)
1✔
91
                next_state.eval(env)
1✔
92
                # Garbage collect hanging values added by this last state.
93
                env.stack.clear()
1✔
94
                env.heap.clear()
1✔
95
        except FailureEventException as ex:
1✔
96
            env.set_error(error=ex.get_execution_failed_event_details())
1✔
97
        except Exception as ex:
×
98
            cause = f"{type(ex).__name__}({str(ex)})"
×
99
            LOG.error("Stepfunctions computation ended with exception '%s'.", cause)
×
100
            env.set_error(
×
101
                ExecutionFailedEventDetails(
102
                    error=StatesErrorName(typ=StatesErrorNameType.StatesRuntime).error_name,
103
                    cause=cause,
104
                )
105
            )
106

107
        # If the program is evaluating within a frames then these are not allowed to produce program termination states.
108
        if env.is_frame():
1✔
109
            return
1✔
110

111
        program_state: ProgramState = env.program_state()
1✔
112
        if isinstance(program_state, ProgramError):
1✔
113
            exec_failed_event_details = select_from_typed_dict(
1✔
114
                typed_dict=ExecutionFailedEventDetails, obj=program_state.error or {}
115
            )
116
            env.event_manager.add_event(
1✔
117
                context=env.event_history_context,
118
                event_type=HistoryEventType.ExecutionFailed,
119
                event_details=EventDetails(executionFailedEventDetails=exec_failed_event_details),
120
            )
121
        elif isinstance(program_state, ProgramStopped):
1✔
122
            env.event_history_context.source_event_id = 0
1✔
123
            env.event_manager.add_event(
1✔
124
                context=env.event_history_context,
125
                event_type=HistoryEventType.ExecutionAborted,
126
                event_details=EventDetails(
127
                    executionAbortedEventDetails=ExecutionAbortedEventDetails(
128
                        error=program_state.error, cause=program_state.cause
129
                    )
130
                ),
131
            )
132
        elif isinstance(program_state, ProgramTimedOut):
1✔
133
            env.event_manager.add_event(
1✔
134
                context=env.event_history_context,
135
                event_type=HistoryEventType.ExecutionTimedOut,
136
                event_details=EventDetails(
137
                    executionTimedOutEventDetails=ExecutionTimedOutEventDetails()
138
                ),
139
            )
140
        elif isinstance(program_state, ProgramEnded):
1✔
141
            env.event_manager.add_event(
1✔
142
                context=env.event_history_context,
143
                event_type=HistoryEventType.ExecutionSucceeded,
144
                event_details=EventDetails(
145
                    executionSucceededEventDetails=ExecutionSucceededEventDetails(
146
                        output=to_json_str(env.states.get_input(), separators=(",", ":")),
147
                        outputDetails=HistoryEventExecutionDataDetails(
148
                            truncated=False  # Always False for api calls.
149
                        ),
150
                    )
151
                ),
152
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc