• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 836de691-8bbb-4693-9ed3-7ab4d196cd89

20 Mar 2025 04:14PM UTC coverage: 86.818% (+0.009%) from 86.809%
836de691-8bbb-4693-9ed3-7ab4d196cd89

push

circleci

web-flow
S3: fix MA/MR test (#12417)

62787 of 72320 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.78
/localstack-core/localstack/services/lambda_/invocation/execution_environment.py
1
import logging
1✔
2
import random
1✔
3
import string
1✔
4
import time
1✔
5
from datetime import date, datetime
1✔
6
from enum import Enum, auto
1✔
7
from threading import RLock, Timer
1✔
8
from typing import Callable, Dict, Optional
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.connect import connect_to
1✔
12
from localstack.services.lambda_.invocation.lambda_models import (
1✔
13
    Credentials,
14
    FunctionVersion,
15
    InitializationType,
16
    Invocation,
17
    InvocationResult,
18
)
19
from localstack.services.lambda_.invocation.runtime_executor import (
1✔
20
    RuntimeExecutor,
21
    get_runtime_executor,
22
)
23
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
24
    DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
25
    is_lambda_debug_timeout_enabled_for,
26
)
27
from localstack.utils.strings import to_str
1✔
28
from localstack.utils.xray.trace_header import TraceHeader
1✔
29

30
STARTUP_TIMEOUT_SEC = config.LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT
1✔
31
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
1✔
32

33
LOG = logging.getLogger(__name__)
1✔
34

35

36
class RuntimeStatus(Enum):
1✔
37
    INACTIVE = auto()
1✔
38
    STARTING = auto()
1✔
39
    READY = auto()
1✔
40
    RUNNING = auto()
1✔
41
    STARTUP_FAILED = auto()
1✔
42
    STARTUP_TIMED_OUT = auto()
1✔
43
    STOPPED = auto()
1✔
44

45

46
class InvalidStatusException(Exception):
1✔
47
    def __init__(self, message: str):
1✔
48
        super().__init__(message)
1✔
49

50

51
class EnvironmentStartupTimeoutException(Exception):
1✔
52
    def __init__(self, message: str):
1✔
53
        super().__init__(message)
1✔
54

55

56
def generate_runtime_id() -> str:
1✔
57
    return "".join(random.choices(string.hexdigits[:16], k=32)).lower()
1✔
58

59

60
# TODO: add status callback
61
class ExecutionEnvironment:
1✔
62
    runtime_executor: RuntimeExecutor
1✔
63
    status_lock: RLock
1✔
64
    status: RuntimeStatus
1✔
65
    initialization_type: InitializationType
1✔
66
    last_returned: datetime
1✔
67
    startup_timer: Optional[Timer]
1✔
68
    keepalive_timer: Optional[Timer]
1✔
69
    on_timeout: Callable[[str, str], None]
1✔
70

71
    def __init__(
1✔
72
        self,
73
        function_version: FunctionVersion,
74
        initialization_type: InitializationType,
75
        on_timeout: Callable[[str, str], None],
76
        version_manager_id: str,
77
    ):
78
        self.id = generate_runtime_id()
1✔
79
        self.status = RuntimeStatus.INACTIVE
1✔
80
        # Lock for updating the runtime status
81
        self.status_lock = RLock()
1✔
82
        self.function_version = function_version
1✔
83
        self.initialization_type = initialization_type
1✔
84
        self.runtime_executor = get_runtime_executor()(self.id, function_version)
1✔
85
        self.last_returned = datetime.min
1✔
86
        self.startup_timer = None
1✔
87
        self.keepalive_timer = Timer(0, lambda *args, **kwargs: None)
1✔
88
        self.on_timeout = on_timeout
1✔
89
        self.version_manager_id = version_manager_id
1✔
90

91
    def get_log_group_name(self) -> str:
1✔
92
        return f"/aws/lambda/{self.function_version.id.function_name}"
1✔
93

94
    def get_log_stream_name(self) -> str:
1✔
95
        return f"{date.today():%Y/%m/%d}/[{self.function_version.id.qualifier}]{self.id}"
1✔
96

97
    def get_environment_variables(self) -> Dict[str, str]:
1✔
98
        """
99
        Returns the environment variable set for the runtime container
100
        :return: Dict of environment variables
101
        """
102
        credentials = self.get_credentials()
1✔
103
        env_vars = {
1✔
104
            # 1) Public AWS defined runtime environment variables (in same order):
105
            # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
106
            # a) Reserved environment variables
107
            # _HANDLER conditionally added below
108
            # TODO: _X_AMZN_TRACE_ID
109
            "AWS_DEFAULT_REGION": self.function_version.id.region,
110
            "AWS_REGION": self.function_version.id.region,
111
            # AWS_EXECUTION_ENV conditionally added below
112
            "AWS_LAMBDA_FUNCTION_NAME": self.function_version.id.function_name,
113
            "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": self.function_version.config.memory_size,
114
            "AWS_LAMBDA_FUNCTION_VERSION": self.function_version.id.qualifier,
115
            "AWS_LAMBDA_INITIALIZATION_TYPE": self.initialization_type,
116
            "AWS_LAMBDA_LOG_GROUP_NAME": self.get_log_group_name(),
117
            "AWS_LAMBDA_LOG_STREAM_NAME": self.get_log_stream_name(),
118
            # Access IDs for role
119
            "AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
120
            "AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
121
            "AWS_SESSION_TOKEN": credentials["SessionToken"],
122
            # AWS_LAMBDA_RUNTIME_API is set in the runtime interface emulator (RIE)
123
            "LAMBDA_TASK_ROOT": "/var/task",
124
            "LAMBDA_RUNTIME_DIR": "/var/runtime",
125
            # b) Unreserved environment variables
126
            # LANG
127
            # LD_LIBRARY_PATH
128
            # NODE_PATH
129
            # PYTHONPATH
130
            # GEM_PATH
131
            "AWS_XRAY_CONTEXT_MISSING": "LOG_ERROR",
132
            # TODO: allow configuration of xray address
133
            "AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1:2000",
134
            # not 100% sure who sets these two
135
            # extensions are not supposed to have them in their envs => TODO: test if init removes them
136
            "_AWS_XRAY_DAEMON_PORT": "2000",
137
            "_AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1",
138
            # AWS_LAMBDA_DOTNET_PREJIT
139
            "TZ": ":UTC",
140
            # 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
141
            "AWS_LAMBDA_FUNCTION_TIMEOUT": self._get_execution_timeout_seconds(),
142
            # 3) Public LocalStack endpoint
143
            "LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
144
            "EDGE_PORT": str(config.GATEWAY_LISTEN[0].port),
145
            # AWS_ENDPOINT_URL conditionally added below
146
            # 4) Internal LocalStack runtime API
147
            "LOCALSTACK_RUNTIME_ID": self.id,
148
            "LOCALSTACK_RUNTIME_ENDPOINT": self.runtime_executor.get_runtime_endpoint(),
149
            # 5) Account of the function (necessary for extensions API)
150
            "LOCALSTACK_FUNCTION_ACCOUNT_ID": self.function_version.id.account,
151
            # used by the init to spawn the x-ray daemon
152
            # LOCALSTACK_USER conditionally added below
153
        }
154
        # Conditionally added environment variables
155
        if not config.LAMBDA_DISABLE_AWS_ENDPOINT_URL:
1✔
156
            env_vars["AWS_ENDPOINT_URL"] = (
1✔
157
                f"http://{self.runtime_executor.get_endpoint_from_executor()}:{config.GATEWAY_LISTEN[0].port}"
158
            )
159
        # config.handler is None for image lambdas and will be populated at runtime (e.g., by RIE)
160
        if self.function_version.config.handler:
1✔
161
            env_vars["_HANDLER"] = self.function_version.config.handler
1✔
162
        # Will be overridden by the runtime itself unless it is a provided runtime
163
        if self.function_version.config.runtime:
1✔
164
            env_vars["AWS_EXECUTION_ENV"] = "AWS_Lambda_rapid"
1✔
165
        if self.function_version.config.environment:
1✔
166
            env_vars.update(self.function_version.config.environment)
1✔
167
        if config.LAMBDA_INIT_DEBUG:
1✔
168
            # Disable dropping privileges because it breaks debugging
169
            env_vars["LOCALSTACK_USER"] = "root"
×
170
        # Forcefully overwrite the user might break debugging!
171
        if config.LAMBDA_INIT_USER is not None:
1✔
172
            env_vars["LOCALSTACK_USER"] = config.LAMBDA_INIT_USER
1✔
173
        if config.LS_LOG in config.TRACE_LOG_LEVELS:
1✔
174
            env_vars["LOCALSTACK_INIT_LOG_LEVEL"] = "info"
×
175
        if config.LAMBDA_INIT_POST_INVOKE_WAIT_MS:
1✔
176
            env_vars["LOCALSTACK_POST_INVOKE_WAIT_MS"] = int(config.LAMBDA_INIT_POST_INVOKE_WAIT_MS)
1✔
177
        if config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES:
1✔
178
            env_vars["LOCALSTACK_MAX_PAYLOAD_SIZE"] = int(
1✔
179
                config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES
180
            )
181
        return env_vars
1✔
182

183
    # Lifecycle methods
184
    def start(self) -> None:
1✔
185
        """
186
        Starting the runtime environment
187
        """
188
        with self.status_lock:
1✔
189
            if self.status != RuntimeStatus.INACTIVE:
1✔
190
                raise InvalidStatusException(
×
191
                    f"Execution environment {self.id} can only be started when inactive. Current status: {self.status}"
192
                )
193
            self.status = RuntimeStatus.STARTING
1✔
194

195
        startup_time_seconds: int = self._get_startup_timeout_seconds()
1✔
196
        self.startup_timer = Timer(startup_time_seconds, self.timed_out)
1✔
197
        self.startup_timer.start()
1✔
198

199
        try:
1✔
200
            time_before = time.perf_counter()
1✔
201
            self.runtime_executor.start(self.get_environment_variables())
1✔
202
            LOG.debug(
1✔
203
                "Start of execution environment %s for function %s took %0.2fms",
204
                self.id,
205
                self.function_version.qualified_arn,
206
                (time.perf_counter() - time_before) * 1000,
207
            )
208

209
            with self.status_lock:
1✔
210
                self.status = RuntimeStatus.READY
1✔
211
        # TODO: Distinguish between expected errors (e.g., timeout, cancellation due to deletion update) and
212
        #  other unexpected exceptions. Improve control flow after implementing error reporting in Go init.
213
        except Exception as e:
1✔
214
            if self.status == RuntimeStatus.STARTUP_TIMED_OUT:
1✔
215
                raise EnvironmentStartupTimeoutException(
1✔
216
                    "Execution environment timed out during startup."
217
                ) from e
218
            else:
219
                LOG.warning(
1✔
220
                    "Failed to start execution environment %s: %s",
221
                    self.id,
222
                    e,
223
                )
224
                self.errored()
1✔
225
            raise
1✔
226
        finally:
227
            if self.startup_timer:
1✔
228
                self.startup_timer.cancel()
1✔
229
                self.startup_timer = None
1✔
230

231
    def stop(self) -> None:
1✔
232
        """
233
        Stopping the runtime environment
234
        """
235
        with self.status_lock:
1✔
236
            if self.status in [RuntimeStatus.INACTIVE, RuntimeStatus.STOPPED]:
1✔
237
                raise InvalidStatusException(
1✔
238
                    f"Execution environment {self.id} cannot be stopped when inactive or already stopped."
239
                    f" Current status: {self.status}"
240
                )
241
            self.status = RuntimeStatus.STOPPED
1✔
242
        self.runtime_executor.stop()
1✔
243
        self.keepalive_timer.cancel()
1✔
244

245
    # Status methods
246
    def release(self) -> None:
1✔
247
        self.last_returned = datetime.now()
1✔
248
        with self.status_lock:
1✔
249
            if self.status != RuntimeStatus.RUNNING:
1✔
250
                raise InvalidStatusException(
×
251
                    f"Execution environment {self.id} can only be set to status ready while running."
252
                    f" Current status: {self.status}"
253
                )
254
            self.status = RuntimeStatus.READY
1✔
255

256
        if self.initialization_type == "on-demand":
1✔
257
            self.keepalive_timer = Timer(config.LAMBDA_KEEPALIVE_MS / 1000, self.keepalive_passed)
1✔
258
            self.keepalive_timer.start()
1✔
259

260
    def reserve(self) -> None:
1✔
261
        with self.status_lock:
1✔
262
            if self.status != RuntimeStatus.READY:
1✔
263
                raise InvalidStatusException(
×
264
                    f"Execution environment {self.id} can only be reserved if ready. "
265
                    f" Current status: {self.status}"
266
                )
267
            self.status = RuntimeStatus.RUNNING
1✔
268

269
        self.keepalive_timer.cancel()
1✔
270

271
    def keepalive_passed(self) -> None:
1✔
272
        LOG.debug(
×
273
            "Execution environment %s for function %s has not received any invocations in a while. Stopping.",
274
            self.id,
275
            self.function_version.qualified_arn,
276
        )
277
        self.stop()
×
278
        # Notify assignment service via callback to remove from environments list
279
        self.on_timeout(self.version_manager_id, self.id)
×
280

281
    def timed_out(self) -> None:
1✔
282
        """Handle status updates if the startup of an execution environment times out.
283
        Invoked asynchronously by the startup timer in a separate thread."""
284
        # TODO: De-emphasize the error part after fixing control flow and tests for test_lambda_runtime_exit
285
        LOG.warning(
1✔
286
            "Execution environment %s for function %s timed out during startup."
287
            " Check for errors during the startup of your Lambda function and"
288
            " consider increasing the startup timeout via LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT.",
289
            self.id,
290
            self.function_version.qualified_arn,
291
        )
292
        if LOG.isEnabledFor(logging.DEBUG):
1✔
293
            LOG.debug(
1✔
294
                "Logs from the execution environment %s after startup timeout:\n%s",
295
                self.id,
296
                self.get_prefixed_logs(),
297
            )
298
        with self.status_lock:
1✔
299
            if self.status != RuntimeStatus.STARTING:
1✔
300
                raise InvalidStatusException(
×
301
                    f"Execution environment {self.id} can only time out while starting. Current status: {self.status}"
302
                )
303
            self.status = RuntimeStatus.STARTUP_TIMED_OUT
1✔
304
        try:
1✔
305
            self.runtime_executor.stop()
1✔
306
        except Exception as e:
×
307
            LOG.debug("Unable to shutdown execution environment %s after timeout: %s", self.id, e)
×
308

309
    def errored(self) -> None:
1✔
310
        """Handle status updates if the startup of an execution environment fails.
311
        Invoked synchronously when an unexpected error occurs during startup."""
312
        LOG.warning(
1✔
313
            "Execution environment %s for function %s failed during startup."
314
            " Check for errors during the startup of your Lambda function.",
315
            self.id,
316
            self.function_version.qualified_arn,
317
        )
318
        if LOG.isEnabledFor(logging.DEBUG):
1✔
319
            LOG.debug(
1✔
320
                "Logs from the execution environment %s after startup error:\n%s",
321
                self.id,
322
                self.get_prefixed_logs(),
323
            )
324
        with self.status_lock:
1✔
325
            if self.status != RuntimeStatus.STARTING:
1✔
326
                raise InvalidStatusException(
1✔
327
                    f"Execution environment {self.id} can only error while starting. Current status: {self.status}"
328
                )
329
            self.status = RuntimeStatus.STARTUP_FAILED
1✔
330
        try:
1✔
331
            self.runtime_executor.stop()
1✔
332
        except Exception as e:
1✔
333
            LOG.debug("Unable to shutdown execution environment %s after error: %s", self.id, e)
1✔
334

335
    def get_prefixed_logs(self) -> str:
1✔
336
        """Returns prefixed lambda containers logs"""
337
        logs = self.runtime_executor.get_logs()
1✔
338
        prefix = f"[lambda {self.id}] "
1✔
339
        prefixed_logs = logs.replace("\n", f"\n{prefix}")
1✔
340
        return f"{prefix}{prefixed_logs}"
1✔
341

342
    def invoke(self, invocation: Invocation) -> InvocationResult:
1✔
343
        assert self.status == RuntimeStatus.RUNNING
1✔
344
        # Async/event invokes might miss an aws_trace_header, then we need to create a new root trace id.
345
        aws_trace_header = (
1✔
346
            invocation.trace_context.get("aws_trace_header") or TraceHeader().ensure_root_exists()
347
        )
348
        # The Lambda RIE requires a full tracing header including Root, Parent, and Samples. Otherwise, tracing fails
349
        # with the warning "Subsegment ## handler discarded due to Lambda worker still initializing"
350
        aws_trace_header.ensure_sampled_exists()
1✔
351
        # TODO: replace this random parent id with actual parent segment created within the Lambda provider using X-Ray
352
        aws_trace_header.ensure_parent_exists()
1✔
353
        # TODO: test and implement Active and PassThrough tracing and sampling decisions.
354
        # TODO: implement Lambda lineage: https://docs.aws.amazon.com/lambda/latest/dg/invocation-recursion.html
355
        invoke_payload = {
1✔
356
            "invoke-id": invocation.request_id,  # TODO: rename to request-id (requires change in lambda-init)
357
            "invoked-function-arn": invocation.invoked_arn,
358
            "payload": to_str(invocation.payload),
359
            "trace-id": aws_trace_header.to_header_str(),
360
        }
361
        return self.runtime_executor.invoke(payload=invoke_payload)
1✔
362

363
    def get_credentials(self) -> Credentials:
1✔
364
        sts_client = connect_to(region_name=self.function_version.id.region).sts.request_metadata(
1✔
365
            service_principal="lambda"
366
        )
367
        role_session_name = self.function_version.id.function_name
1✔
368

369
        # To handle single character function names #9016
370
        if len(role_session_name) == 1:
1✔
371
            role_session_name += "@lambda_function"
1✔
372
        # TODO we should probably set a maximum alive duration for environments, due to the session expiration
373
        return sts_client.assume_role(
1✔
374
            RoleArn=self.function_version.config.role,
375
            RoleSessionName=role_session_name,
376
            DurationSeconds=43200,
377
        )["Credentials"]
378

379
    def _get_execution_timeout_seconds(self) -> int:
1✔
380
        # Returns the timeout value in seconds to be enforced during the execution of the
381
        # lambda function. This is the configured value or the DEBUG MODE default if this
382
        # is enabled.
383
        if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
1✔
384
            return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
385
        return self.function_version.config.timeout
1✔
386

387
    def _get_startup_timeout_seconds(self) -> int:
1✔
388
        # Returns the timeout value in seconds to be enforced during lambda container startups.
389
        # This is the value defined through LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT or the LAMBDA
390
        # DEBUG MODE default if this is enabled.
391
        if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
1✔
392
            return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
393
        return STARTUP_TIMEOUT_SEC
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc