• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 858585c6-6e27-4bca-a3b8-78d6d978f737

25 Mar 2025 06:43PM UTC coverage: 86.88% (+0.02%) from 86.865%
858585c6-6e27-4bca-a3b8-78d6d978f737

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

227 existing lines in 12 files now uncovered.

63251 of 72803 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.38
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.lambda_ import (
1✔
9
    ProvisionedConcurrencyStatusEnum,
10
    ServiceException,
11
    State,
12
    StateReasonCode,
13
)
14
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
15
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
16
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
17
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
18
from localstack.services.lambda_.invocation.lambda_models import (
1✔
19
    Function,
20
    FunctionVersion,
21
    Invocation,
22
    InvocationResult,
23
    ProvisionedConcurrencyState,
24
    VersionState,
25
)
26
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
27
from localstack.services.lambda_.invocation.metrics import (
1✔
28
    record_cw_metric_error,
29
    record_cw_metric_invocation,
30
)
31
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
32
from localstack.utils.strings import long_uid, truncate
1✔
33
from localstack.utils.threads import FuncThread, start_thread
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37

38
class LambdaVersionManager:
1✔
39
    # arn this Lambda Version manager manages
40
    function_arn: str
1✔
41
    function_version: FunctionVersion
1✔
42
    function: Function
1✔
43

44
    # Scale provisioned concurrency up and down
45
    provisioning_thread: FuncThread | None
1✔
46
    # Additional guard to prevent scheduling invocation on version during shutdown
47
    shutdown_event: threading.Event
1✔
48

49
    state: VersionState | None
1✔
50
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
51
    log_handler: LogHandler
1✔
52
    counting_service: CountingService
1✔
53
    assignment_service: AssignmentService
1✔
54

55
    def __init__(
1✔
56
        self,
57
        function_arn: str,
58
        function_version: FunctionVersion,
59
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
60
        function: Function | None,
61
        counting_service: CountingService,
62
        assignment_service: AssignmentService,
63
    ):
64
        self.id = long_uid()
1✔
65
        self.function_arn = function_arn
1✔
66
        self.function_version = function_version
1✔
67
        self.function = function
1✔
68
        self.counting_service = counting_service
1✔
69
        self.assignment_service = assignment_service
1✔
70
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
71

72
        # async
73
        self.provisioning_thread = None
1✔
74
        self.shutdown_event = threading.Event()
1✔
75

76
        # async state
77
        self.provisioned_state = None
1✔
78
        self.provisioned_state_lock = threading.RLock()
1✔
79
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
80
        self.state = VersionState(state=State.Pending)
1✔
81

82
    def start(self) -> VersionState:
1✔
83
        try:
1✔
84
            self.log_handler.start_subscriber()
1✔
85
            time_before = time.perf_counter()
1✔
86
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
87
            LOG.debug(
1✔
88
                "Version preparation of function %s took %0.2fms",
89
                self.function_version.qualified_arn,
90
                (time.perf_counter() - time_before) * 1000,
91
            )
92

93
            # code and reason not set for success scenario because only failed states provide this field:
94
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
95
            self.state = VersionState(state=State.Active)
1✔
96
            LOG.debug(
1✔
97
                "Changing Lambda %s (id %s) to active",
98
                self.function_arn,
99
                self.function_version.config.internal_revision,
100
            )
UNCOV
101
        except Exception as e:
×
UNCOV
102
            self.state = VersionState(
×
103
                state=State.Failed,
104
                code=StateReasonCode.InternalError,
105
                reason=f"Error while creating lambda: {e}",
106
            )
UNCOV
107
            LOG.debug(
×
108
                "Changing Lambda %s (id %s) to failed. Reason: %s",
109
                self.function_arn,
110
                self.function_version.config.internal_revision,
111
                e,
112
                exc_info=True,
113
            )
114
        return self.state
1✔
115

116
    def stop(self) -> None:
1✔
117
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
118
        self.state = VersionState(
1✔
119
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
120
        )
121
        self.shutdown_event.set()
1✔
122
        self.log_handler.stop()
1✔
123
        self.assignment_service.stop_environments_for_version(self.id)
1✔
124
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
125

126
    def update_provisioned_concurrency_config(
1✔
127
        self, provisioned_concurrent_executions: int
128
    ) -> Future[None]:
129
        """
130
        TODO: implement update while in progress (see test_provisioned_concurrency test)
131
        TODO: loop until diff == 0 and retry to remove/add diff environments
132
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
133
        TODO: ProvisionedConcurrencyStatusEnum.FAILED
134
        TODO: status reason
135

136
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
137
        """
138
        with self.provisioned_state_lock:
1✔
139
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
140
            if (
1✔
141
                self.provisioned_state
142
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
143
            ):
UNCOV
144
                raise ServiceException(
×
145
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
146
                )
147

148
            if not self.provisioned_state:
1✔
149
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
150

151
        def scale_environments(*args, **kwargs) -> None:
1✔
152
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
153
                self.id, self.function_version, provisioned_concurrent_executions
154
            )
155

156
            concurrent.futures.wait(futures)
1✔
157

158
            with self.provisioned_state_lock:
1✔
159
                if provisioned_concurrent_executions == 0:
1✔
160
                    self.provisioned_state = None
1✔
161
                else:
162
                    self.provisioned_state.available = provisioned_concurrent_executions
1✔
163
                    self.provisioned_state.allocated = provisioned_concurrent_executions
1✔
164
                    self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
165

166
        self.provisioning_thread = start_thread(scale_environments)
1✔
167
        return self.provisioning_thread.result_future
1✔
168

169
    # Extract environment handling
170

171
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
172
        """
173
        synchronous invoke entrypoint
174

175
        0. check counter, get lease
176
        1. try to get an inactive (no active invoke) environment
177
        2.(allgood) send invoke to environment
178
        3. wait for invocation result
179
        4. return invocation result & release lease
180

181
        2.(nogood) fail fast fail hard
182

183
        """
184
        LOG.debug(
1✔
185
            "Got an invocation for function %s with request_id %s",
186
            self.function_arn,
187
            invocation.request_id,
188
        )
189
        if self.shutdown_event.is_set():
1✔
190
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
UNCOV
191
            LOG.warning(message)
×
UNCOV
192
            raise ServiceException(message)
×
193

194
        with self.counting_service.get_invocation_lease(
1✔
195
            self.function, self.function_version
196
        ) as provisioning_type:
197
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
198
            #   getting an environment
199
            try:
1✔
200
                # Blocks and potentially creates a new execution environment for this invocation
201
                with self.assignment_service.get_environment(
1✔
202
                    self.id, self.function_version, provisioning_type
203
                ) as execution_env:
204
                    invocation_result = execution_env.invoke(invocation)
1✔
205
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
206
                    self.store_logs(
1✔
207
                        invocation_result=invocation_result, execution_env=execution_env
208
                    )
209
            except StatusErrorException as e:
1✔
210
                invocation_result = InvocationResult(
1✔
211
                    request_id="",
212
                    payload=e.payload,
213
                    is_error=True,
214
                    logs="",
215
                    executed_version=self.function_version.id.qualifier,
216
                )
217

218
        function_id = self.function_version.id
1✔
219
        # Record CloudWatch metrics in separate threads
220
        # MAYBE reuse threads rather than starting new threads upon every invocation
221
        if invocation_result.is_error:
1✔
222
            start_thread(
1✔
223
                lambda *args, **kwargs: record_cw_metric_error(
224
                    function_name=function_id.function_name,
225
                    account_id=function_id.account,
226
                    region_name=function_id.region,
227
                ),
228
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
229
            )
230
        else:
231
            start_thread(
1✔
232
                lambda *args, **kwargs: record_cw_metric_invocation(
233
                    function_name=function_id.function_name,
234
                    account_id=function_id.account,
235
                    region_name=function_id.region,
236
                ),
237
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
238
            )
239
        # TODO: consider using the same prefix logging as in error case for execution environment.
240
        #   possibly as separate named logger.
241
        LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
242
        for log_line in invocation_result.logs.splitlines():
1✔
243
            LOG.debug(
1✔
244
                "[%s-%s] %s",
245
                function_id.function_name,
246
                invocation.request_id,
247
                truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
248
            )
249
        return invocation_result
1✔
250

251
    def store_logs(
1✔
252
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
253
    ) -> None:
254
        if invocation_result.logs:
1✔
255
            log_item = LogItem(
1✔
256
                execution_env.get_log_group_name(),
257
                execution_env.get_log_stream_name(),
258
                invocation_result.logs,
259
            )
260
            self.log_handler.add_logs(log_item)
1✔
261
        else:
UNCOV
262
            LOG.warning(
×
263
                "Received no logs from invocation with id %s for lambda %s",
264
                invocation_result.request_id,
265
                self.function_arn,
266
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc