• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4e27dc30-df7d-47cf-9ddb-0b539d612501

17 Apr 2025 08:11PM UTC coverage: 86.279% (-0.02%) from 86.294%
4e27dc30-df7d-47cf-9ddb-0b539d612501

push

circleci

web-flow
Step Functions: Surface Support for Mocked Responses (#12525)

200 of 245 new or added lines in 9 files covered. (81.63%)

201 existing lines in 15 files now uncovered.

63889 of 74049 relevant lines covered (86.28%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.59
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.lambda_ import (
1✔
9
    ProvisionedConcurrencyStatusEnum,
10
    ServiceException,
11
    State,
12
    StateReasonCode,
13
)
14
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
15
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
16
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
17
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
18
from localstack.services.lambda_.invocation.lambda_models import (
1✔
19
    Function,
20
    FunctionVersion,
21
    Invocation,
22
    InvocationResult,
23
    ProvisionedConcurrencyState,
24
    VersionState,
25
)
26
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
27
from localstack.services.lambda_.invocation.metrics import (
1✔
28
    record_cw_metric_error,
29
    record_cw_metric_invocation,
30
)
31
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
32
from localstack.utils.strings import long_uid, truncate
1✔
33
from localstack.utils.threads import FuncThread, start_thread
1✔
34

35
LOG = logging.getLogger(__name__)
1✔
36

37

38
class LambdaVersionManager:
1✔
39
    # arn this Lambda Version manager manages
40
    function_arn: str
1✔
41
    function_version: FunctionVersion
1✔
42
    function: Function
1✔
43

44
    # Scale provisioned concurrency up and down
45
    provisioning_thread: FuncThread | None
1✔
46
    # Additional guard to prevent scheduling invocation on version during shutdown
47
    shutdown_event: threading.Event
1✔
48

49
    state: VersionState | None
1✔
50
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
51
    log_handler: LogHandler
1✔
52
    counting_service: CountingService
1✔
53
    assignment_service: AssignmentService
1✔
54

55
    def __init__(
1✔
56
        self,
57
        function_arn: str,
58
        function_version: FunctionVersion,
59
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
60
        function: Function | None,
61
        counting_service: CountingService,
62
        assignment_service: AssignmentService,
63
    ):
64
        self.id = long_uid()
1✔
65
        self.function_arn = function_arn
1✔
66
        self.function_version = function_version
1✔
67
        self.function = function
1✔
68
        self.counting_service = counting_service
1✔
69
        self.assignment_service = assignment_service
1✔
70
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
71

72
        # async
73
        self.provisioning_thread = None
1✔
74
        self.shutdown_event = threading.Event()
1✔
75

76
        # async state
77
        self.provisioned_state = None
1✔
78
        self.provisioned_state_lock = threading.RLock()
1✔
79
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
80
        self.state = VersionState(state=State.Pending)
1✔
81

82
    def start(self) -> VersionState:
1✔
83
        try:
1✔
84
            self.log_handler.start_subscriber()
1✔
85
            time_before = time.perf_counter()
1✔
86
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
87
            LOG.debug(
1✔
88
                "Version preparation of function %s took %0.2fms",
89
                self.function_version.qualified_arn,
90
                (time.perf_counter() - time_before) * 1000,
91
            )
92

93
            # code and reason not set for success scenario because only failed states provide this field:
94
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
95
            self.state = VersionState(state=State.Active)
1✔
96
            LOG.debug(
1✔
97
                "Changing Lambda %s (id %s) to active",
98
                self.function_arn,
99
                self.function_version.config.internal_revision,
100
            )
101
        except Exception as e:
×
102
            self.state = VersionState(
×
103
                state=State.Failed,
104
                code=StateReasonCode.InternalError,
105
                reason=f"Error while creating lambda: {e}",
106
            )
107
            LOG.debug(
×
108
                "Changing Lambda %s (id %s) to failed. Reason: %s",
109
                self.function_arn,
110
                self.function_version.config.internal_revision,
111
                e,
112
                exc_info=True,
113
            )
114
        return self.state
1✔
115

116
    def stop(self) -> None:
1✔
117
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
118
        self.state = VersionState(
1✔
119
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
120
        )
121
        self.shutdown_event.set()
1✔
122
        self.log_handler.stop()
1✔
123
        self.assignment_service.stop_environments_for_version(self.id)
1✔
124
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
125

126
    def update_provisioned_concurrency_config(
1✔
127
        self, provisioned_concurrent_executions: int
128
    ) -> Future[None]:
129
        """
130
        TODO: implement update while in progress (see test_provisioned_concurrency test)
131
        TODO: loop until diff == 0 and retry to remove/add diff environments
132
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
133
        TODO: ProvisionedConcurrencyStatusEnum.FAILED
134
        TODO: status reason
135

136
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
137
        """
138
        with self.provisioned_state_lock:
1✔
139
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
140
            if (
1✔
141
                self.provisioned_state
142
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
143
            ):
144
                raise ServiceException(
×
145
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
146
                )
147

148
            if not self.provisioned_state:
1✔
149
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
150

151
        def scale_environments(*args, **kwargs) -> None:
1✔
152
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
153
                self.id, self.function_version, provisioned_concurrent_executions
154
            )
155

156
            concurrent.futures.wait(futures)
1✔
157

158
            with self.provisioned_state_lock:
1✔
159
                if provisioned_concurrent_executions == 0:
1✔
160
                    self.provisioned_state = None
1✔
161
                else:
162
                    self.provisioned_state.available = provisioned_concurrent_executions
1✔
163
                    self.provisioned_state.allocated = provisioned_concurrent_executions
1✔
164
                    self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
165

166
        self.provisioning_thread = start_thread(scale_environments)
1✔
167
        return self.provisioning_thread.result_future
1✔
168

169
    # Extract environment handling
170

171
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
172
        """
173
        synchronous invoke entrypoint
174

175
        0. check counter, get lease
176
        1. try to get an inactive (no active invoke) environment
177
        2.(allgood) send invoke to environment
178
        3. wait for invocation result
179
        4. return invocation result & release lease
180

181
        2.(nogood) fail fast fail hard
182

183
        """
184
        LOG.debug(
1✔
185
            "Got an invocation for function %s with request_id %s",
186
            self.function_arn,
187
            invocation.request_id,
188
        )
189
        if self.shutdown_event.is_set():
1✔
190
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
191
            LOG.warning(message)
×
192
            raise ServiceException(message)
×
193

194
        with self.counting_service.get_invocation_lease(
1✔
195
            self.function, self.function_version
196
        ) as provisioning_type:
197
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
198
            #   getting an environment
199
            try:
1✔
200
                # Blocks and potentially creates a new execution environment for this invocation
201
                with self.assignment_service.get_environment(
1✔
202
                    self.id, self.function_version, provisioning_type
203
                ) as execution_env:
204
                    invocation_result = execution_env.invoke(invocation)
1✔
205
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
206
                    self.store_logs(
1✔
207
                        invocation_result=invocation_result, execution_env=execution_env
208
                    )
209
            except StatusErrorException as e:
1✔
210
                invocation_result = InvocationResult(
1✔
211
                    request_id="",
212
                    payload=e.payload,
213
                    is_error=True,
214
                    logs="",
215
                    executed_version=self.function_version.id.qualifier,
216
                )
217

218
        function_id = self.function_version.id
1✔
219
        # Record CloudWatch metrics in separate threads
220
        # MAYBE reuse threads rather than starting new threads upon every invocation
221
        if invocation_result.is_error:
1✔
222
            start_thread(
1✔
223
                lambda *args, **kwargs: record_cw_metric_error(
224
                    function_name=function_id.function_name,
225
                    account_id=function_id.account,
226
                    region_name=function_id.region,
227
                ),
228
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
229
            )
230
        else:
231
            start_thread(
1✔
232
                lambda *args, **kwargs: record_cw_metric_invocation(
233
                    function_name=function_id.function_name,
234
                    account_id=function_id.account,
235
                    region_name=function_id.region,
236
                ),
237
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
238
            )
239
        # TODO: consider using the same prefix logging as in error case for execution environment.
240
        #   possibly as separate named logger.
241
        if invocation_result.logs is not None:
1✔
242
            LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
243
            for log_line in invocation_result.logs.splitlines():
1✔
244
                LOG.debug(
1✔
245
                    "[%s-%s] %s",
246
                    function_id.function_name,
247
                    invocation.request_id,
248
                    truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
249
                )
250
        else:
UNCOV
251
            LOG.warning(
×
252
                "[%s] Error while printing logs for function '%s': Received no logs from environment.",
253
                invocation.request_id,
254
                function_id.function_name,
255
            )
256
        return invocation_result
1✔
257

258
    def store_logs(
1✔
259
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
260
    ) -> None:
261
        if invocation_result.logs:
1✔
262
            log_item = LogItem(
1✔
263
                execution_env.get_log_group_name(),
264
                execution_env.get_log_stream_name(),
265
                invocation_result.logs,
266
            )
267
            self.log_handler.add_logs(log_item)
1✔
268
        else:
UNCOV
269
            LOG.warning(
×
270
                "Received no logs from invocation with id %s for lambda %s. Execution environment logs: \n%s",
271
                invocation_result.request_id,
272
                self.function_arn,
273
                execution_env.get_prefixed_logs(),
274
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc