• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20736160175

05 Jan 2026 04:14PM UTC coverage: 86.921% (-0.009%) from 86.93%
20736160175

push

github

web-flow
CloudFormation: Store resource scaffolding in 'generated' directory. (#13534)

28 of 28 new or added lines in 3 files covered. (100.0%)

12 existing lines in 6 files now uncovered.

70081 of 80626 relevant lines covered (86.92%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.89
/localstack-core/localstack/services/stepfunctions/asl/eval/environment.py
1
from __future__ import annotations
1✔
2

3
import copy
1✔
4
import logging
1✔
5
import threading
1✔
6
from typing import Any, Final, Optional, Self
1✔
7

8
from localstack.aws.api.stepfunctions import (
1✔
9
    Arn,
10
    ExecutionFailedEventDetails,
11
    StateMachineType,
12
    Timestamp,
13
)
14
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.iteration.itemprocessor.map_run_record import (
1✔
15
    MapRunRecordPoolManager,
16
)
17
from localstack.services.stepfunctions.asl.eval.callback.callback import CallbackPoolManager
1✔
18
from localstack.services.stepfunctions.asl.eval.evaluation_details import AWSExecutionDetails
1✔
19
from localstack.services.stepfunctions.asl.eval.event.event_manager import (
1✔
20
    EventHistoryContext,
21
    EventManager,
22
)
23
from localstack.services.stepfunctions.asl.eval.event.logging import (
1✔
24
    CloudWatchLoggingSession,
25
)
26
from localstack.services.stepfunctions.asl.eval.program_state import (
1✔
27
    ProgramEnded,
28
    ProgramError,
29
    ProgramRunning,
30
    ProgramState,
31
    ProgramStopped,
32
    ProgramTimedOut,
33
)
34
from localstack.services.stepfunctions.asl.eval.states import ContextObjectData, States
1✔
35
from localstack.services.stepfunctions.asl.eval.variable_store import VariableStore
1✔
36
from localstack.services.stepfunctions.backend.activity import Activity
1✔
37
from localstack.services.stepfunctions.local_mocking.mock_config import (
1✔
38
    LocalMockedResponse,
39
    LocalMockTestCase,
40
)
41

42
LOG = logging.getLogger(__name__)
1✔
43

44

45
class Environment:
1✔
46
    _state_mutex: Final[threading.RLock()]
1✔
47
    _program_state: ProgramState | None
1✔
48
    program_state_event: Final[threading.Event()]
1✔
49

50
    event_manager: EventManager
1✔
51
    event_history_context: Final[EventHistoryContext]
1✔
52
    cloud_watch_logging_session: Final[CloudWatchLoggingSession | None]
1✔
53
    aws_execution_details: Final[AWSExecutionDetails]
1✔
54
    execution_type: Final[StateMachineType]
1✔
55
    callback_pool_manager: CallbackPoolManager
1✔
56
    map_run_record_pool_manager: MapRunRecordPoolManager
1✔
57
    activity_store: Final[dict[Arn, Activity]]
1✔
58
    local_mock_test_case: LocalMockTestCase | None = None
1✔
59

60
    _frames: Final[list[Environment]]
1✔
61
    _is_frame: bool = False
1✔
62

63
    heap: dict[str, Any] = {}
1✔
64
    stack: list[Any] = []
1✔
65
    states: Final[States]
1✔
66
    variable_store: Final[VariableStore]
1✔
67

68
    def __init__(
1✔
69
        self,
70
        aws_execution_details: AWSExecutionDetails,
71
        execution_type: StateMachineType,
72
        context: ContextObjectData,
73
        event_history_context: EventHistoryContext,
74
        cloud_watch_logging_session: CloudWatchLoggingSession | None,
75
        activity_store: dict[Arn, Activity],
76
        variable_store: VariableStore | None = None,
77
        local_mock_test_case: LocalMockTestCase | None = None,
78
    ):
79
        super().__init__()
1✔
80
        self._state_mutex = threading.RLock()
1✔
81
        self._program_state = None
1✔
82
        self.program_state_event = threading.Event()
1✔
83

84
        self.cloud_watch_logging_session = cloud_watch_logging_session
1✔
85
        self.event_manager = EventManager(cloud_watch_logging_session=cloud_watch_logging_session)
1✔
86
        self.event_history_context = event_history_context
1✔
87

88
        self.aws_execution_details = aws_execution_details
1✔
89
        self.execution_type = execution_type
1✔
90
        self.callback_pool_manager = CallbackPoolManager(activity_store=activity_store)
1✔
91
        self.map_run_record_pool_manager = MapRunRecordPoolManager()
1✔
92

93
        self.activity_store = activity_store
1✔
94

95
        self.local_mock_test_case = local_mock_test_case
1✔
96

97
        self._frames = []
1✔
98
        self._is_frame = False
1✔
99

100
        self.heap = {}
1✔
101
        self.stack = []
1✔
102
        self.states = States(context=context)
1✔
103
        self.variable_store = variable_store or VariableStore()
1✔
104

105
    @classmethod
1✔
106
    def as_frame_of(
1✔
107
        cls, env: Self, event_history_frame_cache: EventHistoryContext | None = None
108
    ) -> Self:
109
        return cls.as_inner_frame_of(
1✔
110
            env=env,
111
            variable_store=env.variable_store,
112
            event_history_frame_cache=event_history_frame_cache,
113
        )
114

115
    @classmethod
1✔
116
    def as_inner_frame_of(
1✔
117
        cls,
118
        env: Self,
119
        variable_store: VariableStore,
120
        event_history_frame_cache: EventHistoryContext | None = None,
121
    ) -> Self:
122
        # Construct the frame's context object data.
123
        context = ContextObjectData(
1✔
124
            Execution=env.states.context_object.context_object_data["Execution"],
125
            StateMachine=env.states.context_object.context_object_data["StateMachine"],
126
        )
127
        if "Task" in env.states.context_object.context_object_data:
1✔
128
            context["Task"] = env.states.context_object.context_object_data["Task"]
1✔
129

130
        # The default logic provisions for child frame to extend the source frame event id.
131
        if event_history_frame_cache is None:
1✔
132
            event_history_frame_cache = EventHistoryContext(
1✔
133
                previous_event_id=env.event_history_context.source_event_id
134
            )
135

136
        frame = cls(
1✔
137
            aws_execution_details=env.aws_execution_details,
138
            execution_type=env.execution_type,
139
            context=context,
140
            event_history_context=event_history_frame_cache,
141
            cloud_watch_logging_session=env.cloud_watch_logging_session,
142
            activity_store=env.activity_store,
143
            variable_store=variable_store,
144
        )
145
        frame.local_mock_test_case = env.local_mock_test_case
1✔
146
        frame._is_frame = True
1✔
147
        frame.event_manager = env.event_manager
1✔
148
        if "State" in env.states.context_object.context_object_data:
1✔
149
            frame.states.context_object.context_object_data["State"] = copy.deepcopy(
1✔
150
                env.states.context_object.context_object_data["State"]
151
            )
152
        frame.callback_pool_manager = env.callback_pool_manager
1✔
153
        frame.map_run_record_pool_manager = env.map_run_record_pool_manager
1✔
154
        frame.heap = {}
1✔
155
        frame._program_state = copy.deepcopy(env._program_state)
1✔
156
        return frame
1✔
157

158
    @property
1✔
159
    def next_state_name(self) -> str | None:
1✔
160
        next_state_name: str | None = None
1✔
161
        program_state = self._program_state
1✔
162
        if isinstance(program_state, ProgramRunning):
1✔
163
            next_state_name = program_state.next_state_name
1✔
164
        return next_state_name
1✔
165

166
    @next_state_name.setter
1✔
167
    def next_state_name(self, next_state_name: str) -> None:
1✔
168
        if self._program_state is None:
1✔
169
            self._program_state = ProgramRunning()
1✔
170

171
        if isinstance(self._program_state, ProgramRunning):
1✔
172
            self._program_state.next_state_name = next_state_name
1✔
173
        else:
174
            raise RuntimeError(
×
175
                f"Could not set NextState value when in state '{type(self._program_state)}'."
176
            )
177

178
    @property
1✔
179
    def next_field_name(self) -> str | None:
1✔
180
        next_field_name: str | None = None
1✔
181
        program_state = self._program_state
1✔
182
        if isinstance(program_state, ProgramRunning):
1✔
183
            next_field_name = program_state.next_field_name
1✔
184
        return next_field_name
1✔
185

186
    @next_field_name.setter
1✔
187
    def next_field_name(self, next_field_name: str) -> None:
1✔
188
        if isinstance(self._program_state, ProgramRunning):
1✔
189
            self._program_state.next_field_name = next_field_name
1✔
190
        else:
191
            raise RuntimeError(
×
192
                f"Could not set NextField value when in state '{type(self._program_state)}'."
193
            )
194

195
    def program_state(self) -> ProgramState:
1✔
196
        return copy.deepcopy(self._program_state)
1✔
197

198
    def is_running(self) -> bool:
1✔
199
        return isinstance(self._program_state, ProgramRunning)
1✔
200

201
    def set_ended(self) -> None:
1✔
202
        with self._state_mutex:
1✔
203
            if isinstance(self._program_state, ProgramRunning):
1✔
204
                self._program_state = ProgramEnded()
1✔
205
                for frame in self._frames:
1✔
UNCOV
206
                    frame.set_ended()
×
207
            self.program_state_event.set()
1✔
208
            self.program_state_event.clear()
1✔
209

210
    def set_error(self, error: ExecutionFailedEventDetails) -> None:
1✔
211
        with self._state_mutex:
1✔
212
            self._program_state = ProgramError(error=error)
1✔
213
            for frame in self._frames:
1✔
214
                frame.set_error(error=error)
×
215
            self.program_state_event.set()
1✔
216
            self.program_state_event.clear()
1✔
217

218
    def set_timed_out(self) -> None:
1✔
219
        with self._state_mutex:
1✔
220
            self._program_state = ProgramTimedOut()
1✔
221
            for frame in self._frames:
1✔
222
                frame.set_timed_out()
×
223
            self.program_state_event.set()
1✔
224
            self.program_state_event.clear()
1✔
225

226
    def set_stop(self, stop_date: Timestamp, cause: str | None, error: str | None) -> None:
1✔
227
        with self._state_mutex:
1✔
228
            if isinstance(self._program_state, ProgramRunning):
1✔
229
                self._program_state = ProgramStopped(stop_date=stop_date, cause=cause, error=error)
1✔
230
                for frame in self._frames:
1✔
231
                    frame.set_stop(stop_date=stop_date, cause=cause, error=error)
1✔
232
                self.program_state_event.set()
1✔
233
                self.program_state_event.clear()
1✔
234

235
    def open_frame(self, event_history_context: EventHistoryContext | None = None) -> Environment:
1✔
236
        with self._state_mutex:
1✔
237
            frame = self.as_frame_of(env=self, event_history_frame_cache=event_history_context)
1✔
238
            self._frames.append(frame)
1✔
239
            return frame
1✔
240

241
    def open_inner_frame(
1✔
242
        self, event_history_context: EventHistoryContext | None = None
243
    ) -> Environment:
244
        with self._state_mutex:
1✔
245
            variable_store = VariableStore.as_inner_scope_of(
1✔
246
                outer_variable_store=self.variable_store
247
            )
248
            frame = self.as_inner_frame_of(
1✔
249
                env=self,
250
                variable_store=variable_store,
251
                event_history_frame_cache=event_history_context,
252
            )
253
            self._frames.append(frame)
1✔
254
            return frame
1✔
255

256
    def close_frame(self, frame: Environment) -> None:
1✔
257
        with self._state_mutex:
1✔
258
            if frame in self._frames:
1✔
259
                self._frames.remove(frame)
1✔
260
                self.event_history_context.integrate(frame.event_history_context)
1✔
261

262
    def delete_frame(self, frame: Environment) -> None:
1✔
263
        with self._state_mutex:
1✔
264
            if frame in self._frames:
1✔
265
                self._frames.remove(frame)
1✔
266

267
    def is_frame(self) -> bool:
1✔
268
        return self._is_frame
1✔
269

270
    def is_standard_workflow(self) -> bool:
1✔
271
        return self.execution_type == StateMachineType.STANDARD
×
272

273
    def is_test_state_mocked_mode(self) -> bool:
1✔
274
        return False
1✔
275

276
    def is_local_mocked_mode(self) -> bool:
1✔
277
        """
278
        Returns True if:
279
        - the state machine is running in Step Functions Local mode
280
        - the current state has a defined Local mock configuration in the target environment or frame
281

282
        Otherwise, returns False.
283
        """
284
        return (
1✔
285
            self.local_mock_test_case is not None
286
            and self.next_state_name in self.local_mock_test_case.state_mocked_responses
287
        )
288

289
    def get_current_local_mocked_response(self) -> LocalMockedResponse:
1✔
290
        if not self.is_local_mocked_mode():
1✔
291
            raise RuntimeError(
×
292
                "Cannot retrieve mocked response: execution is not operating in mocked mode"
293
            )
294
        state_name = self.next_state_name
1✔
295
        state_mocked_responses: Optional = self.local_mock_test_case.state_mocked_responses.get(
1✔
296
            state_name
297
        )
298
        if state_mocked_responses is None:
1✔
299
            raise RuntimeError(f"No Local mocked response definition for state '{state_name}'")
×
300
        retry_count = self.states.context_object.context_object_data["State"]["RetryCount"]
1✔
301
        if len(state_mocked_responses.mocked_responses) <= retry_count:
1✔
302
            raise RuntimeError(
×
303
                f"No Local mocked response definition for state '{state_name}' "
304
                f"and retry number '{retry_count}'"
305
            )
306
        return state_mocked_responses.mocked_responses[retry_count]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc