• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19809586398

28 Nov 2025 05:40PM UTC coverage: 86.863% (-0.02%) from 86.879%
19809586398

push

github

web-flow
[SFN] Add new TestState API capabilities (#13418)

New capabilities have recently been added to TestState API. This commit adds the following support for the new capabilities:

- Add mocking support – Mock state outputs and errors without invoking downstream services
- Add support for Map (inline and distributed) states
- Add support to test specific states within a full state machine definition using the new stateName parameter.
- Add support for Catch and Retry fields
- Add new inspection data
- Rename `mocking` package to l`ocal_mocking`: clearly mark mocking functionality related to Step Functions Local. This helps to distinguish between Local mocks and TestState mocks.



Co-authored-by: Greg Furman <gregfurman99@gmail.com>
Co-authored-by: Greg Furman <31275503+gregfurman@users.noreply.github.com>

618 of 728 new or added lines in 25 files covered. (84.89%)

99 existing lines in 8 files now uncovered.

69469 of 79975 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.62
/localstack-core/localstack/services/lambda_/invocation/execution_environment.py
1
import logging
1✔
2
import random
1✔
3
import string
1✔
4
import time
1✔
5
from collections.abc import Callable
1✔
6
from datetime import date, datetime
1✔
7
from enum import Enum, auto
1✔
8
from threading import RLock, Timer
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.connect import connect_to
1✔
12
from localstack.services.lambda_.invocation.lambda_models import (
1✔
13
    Credentials,
14
    FunctionVersion,
15
    InitializationType,
16
    Invocation,
17
    InvocationResult,
18
)
19
from localstack.services.lambda_.invocation.runtime_executor import (
1✔
20
    RuntimeExecutor,
21
    get_runtime_executor,
22
)
23
from localstack.utils.strings import to_str
1✔
24
from localstack.utils.xray.trace_header import TraceHeader
1✔
25

26
STARTUP_TIMEOUT_SEC = config.LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT
1✔
27
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
1✔
28

29
LOG = logging.getLogger(__name__)
1✔
30

31

32
class RuntimeStatus(Enum):
1✔
33
    INACTIVE = auto()
1✔
34
    STARTING = auto()
1✔
35
    READY = auto()
1✔
36
    INVOKING = auto()
1✔
37
    STARTUP_FAILED = auto()
1✔
38
    STARTUP_TIMED_OUT = auto()
1✔
39
    STOPPED = auto()
1✔
40
    TIMING_OUT = auto()
1✔
41

42

43
class InvalidStatusException(Exception):
1✔
44
    def __init__(self, message: str):
1✔
45
        super().__init__(message)
1✔
46

47

48
class EnvironmentStartupTimeoutException(Exception):
1✔
49
    def __init__(self, message: str):
1✔
50
        super().__init__(message)
1✔
51

52

53
def generate_runtime_id() -> str:
1✔
54
    return "".join(random.choices(string.hexdigits[:16], k=32)).lower()
1✔
55

56

57
# TODO: add status callback
58
class ExecutionEnvironment:
1✔
59
    runtime_executor: RuntimeExecutor
1✔
60
    status_lock: RLock
1✔
61
    status: RuntimeStatus
1✔
62
    initialization_type: InitializationType
1✔
63
    last_returned: datetime
1✔
64
    startup_timer: Timer | None
1✔
65
    keepalive_timer: Timer | None
1✔
66
    on_timeout: Callable[[str, str], None]
1✔
67

68
    def __init__(
1✔
69
        self,
70
        function_version: FunctionVersion,
71
        initialization_type: InitializationType,
72
        on_timeout: Callable[[str, str], None],
73
        version_manager_id: str,
74
    ):
75
        self.id = generate_runtime_id()
1✔
76
        self.status = RuntimeStatus.INACTIVE
1✔
77
        # Lock for updating the runtime status
78
        self.status_lock = RLock()
1✔
79
        self.function_version = function_version
1✔
80
        self.initialization_type = initialization_type
1✔
81
        self.runtime_executor = get_runtime_executor()(self.id, function_version)
1✔
82
        self.last_returned = datetime.min
1✔
83
        self.startup_timer = None
1✔
84
        self.keepalive_timer = Timer(0, lambda *args, **kwargs: None)
1✔
85
        self.on_timeout = on_timeout
1✔
86
        self.version_manager_id = version_manager_id
1✔
87

88
    def get_log_group_name(self) -> str:
1✔
89
        return f"/aws/lambda/{self.function_version.id.function_name}"
1✔
90

91
    def get_log_stream_name(self) -> str:
1✔
92
        return f"{date.today():%Y/%m/%d}/[{self.function_version.id.qualifier}]{self.id}"
1✔
93

94
    def get_environment_variables(self) -> dict[str, str]:
1✔
95
        """
96
        Returns the environment variable set for the runtime container
97
        :return: Dict of environment variables
98
        """
99
        credentials = self.get_credentials()
1✔
100
        env_vars = {
1✔
101
            # 1) Public AWS defined runtime environment variables (in same order):
102
            # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
103
            # a) Reserved environment variables
104
            # _HANDLER conditionally added below
105
            # TODO: _X_AMZN_TRACE_ID
106
            "AWS_DEFAULT_REGION": self.function_version.id.region,
107
            "AWS_REGION": self.function_version.id.region,
108
            # AWS_EXECUTION_ENV conditionally added below
109
            "AWS_LAMBDA_FUNCTION_NAME": self.function_version.id.function_name,
110
            "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": self.function_version.config.memory_size,
111
            "AWS_LAMBDA_FUNCTION_VERSION": self.function_version.id.qualifier,
112
            "AWS_LAMBDA_INITIALIZATION_TYPE": self.initialization_type,
113
            "AWS_LAMBDA_LOG_GROUP_NAME": self.get_log_group_name(),
114
            "AWS_LAMBDA_LOG_STREAM_NAME": self.get_log_stream_name(),
115
            # Access IDs for role
116
            "AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
117
            "AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
118
            "AWS_SESSION_TOKEN": credentials["SessionToken"],
119
            # AWS_LAMBDA_RUNTIME_API is set in the runtime interface emulator (RIE)
120
            "LAMBDA_TASK_ROOT": "/var/task",
121
            "LAMBDA_RUNTIME_DIR": "/var/runtime",
122
            # b) Unreserved environment variables
123
            # LANG
124
            # LD_LIBRARY_PATH
125
            # NODE_PATH
126
            # PYTHONPATH
127
            # GEM_PATH
128
            "AWS_XRAY_CONTEXT_MISSING": "LOG_ERROR",
129
            # TODO: allow configuration of xray address
130
            "AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1:2000",
131
            # not 100% sure who sets these two
132
            # extensions are not supposed to have them in their envs => TODO: test if init removes them
133
            "_AWS_XRAY_DAEMON_PORT": "2000",
134
            "_AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1",
135
            # AWS_LAMBDA_DOTNET_PREJIT
136
            "TZ": ":UTC",
137
            # 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
138
            "AWS_LAMBDA_FUNCTION_TIMEOUT": self.function_version.config.timeout,
139
            # 3) Public LocalStack endpoint
140
            "LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
141
            "EDGE_PORT": str(config.GATEWAY_LISTEN[0].port),
142
            # AWS_ENDPOINT_URL conditionally added below
143
            # 4) Internal LocalStack runtime API
144
            "LOCALSTACK_RUNTIME_ID": self.id,
145
            "LOCALSTACK_RUNTIME_ENDPOINT": self.runtime_executor.get_runtime_endpoint(),
146
            # 5) Account of the function (necessary for extensions API)
147
            "LOCALSTACK_FUNCTION_ACCOUNT_ID": self.function_version.id.account,
148
            # used by the init to spawn the x-ray daemon
149
            # LOCALSTACK_USER conditionally added below
150
        }
151
        # Conditionally added environment variables
152
        if not config.LAMBDA_DISABLE_AWS_ENDPOINT_URL:
1✔
153
            env_vars["AWS_ENDPOINT_URL"] = (
1✔
154
                f"http://{self.runtime_executor.get_endpoint_from_executor()}:{config.GATEWAY_LISTEN[0].port}"
155
            )
156
        # config.handler is None for image lambdas and will be populated at runtime (e.g., by RIE)
157
        if self.function_version.config.handler:
1✔
158
            env_vars["_HANDLER"] = self.function_version.config.handler
1✔
159
        # Will be overridden by the runtime itself unless it is a provided runtime
160
        if self.function_version.config.runtime:
1✔
161
            env_vars["AWS_EXECUTION_ENV"] = "AWS_Lambda_rapid"
1✔
162
        if self.function_version.config.environment:
1✔
163
            env_vars.update(self.function_version.config.environment)
1✔
164
        if config.LAMBDA_INIT_DEBUG:
1✔
165
            # Disable dropping privileges because it breaks debugging
166
            env_vars["LOCALSTACK_USER"] = "root"
×
167
        # Forcefully overwrite the user might break debugging!
168
        if config.LAMBDA_INIT_USER is not None:
1✔
169
            env_vars["LOCALSTACK_USER"] = config.LAMBDA_INIT_USER
1✔
170
        if config.LS_LOG in config.TRACE_LOG_LEVELS:
1✔
171
            env_vars["LOCALSTACK_INIT_LOG_LEVEL"] = "info"
×
172
        if config.LAMBDA_INIT_POST_INVOKE_WAIT_MS:
1✔
173
            env_vars["LOCALSTACK_POST_INVOKE_WAIT_MS"] = int(config.LAMBDA_INIT_POST_INVOKE_WAIT_MS)
1✔
174
        if config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES:
1✔
175
            env_vars["LOCALSTACK_MAX_PAYLOAD_SIZE"] = int(
1✔
176
                config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES
177
            )
178
        return env_vars
1✔
179

180
    # Lifecycle methods
181
    def start(self) -> None:
1✔
182
        """
183
        Starting the runtime environment
184
        """
185
        with self.status_lock:
1✔
186
            if self.status != RuntimeStatus.INACTIVE:
1✔
187
                raise InvalidStatusException(
×
188
                    f"Execution environment {self.id} can only be started when inactive. Current status: {self.status}"
189
                )
190
            self.status = RuntimeStatus.STARTING
1✔
191

192
        startup_time_seconds: int = self._get_startup_timeout_seconds()
1✔
193
        self.startup_timer = Timer(startup_time_seconds, self.timed_out)
1✔
194
        self.startup_timer.start()
1✔
195

196
        try:
1✔
197
            time_before = time.perf_counter()
1✔
198
            self.runtime_executor.start(self.get_environment_variables())
1✔
199
            LOG.debug(
1✔
200
                "Start of execution environment %s for function %s took %0.2fms",
201
                self.id,
202
                self.function_version.qualified_arn,
203
                (time.perf_counter() - time_before) * 1000,
204
            )
205

206
            with self.status_lock:
1✔
207
                self.status = RuntimeStatus.READY
1✔
208
        # TODO: Distinguish between expected errors (e.g., timeout, cancellation due to deletion update) and
209
        #  other unexpected exceptions. Improve control flow after implementing error reporting in Go init.
210
        except Exception as e:
1✔
211
            if self.status == RuntimeStatus.STARTUP_TIMED_OUT:
1✔
212
                raise EnvironmentStartupTimeoutException(
1✔
213
                    "Execution environment timed out during startup."
214
                ) from e
215
            else:
216
                LOG.warning(
1✔
217
                    "Failed to start execution environment %s: %s",
218
                    self.id,
219
                    e,
220
                )
221
                self.errored()
1✔
222
            raise
1✔
223
        finally:
224
            if self.startup_timer:
1✔
225
                self.startup_timer.cancel()
1✔
226
                self.startup_timer = None
1✔
227

228
    def stop(self) -> None:
1✔
229
        """
230
        Stopping the runtime environment
231
        """
232
        with self.status_lock:
1✔
233
            if self.status in [RuntimeStatus.INACTIVE, RuntimeStatus.STOPPED]:
1✔
234
                raise InvalidStatusException(
1✔
235
                    f"Execution environment {self.id} cannot be stopped when inactive or already stopped."
236
                    f" Current status: {self.status}"
237
                )
238
            self.status = RuntimeStatus.STOPPED
1✔
239
        self.runtime_executor.stop()
1✔
240
        self.keepalive_timer.cancel()
1✔
241

242
    # Status methods
243
    def release(self) -> None:
1✔
244
        self.last_returned = datetime.now()
1✔
245
        with self.status_lock:
1✔
246
            if self.status != RuntimeStatus.INVOKING:
1✔
UNCOV
247
                raise InvalidStatusException(
×
248
                    f"Execution environment {self.id} can only be set to status ready while running."
249
                    f" Current status: {self.status}"
250
                )
251
            self.status = RuntimeStatus.READY
1✔
252

253
        if self.initialization_type == "on-demand":
1✔
254
            self.keepalive_timer = Timer(config.LAMBDA_KEEPALIVE_MS / 1000, self.keepalive_passed)
1✔
255
            self.keepalive_timer.start()
1✔
256

257
    def reserve(self) -> None:
1✔
258
        with self.status_lock:
1✔
259
            if self.status != RuntimeStatus.READY:
1✔
260
                raise InvalidStatusException(
1✔
261
                    f"Execution environment {self.id} can only be reserved if ready. "
262
                    f" Current status: {self.status}"
263
                )
264
            self.status = RuntimeStatus.INVOKING
1✔
265

266
        self.keepalive_timer.cancel()
1✔
267

268
    def keepalive_passed(self) -> None:
1✔
UNCOV
269
        LOG.debug(
×
270
            "Execution environment %s for function %s has not received any invocations in a while. Stopping.",
271
            self.id,
272
            self.function_version.qualified_arn,
273
        )
274
        # The stop() method allows to interrupt invocations (on purpose), which might cancel running invocations
275
        # which we should not do when the keepalive timer passed.
276
        # The new TIMING_OUT state prevents this race condition
UNCOV
277
        with self.status_lock:
×
UNCOV
278
            if self.status != RuntimeStatus.READY:
×
279
                LOG.debug(
×
280
                    "Keepalive timer passed, but current runtime status is %s. Aborting keepalive stop.",
281
                    self.status,
282
                )
283
                return
×
UNCOV
284
            self.status = RuntimeStatus.TIMING_OUT
×
UNCOV
285
        self.stop()
×
286
        # Notify assignment service via callback to remove from environments list
UNCOV
287
        self.on_timeout(self.version_manager_id, self.id)
×
288

289
    def timed_out(self) -> None:
1✔
290
        """Handle status updates if the startup of an execution environment times out.
291
        Invoked asynchronously by the startup timer in a separate thread."""
292
        # TODO: De-emphasize the error part after fixing control flow and tests for test_lambda_runtime_exit
293
        LOG.warning(
1✔
294
            "Execution environment %s for function %s timed out during startup."
295
            " Check for errors during the startup of your Lambda function and"
296
            " consider increasing the startup timeout via LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT.",
297
            self.id,
298
            self.function_version.qualified_arn,
299
        )
300
        if LOG.isEnabledFor(logging.DEBUG):
1✔
301
            LOG.debug(
1✔
302
                "Logs from the execution environment %s after startup timeout:\n%s",
303
                self.id,
304
                self.get_prefixed_logs(),
305
            )
306
        with self.status_lock:
1✔
307
            if self.status != RuntimeStatus.STARTING:
1✔
308
                raise InvalidStatusException(
×
309
                    f"Execution environment {self.id} can only time out while starting. Current status: {self.status}"
310
                )
311
            self.status = RuntimeStatus.STARTUP_TIMED_OUT
1✔
312
        try:
1✔
313
            self.runtime_executor.stop()
1✔
314
        except Exception as e:
×
315
            LOG.debug("Unable to shutdown execution environment %s after timeout: %s", self.id, e)
×
316

317
    def errored(self) -> None:
1✔
318
        """Handle status updates if the startup of an execution environment fails.
319
        Invoked synchronously when an unexpected error occurs during startup."""
320
        LOG.warning(
1✔
321
            "Execution environment %s for function %s failed during startup."
322
            " Check for errors during the startup of your Lambda function.",
323
            self.id,
324
            self.function_version.qualified_arn,
325
        )
326
        if LOG.isEnabledFor(logging.DEBUG):
1✔
327
            LOG.debug(
1✔
328
                "Logs from the execution environment %s after startup error:\n%s",
329
                self.id,
330
                self.get_prefixed_logs(),
331
            )
332
        with self.status_lock:
1✔
333
            if self.status != RuntimeStatus.STARTING:
1✔
334
                raise InvalidStatusException(
1✔
335
                    f"Execution environment {self.id} can only error while starting. Current status: {self.status}"
336
                )
337
            self.status = RuntimeStatus.STARTUP_FAILED
1✔
338
        try:
1✔
339
            self.runtime_executor.stop()
1✔
340
        except Exception as e:
1✔
341
            LOG.debug("Unable to shutdown execution environment %s after error: %s", self.id, e)
1✔
342

343
    def get_prefixed_logs(self) -> str:
1✔
344
        """Returns prefixed lambda containers logs"""
345
        logs = self.runtime_executor.get_logs()
1✔
346
        prefix = f"[lambda {self.id}] "
1✔
347
        prefixed_logs = logs.replace("\n", f"\n{prefix}")
1✔
348
        return f"{prefix}{prefixed_logs}"
1✔
349

350
    def invoke(self, invocation: Invocation) -> InvocationResult:
1✔
351
        assert self.status == RuntimeStatus.INVOKING
1✔
352
        # Async/event invokes might miss an aws_trace_header, then we need to create a new root trace id.
353
        aws_trace_header = (
1✔
354
            invocation.trace_context.get("aws_trace_header") or TraceHeader().ensure_root_exists()
355
        )
356
        # The Lambda RIE requires a full tracing header including Root, Parent, and Samples. Otherwise, tracing fails
357
        # with the warning "Subsegment ## handler discarded due to Lambda worker still initializing"
358
        aws_trace_header.ensure_sampled_exists()
1✔
359
        # TODO: replace this random parent id with actual parent segment created within the Lambda provider using X-Ray
360
        aws_trace_header.ensure_parent_exists()
1✔
361
        # TODO: test and implement Active and PassThrough tracing and sampling decisions.
362
        # TODO: implement Lambda lineage: https://docs.aws.amazon.com/lambda/latest/dg/invocation-recursion.html
363
        invoke_payload = {
1✔
364
            "invoke-id": invocation.request_id,  # TODO: rename to request-id (requires change in lambda-init)
365
            "invoked-function-arn": invocation.invoked_arn,
366
            "payload": to_str(invocation.payload),
367
            "trace-id": aws_trace_header.to_header_str(),
368
        }
369
        return self.runtime_executor.invoke(payload=invoke_payload)
1✔
370

371
    def get_credentials(self) -> Credentials:
1✔
372
        sts_client = connect_to(region_name=self.function_version.id.region).sts.request_metadata(
1✔
373
            service_principal="lambda"
374
        )
375
        role_session_name = self.function_version.id.function_name
1✔
376

377
        # To handle single character function names #9016
378
        if len(role_session_name) == 1:
1✔
379
            role_session_name += "@lambda_function"
1✔
380
        # TODO we should probably set a maximum alive duration for environments, due to the session expiration
381
        return sts_client.assume_role(
1✔
382
            RoleArn=self.function_version.config.role,
383
            RoleSessionName=role_session_name,
384
            DurationSeconds=43200,
385
        )["Credentials"]
386

387
    def _get_startup_timeout_seconds(self) -> int:
1✔
388
        return STARTUP_TIMEOUT_SEC
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc