• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22291461973

21 Feb 2026 09:33AM UTC coverage: 86.973% (+0.009%) from 86.964%
22291461973

push

github

web-flow
fix typo in Dockerfile (#13812)

69789 of 80242 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.0
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6
from concurrent.futures._base import ALL_COMPLETED, CancelledError
1✔
7

8
from localstack import config
1✔
9
from localstack.aws.api.lambda_ import (
1✔
10
    ProvisionedConcurrencyStatusEnum,
11
    ServiceException,
12
    State,
13
    StateReasonCode,
14
)
15
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
16
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
17
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
18
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
19
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
20
from localstack.services.lambda_.invocation.lambda_models import (
1✔
21
    Function,
22
    FunctionVersion,
23
    Invocation,
24
    InvocationResult,
25
    ProvisionedConcurrencyState,
26
    VersionState,
27
)
28
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
29
from localstack.services.lambda_.invocation.metrics import (
1✔
30
    record_cw_metric_error,
31
    record_cw_metric_invocation,
32
)
33
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
34
from localstack.services.lambda_.ldm import LDMProvisioner
1✔
35
from localstack.utils.strings import long_uid, to_bytes, truncate
1✔
36
from localstack.utils.threads import FuncThread, start_thread
1✔
37

38
LOG = logging.getLogger(__name__)
1✔
39

40

41
class LambdaVersionManager:
1✔
42
    # arn this Lambda Version manager manages
43
    function_arn: str
1✔
44
    function_version: FunctionVersion
1✔
45
    function: Function
1✔
46

47
    # Scale provisioned concurrency up and down
48
    provisioning_thread: FuncThread | None
1✔
49
    # Additional guard to prevent scheduling invocation on version during shutdown
50
    shutdown_event: threading.Event
1✔
51

52
    state: VersionState | None
1✔
53
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
54
    log_handler: LogHandler
1✔
55
    counting_service: CountingService
1✔
56
    assignment_service: AssignmentService
1✔
57

58
    ldm_provisioner: LDMProvisioner | None
1✔
59

60
    def __init__(
1✔
61
        self,
62
        function_arn: str,
63
        function_version: FunctionVersion,
64
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
65
        function: Function | None,
66
        counting_service: CountingService,
67
        assignment_service: AssignmentService,
68
    ):
69
        self.id = long_uid()
1✔
70
        self.function_arn = function_arn
1✔
71
        self.function_version = function_version
1✔
72
        self.function = function
1✔
73
        self.counting_service = counting_service
1✔
74
        self.assignment_service = assignment_service
1✔
75
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
76

77
        # async
78
        self.provisioning_thread = None
1✔
79
        self.shutdown_event = threading.Event()
1✔
80

81
        # async state
82
        self.provisioned_state: ProvisionedConcurrencyState | None = None
1✔
83
        self.provisioned_state_lock = threading.RLock()
1✔
84
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
85
        self.state: VersionState = VersionState(state=State.Pending)
1✔
86

87
        self.ldm_provisioner = None
1✔
88
        lambda_hooks.inject_ldm_provisioner.run(self)
1✔
89

90
    def start(self) -> VersionState:
1✔
91
        try:
1✔
92
            self.log_handler.start_subscriber()
1✔
93
            time_before = time.perf_counter()
1✔
94
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
95
            LOG.debug(
1✔
96
                "Version preparation of function %s took %0.2fms",
97
                self.function_version.qualified_arn,
98
                (time.perf_counter() - time_before) * 1000,
99
            )
100

101
            # code and reason not set for success scenario because only failed states provide this field:
102
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
103
            new_state = State.Active
1✔
104
            if (
1✔
105
                self.function_version.config.capacity_provider_config
106
                and self.function_version.id.qualifier == "$LATEST"
107
            ):
108
                new_state = State.ActiveNonInvocable
×
109
                # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
110
                # update_function_configuration followed by get_function because transitioning LastUpdateStatus from
111
                # InProgress to Successful happens too fast on LocalStack (thanks to caching in prepare_version).
112
                # Without this hack, test_latest_published_update_config fails at get_function_response_postupdate_latest
113
                # TODO: this sleep has side-effects and we should be looking into alternatives
114
                # Increasing this sleep too much (e.g., 3s) could cause the side effect that a created function is not
115
                # ready for updates (i.e., rejected with a ResourceConflictException) and failing other tests
116
                # time.sleep(0.1)
117
            self.state = VersionState(state=new_state)
1✔
118
            LOG.debug(
1✔
119
                "Changing Lambda %s (id %s) to %s",
120
                self.function_arn,
121
                self.function_version.config.internal_revision,
122
                new_state,
123
            )
124
        except Exception as e:
×
125
            self.state = VersionState(
×
126
                state=State.Failed,
127
                code=StateReasonCode.InternalError,
128
                reason=f"Error while creating lambda: {e}",
129
            )
130
            LOG.debug(
×
131
                "Changing Lambda %s (id %s) to Failed. Reason: %s",
132
                self.function_arn,
133
                self.function_version.config.internal_revision,
134
                e,
135
                exc_info=LOG.isEnabledFor(logging.DEBUG),
136
            )
137
        return self.state
1✔
138

139
    def stop(self) -> None:
1✔
140
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
141
        self.state = VersionState(
1✔
142
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
143
        )
144
        self.shutdown_event.set()
1✔
145
        self.log_handler.stop()
1✔
146
        self.assignment_service.stop_environments_for_version(self.id)
1✔
147
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
148

149
    def update_provisioned_concurrency_config(
1✔
150
        self, provisioned_concurrent_executions: int
151
    ) -> Future[None]:
152
        """
153
        TODO: implement update while in progress (see test_provisioned_concurrency test)
154
        TODO: loop until diff == 0 and retry to remove/add diff environments
155
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
156

157
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
158
        """
159
        with self.provisioned_state_lock:
1✔
160
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
161
            if (
1✔
162
                self.provisioned_state
163
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
164
            ):
165
                raise ServiceException(
×
166
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
167
                )
168

169
            if not self.provisioned_state:
1✔
170
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
171

172
        def scale_environments(*args, **kwargs) -> None:
1✔
173
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
174
                self.id, self.function_version, provisioned_concurrent_executions
175
            )
176
            # Wait for all provisioning/de-provisioning tasks to finish using a timeout longer than max Lambda execution
177
            concurrent.futures.wait(futures, timeout=20 * 60, return_when=ALL_COMPLETED)
1✔
178

179
            success_count = 0
1✔
180
            start_error = None
1✔
181
            for i, future in enumerate(futures):
1✔
182
                try:
1✔
183
                    future.result()
1✔
184
                    success_count += 1
1✔
185
                except Exception as e:
1✔
186
                    start_error = e
1✔
187

188
            with self.provisioned_state_lock:
1✔
189
                if provisioned_concurrent_executions == 0:
1✔
190
                    self.provisioned_state = None
1✔
191
                else:
192
                    # TODO: check whether available changes with active invokes while updating
193
                    self.provisioned_state.available = success_count
1✔
194
                    self.provisioned_state.allocated = success_count
1✔
195
                    if start_error or success_count < provisioned_concurrent_executions:
1✔
196
                        self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.FAILED
1✔
197
                        self.provisioned_state.status_reason = "FUNCTION_ERROR_INIT_FAILURE"
1✔
198
                        LOG.warning(
1✔
199
                            "Failed to provision %d/%s environments for function %s. Error: %s",
200
                            provisioned_concurrent_executions - success_count,
201
                            provisioned_concurrent_executions,
202
                            self.function_arn,
203
                            start_error,
204
                        )
205
                    else:
206
                        self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
207

208
        self.provisioning_thread = start_thread(scale_environments)
1✔
209
        return self.provisioning_thread.result_future
1✔
210

211
    # Extract environment handling
212

213
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
214
        """
215
        synchronous invoke entrypoint
216

217
        0. check counter, get lease
218
        1. try to get an inactive (no active invoke) environment
219
        2.(allgood) send invoke to environment
220
        3. wait for invocation result
221
        4. return invocation result & release lease
222

223
        2.(nogood) fail fast fail hard
224

225
        """
226
        LOG.debug(
1✔
227
            "Got an invocation for function %s with request_id %s",
228
            self.function_arn,
229
            invocation.request_id,
230
        )
231
        if self.shutdown_event.is_set():
1✔
232
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
233
            LOG.warning(message)
×
234
            raise ServiceException(message)
×
235

236
        # If the environment has debugging enabled, route the invocation there;
237
        # debug environments bypass Lambda service quotas.
238
        if self.ldm_provisioner and (
1✔
239
            ldm_execution_environment := self.ldm_provisioner.get_execution_environment(
240
                qualified_lambda_arn=self.function_version.qualified_arn,
241
                user_agent=invocation.user_agent,
242
            )
243
        ):
244
            try:
×
245
                invocation_result = ldm_execution_environment.invoke(invocation)
×
246
                invocation_result.executed_version = self.function_version.id.qualifier
×
247
                self.store_logs(
×
248
                    invocation_result=invocation_result, execution_env=ldm_execution_environment
249
                )
250
            except CancelledError as e:
×
251
                # Timeouts for invocation futures are managed by LDM, a cancelled error here is
252
                # aligned with the debug container terminating whilst debugging/invocation.
253
                LOG.debug("LDM invocation future encountered a cancelled error: '%s'", e)
×
254
                invocation_result = InvocationResult(
×
255
                    request_id="",
256
                    payload=to_bytes(
257
                        "The invocation was canceled because the debug configuration "
258
                        "was removed or the operation timed out"
259
                    ),
260
                    is_error=True,
261
                    logs="",
262
                    executed_version=self.function_version.id.qualifier,
263
                )
264
            except StatusErrorException as e:
×
265
                invocation_result = InvocationResult(
×
266
                    request_id="",
267
                    payload=e.payload,
268
                    is_error=True,
269
                    logs="",
270
                    executed_version=self.function_version.id.qualifier,
271
                )
272
            finally:
273
                ldm_execution_environment.release()
×
274
            return invocation_result
×
275

276
        with self.counting_service.get_invocation_lease(
1✔
277
            self.function, self.function_version, self.provisioned_state
278
        ) as provisioning_type:
279
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
280
            #   getting an environment
281
            try:
1✔
282
                # Blocks and potentially creates a new execution environment for this invocation
283
                with self.assignment_service.get_environment(
1✔
284
                    self.id, self.function_version, provisioning_type
285
                ) as execution_env:
286
                    invocation_result = execution_env.invoke(invocation)
1✔
287
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
288
                    self.store_logs(
1✔
289
                        invocation_result=invocation_result, execution_env=execution_env
290
                    )
291
            except StatusErrorException as e:
1✔
292
                invocation_result = InvocationResult(
1✔
293
                    request_id="",
294
                    payload=e.payload,
295
                    is_error=True,
296
                    logs="",
297
                    executed_version=self.function_version.id.qualifier,
298
                )
299

300
        function_id = self.function_version.id
1✔
301
        # Record CloudWatch metrics in separate threads
302
        # MAYBE reuse threads rather than starting new threads upon every invocation
303
        if invocation_result.is_error:
1✔
304
            start_thread(
1✔
305
                lambda *args, **kwargs: record_cw_metric_error(
306
                    function_name=function_id.function_name,
307
                    account_id=function_id.account,
308
                    region_name=function_id.region,
309
                ),
310
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
311
            )
312
        else:
313
            start_thread(
1✔
314
                lambda *args, **kwargs: record_cw_metric_invocation(
315
                    function_name=function_id.function_name,
316
                    account_id=function_id.account,
317
                    region_name=function_id.region,
318
                ),
319
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
320
            )
321
        # TODO: consider using the same prefix logging as in error case for execution environment.
322
        #   possibly as separate named logger.
323
        if invocation_result.logs is not None:
1✔
324
            LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
325
            for log_line in invocation_result.logs.splitlines():
1✔
326
                LOG.debug(
1✔
327
                    "[%s-%s] %s",
328
                    function_id.function_name,
329
                    invocation.request_id,
330
                    truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
331
                )
332
        else:
333
            LOG.warning(
×
334
                "[%s] Error while printing logs for function '%s': Received no logs from environment.",
335
                invocation.request_id,
336
                function_id.function_name,
337
            )
338
        return invocation_result
1✔
339

340
    def store_logs(
1✔
341
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
342
    ) -> None:
343
        if invocation_result.logs:
1✔
344
            log_item = LogItem(
1✔
345
                execution_env.get_log_group_name(),
346
                execution_env.get_log_stream_name(),
347
                invocation_result.logs,
348
            )
349
            self.log_handler.add_logs(log_item)
1✔
350
        else:
351
            LOG.warning(
×
352
                "Received no logs from invocation with id %s for lambda %s. Execution environment logs: \n%s",
353
                invocation_result.request_id,
354
                self.function_arn,
355
                execution_env.get_prefixed_logs(),
356
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc