• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19809586398

28 Nov 2025 05:40PM UTC coverage: 86.863% (-0.02%) from 86.879%
19809586398

push

github

web-flow
[SFN] Add new TestState API capabilities (#13418)

New capabilities have recently been added to TestState API. This commit adds the following support for the new capabilities:

- Add mocking support – Mock state outputs and errors without invoking downstream services
- Add support for Map (inline and distributed) states
- Add support to test specific states within a full state machine definition using the new stateName parameter.
- Add support for Catch and Retry fields
- Add new inspection data
- Rename `mocking` package to l`ocal_mocking`: clearly mark mocking functionality related to Step Functions Local. This helps to distinguish between Local mocks and TestState mocks.



Co-authored-by: Greg Furman <gregfurman99@gmail.com>
Co-authored-by: Greg Furman <31275503+gregfurman@users.noreply.github.com>

618 of 728 new or added lines in 25 files covered. (84.89%)

99 existing lines in 8 files now uncovered.

69469 of 79975 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/localstack-core/localstack/services/lambda_/invocation/assignment.py
1
import contextlib
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from concurrent.futures import Future, ThreadPoolExecutor
1✔
5

6
from localstack.services.lambda_.invocation.execution_environment import (
1✔
7
    EnvironmentStartupTimeoutException,
8
    ExecutionEnvironment,
9
    InvalidStatusException,
10
)
11
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
12
from localstack.services.lambda_.invocation.lambda_models import (
1✔
13
    FunctionVersion,
14
    InitializationType,
15
    OtherServiceEndpoint,
16
)
17

18
LOG = logging.getLogger(__name__)
1✔
19

20

21
class AssignmentException(Exception):
1✔
22
    pass
1✔
23

24

25
class AssignmentService(OtherServiceEndpoint):
1✔
26
    """
27
    scope: LocalStack global
28
    """
29

30
    # function_version manager id => runtime_environment_id => runtime_environment
31
    environments: dict[str, dict[str, ExecutionEnvironment]]
1✔
32

33
    # Global pool for spawning and killing provisioned Lambda runtime environments
34
    provisioning_pool: ThreadPoolExecutor
1✔
35

36
    def __init__(self):
1✔
37
        self.environments = defaultdict(dict)
1✔
38
        self.provisioning_pool = ThreadPoolExecutor(thread_name_prefix="lambda-provisioning-pool")
1✔
39

40
    @contextlib.contextmanager
1✔
41
    def get_environment(
1✔
42
        self,
43
        version_manager_id: str,
44
        function_version: FunctionVersion,
45
        provisioning_type: InitializationType,
46
    ) -> contextlib.AbstractContextManager[ExecutionEnvironment]:
47
        applicable_envs = (
1✔
48
            env
49
            for env in self.environments[version_manager_id].values()
50
            if env.initialization_type == provisioning_type
51
        )
52
        execution_environment = None
1✔
53
        for environment in applicable_envs:
1✔
54
            try:
1✔
55
                environment.reserve()
1✔
56
                execution_environment = environment
1✔
57
                break
1✔
58
            except InvalidStatusException:
1✔
59
                pass
1✔
60

61
        if execution_environment is None:
1✔
62
            if provisioning_type == "provisioned-concurrency":
1✔
63
                raise AssignmentException(
×
64
                    "No provisioned concurrency environment available despite lease."
65
                )
66
            elif provisioning_type == "on-demand":
1✔
67
                execution_environment = self.start_environment(version_manager_id, function_version)
1✔
68
                self.environments[version_manager_id][execution_environment.id] = (
1✔
69
                    execution_environment
70
                )
71
                execution_environment.reserve()
1✔
72
            else:
73
                raise ValueError(f"Invalid provisioning type {provisioning_type}")
×
74

75
        try:
1✔
76
            yield execution_environment
1✔
77
            execution_environment.release()
1✔
78
        except InvalidStatusException as invalid_e:
1✔
UNCOV
79
            LOG.error("InvalidStatusException: %s", invalid_e)
×
80
        except Exception as e:
1✔
81
            LOG.error(
1✔
82
                "Failed invocation <%s>: %s", type(e), e, exc_info=LOG.isEnabledFor(logging.DEBUG)
83
            )
84
            self.stop_environment(execution_environment)
1✔
85
            raise e
1✔
86

87
    def start_environment(
1✔
88
        self, version_manager_id: str, function_version: FunctionVersion
89
    ) -> ExecutionEnvironment:
90
        LOG.debug("Starting new environment")
1✔
91
        execution_environment = ExecutionEnvironment(
1✔
92
            function_version=function_version,
93
            initialization_type="on-demand",
94
            on_timeout=self.on_timeout,
95
            version_manager_id=version_manager_id,
96
        )
97
        try:
1✔
98
            execution_environment.start()
1✔
99
        except StatusErrorException:
1✔
100
            raise
1✔
101
        except EnvironmentStartupTimeoutException:
1✔
102
            raise
1✔
103
        except Exception as e:
1✔
104
            message = f"Could not start new environment: {type(e).__name__}:{e}"
1✔
105
            raise AssignmentException(message) from e
1✔
106
        return execution_environment
1✔
107

108
    def on_timeout(self, version_manager_id: str, environment_id: str) -> None:
1✔
109
        """Callback for deleting environment after function times out"""
UNCOV
110
        del self.environments[version_manager_id][environment_id]
×
111

112
    def stop_environment(self, environment: ExecutionEnvironment) -> None:
1✔
113
        version_manager_id = environment.version_manager_id
1✔
114
        try:
1✔
115
            environment.stop()
1✔
116
            self.environments.get(version_manager_id).pop(environment.id)
1✔
117
        except Exception as e:
1✔
118
            LOG.debug(
1✔
119
                "Error while stopping environment for lambda %s, manager id %s, environment: %s, error: %s",
120
                environment.function_version.qualified_arn,
121
                version_manager_id,
122
                environment.id,
123
                e,
124
            )
125

126
    def stop_environments_for_version(self, version_manager_id: str):
1✔
127
        # We have to materialize the list before iterating due to concurrency
128
        environments_to_stop = list(self.environments.get(version_manager_id, {}).values())
1✔
129
        for env in environments_to_stop:
1✔
130
            self.stop_environment(env)
1✔
131

132
    def scale_provisioned_concurrency(
1✔
133
        self,
134
        version_manager_id: str,
135
        function_version: FunctionVersion,
136
        target_provisioned_environments: int,
137
    ) -> list[Future[None]]:
138
        current_provisioned_environments = [
1✔
139
            e
140
            for e in self.environments[version_manager_id].values()
141
            if e.initialization_type == "provisioned-concurrency"
142
        ]
143
        # TODO: refine scaling loop to re-use existing environments instead of re-creating all
144
        # current_provisioned_environments_count = len(current_provisioned_environments)
145
        # diff = target_provisioned_environments - current_provisioned_environments_count
146

147
        # TODO: handle case where no provisioned environment is available during scaling
148
        # Most simple scaling implementation for now:
149
        futures = []
1✔
150
        # 1) Re-create new target
151
        for _ in range(target_provisioned_environments):
1✔
152
            execution_environment = ExecutionEnvironment(
1✔
153
                function_version=function_version,
154
                initialization_type="provisioned-concurrency",
155
                on_timeout=self.on_timeout,
156
                version_manager_id=version_manager_id,
157
            )
158
            self.environments[version_manager_id][execution_environment.id] = execution_environment
1✔
159
            futures.append(self.provisioning_pool.submit(execution_environment.start))
1✔
160
        # 2) Kill all existing
161
        for env in current_provisioned_environments:
1✔
162
            # TODO: think about concurrent updates while deleting a function
163
            futures.append(self.provisioning_pool.submit(self.stop_environment, env))
1✔
164

165
        return futures
1✔
166

167
    def stop(self):
1✔
168
        self.provisioning_pool.shutdown(cancel_futures=True)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc