• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16665047018

31 Jul 2025 06:34PM UTC coverage: 86.897% (+0.1%) from 86.781%
16665047018

push

github

web-flow
Apigw/enable vpce routing (#12937)

5 of 5 new or added lines in 1 file covered. (100.0%)

314 existing lines in 13 files now uncovered.

66469 of 76492 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.95
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.lambda_ import (
1✔
9
    ProvisionedConcurrencyStatusEnum,
10
    ServiceException,
11
    State,
12
    StateReasonCode,
13
)
14
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
15
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
16
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
17
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
18
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
19
from localstack.services.lambda_.invocation.lambda_models import (
1✔
20
    Function,
21
    FunctionVersion,
22
    Invocation,
23
    InvocationResult,
24
    ProvisionedConcurrencyState,
25
    VersionState,
26
)
27
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
28
from localstack.services.lambda_.invocation.metrics import (
1✔
29
    record_cw_metric_error,
30
    record_cw_metric_invocation,
31
)
32
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
33
from localstack.services.lambda_.ldm import LDMProvisioner
1✔
34
from localstack.utils.strings import long_uid, truncate
1✔
35
from localstack.utils.threads import FuncThread, start_thread
1✔
36

37
LOG = logging.getLogger(__name__)
1✔
38

39

40
class LambdaVersionManager:
1✔
41
    # arn this Lambda Version manager manages
42
    function_arn: str
1✔
43
    function_version: FunctionVersion
1✔
44
    function: Function
1✔
45

46
    # Scale provisioned concurrency up and down
47
    provisioning_thread: FuncThread | None
1✔
48
    # Additional guard to prevent scheduling invocation on version during shutdown
49
    shutdown_event: threading.Event
1✔
50

51
    state: VersionState | None
1✔
52
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
53
    log_handler: LogHandler
1✔
54
    counting_service: CountingService
1✔
55
    assignment_service: AssignmentService
1✔
56

57
    ldm_provisioner: LDMProvisioner | None
1✔
58

59
    def __init__(
1✔
60
        self,
61
        function_arn: str,
62
        function_version: FunctionVersion,
63
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
64
        function: Function | None,
65
        counting_service: CountingService,
66
        assignment_service: AssignmentService,
67
    ):
68
        self.id = long_uid()
1✔
69
        self.function_arn = function_arn
1✔
70
        self.function_version = function_version
1✔
71
        self.function = function
1✔
72
        self.counting_service = counting_service
1✔
73
        self.assignment_service = assignment_service
1✔
74
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
75

76
        # async
77
        self.provisioning_thread = None
1✔
78
        self.shutdown_event = threading.Event()
1✔
79

80
        # async state
81
        self.provisioned_state = None
1✔
82
        self.provisioned_state_lock = threading.RLock()
1✔
83
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
84
        self.state = VersionState(state=State.Pending)
1✔
85

86
        self.ldm_provisioner = None
1✔
87
        lambda_hooks.inject_ldm_provisioner.run(self)
1✔
88

89
    def start(self) -> VersionState:
1✔
90
        try:
1✔
91
            self.log_handler.start_subscriber()
1✔
92
            time_before = time.perf_counter()
1✔
93
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
94
            LOG.debug(
1✔
95
                "Version preparation of function %s took %0.2fms",
96
                self.function_version.qualified_arn,
97
                (time.perf_counter() - time_before) * 1000,
98
            )
99

100
            # code and reason not set for success scenario because only failed states provide this field:
101
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
102
            self.state = VersionState(state=State.Active)
1✔
103
            LOG.debug(
1✔
104
                "Changing Lambda %s (id %s) to active",
105
                self.function_arn,
106
                self.function_version.config.internal_revision,
107
            )
108
        except Exception as e:
×
UNCOV
109
            self.state = VersionState(
×
110
                state=State.Failed,
111
                code=StateReasonCode.InternalError,
112
                reason=f"Error while creating lambda: {e}",
113
            )
UNCOV
114
            LOG.debug(
×
115
                "Changing Lambda %s (id %s) to failed. Reason: %s",
116
                self.function_arn,
117
                self.function_version.config.internal_revision,
118
                e,
119
                exc_info=LOG.isEnabledFor(logging.DEBUG),
120
            )
121
        return self.state
1✔
122

123
    def stop(self) -> None:
1✔
124
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
125
        self.state = VersionState(
1✔
126
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
127
        )
128
        self.shutdown_event.set()
1✔
129
        self.log_handler.stop()
1✔
130
        self.assignment_service.stop_environments_for_version(self.id)
1✔
131
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
132

133
    def update_provisioned_concurrency_config(
1✔
134
        self, provisioned_concurrent_executions: int
135
    ) -> Future[None]:
136
        """
137
        TODO: implement update while in progress (see test_provisioned_concurrency test)
138
        TODO: loop until diff == 0 and retry to remove/add diff environments
139
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
140
        TODO: ProvisionedConcurrencyStatusEnum.FAILED
141
        TODO: status reason
142

143
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
144
        """
145
        with self.provisioned_state_lock:
1✔
146
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
147
            if (
1✔
148
                self.provisioned_state
149
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
150
            ):
UNCOV
151
                raise ServiceException(
×
152
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
153
                )
154

155
            if not self.provisioned_state:
1✔
156
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
157

158
        def scale_environments(*args, **kwargs) -> None:
1✔
159
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
160
                self.id, self.function_version, provisioned_concurrent_executions
161
            )
162

163
            concurrent.futures.wait(futures)
1✔
164

165
            with self.provisioned_state_lock:
1✔
166
                if provisioned_concurrent_executions == 0:
1✔
167
                    self.provisioned_state = None
1✔
168
                else:
169
                    self.provisioned_state.available = provisioned_concurrent_executions
1✔
170
                    self.provisioned_state.allocated = provisioned_concurrent_executions
1✔
171
                    self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
172

173
        self.provisioning_thread = start_thread(scale_environments)
1✔
174
        return self.provisioning_thread.result_future
1✔
175

176
    # Extract environment handling
177

178
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
179
        """
180
        synchronous invoke entrypoint
181

182
        0. check counter, get lease
183
        1. try to get an inactive (no active invoke) environment
184
        2.(allgood) send invoke to environment
185
        3. wait for invocation result
186
        4. return invocation result & release lease
187

188
        2.(nogood) fail fast fail hard
189

190
        """
191
        LOG.debug(
1✔
192
            "Got an invocation for function %s with request_id %s",
193
            self.function_arn,
194
            invocation.request_id,
195
        )
196
        if self.shutdown_event.is_set():
1✔
UNCOV
197
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
UNCOV
198
            LOG.warning(message)
×
UNCOV
199
            raise ServiceException(message)
×
200

201
        # If the environment has debugging enabled, route the invocation there;
202
        # debug environments bypass Lambda service quotas.
203
        if self.ldm_provisioner and (
1✔
204
            ldm_execution_environment := self.ldm_provisioner.get_execution_environment(
205
                qualified_lambda_arn=self.function_version.qualified_arn,
206
                user_agent=invocation.user_agent,
207
            )
208
        ):
UNCOV
209
            try:
×
UNCOV
210
                invocation_result = ldm_execution_environment.invoke(invocation)
×
UNCOV
211
                invocation_result.executed_version = self.function_version.id.qualifier
×
UNCOV
212
                self.store_logs(
×
213
                    invocation_result=invocation_result, execution_env=ldm_execution_environment
214
                )
UNCOV
215
            except StatusErrorException as e:
×
216
                invocation_result = InvocationResult(
×
217
                    request_id="",
218
                    payload=e.payload,
219
                    is_error=True,
220
                    logs="",
221
                    executed_version=self.function_version.id.qualifier,
222
                )
223
            finally:
UNCOV
224
                ldm_execution_environment.release()
×
UNCOV
225
            return invocation_result
×
226

227
        with self.counting_service.get_invocation_lease(
1✔
228
            self.function, self.function_version
229
        ) as provisioning_type:
230
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
231
            #   getting an environment
232
            try:
1✔
233
                # Blocks and potentially creates a new execution environment for this invocation
234
                with self.assignment_service.get_environment(
1✔
235
                    self.id, self.function_version, provisioning_type
236
                ) as execution_env:
237
                    invocation_result = execution_env.invoke(invocation)
1✔
238
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
239
                    self.store_logs(
1✔
240
                        invocation_result=invocation_result, execution_env=execution_env
241
                    )
242
            except StatusErrorException as e:
1✔
243
                invocation_result = InvocationResult(
1✔
244
                    request_id="",
245
                    payload=e.payload,
246
                    is_error=True,
247
                    logs="",
248
                    executed_version=self.function_version.id.qualifier,
249
                )
250

251
        function_id = self.function_version.id
1✔
252
        # Record CloudWatch metrics in separate threads
253
        # MAYBE reuse threads rather than starting new threads upon every invocation
254
        if invocation_result.is_error:
1✔
255
            start_thread(
1✔
256
                lambda *args, **kwargs: record_cw_metric_error(
257
                    function_name=function_id.function_name,
258
                    account_id=function_id.account,
259
                    region_name=function_id.region,
260
                ),
261
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
262
            )
263
        else:
264
            start_thread(
1✔
265
                lambda *args, **kwargs: record_cw_metric_invocation(
266
                    function_name=function_id.function_name,
267
                    account_id=function_id.account,
268
                    region_name=function_id.region,
269
                ),
270
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
271
            )
272
        # TODO: consider using the same prefix logging as in error case for execution environment.
273
        #   possibly as separate named logger.
274
        if invocation_result.logs is not None:
1✔
275
            LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
276
            for log_line in invocation_result.logs.splitlines():
1✔
277
                LOG.debug(
1✔
278
                    "[%s-%s] %s",
279
                    function_id.function_name,
280
                    invocation.request_id,
281
                    truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
282
                )
283
        else:
UNCOV
284
            LOG.warning(
×
285
                "[%s] Error while printing logs for function '%s': Received no logs from environment.",
286
                invocation.request_id,
287
                function_id.function_name,
288
            )
289
        return invocation_result
1✔
290

291
    def store_logs(
1✔
292
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
293
    ) -> None:
294
        if invocation_result.logs:
1✔
295
            log_item = LogItem(
1✔
296
                execution_env.get_log_group_name(),
297
                execution_env.get_log_stream_name(),
298
                invocation_result.logs,
299
            )
300
            self.log_handler.add_logs(log_item)
1✔
301
        else:
UNCOV
302
            LOG.warning(
×
303
                "Received no logs from invocation with id %s for lambda %s. Execution environment logs: \n%s",
304
                invocation_result.request_id,
305
                self.function_arn,
306
                execution_env.get_prefixed_logs(),
307
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc