• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 6eb2aab0-9781-40f0-8cc1-9e7c437858cb

22 Apr 2025 04:28PM UTC coverage: 86.272% (+0.007%) from 86.265%
6eb2aab0-9781-40f0-8cc1-9e7c437858cb

push

circleci

web-flow
APIGW: validate REST API custom id tag (#12539)

7 of 7 new or added lines in 1 file covered. (100.0%)

82 existing lines in 10 files now uncovered.

63897 of 74065 relevant lines covered (86.27%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.8
/localstack-core/localstack/services/stepfunctions/asl/eval/environment.py
1
from __future__ import annotations
1✔
2

3
import copy
1✔
4
import logging
1✔
5
import threading
1✔
6
from typing import Any, Final, Optional
1✔
7

8
from localstack.aws.api.stepfunctions import (
1✔
9
    Arn,
10
    ExecutionFailedEventDetails,
11
    StateMachineType,
12
    Timestamp,
13
)
14
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.iteration.itemprocessor.map_run_record import (
1✔
15
    MapRunRecordPoolManager,
16
)
17
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackPoolManager
1✔
18
from localstack.services.stepfunctions.asl.eval.evaluation_details import AWSExecutionDetails
1✔
19
from localstack.services.stepfunctions.asl.eval.event.event_manager import (
1✔
20
    EventHistoryContext,
21
    EventManager,
22
)
23
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
24
    CloudWatchLoggingSession,
25
)
26
from localstack.services.stepfunctions.asl.eval.program_state import (
1✔
27
    ProgramEnded,
28
    ProgramError,
29
    ProgramRunning,
30
    ProgramState,
31
    ProgramStopped,
32
    ProgramTimedOut,
33
)
34
from localstack.services.stepfunctions.asl.eval.states import ContextObjectData, States
1✔
35
from localstack.services.stepfunctions.asl.eval.variable_store import VariableStore
1✔
36
from localstack.services.stepfunctions.backend.activity import Activity
1✔
37
from localstack.services.stepfunctions.mocking.mock_config import MockedResponse, MockTestCase
1✔
38

39
LOG = logging.getLogger(__name__)
1✔
40

41

42
class Environment:
1✔
43
    _state_mutex: Final[threading.RLock()]
1✔
44
    _program_state: Optional[ProgramState]
1✔
45
    program_state_event: Final[threading.Event()]
1✔
46

47
    event_manager: EventManager
1✔
48
    event_history_context: Final[EventHistoryContext]
1✔
49
    cloud_watch_logging_session: Final[Optional[CloudWatchLoggingSession]]
1✔
50
    aws_execution_details: Final[AWSExecutionDetails]
1✔
51
    execution_type: Final[StateMachineType]
1✔
52
    callback_pool_manager: CallbackPoolManager
1✔
53
    map_run_record_pool_manager: MapRunRecordPoolManager
1✔
54
    activity_store: Final[dict[Arn, Activity]]
1✔
55
    mock_test_case: Optional[MockTestCase] = None
1✔
56

57
    _frames: Final[list[Environment]]
1✔
58
    _is_frame: bool = False
1✔
59

60
    heap: dict[str, Any] = dict()
1✔
61
    stack: list[Any] = list()
1✔
62
    states: Final[States]
1✔
63
    variable_store: Final[VariableStore]
1✔
64

65
    def __init__(
1✔
66
        self,
67
        aws_execution_details: AWSExecutionDetails,
68
        execution_type: StateMachineType,
69
        context: ContextObjectData,
70
        event_history_context: EventHistoryContext,
71
        cloud_watch_logging_session: Optional[CloudWatchLoggingSession],
72
        activity_store: dict[Arn, Activity],
73
        variable_store: Optional[VariableStore] = None,
74
        mock_test_case: Optional[MockTestCase] = None,
75
    ):
76
        super(Environment, self).__init__()
1✔
77
        self._state_mutex = threading.RLock()
1✔
78
        self._program_state = None
1✔
79
        self.program_state_event = threading.Event()
1✔
80

81
        self.cloud_watch_logging_session = cloud_watch_logging_session
1✔
82
        self.event_manager = EventManager(cloud_watch_logging_session=cloud_watch_logging_session)
1✔
83
        self.event_history_context = event_history_context
1✔
84

85
        self.aws_execution_details = aws_execution_details
1✔
86
        self.execution_type = execution_type
1✔
87
        self.callback_pool_manager = CallbackPoolManager(activity_store=activity_store)
1✔
88
        self.map_run_record_pool_manager = MapRunRecordPoolManager()
1✔
89

90
        self.activity_store = activity_store
1✔
91

92
        self.mock_test_case = mock_test_case
1✔
93

94
        self._frames = list()
1✔
95
        self._is_frame = False
1✔
96

97
        self.heap = dict()
1✔
98
        self.stack = list()
1✔
99
        self.states = States(context=context)
1✔
100
        self.variable_store = variable_store or VariableStore()
1✔
101

102
    @classmethod
1✔
103
    def as_frame_of(
1✔
104
        cls, env: Environment, event_history_frame_cache: Optional[EventHistoryContext] = None
105
    ) -> Environment:
106
        return Environment.as_inner_frame_of(
1✔
107
            env=env,
108
            variable_store=env.variable_store,
109
            event_history_frame_cache=event_history_frame_cache,
110
        )
111

112
    @classmethod
1✔
113
    def as_inner_frame_of(
1✔
114
        cls,
115
        env: Environment,
116
        variable_store: VariableStore,
117
        event_history_frame_cache: Optional[EventHistoryContext] = None,
118
    ) -> Environment:
119
        # Construct the frame's context object data.
120
        context = ContextObjectData(
1✔
121
            Execution=env.states.context_object.context_object_data["Execution"],
122
            StateMachine=env.states.context_object.context_object_data["StateMachine"],
123
        )
124
        if "Task" in env.states.context_object.context_object_data:
1✔
125
            context["Task"] = env.states.context_object.context_object_data["Task"]
1✔
126

127
        # The default logic provisions for child frame to extend the source frame event id.
128
        if event_history_frame_cache is None:
1✔
129
            event_history_frame_cache = EventHistoryContext(
1✔
130
                previous_event_id=env.event_history_context.source_event_id
131
            )
132

133
        frame = cls(
1✔
134
            aws_execution_details=env.aws_execution_details,
135
            execution_type=env.execution_type,
136
            context=context,
137
            event_history_context=event_history_frame_cache,
138
            cloud_watch_logging_session=env.cloud_watch_logging_session,
139
            activity_store=env.activity_store,
140
            variable_store=variable_store,
141
            mock_test_case=env.mock_test_case,
142
        )
143
        frame._is_frame = True
1✔
144
        frame.event_manager = env.event_manager
1✔
145
        if "State" in env.states.context_object.context_object_data:
1✔
146
            frame.states.context_object.context_object_data["State"] = copy.deepcopy(
1✔
147
                env.states.context_object.context_object_data["State"]
148
            )
149
        frame.callback_pool_manager = env.callback_pool_manager
1✔
150
        frame.map_run_record_pool_manager = env.map_run_record_pool_manager
1✔
151
        frame.heap = dict()
1✔
152
        frame._program_state = copy.deepcopy(env._program_state)
1✔
153
        return frame
1✔
154

155
    @property
1✔
156
    def next_state_name(self) -> Optional[str]:
1✔
157
        next_state_name: Optional[str] = None
1✔
158
        program_state = self._program_state
1✔
159
        if isinstance(program_state, ProgramRunning):
1✔
160
            next_state_name = program_state.next_state_name
1✔
161
        return next_state_name
1✔
162

163
    @next_state_name.setter
1✔
164
    def next_state_name(self, next_state_name: str) -> None:
1✔
165
        if self._program_state is None:
1✔
166
            self._program_state = ProgramRunning()
1✔
167

168
        if isinstance(self._program_state, ProgramRunning):
1✔
169
            self._program_state.next_state_name = next_state_name
1✔
170
        else:
171
            raise RuntimeError(
×
172
                f"Could not set NextState value when in state '{type(self._program_state)}'."
173
            )
174

175
    @property
1✔
176
    def next_field_name(self) -> Optional[str]:
1✔
177
        next_field_name: Optional[str] = None
1✔
178
        program_state = self._program_state
1✔
179
        if isinstance(program_state, ProgramRunning):
1✔
180
            next_field_name = program_state.next_field_name
1✔
181
        return next_field_name
1✔
182

183
    @next_field_name.setter
1✔
184
    def next_field_name(self, next_field_name: str) -> None:
1✔
185
        if isinstance(self._program_state, ProgramRunning):
1✔
186
            self._program_state.next_field_name = next_field_name
1✔
187
        else:
UNCOV
188
            raise RuntimeError(
×
189
                f"Could not set NextField value when in state '{type(self._program_state)}'."
190
            )
191

192
    def program_state(self) -> ProgramState:
1✔
193
        return copy.deepcopy(self._program_state)
1✔
194

195
    def is_running(self) -> bool:
1✔
196
        return isinstance(self._program_state, ProgramRunning)
1✔
197

198
    def set_ended(self) -> None:
1✔
199
        with self._state_mutex:
1✔
200
            if isinstance(self._program_state, ProgramRunning):
1✔
201
                self._program_state = ProgramEnded()
1✔
202
                for frame in self._frames:
1✔
UNCOV
203
                    frame.set_ended()
×
204
            self.program_state_event.set()
1✔
205
            self.program_state_event.clear()
1✔
206

207
    def set_error(self, error: ExecutionFailedEventDetails) -> None:
1✔
208
        with self._state_mutex:
1✔
209
            self._program_state = ProgramError(error=error)
1✔
210
            for frame in self._frames:
1✔
211
                frame.set_error(error=error)
×
212
            self.program_state_event.set()
1✔
213
            self.program_state_event.clear()
1✔
214

215
    def set_timed_out(self) -> None:
1✔
216
        with self._state_mutex:
1✔
217
            self._program_state = ProgramTimedOut()
1✔
218
            for frame in self._frames:
1✔
219
                frame.set_timed_out()
×
220
            self.program_state_event.set()
1✔
221
            self.program_state_event.clear()
1✔
222

223
    def set_stop(self, stop_date: Timestamp, cause: Optional[str], error: Optional[str]) -> None:
1✔
224
        with self._state_mutex:
1✔
225
            if isinstance(self._program_state, ProgramRunning):
1✔
226
                self._program_state = ProgramStopped(stop_date=stop_date, cause=cause, error=error)
1✔
227
                for frame in self._frames:
1✔
228
                    frame.set_stop(stop_date=stop_date, cause=cause, error=error)
1✔
229
                self.program_state_event.set()
1✔
230
                self.program_state_event.clear()
1✔
231

232
    def open_frame(
1✔
233
        self, event_history_context: Optional[EventHistoryContext] = None
234
    ) -> Environment:
235
        with self._state_mutex:
1✔
236
            frame = self.as_frame_of(env=self, event_history_frame_cache=event_history_context)
1✔
237
            self._frames.append(frame)
1✔
238
            return frame
1✔
239

240
    def open_inner_frame(
1✔
241
        self, event_history_context: Optional[EventHistoryContext] = None
242
    ) -> Environment:
243
        with self._state_mutex:
1✔
244
            variable_store = VariableStore.as_inner_scope_of(
1✔
245
                outer_variable_store=self.variable_store
246
            )
247
            frame = self.as_inner_frame_of(
1✔
248
                env=self,
249
                variable_store=variable_store,
250
                event_history_frame_cache=event_history_context,
251
            )
252
            self._frames.append(frame)
1✔
253
            return frame
1✔
254

255
    def close_frame(self, frame: Environment) -> None:
1✔
256
        with self._state_mutex:
1✔
257
            if frame in self._frames:
1✔
258
                self._frames.remove(frame)
1✔
259
                self.event_history_context.integrate(frame.event_history_context)
1✔
260

261
    def delete_frame(self, frame: Environment) -> None:
1✔
262
        with self._state_mutex:
1✔
263
            if frame in self._frames:
1✔
264
                self._frames.remove(frame)
1✔
265

266
    def is_frame(self) -> bool:
1✔
267
        return self._is_frame
1✔
268

269
    def is_standard_workflow(self) -> bool:
1✔
270
        return self.execution_type == StateMachineType.STANDARD
×
271

272
    def is_mocked_mode(self) -> bool:
1✔
273
        return self.mock_test_case is not None
1✔
274

275
    def get_current_mocked_response(self) -> MockedResponse:
1✔
276
        if not self.is_mocked_mode():
1✔
277
            raise RuntimeError(
×
278
                "Cannot retrieve mocked response: execution is not operating in mocked mode"
279
            )
280
        state_name = self.next_state_name
1✔
281
        state_mocked_responses: Optional = self.mock_test_case.state_mocked_responses.get(
1✔
282
            state_name
283
        )
284
        if state_mocked_responses is None:
1✔
285
            raise RuntimeError(f"No mocked response definition for state '{state_name}'")
×
286
        retry_count = self.states.context_object.context_object_data["State"]["RetryCount"]
1✔
287
        if len(state_mocked_responses.mocked_responses) <= retry_count:
1✔
288
            raise RuntimeError(
×
289
                f"No mocked response definition for state '{state_name}' "
290
                f"and retry number '{retry_count}'"
291
            )
292
        return state_mocked_responses.mocked_responses[retry_count]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc