• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.84
/localstack-core/localstack/services/lambda_/invocation/execution_environment.py
1
import logging
1✔
2
import random
1✔
3
import string
1✔
4
import time
1✔
5
from collections.abc import Callable
1✔
6
from datetime import date, datetime
1✔
7
from enum import Enum, auto
1✔
8
from threading import RLock, Timer
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.api.lambda_ import LogFormat
1✔
12
from localstack.aws.connect import connect_to
1✔
13
from localstack.services.lambda_.invocation.lambda_models import (
1✔
14
    Credentials,
15
    FunctionVersion,
16
    InitializationType,
17
    Invocation,
18
    InvocationResult,
19
)
20
from localstack.services.lambda_.invocation.runtime_executor import (
1✔
21
    RuntimeExecutor,
22
    get_runtime_executor,
23
)
24
from localstack.utils.strings import to_str
1✔
25
from localstack.utils.xray.trace_header import TraceHeader
1✔
26

27
STARTUP_TIMEOUT_SEC = config.LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT
1✔
28
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
1✔
29

30
LOG = logging.getLogger(__name__)
1✔
31

32

33
class RuntimeStatus(Enum):
1✔
34
    INACTIVE = auto()
1✔
35
    STARTING = auto()
1✔
36
    READY = auto()
1✔
37
    INVOKING = auto()
1✔
38
    STARTUP_FAILED = auto()
1✔
39
    STARTUP_TIMED_OUT = auto()
1✔
40
    STOPPED = auto()
1✔
41
    TIMING_OUT = auto()
1✔
42

43

44
class InvalidStatusException(Exception):
1✔
45
    def __init__(self, message: str):
1✔
46
        super().__init__(message)
1✔
47

48

49
class EnvironmentStartupTimeoutException(Exception):
1✔
50
    def __init__(self, message: str):
1✔
51
        super().__init__(message)
1✔
52

53

54
def generate_runtime_id() -> str:
1✔
55
    return "".join(random.choices(string.hexdigits[:16], k=32)).lower()
1✔
56

57

58
# TODO: add status callback
59
class ExecutionEnvironment:
1✔
60
    runtime_executor: RuntimeExecutor
1✔
61
    status_lock: RLock
1✔
62
    status: RuntimeStatus
1✔
63
    initialization_type: InitializationType
1✔
64
    last_returned: datetime
1✔
65
    startup_timer: Timer | None
1✔
66
    keepalive_timer: Timer | None
1✔
67
    on_timeout: Callable[[str, str], None]
1✔
68

69
    def __init__(
1✔
70
        self,
71
        function_version: FunctionVersion,
72
        initialization_type: InitializationType,
73
        on_timeout: Callable[[str, str], None],
74
        version_manager_id: str,
75
    ):
76
        self.id = generate_runtime_id()
1✔
77
        self.status = RuntimeStatus.INACTIVE
1✔
78
        # Lock for updating the runtime status
79
        self.status_lock = RLock()
1✔
80
        self.function_version = function_version
1✔
81
        self.initialization_type = initialization_type
1✔
82
        self.runtime_executor = get_runtime_executor()(self.id, function_version)
1✔
83
        self.last_returned = datetime.min
1✔
84
        self.startup_timer = None
1✔
85
        self.keepalive_timer = Timer(0, lambda *args, **kwargs: None)
1✔
86
        self.on_timeout = on_timeout
1✔
87
        self.version_manager_id = version_manager_id
1✔
88

89
    def get_log_group_name(self) -> str:
1✔
90
        return f"/aws/lambda/{self.function_version.id.function_name}"
1✔
91

92
    def get_log_stream_name(self) -> str:
1✔
93
        return f"{date.today():%Y/%m/%d}/[{self.function_version.id.qualifier}]{self.id}"
1✔
94

95
    def get_environment_variables(self) -> dict[str, str]:
1✔
96
        """
97
        Returns the environment variable set for the runtime container
98
        :return: Dict of environment variables
99
        """
100
        credentials = self.get_credentials()
1✔
101
        env_vars = {
1✔
102
            # 1) Public AWS defined runtime environment variables (in same order):
103
            # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
104
            # a) Reserved environment variables
105
            # _HANDLER conditionally added below
106
            # TODO: _X_AMZN_TRACE_ID
107
            "AWS_DEFAULT_REGION": self.function_version.id.region,
108
            "AWS_REGION": self.function_version.id.region,
109
            # AWS_EXECUTION_ENV conditionally added below
110
            "AWS_LAMBDA_FUNCTION_NAME": self.function_version.id.function_name,
111
            "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": self.function_version.config.memory_size,
112
            "AWS_LAMBDA_FUNCTION_VERSION": self.function_version.id.qualifier,
113
            "AWS_LAMBDA_INITIALIZATION_TYPE": self.initialization_type,
114
            "AWS_LAMBDA_LOG_GROUP_NAME": self.get_log_group_name(),
115
            "AWS_LAMBDA_LOG_STREAM_NAME": self.get_log_stream_name(),
116
            # Access IDs for role
117
            "AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
118
            "AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
119
            "AWS_SESSION_TOKEN": credentials["SessionToken"],
120
            # AWS_LAMBDA_RUNTIME_API is set in the runtime interface emulator (RIE)
121
            "LAMBDA_TASK_ROOT": "/var/task",
122
            "LAMBDA_RUNTIME_DIR": "/var/runtime",
123
            # b) Unreserved environment variables
124
            # LANG
125
            # LD_LIBRARY_PATH
126
            # NODE_PATH
127
            # PYTHONPATH
128
            # GEM_PATH
129
            "AWS_XRAY_CONTEXT_MISSING": "LOG_ERROR",
130
            # TODO: allow configuration of xray address
131
            "AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1:2000",
132
            # not 100% sure who sets these two
133
            # extensions are not supposed to have them in their envs => TODO: test if init removes them
134
            "_AWS_XRAY_DAEMON_PORT": "2000",
135
            "_AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1",
136
            # AWS_LAMBDA_DOTNET_PREJIT
137
            "TZ": ":UTC",
138
            # 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
139
            "AWS_LAMBDA_FUNCTION_TIMEOUT": self.function_version.config.timeout,
140
            # 3) Public LocalStack endpoint
141
            "LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
142
            "EDGE_PORT": str(config.GATEWAY_LISTEN[0].port),
143
            # AWS_ENDPOINT_URL conditionally added below
144
            # 4) Internal LocalStack runtime API
145
            "LOCALSTACK_RUNTIME_ID": self.id,
146
            "LOCALSTACK_RUNTIME_ENDPOINT": self.runtime_executor.get_runtime_endpoint(),
147
            # 5) Account of the function (necessary for extensions API)
148
            "LOCALSTACK_FUNCTION_ACCOUNT_ID": self.function_version.id.account,
149
            # used by the init to spawn the x-ray daemon
150
            # LOCALSTACK_USER conditionally added below
151
        }
152
        # Conditionally added environment variables
153
        # Lambda advanced logging controls:
154
        # https://aws.amazon.com/blogs/compute/introducing-advanced-logging-controls-for-aws-lambda-functions/
155
        logging_config = self.function_version.config.logging_config
1✔
156
        if logging_config.get("LogFormat") == LogFormat.JSON:
1✔
157
            env_vars["AWS_LAMBDA_LOG_FORMAT"] = logging_config["LogFormat"]
×
158
            # TODO: test this (currently not implemented in LocalStack)
159
            env_vars["AWS_LAMBDA_LOG_LEVEL"] = logging_config["ApplicationLogLevel"].capitalize()
×
160
        # Lambda Managed Instances
161
        if capacity_provider_config := self.function_version.config.CapacityProviderConfig:
1✔
162
            # Disable dropping privileges for parity
163
            # TODO: implement mixed permissions (maybe in RIE)
164
            # env_vars["LOCALSTACK_USER"] = "root"
165
            env_vars["AWS_LAMBDA_MAX_CONCURRENCY"] = capacity_provider_config[
×
166
                "LambdaManagedInstancesCapacityProviderConfig"
167
            ]["PerExecutionEnvironmentMaxConcurrency"]
168
            env_vars["TZ"] = ":/etc/localtime"
×
169
        if not config.LAMBDA_DISABLE_AWS_ENDPOINT_URL:
1✔
170
            env_vars["AWS_ENDPOINT_URL"] = (
1✔
171
                f"http://{self.runtime_executor.get_endpoint_from_executor()}:{config.GATEWAY_LISTEN[0].port}"
172
            )
173
        # config.handler is None for image lambdas and will be populated at runtime (e.g., by RIE)
174
        if self.function_version.config.handler:
1✔
175
            env_vars["_HANDLER"] = self.function_version.config.handler
1✔
176
        # Will be overridden by the runtime itself unless it is a provided runtime
177
        if self.function_version.config.runtime:
1✔
178
            env_vars["AWS_EXECUTION_ENV"] = "AWS_Lambda_rapid"
1✔
179
        if config.LAMBDA_INIT_DEBUG:
1✔
180
            # Disable dropping privileges because it breaks debugging
181
            env_vars["LOCALSTACK_USER"] = "root"
×
182
        # Forcefully overwrite the user might break debugging!
183
        if config.LAMBDA_INIT_USER is not None:
1✔
184
            env_vars["LOCALSTACK_USER"] = config.LAMBDA_INIT_USER
1✔
185
        if config.LS_LOG in config.TRACE_LOG_LEVELS:
1✔
186
            env_vars["LOCALSTACK_INIT_LOG_LEVEL"] = "info"
×
187
        if config.LAMBDA_INIT_POST_INVOKE_WAIT_MS:
1✔
188
            env_vars["LOCALSTACK_POST_INVOKE_WAIT_MS"] = int(config.LAMBDA_INIT_POST_INVOKE_WAIT_MS)
1✔
189
        if config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES:
1✔
190
            env_vars["LOCALSTACK_MAX_PAYLOAD_SIZE"] = int(
1✔
191
                config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES
192
            )
193

194
        # Let users overwrite any environment variable at last (if they want so)
195
        if self.function_version.config.environment:
1✔
196
            env_vars.update(self.function_version.config.environment)
1✔
197
        return env_vars
1✔
198

199
    # Lifecycle methods
200
    def start(self) -> None:
1✔
201
        """
202
        Starting the runtime environment
203
        """
204
        with self.status_lock:
1✔
205
            if self.status != RuntimeStatus.INACTIVE:
1✔
206
                raise InvalidStatusException(
×
207
                    f"Execution environment {self.id} can only be started when inactive. Current status: {self.status}"
208
                )
209
            self.status = RuntimeStatus.STARTING
1✔
210

211
        startup_time_seconds: int = self._get_startup_timeout_seconds()
1✔
212
        self.startup_timer = Timer(startup_time_seconds, self.timed_out)
1✔
213
        self.startup_timer.start()
1✔
214

215
        try:
1✔
216
            time_before = time.perf_counter()
1✔
217
            self.runtime_executor.start(self.get_environment_variables())
1✔
218
            LOG.debug(
1✔
219
                "Start of execution environment %s for function %s took %0.2fms",
220
                self.id,
221
                self.function_version.qualified_arn,
222
                (time.perf_counter() - time_before) * 1000,
223
            )
224

225
            with self.status_lock:
1✔
226
                self.status = RuntimeStatus.READY
1✔
227
        # TODO: Distinguish between expected errors (e.g., timeout, cancellation due to deletion update) and
228
        #  other unexpected exceptions. Improve control flow after implementing error reporting in Go init.
229
        except Exception as e:
1✔
230
            if self.status == RuntimeStatus.STARTUP_TIMED_OUT:
1✔
231
                raise EnvironmentStartupTimeoutException(
1✔
232
                    "Execution environment timed out during startup."
233
                ) from e
234
            else:
235
                LOG.warning(
1✔
236
                    "Failed to start execution environment %s: %s",
237
                    self.id,
238
                    e,
239
                )
240
                self.errored()
1✔
241
            raise
1✔
242
        finally:
243
            if self.startup_timer:
1✔
244
                self.startup_timer.cancel()
1✔
245
                self.startup_timer = None
1✔
246

247
    def stop(self) -> None:
1✔
248
        """
249
        Stopping the runtime environment
250
        """
251
        with self.status_lock:
1✔
252
            if self.status in [RuntimeStatus.INACTIVE, RuntimeStatus.STOPPED]:
1✔
253
                raise InvalidStatusException(
1✔
254
                    f"Execution environment {self.id} cannot be stopped when inactive or already stopped."
255
                    f" Current status: {self.status}"
256
                )
257
            self.status = RuntimeStatus.STOPPED
1✔
258
        self.runtime_executor.stop()
1✔
259
        self.keepalive_timer.cancel()
1✔
260

261
    # Status methods
262
    def release(self) -> None:
1✔
263
        self.last_returned = datetime.now()
1✔
264
        with self.status_lock:
1✔
265
            if self.status != RuntimeStatus.INVOKING:
1✔
266
                raise InvalidStatusException(
×
267
                    f"Execution environment {self.id} can only be set to status ready while running."
268
                    f" Current status: {self.status}"
269
                )
270
            self.status = RuntimeStatus.READY
1✔
271

272
        if self.initialization_type == "on-demand":
1✔
273
            self.keepalive_timer = Timer(config.LAMBDA_KEEPALIVE_MS / 1000, self.keepalive_passed)
1✔
274
            self.keepalive_timer.start()
1✔
275

276
    def reserve(self) -> None:
1✔
277
        with self.status_lock:
1✔
278
            if self.status != RuntimeStatus.READY:
1✔
279
                raise InvalidStatusException(
1✔
280
                    f"Execution environment {self.id} can only be reserved if ready. "
281
                    f" Current status: {self.status}"
282
                )
283
            self.status = RuntimeStatus.INVOKING
1✔
284

285
        self.keepalive_timer.cancel()
1✔
286

287
    def keepalive_passed(self) -> None:
1✔
288
        LOG.debug(
×
289
            "Execution environment %s for function %s has not received any invocations in a while. Stopping.",
290
            self.id,
291
            self.function_version.qualified_arn,
292
        )
293
        # The stop() method allows to interrupt invocations (on purpose), which might cancel running invocations
294
        # which we should not do when the keepalive timer passed.
295
        # The new TIMING_OUT state prevents this race condition
296
        with self.status_lock:
×
297
            if self.status != RuntimeStatus.READY:
×
298
                LOG.debug(
×
299
                    "Keepalive timer passed, but current runtime status is %s. Aborting keepalive stop.",
300
                    self.status,
301
                )
302
                return
×
303
            self.status = RuntimeStatus.TIMING_OUT
×
304
        self.stop()
×
305
        # Notify assignment service via callback to remove from environments list
306
        self.on_timeout(self.version_manager_id, self.id)
×
307

308
    def timed_out(self) -> None:
1✔
309
        """Handle status updates if the startup of an execution environment times out.
310
        Invoked asynchronously by the startup timer in a separate thread."""
311
        # TODO: De-emphasize the error part after fixing control flow and tests for test_lambda_runtime_exit
312
        LOG.warning(
1✔
313
            "Execution environment %s for function %s timed out during startup."
314
            " Check for errors during the startup of your Lambda function and"
315
            " consider increasing the startup timeout via LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT.",
316
            self.id,
317
            self.function_version.qualified_arn,
318
        )
319
        if LOG.isEnabledFor(logging.DEBUG):
1✔
320
            LOG.debug(
1✔
321
                "Logs from the execution environment %s after startup timeout:\n%s",
322
                self.id,
323
                self.get_prefixed_logs(),
324
            )
325
        with self.status_lock:
1✔
326
            if self.status != RuntimeStatus.STARTING:
1✔
327
                raise InvalidStatusException(
×
328
                    f"Execution environment {self.id} can only time out while starting. Current status: {self.status}"
329
                )
330
            self.status = RuntimeStatus.STARTUP_TIMED_OUT
1✔
331
        try:
1✔
332
            self.runtime_executor.stop()
1✔
333
        except Exception as e:
×
334
            LOG.debug("Unable to shutdown execution environment %s after timeout: %s", self.id, e)
×
335

336
    def errored(self) -> None:
1✔
337
        """Handle status updates if the startup of an execution environment fails.
338
        Invoked synchronously when an unexpected error occurs during startup."""
339
        LOG.warning(
1✔
340
            "Execution environment %s for function %s failed during startup."
341
            " Check for errors during the startup of your Lambda function.",
342
            self.id,
343
            self.function_version.qualified_arn,
344
        )
345
        if LOG.isEnabledFor(logging.DEBUG):
1✔
346
            LOG.debug(
1✔
347
                "Logs from the execution environment %s after startup error:\n%s",
348
                self.id,
349
                self.get_prefixed_logs(),
350
            )
351
        with self.status_lock:
1✔
352
            if self.status != RuntimeStatus.STARTING:
1✔
353
                raise InvalidStatusException(
1✔
354
                    f"Execution environment {self.id} can only error while starting. Current status: {self.status}"
355
                )
356
            self.status = RuntimeStatus.STARTUP_FAILED
1✔
357
        try:
1✔
358
            self.runtime_executor.stop()
1✔
359
        except Exception as e:
1✔
360
            LOG.debug("Unable to shutdown execution environment %s after error: %s", self.id, e)
1✔
361

362
    def get_prefixed_logs(self) -> str:
1✔
363
        """Returns prefixed lambda containers logs"""
364
        logs = self.runtime_executor.get_logs()
1✔
365
        prefix = f"[lambda {self.id}] "
1✔
366
        prefixed_logs = logs.replace("\n", f"\n{prefix}")
1✔
367
        return f"{prefix}{prefixed_logs}"
1✔
368

369
    def invoke(self, invocation: Invocation) -> InvocationResult:
1✔
370
        assert self.status == RuntimeStatus.INVOKING
1✔
371
        # Async/event invokes might miss an aws_trace_header, then we need to create a new root trace id.
372
        aws_trace_header = (
1✔
373
            invocation.trace_context.get("aws_trace_header") or TraceHeader().ensure_root_exists()
374
        )
375
        # The Lambda RIE requires a full tracing header including Root, Parent, and Samples. Otherwise, tracing fails
376
        # with the warning "Subsegment ## handler discarded due to Lambda worker still initializing"
377
        aws_trace_header.ensure_sampled_exists()
1✔
378
        # TODO: replace this random parent id with actual parent segment created within the Lambda provider using X-Ray
379
        aws_trace_header.ensure_parent_exists()
1✔
380
        # TODO: test and implement Active and PassThrough tracing and sampling decisions.
381
        # TODO: implement Lambda lineage: https://docs.aws.amazon.com/lambda/latest/dg/invocation-recursion.html
382
        invoke_payload = {
1✔
383
            "invoke-id": invocation.request_id,  # TODO: rename to request-id (requires change in lambda-init)
384
            "invoked-function-arn": invocation.invoked_arn,
385
            "payload": to_str(invocation.payload),
386
            "trace-id": aws_trace_header.to_header_str(),
387
        }
388
        return self.runtime_executor.invoke(payload=invoke_payload)
1✔
389

390
    def get_credentials(self) -> Credentials:
1✔
391
        sts_client = connect_to(region_name=self.function_version.id.region).sts.request_metadata(
1✔
392
            service_principal="lambda"
393
        )
394
        role_session_name = self.function_version.id.function_name
1✔
395

396
        # To handle single character function names #9016
397
        if len(role_session_name) == 1:
1✔
398
            role_session_name += "@lambda_function"
1✔
399
        # TODO we should probably set a maximum alive duration for environments, due to the session expiration
400
        return sts_client.assume_role(
1✔
401
            RoleArn=self.function_version.config.role,
402
            RoleSessionName=role_session_name,
403
            DurationSeconds=43200,
404
        )["Credentials"]
405

406
    def _get_startup_timeout_seconds(self) -> int:
1✔
407
        return STARTUP_TIMEOUT_SEC
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc