• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 533d4262-4f08-49b7-b7ba-18c46e51ac1a

02 Jun 2025 06:43PM UTC coverage: 86.752% (+0.1%) from 86.654%
533d4262-4f08-49b7-b7ba-18c46e51ac1a

push

circleci

web-flow
APIGW: implement Canary Deployments CRUD logic (#12694)

142 of 147 new or added lines in 3 files covered. (96.6%)

187 existing lines in 16 files now uncovered.

64937 of 74854 relevant lines covered (86.75%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.09
/localstack-core/localstack/services/lambda_/invocation/execution_environment.py
1
import logging
1✔
2
import random
1✔
3
import string
1✔
4
import time
1✔
5
from datetime import date, datetime
1✔
6
from enum import Enum, auto
1✔
7
from threading import RLock, Timer
1✔
8
from typing import Callable, Dict, Optional
1✔
9

10
from localstack import config
1✔
11
from localstack.aws.connect import connect_to
1✔
12
from localstack.services.lambda_.invocation.lambda_models import (
1✔
13
    Credentials,
14
    FunctionVersion,
15
    InitializationType,
16
    Invocation,
17
    InvocationResult,
18
)
19
from localstack.services.lambda_.invocation.runtime_executor import (
1✔
20
    RuntimeExecutor,
21
    get_runtime_executor,
22
)
23
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
1✔
24
    DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
25
    is_lambda_debug_timeout_enabled_for,
26
)
27
from localstack.utils.strings import to_str
1✔
28
from localstack.utils.xray.trace_header import TraceHeader
1✔
29

30
STARTUP_TIMEOUT_SEC = config.LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT
1✔
31
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
1✔
32

33
LOG = logging.getLogger(__name__)
1✔
34

35

36
class RuntimeStatus(Enum):
1✔
37
    INACTIVE = auto()
1✔
38
    STARTING = auto()
1✔
39
    READY = auto()
1✔
40
    INVOKING = auto()
1✔
41
    STARTUP_FAILED = auto()
1✔
42
    STARTUP_TIMED_OUT = auto()
1✔
43
    STOPPED = auto()
1✔
44
    TIMING_OUT = auto()
1✔
45

46

47
class InvalidStatusException(Exception):
1✔
48
    def __init__(self, message: str):
1✔
49
        super().__init__(message)
1✔
50

51

52
class EnvironmentStartupTimeoutException(Exception):
1✔
53
    def __init__(self, message: str):
1✔
54
        super().__init__(message)
1✔
55

56

57
def generate_runtime_id() -> str:
1✔
58
    return "".join(random.choices(string.hexdigits[:16], k=32)).lower()
1✔
59

60

61
# TODO: add status callback
62
class ExecutionEnvironment:
1✔
63
    runtime_executor: RuntimeExecutor
1✔
64
    status_lock: RLock
1✔
65
    status: RuntimeStatus
1✔
66
    initialization_type: InitializationType
1✔
67
    last_returned: datetime
1✔
68
    startup_timer: Optional[Timer]
1✔
69
    keepalive_timer: Optional[Timer]
1✔
70
    on_timeout: Callable[[str, str], None]
1✔
71

72
    def __init__(
1✔
73
        self,
74
        function_version: FunctionVersion,
75
        initialization_type: InitializationType,
76
        on_timeout: Callable[[str, str], None],
77
        version_manager_id: str,
78
    ):
79
        self.id = generate_runtime_id()
1✔
80
        self.status = RuntimeStatus.INACTIVE
1✔
81
        # Lock for updating the runtime status
82
        self.status_lock = RLock()
1✔
83
        self.function_version = function_version
1✔
84
        self.initialization_type = initialization_type
1✔
85
        self.runtime_executor = get_runtime_executor()(self.id, function_version)
1✔
86
        self.last_returned = datetime.min
1✔
87
        self.startup_timer = None
1✔
88
        self.keepalive_timer = Timer(0, lambda *args, **kwargs: None)
1✔
89
        self.on_timeout = on_timeout
1✔
90
        self.version_manager_id = version_manager_id
1✔
91

92
    def get_log_group_name(self) -> str:
1✔
93
        return f"/aws/lambda/{self.function_version.id.function_name}"
1✔
94

95
    def get_log_stream_name(self) -> str:
1✔
96
        return f"{date.today():%Y/%m/%d}/[{self.function_version.id.qualifier}]{self.id}"
1✔
97

98
    def get_environment_variables(self) -> Dict[str, str]:
1✔
99
        """
100
        Returns the environment variable set for the runtime container
101
        :return: Dict of environment variables
102
        """
103
        credentials = self.get_credentials()
1✔
104
        env_vars = {
1✔
105
            # 1) Public AWS defined runtime environment variables (in same order):
106
            # https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
107
            # a) Reserved environment variables
108
            # _HANDLER conditionally added below
109
            # TODO: _X_AMZN_TRACE_ID
110
            "AWS_DEFAULT_REGION": self.function_version.id.region,
111
            "AWS_REGION": self.function_version.id.region,
112
            # AWS_EXECUTION_ENV conditionally added below
113
            "AWS_LAMBDA_FUNCTION_NAME": self.function_version.id.function_name,
114
            "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": self.function_version.config.memory_size,
115
            "AWS_LAMBDA_FUNCTION_VERSION": self.function_version.id.qualifier,
116
            "AWS_LAMBDA_INITIALIZATION_TYPE": self.initialization_type,
117
            "AWS_LAMBDA_LOG_GROUP_NAME": self.get_log_group_name(),
118
            "AWS_LAMBDA_LOG_STREAM_NAME": self.get_log_stream_name(),
119
            # Access IDs for role
120
            "AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
121
            "AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
122
            "AWS_SESSION_TOKEN": credentials["SessionToken"],
123
            # AWS_LAMBDA_RUNTIME_API is set in the runtime interface emulator (RIE)
124
            "LAMBDA_TASK_ROOT": "/var/task",
125
            "LAMBDA_RUNTIME_DIR": "/var/runtime",
126
            # b) Unreserved environment variables
127
            # LANG
128
            # LD_LIBRARY_PATH
129
            # NODE_PATH
130
            # PYTHONPATH
131
            # GEM_PATH
132
            "AWS_XRAY_CONTEXT_MISSING": "LOG_ERROR",
133
            # TODO: allow configuration of xray address
134
            "AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1:2000",
135
            # not 100% sure who sets these two
136
            # extensions are not supposed to have them in their envs => TODO: test if init removes them
137
            "_AWS_XRAY_DAEMON_PORT": "2000",
138
            "_AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1",
139
            # AWS_LAMBDA_DOTNET_PREJIT
140
            "TZ": ":UTC",
141
            # 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
142
            "AWS_LAMBDA_FUNCTION_TIMEOUT": self._get_execution_timeout_seconds(),
143
            # 3) Public LocalStack endpoint
144
            "LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
145
            "EDGE_PORT": str(config.GATEWAY_LISTEN[0].port),
146
            # AWS_ENDPOINT_URL conditionally added below
147
            # 4) Internal LocalStack runtime API
148
            "LOCALSTACK_RUNTIME_ID": self.id,
149
            "LOCALSTACK_RUNTIME_ENDPOINT": self.runtime_executor.get_runtime_endpoint(),
150
            # 5) Account of the function (necessary for extensions API)
151
            "LOCALSTACK_FUNCTION_ACCOUNT_ID": self.function_version.id.account,
152
            # used by the init to spawn the x-ray daemon
153
            # LOCALSTACK_USER conditionally added below
154
        }
155
        # Conditionally added environment variables
156
        if not config.LAMBDA_DISABLE_AWS_ENDPOINT_URL:
1✔
157
            env_vars["AWS_ENDPOINT_URL"] = (
1✔
158
                f"http://{self.runtime_executor.get_endpoint_from_executor()}:{config.GATEWAY_LISTEN[0].port}"
159
            )
160
        # config.handler is None for image lambdas and will be populated at runtime (e.g., by RIE)
161
        if self.function_version.config.handler:
1✔
162
            env_vars["_HANDLER"] = self.function_version.config.handler
1✔
163
        # Will be overridden by the runtime itself unless it is a provided runtime
164
        if self.function_version.config.runtime:
1✔
165
            env_vars["AWS_EXECUTION_ENV"] = "AWS_Lambda_rapid"
1✔
166
        if self.function_version.config.environment:
1✔
167
            env_vars.update(self.function_version.config.environment)
1✔
168
        if config.LAMBDA_INIT_DEBUG:
1✔
169
            # Disable dropping privileges because it breaks debugging
170
            env_vars["LOCALSTACK_USER"] = "root"
×
171
        # Forcefully overwrite the user might break debugging!
172
        if config.LAMBDA_INIT_USER is not None:
1✔
173
            env_vars["LOCALSTACK_USER"] = config.LAMBDA_INIT_USER
1✔
174
        if config.LS_LOG in config.TRACE_LOG_LEVELS:
1✔
175
            env_vars["LOCALSTACK_INIT_LOG_LEVEL"] = "info"
×
176
        if config.LAMBDA_INIT_POST_INVOKE_WAIT_MS:
1✔
177
            env_vars["LOCALSTACK_POST_INVOKE_WAIT_MS"] = int(config.LAMBDA_INIT_POST_INVOKE_WAIT_MS)
1✔
178
        if config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES:
1✔
179
            env_vars["LOCALSTACK_MAX_PAYLOAD_SIZE"] = int(
1✔
180
                config.LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES
181
            )
182
        return env_vars
1✔
183

184
    # Lifecycle methods
185
    def start(self) -> None:
1✔
186
        """
187
        Starting the runtime environment
188
        """
189
        with self.status_lock:
1✔
190
            if self.status != RuntimeStatus.INACTIVE:
1✔
191
                raise InvalidStatusException(
×
192
                    f"Execution environment {self.id} can only be started when inactive. Current status: {self.status}"
193
                )
194
            self.status = RuntimeStatus.STARTING
1✔
195

196
        startup_time_seconds: int = self._get_startup_timeout_seconds()
1✔
197
        self.startup_timer = Timer(startup_time_seconds, self.timed_out)
1✔
198
        self.startup_timer.start()
1✔
199

200
        try:
1✔
201
            time_before = time.perf_counter()
1✔
202
            self.runtime_executor.start(self.get_environment_variables())
1✔
203
            LOG.debug(
1✔
204
                "Start of execution environment %s for function %s took %0.2fms",
205
                self.id,
206
                self.function_version.qualified_arn,
207
                (time.perf_counter() - time_before) * 1000,
208
            )
209

210
            with self.status_lock:
1✔
211
                self.status = RuntimeStatus.READY
1✔
212
        # TODO: Distinguish between expected errors (e.g., timeout, cancellation due to deletion update) and
213
        #  other unexpected exceptions. Improve control flow after implementing error reporting in Go init.
214
        except Exception as e:
1✔
215
            if self.status == RuntimeStatus.STARTUP_TIMED_OUT:
1✔
216
                raise EnvironmentStartupTimeoutException(
1✔
217
                    "Execution environment timed out during startup."
218
                ) from e
219
            else:
220
                LOG.warning(
1✔
221
                    "Failed to start execution environment %s: %s",
222
                    self.id,
223
                    e,
224
                )
225
                self.errored()
1✔
226
            raise
1✔
227
        finally:
228
            if self.startup_timer:
1✔
229
                self.startup_timer.cancel()
1✔
230
                self.startup_timer = None
1✔
231

232
    def stop(self) -> None:
1✔
233
        """
234
        Stopping the runtime environment
235
        """
236
        with self.status_lock:
1✔
237
            if self.status in [RuntimeStatus.INACTIVE, RuntimeStatus.STOPPED]:
1✔
238
                raise InvalidStatusException(
1✔
239
                    f"Execution environment {self.id} cannot be stopped when inactive or already stopped."
240
                    f" Current status: {self.status}"
241
                )
242
            self.status = RuntimeStatus.STOPPED
1✔
243
        self.runtime_executor.stop()
1✔
244
        self.keepalive_timer.cancel()
1✔
245

246
    # Status methods
247
    def release(self) -> None:
1✔
248
        self.last_returned = datetime.now()
1✔
249
        with self.status_lock:
1✔
250
            if self.status != RuntimeStatus.INVOKING:
1✔
UNCOV
251
                raise InvalidStatusException(
×
252
                    f"Execution environment {self.id} can only be set to status ready while running."
253
                    f" Current status: {self.status}"
254
                )
255
            self.status = RuntimeStatus.READY
1✔
256

257
        if self.initialization_type == "on-demand":
1✔
258
            self.keepalive_timer = Timer(config.LAMBDA_KEEPALIVE_MS / 1000, self.keepalive_passed)
1✔
259
            self.keepalive_timer.start()
1✔
260

261
    def reserve(self) -> None:
1✔
262
        with self.status_lock:
1✔
263
            if self.status != RuntimeStatus.READY:
1✔
264
                raise InvalidStatusException(
1✔
265
                    f"Execution environment {self.id} can only be reserved if ready. "
266
                    f" Current status: {self.status}"
267
                )
268
            self.status = RuntimeStatus.INVOKING
1✔
269

270
        self.keepalive_timer.cancel()
1✔
271

272
    def keepalive_passed(self) -> None:
1✔
273
        LOG.debug(
1✔
274
            "Execution environment %s for function %s has not received any invocations in a while. Stopping.",
275
            self.id,
276
            self.function_version.qualified_arn,
277
        )
278
        # The stop() method allows to interrupt invocations (on purpose), which might cancel running invocations
279
        # which we should not do when the keepalive timer passed.
280
        # The new TIMING_OUT state prevents this race condition
281
        with self.status_lock:
1✔
282
            if self.status != RuntimeStatus.READY:
1✔
283
                LOG.debug(
×
284
                    "Keepalive timer passed, but current runtime status is %s. Aborting keepalive stop.",
285
                    self.status,
286
                )
287
                return
×
288
            self.status = RuntimeStatus.TIMING_OUT
1✔
289
        self.stop()
1✔
290
        # Notify assignment service via callback to remove from environments list
291
        self.on_timeout(self.version_manager_id, self.id)
1✔
292

293
    def timed_out(self) -> None:
1✔
294
        """Handle status updates if the startup of an execution environment times out.
295
        Invoked asynchronously by the startup timer in a separate thread."""
296
        # TODO: De-emphasize the error part after fixing control flow and tests for test_lambda_runtime_exit
297
        LOG.warning(
1✔
298
            "Execution environment %s for function %s timed out during startup."
299
            " Check for errors during the startup of your Lambda function and"
300
            " consider increasing the startup timeout via LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT.",
301
            self.id,
302
            self.function_version.qualified_arn,
303
        )
304
        if LOG.isEnabledFor(logging.DEBUG):
1✔
305
            LOG.debug(
1✔
306
                "Logs from the execution environment %s after startup timeout:\n%s",
307
                self.id,
308
                self.get_prefixed_logs(),
309
            )
310
        with self.status_lock:
1✔
311
            if self.status != RuntimeStatus.STARTING:
1✔
312
                raise InvalidStatusException(
×
313
                    f"Execution environment {self.id} can only time out while starting. Current status: {self.status}"
314
                )
315
            self.status = RuntimeStatus.STARTUP_TIMED_OUT
1✔
316
        try:
1✔
317
            self.runtime_executor.stop()
1✔
318
        except Exception as e:
×
319
            LOG.debug("Unable to shutdown execution environment %s after timeout: %s", self.id, e)
×
320

321
    def errored(self) -> None:
1✔
322
        """Handle status updates if the startup of an execution environment fails.
323
        Invoked synchronously when an unexpected error occurs during startup."""
324
        LOG.warning(
1✔
325
            "Execution environment %s for function %s failed during startup."
326
            " Check for errors during the startup of your Lambda function.",
327
            self.id,
328
            self.function_version.qualified_arn,
329
        )
330
        if LOG.isEnabledFor(logging.DEBUG):
1✔
331
            LOG.debug(
1✔
332
                "Logs from the execution environment %s after startup error:\n%s",
333
                self.id,
334
                self.get_prefixed_logs(),
335
            )
336
        with self.status_lock:
1✔
337
            if self.status != RuntimeStatus.STARTING:
1✔
338
                raise InvalidStatusException(
1✔
339
                    f"Execution environment {self.id} can only error while starting. Current status: {self.status}"
340
                )
341
            self.status = RuntimeStatus.STARTUP_FAILED
1✔
342
        try:
1✔
343
            self.runtime_executor.stop()
1✔
344
        except Exception as e:
1✔
345
            LOG.debug("Unable to shutdown execution environment %s after error: %s", self.id, e)
1✔
346

347
    def get_prefixed_logs(self) -> str:
1✔
348
        """Returns prefixed lambda containers logs"""
349
        logs = self.runtime_executor.get_logs()
1✔
350
        prefix = f"[lambda {self.id}] "
1✔
351
        prefixed_logs = logs.replace("\n", f"\n{prefix}")
1✔
352
        return f"{prefix}{prefixed_logs}"
1✔
353

354
    def invoke(self, invocation: Invocation) -> InvocationResult:
1✔
355
        assert self.status == RuntimeStatus.INVOKING
1✔
356
        # Async/event invokes might miss an aws_trace_header, then we need to create a new root trace id.
357
        aws_trace_header = (
1✔
358
            invocation.trace_context.get("aws_trace_header") or TraceHeader().ensure_root_exists()
359
        )
360
        # The Lambda RIE requires a full tracing header including Root, Parent, and Samples. Otherwise, tracing fails
361
        # with the warning "Subsegment ## handler discarded due to Lambda worker still initializing"
362
        aws_trace_header.ensure_sampled_exists()
1✔
363
        # TODO: replace this random parent id with actual parent segment created within the Lambda provider using X-Ray
364
        aws_trace_header.ensure_parent_exists()
1✔
365
        # TODO: test and implement Active and PassThrough tracing and sampling decisions.
366
        # TODO: implement Lambda lineage: https://docs.aws.amazon.com/lambda/latest/dg/invocation-recursion.html
367
        invoke_payload = {
1✔
368
            "invoke-id": invocation.request_id,  # TODO: rename to request-id (requires change in lambda-init)
369
            "invoked-function-arn": invocation.invoked_arn,
370
            "payload": to_str(invocation.payload),
371
            "trace-id": aws_trace_header.to_header_str(),
372
        }
373
        return self.runtime_executor.invoke(payload=invoke_payload)
1✔
374

375
    def get_credentials(self) -> Credentials:
1✔
376
        sts_client = connect_to(region_name=self.function_version.id.region).sts.request_metadata(
1✔
377
            service_principal="lambda"
378
        )
379
        role_session_name = self.function_version.id.function_name
1✔
380

381
        # To handle single character function names #9016
382
        if len(role_session_name) == 1:
1✔
383
            role_session_name += "@lambda_function"
1✔
384
        # TODO we should probably set a maximum alive duration for environments, due to the session expiration
385
        return sts_client.assume_role(
1✔
386
            RoleArn=self.function_version.config.role,
387
            RoleSessionName=role_session_name,
388
            DurationSeconds=43200,
389
        )["Credentials"]
390

391
    def _get_execution_timeout_seconds(self) -> int:
1✔
392
        # Returns the timeout value in seconds to be enforced during the execution of the
393
        # lambda function. This is the configured value or the DEBUG MODE default if this
394
        # is enabled.
395
        if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
1✔
396
            return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
397
        return self.function_version.config.timeout
1✔
398

399
    def _get_startup_timeout_seconds(self) -> int:
1✔
400
        # Returns the timeout value in seconds to be enforced during lambda container startups.
401
        # This is the value defined through LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT or the LAMBDA
402
        # DEBUG MODE default if this is enabled.
403
        if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
1✔
404
            return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
×
405
        return STARTUP_TIMEOUT_SEC
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc