• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19809586398

28 Nov 2025 05:40PM UTC coverage: 86.863% (-0.02%) from 86.879%
19809586398

push

github

web-flow
[SFN] Add new TestState API capabilities (#13418)

New capabilities have recently been added to TestState API. This commit adds the following support for the new capabilities:

- Add mocking support – Mock state outputs and errors without invoking downstream services
- Add support for Map (inline and distributed) states
- Add support to test specific states within a full state machine definition using the new stateName parameter.
- Add support for Catch and Retry fields
- Add new inspection data
- Rename `mocking` package to l`ocal_mocking`: clearly mark mocking functionality related to Step Functions Local. This helps to distinguish between Local mocks and TestState mocks.



Co-authored-by: Greg Furman <gregfurman99@gmail.com>
Co-authored-by: Greg Furman <31275503+gregfurman@users.noreply.github.com>

618 of 728 new or added lines in 25 files covered. (84.89%)

99 existing lines in 8 files now uncovered.

69469 of 79975 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.36
/localstack-core/localstack/services/stepfunctions/asl/parse/test_state/preprocessor.py
1
import enum
1✔
2
from typing import Final
1✔
3

4
from antlr4.tree.Tree import ParseTree
1✔
5

6
from localstack.services.stepfunctions.asl.antlr.runtime.ASLParser import ASLParser
1✔
7
from localstack.services.stepfunctions.asl.antlt4utils.antlr4utils import (
1✔
8
    is_production,
9
)
10
from localstack.services.stepfunctions.asl.component.common.parargs import (
1✔
11
    ArgumentsJSONataTemplateValueObject,
12
    ArgumentsStringJSONata,
13
    Parameters,
14
)
15
from localstack.services.stepfunctions.asl.component.common.path.input_path import InputPath
1✔
16
from localstack.services.stepfunctions.asl.component.common.path.items_path import ItemsPath
1✔
17
from localstack.services.stepfunctions.asl.component.common.path.result_path import ResultPath
1✔
18
from localstack.services.stepfunctions.asl.component.common.query_language import QueryLanguage
1✔
19
from localstack.services.stepfunctions.asl.component.common.result_selector import ResultSelector
1✔
20
from localstack.services.stepfunctions.asl.component.state.state import CommonStateField
1✔
21
from localstack.services.stepfunctions.asl.component.state.state_choice.state_choice import (
1✔
22
    StateChoice,
23
)
24
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.max_concurrency import (
1✔
25
    MaxConcurrency,
26
    MaxConcurrencyJSONata,
27
    MaxConcurrencyPath,
28
)
29
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.state_map import (
1✔
30
    StateMap,
31
)
32
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map.tolerated_failure import (
1✔
33
    ToleratedFailureCountInt,
34
    ToleratedFailureCountPath,
35
    ToleratedFailureCountStringJSONata,
36
    ToleratedFailurePercentage,
37
    ToleratedFailurePercentagePath,
38
    ToleratedFailurePercentageStringJSONata,
39
)
40
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.state_task import (
1✔
41
    StateTask,
42
)
43
from localstack.services.stepfunctions.asl.component.state.state_fail.state_fail import StateFail
1✔
44
from localstack.services.stepfunctions.asl.component.state.state_pass.result import Result
1✔
45
from localstack.services.stepfunctions.asl.component.state.state_pass.state_pass import StatePass
1✔
46
from localstack.services.stepfunctions.asl.component.state.state_succeed.state_succeed import (
1✔
47
    StateSucceed,
48
)
49
from localstack.services.stepfunctions.asl.component.test_state.program.test_state_program import (
1✔
50
    TestStateProgram,
51
)
52
from localstack.services.stepfunctions.asl.component.test_state.state.common import (
1✔
53
    MockedCommonState,
54
)
55
from localstack.services.stepfunctions.asl.component.test_state.state.map import (
1✔
56
    MockedStateMap,
57
)
58
from localstack.services.stepfunctions.asl.component.test_state.state.task import (
1✔
59
    MockedStateTask,
60
)
61
from localstack.services.stepfunctions.asl.component.test_state.state.test_state_state_props import (
1✔
62
    TestStateStateProps,
63
)
64
from localstack.services.stepfunctions.asl.eval.test_state.environment import TestStateEnvironment
1✔
65
from localstack.services.stepfunctions.asl.parse.preprocessor import Preprocessor
1✔
66
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
67

68

69
class InspectionDataKey(enum.Enum):
1✔
70
    INPUT = "input"
1✔
71
    AFTER_INPUT_PATH = "afterInputPath"
1✔
72
    AFTER_PARAMETERS = "afterParameters"
1✔
73
    AFTER_ARGUMENTS = "afterArguments"
1✔
74
    RESULT = "result"
1✔
75
    AFTER_RESULT_SELECTOR = "afterResultSelector"
1✔
76
    AFTER_RESULT_PATH = "afterResultPath"
1✔
77
    AFTER_ITEMS_PATH = "afterItemsPath"
1✔
78
    REQUEST = "request"
1✔
79
    RESPONSE = "response"
1✔
80

81
    MAX_CONCURRENCY = "maxConcurrency"
1✔
82
    TOLERATED_FAILURE_COUNT = "toleratedFailureCount"
1✔
83
    TOLERATED_FAILURE_PERCENTAGE = "toleratedFailurePercentage"
1✔
84

85

86
def _decorated_updates_inspection_data(method, inspection_data_key: InspectionDataKey):
1✔
87
    def wrapper(env: TestStateEnvironment, *args, **kwargs):
1✔
88
        method(env, *args, **kwargs)
1✔
89
        result = env.stack[-1]
1✔
90
        if not isinstance(result, (int, float)):
1✔
91
            result = to_json_str(result)
1✔
92
        # We know that the enum value used here corresponds to a supported inspection data field by design.
93
        env.inspection_data[inspection_data_key.value] = result  # noqa
1✔
94

95
    return wrapper
1✔
96

97

98
def _decorate_state_field(state_field: CommonStateField, is_single_state: bool = False) -> None:
1✔
99
    if isinstance(state_field, StateMap):
1✔
100
        MockedStateMap.wrap(state_field, is_single_state)
1✔
101
    elif isinstance(state_field, StateTask):
1✔
102
        MockedStateTask.wrap(state_field, is_single_state)
1✔
103
    elif isinstance(state_field, (StateChoice, StatePass, StateFail, StateSucceed)):
1✔
104
        MockedCommonState.wrap(state_field, is_single_state)
1✔
105

106

107
def find_state(state_name: str, states: dict[str, CommonStateField]) -> CommonStateField | None:
1✔
108
    if state_name in states:
1✔
109
        return states[state_name]
1✔
110

111
    for state in states.values():
1✔
112
        if isinstance(state, StateMap):
1✔
113
            found_state = find_state(state_name, state.iteration_component._states.states)
1✔
114
            if found_state:
1✔
115
                return found_state
1✔
116

117

118
class TestStatePreprocessor(Preprocessor):
1✔
119
    STATE_NAME: Final[str] = "StateName"
1✔
120
    _state_name_stack: list[str] = []
1✔
121

122
    def to_test_state_program(
1✔
123
        self, tree: ParseTree, state_name: str | None = None
124
    ) -> TestStateProgram:
125
        if is_production(tree, ASLParser.RULE_state_machine):
1✔
126
            # full definition passed in
127
            program = self.visitState_machine(ctx=tree)
1✔
128
            state_field = find_state(state_name, program.states.states)
1✔
129
            _decorate_state_field(state_field, False)
1✔
130
            return TestStateProgram(state_field)
1✔
131

132
        if is_production(tree, ASLParser.RULE_state_decl_body):
1✔
133
            # single state case
134
            state_props = self.visitState_decl_body(ctx=tree)
1✔
135
            state_field = self._common_state_field_of(state_props=state_props)
1✔
136
            _decorate_state_field(state_field, True)
1✔
137
            return TestStateProgram(state_field)
1✔
138

NEW
139
        return super().visit(tree)
×
140

141
    def visitState_decl(self, ctx: ASLParser.State_declContext) -> CommonStateField:
1✔
142
        # if we are parsing a full state machine, we need to record the state_name prior to stepping
143
        # into the state body definition.
144
        state_name = self._inner_string_of(parser_rule_context=ctx.string_literal())
1✔
145
        self._state_name_stack.append(state_name)
1✔
146
        state_props: TestStateStateProps = self.visit(ctx.state_decl_body())
1✔
147
        state_field = self._common_state_field_of(state_props=state_props)
1✔
148
        return state_field
1✔
149

150
    def visitState_decl_body(self, ctx: ASLParser.State_decl_bodyContext) -> TestStateStateProps:
1✔
151
        self._open_query_language_scope(ctx)
1✔
152
        state_props = TestStateStateProps()
1✔
153
        state_props.name = (
1✔
154
            self._state_name_stack.pop(-1) if self._state_name_stack else self.STATE_NAME
155
        )
156
        for child in ctx.children:
1✔
157
            cmp = self.visit(child)
1✔
158
            state_props.add(cmp)
1✔
159
        if state_props.get(QueryLanguage) is None:
1✔
160
            state_props.add(self._get_current_query_language())
1✔
161
        self._close_query_language_scope()
1✔
162
        return state_props
1✔
163

164
    def visitInput_path_decl(self, ctx: ASLParser.Input_path_declContext) -> InputPath:
1✔
165
        input_path: InputPath = super().visitInput_path_decl(ctx=ctx)
1✔
166
        input_path._eval_body = _decorated_updates_inspection_data(
1✔
167
            method=input_path._eval_body,  # noqa
168
            inspection_data_key=InspectionDataKey.AFTER_INPUT_PATH,
169
        )
170
        return input_path
1✔
171

172
    def visitParameters_decl(self, ctx: ASLParser.Parameters_declContext) -> Parameters:
1✔
173
        parameters: Parameters = super().visitParameters_decl(ctx=ctx)
1✔
174
        parameters._eval_body = _decorated_updates_inspection_data(
1✔
175
            method=parameters._eval_body,  # noqa
176
            inspection_data_key=InspectionDataKey.AFTER_PARAMETERS,
177
        )
178
        return parameters
1✔
179

180
    def visitResult_selector_decl(
1✔
181
        self, ctx: ASLParser.Result_selector_declContext
182
    ) -> ResultSelector:
183
        result_selector: ResultSelector = super().visitResult_selector_decl(ctx=ctx)
1✔
184
        result_selector._eval_body = _decorated_updates_inspection_data(
1✔
185
            method=result_selector._eval_body,  # noqa
186
            inspection_data_key=InspectionDataKey.AFTER_RESULT_SELECTOR,
187
        )
188
        return result_selector
1✔
189

190
    def visitResult_path_decl(self, ctx: ASLParser.Result_path_declContext) -> ResultPath:
1✔
191
        result_path: ResultPath = super().visitResult_path_decl(ctx=ctx)
1✔
192
        result_path._eval_body = _decorated_updates_inspection_data(
1✔
193
            method=result_path._eval_body,  # noqa
194
            inspection_data_key=InspectionDataKey.AFTER_RESULT_PATH,
195
        )
196
        return result_path
1✔
197

198
    def visitResult_decl(self, ctx: ASLParser.Result_declContext) -> Result:
1✔
199
        result: Result = super().visitResult_decl(ctx=ctx)
1✔
200
        result._eval_body = _decorated_updates_inspection_data(
1✔
201
            method=result._eval_body,
202
            inspection_data_key=InspectionDataKey.RESULT,  # noqa
203
        )
204
        return result
1✔
205

206
    def visitMax_concurrency_int(self, ctx: ASLParser.Max_concurrency_intContext) -> MaxConcurrency:
1✔
207
        max_concurrency: MaxConcurrency = super().visitMax_concurrency_int(ctx)
1✔
208
        max_concurrency._eval_body = _decorated_updates_inspection_data(
1✔
209
            method=max_concurrency._eval_body,
210
            inspection_data_key=InspectionDataKey.MAX_CONCURRENCY,  # noqa
211
        )
212
        return max_concurrency
1✔
213

214
    def visitMax_concurrency_jsonata(
1✔
215
        self, ctx: ASLParser.Max_concurrency_jsonataContext
216
    ) -> MaxConcurrencyJSONata:
217
        max_concurrency_jsonata: MaxConcurrencyJSONata = super().visitMax_concurrency_jsonata(ctx)
1✔
218
        max_concurrency_jsonata._eval_body = _decorated_updates_inspection_data(
1✔
219
            method=max_concurrency_jsonata._eval_body,
220
            inspection_data_key=InspectionDataKey.MAX_CONCURRENCY,  # noqa
221
        )
222
        return max_concurrency_jsonata
1✔
223

224
    def visitMax_concurrency_path(
1✔
225
        self, ctx: ASLParser.Max_concurrency_declContext
226
    ) -> MaxConcurrencyPath:
NEW
227
        max_concurrency_path: MaxConcurrencyPath = super().visitMax_concurrency_path(ctx)
×
NEW
228
        max_concurrency_path._eval_body = _decorated_updates_inspection_data(
×
229
            method=max_concurrency_path._eval_body,
230
            inspection_data_key=InspectionDataKey.MAX_CONCURRENCY,  # noqa
231
        )
NEW
232
        return max_concurrency_path
×
233

234
    def visitTolerated_failure_count_int(self, ctx) -> ToleratedFailureCountInt:
1✔
235
        tolerated_failure_count: ToleratedFailureCountInt = (
1✔
236
            super().visitTolerated_failure_count_int(ctx)
237
        )
238
        tolerated_failure_count._eval_body = _decorated_updates_inspection_data(
1✔
239
            method=tolerated_failure_count._eval_body,
240
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_COUNT,
241
        )
242
        return tolerated_failure_count
1✔
243

244
    def visitTolerated_failure_count_path(self, ctx) -> ToleratedFailureCountPath:
1✔
NEW
245
        tolerated_failure_count_path: ToleratedFailureCountPath = (
×
246
            super().visitTolerated_failure_count_path(ctx)
247
        )
NEW
248
        tolerated_failure_count_path._eval_body = _decorated_updates_inspection_data(
×
249
            method=tolerated_failure_count_path._eval_body,
250
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_COUNT,
251
        )
NEW
252
        return tolerated_failure_count_path
×
253

254
    def visitTolerated_failure_count_string_jsonata(
1✔
255
        self, ctx
256
    ) -> ToleratedFailureCountStringJSONata:
257
        tolerated_failure_count_jsonata: ToleratedFailureCountStringJSONata = (
1✔
258
            super().visitTolerated_failure_count_string_jsonata(ctx)
259
        )
260
        tolerated_failure_count_jsonata._eval_body = _decorated_updates_inspection_data(
1✔
261
            method=tolerated_failure_count_jsonata._eval_body,
262
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_COUNT,
263
        )
264
        return tolerated_failure_count_jsonata
1✔
265

266
    def visitTolerated_failure_percentage_number(self, ctx) -> ToleratedFailurePercentage:
1✔
NEW
267
        tolerated_failure_percentage: ToleratedFailurePercentage = (
×
268
            super().visitTolerated_failure_percentage_number(ctx)
269
        )
NEW
270
        tolerated_failure_percentage._eval_body = _decorated_updates_inspection_data(
×
271
            method=tolerated_failure_percentage._eval_body,
272
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_PERCENTAGE,
273
        )
NEW
274
        return tolerated_failure_percentage
×
275

276
    def visitTolerated_failure_percentage_path(self, ctx) -> ToleratedFailurePercentagePath:
1✔
NEW
277
        tolerated_failure_percentage_path: ToleratedFailurePercentagePath = (
×
278
            super().visitTolerated_failure_percentage_path(ctx)
279
        )
NEW
280
        tolerated_failure_percentage_path._eval_body = _decorated_updates_inspection_data(
×
281
            method=tolerated_failure_percentage_path._eval_body,
282
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_PERCENTAGE,
283
        )
NEW
284
        return tolerated_failure_percentage_path
×
285

286
    def visitTolerated_failure_percentage_string_jsonata(
1✔
287
        self, ctx
288
    ) -> ToleratedFailurePercentageStringJSONata:
289
        tolerated_failure_percentage_jsonata: ToleratedFailurePercentageStringJSONata = (
1✔
290
            super().visitTolerated_failure_percentage_string_jsonata(ctx)
291
        )
292
        tolerated_failure_percentage_jsonata._eval_body = _decorated_updates_inspection_data(
1✔
293
            method=tolerated_failure_percentage_jsonata._eval_body,
294
            inspection_data_key=InspectionDataKey.TOLERATED_FAILURE_PERCENTAGE,
295
        )
296
        return tolerated_failure_percentage_jsonata
1✔
297

298
    def visitItems_path_decl(self, ctx) -> ItemsPath:
1✔
299
        items_path: ItemsPath = super().visitItems_path_decl(ctx)
1✔
300
        items_path._eval_body = _decorated_updates_inspection_data(
1✔
301
            method=items_path._eval_body,
302
            inspection_data_key=InspectionDataKey.AFTER_ITEMS_PATH,
303
        )
304
        return items_path
1✔
305

306
    def visitArguments_string_jsonata(self, ctx):
1✔
NEW
307
        arguments: ArgumentsStringJSONata = super().visitArguments_string_jsonata(ctx)
×
NEW
308
        arguments._eval_body = _decorated_updates_inspection_data(
×
309
            method=arguments._eval_body,
310
            inspection_data_key=InspectionDataKey.AFTER_ARGUMENTS,
311
        )
NEW
312
        return arguments
×
313

314
    def visitArguments_jsonata_template_value_object(self, ctx):
1✔
315
        arguments: ArgumentsJSONataTemplateValueObject = (
1✔
316
            super().visitArguments_jsonata_template_value_object(ctx)
317
        )
318
        arguments._eval_body = _decorated_updates_inspection_data(
1✔
319
            method=arguments._eval_body,
320
            inspection_data_key=InspectionDataKey.AFTER_ARGUMENTS,
321
        )
322
        return arguments
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc