• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19809586398

28 Nov 2025 05:40PM UTC coverage: 86.863% (-0.02%) from 86.879%
19809586398

push

github

web-flow
[SFN] Add new TestState API capabilities (#13418)

New capabilities have recently been added to TestState API. This commit adds the following support for the new capabilities:

- Add mocking support – Mock state outputs and errors without invoking downstream services
- Add support for Map (inline and distributed) states
- Add support to test specific states within a full state machine definition using the new stateName parameter.
- Add support for Catch and Retry fields
- Add new inspection data
- Rename `mocking` package to l`ocal_mocking`: clearly mark mocking functionality related to Step Functions Local. This helps to distinguish between Local mocks and TestState mocks.



Co-authored-by: Greg Furman <gregfurman99@gmail.com>
Co-authored-by: Greg Furman <31275503+gregfurman@users.noreply.github.com>

618 of 728 new or added lines in 25 files covered. (84.89%)

99 existing lines in 8 files now uncovered.

69469 of 79975 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.75
/localstack-core/localstack/services/stepfunctions/asl/component/test_state/state/execution.py
1
from collections.abc import Callable
1✔
2
from functools import partial
1✔
3

4
from localstack.services.stepfunctions.asl.component.common.catch.catcher_outcome import (
1✔
5
    CatcherOutcomeCaught,
6
)
7
from localstack.services.stepfunctions.asl.component.common.error_name.failure_event import (
1✔
8
    FailureEvent,
9
)
10
from localstack.services.stepfunctions.asl.component.common.query_language import (
1✔
11
    QueryLanguageMode,
12
)
13
from localstack.services.stepfunctions.asl.component.common.retry.retrier_decl import RetrierDecl
1✔
14
from localstack.services.stepfunctions.asl.component.common.retry.retrier_outcome import (
1✔
15
    RetrierOutcome,
16
)
17
from localstack.services.stepfunctions.asl.component.common.retry.retry_outcome import RetryOutcome
1✔
18
from localstack.services.stepfunctions.asl.component.state.state_execution.execute_state import (
1✔
19
    ExecutionState,
20
)
21
from localstack.services.stepfunctions.asl.component.test_state.state.base_mock import (
1✔
22
    MockedBaseState,
23
)
24
from localstack.services.stepfunctions.asl.eval.test_state.environment import TestStateEnvironment
1✔
25
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
1✔
26

27

28
class MockedStateExecution(MockedBaseState[ExecutionState]):
1✔
29
    def add_inspection_data(self, env: TestStateEnvironment):
1✔
30
        if self._wrapped.query_language.query_language_mode == QueryLanguageMode.JSONPath:
1✔
31
            if "afterResultSelector" not in env.inspection_data:
1✔
32
                # HACK: A DistributedItemProcessorEvalInput is added to the stack and never popped off
33
                # during an error case. So we need to check the inspected value is correct before
34
                # adding it to our inspectionData.
35
                if isinstance(env.stack[-1], (dict, str, int, float)):
1✔
36
                    env.inspection_data["afterResultSelector"] = to_json_str(env.stack[-1])
1✔
37

38
        if catch := self._wrapped.catch:
1✔
39
            for ind, catcher in enumerate(catch.catchers):
1✔
40
                original_fn = catcher._eval_body
1✔
41
                catcher._eval_body = self.with_catch_state_id(original_fn, ind)
1✔
42

43
        if retry := self._wrapped.retry:
1✔
44
            for ind, retrier in enumerate(retry.retriers):
1✔
45
                original_fn = retrier._eval_body
1✔
46
                retrier._eval_body = self.with_retry_state_id(retrier, ind)
1✔
47

48
    def _apply_patches(self):
1✔
49
        if not isinstance(self._wrapped, ExecutionState):
1✔
NEW
50
            raise ValueError("Can only apply MockedStateExecution patches to an ExecutionState")
×
51
        state = self._wrapped
1✔
52

53
        if state.query_language.query_language_mode == QueryLanguageMode.JSONPath:
1✔
54
            self._eval_with_inspect(self._wrapped.input_path, "afterInputPath")
1✔
55
            self._eval_with_inspect(self._wrapped.result_path, "afterResultPath")
1✔
56

57
        self._eval_with_inspect(self._wrapped.result_selector, "afterResultSelector")
1✔
58
        original_eval_execution = self._wrapped._eval_execution
1✔
59

60
        if self._wrapped.catch:
1✔
61
            original_fn = self._wrapped._handle_catch
1✔
62
            self._wrapped._handle_catch = partial(self._handle_catch, original_fn)
1✔
63

64
        if self._wrapped.retry:
1✔
65
            original_fn = self._wrapped._handle_retry
1✔
66
            self._wrapped._handle_retry = partial(self._handle_retry, original_fn)
1✔
67

68
        self._wrapped._eval_execution = self.wrap_with_post_return(
1✔
69
            method=original_eval_execution,
70
            post_return_fn=self.add_inspection_data,
71
        )
72

73
    @staticmethod
1✔
74
    def with_catch_state_id(
1✔
75
        original_eval_body: Callable[[TestStateEnvironment], None], state_id: int
76
    ) -> Callable[[TestStateEnvironment], None]:
77
        def _wrapped(env: TestStateEnvironment):
1✔
78
            original_eval_body(env)
1✔
79

80
            if isinstance(env.stack[-1], CatcherOutcomeCaught):
1✔
81
                if not (error_details := env.inspection_data.get("errorDetails")):
1✔
82
                    error_details = env.inspection_data["errorDetails"] = {}
1✔
83

84
                error_details["catchIndex"] = state_id
1✔
85

86
        return _wrapped
1✔
87

88
    @staticmethod
1✔
89
    def with_retry_state_id(
1✔
90
        retrier: RetrierDecl, state_id: int
91
    ) -> Callable[[TestStateEnvironment], None]:
92
        original_retrier_eval_body = retrier._eval_body
1✔
93

94
        def _wrapped(env: TestStateEnvironment):
1✔
95
            if (retry_count := env.mock._state_configuration.get("retrierRetryCount", 0)) > 0:
1✔
96
                retrier.max_attempts._store_attempt_number(env, retry_count - 1)
1✔
97

98
            original_retrier_eval_body(env)
1✔
99

100
            if not (error_details := env.inspection_data.get("errorDetails")):
1✔
101
                error_details = env.inspection_data["errorDetails"] = {}
1✔
102

103
            error_details["retryIndex"] = state_id
1✔
104
            if env.stack[-1] == RetrierOutcome.Executed:
1✔
105
                # TODO(gregfurman): Ideally, retryBackoffIntervalSeconds should be written to inspectionData
106
                # within the retrier.backoff_rate decleration (perhaps at _access_next_multiplier).
107
                rate = retrier.backoff_rate.rate
1✔
108
                interval = retrier.interval_seconds.seconds
1✔
109
                error_details["retryBackoffIntervalSeconds"] = int(interval * (rate**retry_count))
1✔
110

111
        return _wrapped
1✔
112

113
    @staticmethod
1✔
114
    def _handle_catch(
1✔
115
        original_handle_catch: Callable[[TestStateEnvironment, FailureEvent], None],
116
        env: TestStateEnvironment,
117
        failure_event: FailureEvent,
118
    ) -> None:
119
        original_handle_catch(env, failure_event)
1✔
120

121
        spec: dict[str, str] = ExecutionState._construct_error_output_value(failure_event)
1✔
122
        error, cause = spec.get("Error"), spec.get("Cause")
1✔
123

124
        env.set_caught_error(env.next_state_name, error, cause)
1✔
125

126
    @staticmethod
1✔
127
    def _handle_retry(
1✔
128
        original_handle_retry: Callable[[TestStateEnvironment, FailureEvent], RetryOutcome],
129
        env: TestStateEnvironment,
130
        failure_event: FailureEvent,
131
    ) -> RetryOutcome:
132
        res = original_handle_retry(env, failure_event)
1✔
133

134
        spec: dict[str, str] = ExecutionState._construct_error_output_value(failure_event)
1✔
135
        error, cause = spec.get("Error"), spec.get("Cause")
1✔
136

137
        if res == RetryOutcome.CanRetry:
1✔
138
            env.set_retriable_error(error, cause)
1✔
139
        return res
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc