• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19821277742

01 Dec 2025 08:16AM UTC coverage: 86.821% (-0.04%) from 86.863%
19821277742

push

github

web-flow
Add Lambda Managed Instances (#13440)

Co-authored-by: Joel Scheuner <joel.scheuner.dev@gmail.com>
Co-authored-by: Anisa Oshafi <anisaoshafi@gmail.com>
Co-authored-by: Cristopher Pinzón <cristopher.pinzon@gmail.com>
Co-authored-by: Alexander Rashed <alexander.rashed@localstack.cloud>
Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com>
Co-authored-by: Mathieu Cloutier <79954947+cloutierMat@users.noreply.github.com>
Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

127 of 181 new or added lines in 11 files covered. (70.17%)

17 existing lines in 5 files now uncovered.

69556 of 80114 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.59
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6
from concurrent.futures._base import CancelledError
1✔
7

8
from localstack import config
1✔
9
from localstack.aws.api.lambda_ import (
1✔
10
    ProvisionedConcurrencyStatusEnum,
11
    ServiceException,
12
    State,
13
    StateReasonCode,
14
)
15
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
16
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
17
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
18
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
19
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
20
from localstack.services.lambda_.invocation.lambda_models import (
1✔
21
    Function,
22
    FunctionVersion,
23
    Invocation,
24
    InvocationResult,
25
    ProvisionedConcurrencyState,
26
    VersionState,
27
)
28
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
29
from localstack.services.lambda_.invocation.metrics import (
1✔
30
    record_cw_metric_error,
31
    record_cw_metric_invocation,
32
)
33
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
34
from localstack.services.lambda_.ldm import LDMProvisioner
1✔
35
from localstack.utils.strings import long_uid, to_bytes, truncate
1✔
36
from localstack.utils.threads import FuncThread, start_thread
1✔
37

38
LOG = logging.getLogger(__name__)
1✔
39

40

41
class LambdaVersionManager:
1✔
42
    # arn this Lambda Version manager manages
43
    function_arn: str
1✔
44
    function_version: FunctionVersion
1✔
45
    function: Function
1✔
46

47
    # Scale provisioned concurrency up and down
48
    provisioning_thread: FuncThread | None
1✔
49
    # Additional guard to prevent scheduling invocation on version during shutdown
50
    shutdown_event: threading.Event
1✔
51

52
    state: VersionState | None
1✔
53
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
54
    log_handler: LogHandler
1✔
55
    counting_service: CountingService
1✔
56
    assignment_service: AssignmentService
1✔
57

58
    ldm_provisioner: LDMProvisioner | None
1✔
59

60
    def __init__(
1✔
61
        self,
62
        function_arn: str,
63
        function_version: FunctionVersion,
64
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
65
        function: Function | None,
66
        counting_service: CountingService,
67
        assignment_service: AssignmentService,
68
    ):
69
        self.id = long_uid()
1✔
70
        self.function_arn = function_arn
1✔
71
        self.function_version = function_version
1✔
72
        self.function = function
1✔
73
        self.counting_service = counting_service
1✔
74
        self.assignment_service = assignment_service
1✔
75
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
76

77
        # async
78
        self.provisioning_thread = None
1✔
79
        self.shutdown_event = threading.Event()
1✔
80

81
        # async state
82
        self.provisioned_state = None
1✔
83
        self.provisioned_state_lock = threading.RLock()
1✔
84
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
85
        self.state = VersionState(state=State.Pending)
1✔
86

87
        self.ldm_provisioner = None
1✔
88
        lambda_hooks.inject_ldm_provisioner.run(self)
1✔
89

90
    def start(self) -> VersionState:
1✔
91
        try:
1✔
92
            self.log_handler.start_subscriber()
1✔
93
            time_before = time.perf_counter()
1✔
94
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
95
            LOG.debug(
1✔
96
                "Version preparation of function %s took %0.2fms",
97
                self.function_version.qualified_arn,
98
                (time.perf_counter() - time_before) * 1000,
99
            )
100

101
            # code and reason not set for success scenario because only failed states provide this field:
102
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
103
            new_state = State.Active
1✔
104
            if (
1✔
105
                self.function_version.config.CapacityProviderConfig
106
                and self.function_version.id.qualifier == "$LATEST"
107
            ):
NEW
108
                new_state = State.ActiveNonInvocable
×
109
                # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
110
                # update_function_configuration followed by get_function because transitioning LastUpdateStatus from
111
                # InProgress to Successful happens too fast on LocalStack (thanks to caching in prepare_version).
112
                # Without this hack, test_latest_published_update_config fails at get_function_response_postupdate_latest
113
                # TODO: this sleep has side-effects and we should be looking into alternatives
114
                # Increasing this sleep too much (e.g., 3s) could cause the side effect that a created function is not
115
                # ready for updates (i.e., rejected with a ResourceConflictException) and failing other tests
116
                # time.sleep(0.1)
117
            self.state = VersionState(state=new_state)
1✔
118
            LOG.debug(
1✔
119
                "Changing Lambda %s (id %s) to %s",
120
                self.function_arn,
121
                self.function_version.config.internal_revision,
122
                new_state,
123
            )
124
        except Exception as e:
×
125
            self.state = VersionState(
×
126
                state=State.Failed,
127
                code=StateReasonCode.InternalError,
128
                reason=f"Error while creating lambda: {e}",
129
            )
130
            LOG.debug(
×
131
                "Changing Lambda %s (id %s) to Failed. Reason: %s",
132
                self.function_arn,
133
                self.function_version.config.internal_revision,
134
                e,
135
                exc_info=LOG.isEnabledFor(logging.DEBUG),
136
            )
137
        return self.state
1✔
138

139
    def stop(self) -> None:
1✔
140
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
141
        self.state = VersionState(
1✔
142
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
143
        )
144
        self.shutdown_event.set()
1✔
145
        self.log_handler.stop()
1✔
146
        self.assignment_service.stop_environments_for_version(self.id)
1✔
147
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
148

149
    def update_provisioned_concurrency_config(
1✔
150
        self, provisioned_concurrent_executions: int
151
    ) -> Future[None]:
152
        """
153
        TODO: implement update while in progress (see test_provisioned_concurrency test)
154
        TODO: loop until diff == 0 and retry to remove/add diff environments
155
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
156
        TODO: ProvisionedConcurrencyStatusEnum.FAILED
157
        TODO: status reason
158

159
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
160
        """
161
        with self.provisioned_state_lock:
1✔
162
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
163
            if (
1✔
164
                self.provisioned_state
165
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
166
            ):
167
                raise ServiceException(
×
168
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
169
                )
170

171
            if not self.provisioned_state:
1✔
172
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
173

174
        def scale_environments(*args, **kwargs) -> None:
1✔
175
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
176
                self.id, self.function_version, provisioned_concurrent_executions
177
            )
178

179
            concurrent.futures.wait(futures)
1✔
180

181
            with self.provisioned_state_lock:
1✔
182
                if provisioned_concurrent_executions == 0:
1✔
183
                    self.provisioned_state = None
1✔
184
                else:
185
                    self.provisioned_state.available = provisioned_concurrent_executions
1✔
186
                    self.provisioned_state.allocated = provisioned_concurrent_executions
1✔
187
                    self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
188

189
        self.provisioning_thread = start_thread(scale_environments)
1✔
190
        return self.provisioning_thread.result_future
1✔
191

192
    # Extract environment handling
193

194
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
195
        """
196
        synchronous invoke entrypoint
197

198
        0. check counter, get lease
199
        1. try to get an inactive (no active invoke) environment
200
        2.(allgood) send invoke to environment
201
        3. wait for invocation result
202
        4. return invocation result & release lease
203

204
        2.(nogood) fail fast fail hard
205

206
        """
207
        LOG.debug(
1✔
208
            "Got an invocation for function %s with request_id %s",
209
            self.function_arn,
210
            invocation.request_id,
211
        )
212
        if self.shutdown_event.is_set():
1✔
213
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
214
            LOG.warning(message)
×
215
            raise ServiceException(message)
×
216

217
        # If the environment has debugging enabled, route the invocation there;
218
        # debug environments bypass Lambda service quotas.
219
        if self.ldm_provisioner and (
1✔
220
            ldm_execution_environment := self.ldm_provisioner.get_execution_environment(
221
                qualified_lambda_arn=self.function_version.qualified_arn,
222
                user_agent=invocation.user_agent,
223
            )
224
        ):
225
            try:
×
226
                invocation_result = ldm_execution_environment.invoke(invocation)
×
227
                invocation_result.executed_version = self.function_version.id.qualifier
×
228
                self.store_logs(
×
229
                    invocation_result=invocation_result, execution_env=ldm_execution_environment
230
                )
231
            except CancelledError as e:
×
232
                # Timeouts for invocation futures are managed by LDM, a cancelled error here is
233
                # aligned with the debug container terminating whilst debugging/invocation.
234
                LOG.debug("LDM invocation future encountered a cancelled error: '%s'", e)
×
235
                invocation_result = InvocationResult(
×
236
                    request_id="",
237
                    payload=to_bytes(
238
                        "The invocation was canceled because the debug configuration "
239
                        "was removed or the operation timed out"
240
                    ),
241
                    is_error=True,
242
                    logs="",
243
                    executed_version=self.function_version.id.qualifier,
244
                )
245
            except StatusErrorException as e:
×
246
                invocation_result = InvocationResult(
×
247
                    request_id="",
248
                    payload=e.payload,
249
                    is_error=True,
250
                    logs="",
251
                    executed_version=self.function_version.id.qualifier,
252
                )
253
            finally:
254
                ldm_execution_environment.release()
×
255
            return invocation_result
×
256

257
        with self.counting_service.get_invocation_lease(
1✔
258
            self.function, self.function_version
259
        ) as provisioning_type:
260
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
261
            #   getting an environment
262
            try:
1✔
263
                # Blocks and potentially creates a new execution environment for this invocation
264
                with self.assignment_service.get_environment(
1✔
265
                    self.id, self.function_version, provisioning_type
266
                ) as execution_env:
267
                    invocation_result = execution_env.invoke(invocation)
1✔
268
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
269
                    self.store_logs(
1✔
270
                        invocation_result=invocation_result, execution_env=execution_env
271
                    )
272
            except StatusErrorException as e:
1✔
273
                invocation_result = InvocationResult(
1✔
274
                    request_id="",
275
                    payload=e.payload,
276
                    is_error=True,
277
                    logs="",
278
                    executed_version=self.function_version.id.qualifier,
279
                )
280

281
        function_id = self.function_version.id
1✔
282
        # Record CloudWatch metrics in separate threads
283
        # MAYBE reuse threads rather than starting new threads upon every invocation
284
        if invocation_result.is_error:
1✔
285
            start_thread(
1✔
286
                lambda *args, **kwargs: record_cw_metric_error(
287
                    function_name=function_id.function_name,
288
                    account_id=function_id.account,
289
                    region_name=function_id.region,
290
                ),
291
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
292
            )
293
        else:
294
            start_thread(
1✔
295
                lambda *args, **kwargs: record_cw_metric_invocation(
296
                    function_name=function_id.function_name,
297
                    account_id=function_id.account,
298
                    region_name=function_id.region,
299
                ),
300
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
301
            )
302
        # TODO: consider using the same prefix logging as in error case for execution environment.
303
        #   possibly as separate named logger.
304
        if invocation_result.logs is not None:
1✔
305
            LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
306
            for log_line in invocation_result.logs.splitlines():
1✔
307
                LOG.debug(
1✔
308
                    "[%s-%s] %s",
309
                    function_id.function_name,
310
                    invocation.request_id,
311
                    truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
312
                )
313
        else:
314
            LOG.warning(
×
315
                "[%s] Error while printing logs for function '%s': Received no logs from environment.",
316
                invocation.request_id,
317
                function_id.function_name,
318
            )
319
        return invocation_result
1✔
320

321
    def store_logs(
1✔
322
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
323
    ) -> None:
324
        if invocation_result.logs:
1✔
325
            log_item = LogItem(
1✔
326
                execution_env.get_log_group_name(),
327
                execution_env.get_log_stream_name(),
328
                invocation_result.logs,
329
            )
330
            self.log_handler.add_logs(log_item)
1✔
331
        else:
332
            LOG.warning(
×
333
                "Received no logs from invocation with id %s for lambda %s. Execution environment logs: \n%s",
334
                invocation_result.request_id,
335
                self.function_arn,
336
                execution_env.get_prefixed_logs(),
337
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc