• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16981563750

14 Aug 2025 10:49PM UTC coverage: 86.896% (+0.04%) from 86.852%
16981563750

push

github

web-flow
add support for Fn::Tranform in CFnV2 (#12966)

Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

181 of 195 new or added lines in 6 files covered. (92.82%)

348 existing lines in 22 files now uncovered.

66915 of 77006 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.0
/localstack-core/localstack/services/lambda_/invocation/version_manager.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
import threading
1✔
4
import time
1✔
5
from concurrent.futures import Future
1✔
6
from concurrent.futures._base import CancelledError
1✔
7

8
from localstack import config
1✔
9
from localstack.aws.api.lambda_ import (
1✔
10
    ProvisionedConcurrencyStatusEnum,
11
    ServiceException,
12
    State,
13
    StateReasonCode,
14
)
15
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
16
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
17
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
18
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
1✔
19
from localstack.services.lambda_.invocation.executor_endpoint import StatusErrorException
1✔
20
from localstack.services.lambda_.invocation.lambda_models import (
1✔
21
    Function,
22
    FunctionVersion,
23
    Invocation,
24
    InvocationResult,
25
    ProvisionedConcurrencyState,
26
    VersionState,
27
)
28
from localstack.services.lambda_.invocation.logs import LogHandler, LogItem
1✔
29
from localstack.services.lambda_.invocation.metrics import (
1✔
30
    record_cw_metric_error,
31
    record_cw_metric_invocation,
32
)
33
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
1✔
34
from localstack.services.lambda_.ldm import LDMProvisioner
1✔
35
from localstack.utils.strings import long_uid, to_bytes, truncate
1✔
36
from localstack.utils.threads import FuncThread, start_thread
1✔
37

38
LOG = logging.getLogger(__name__)
1✔
39

40

41
class LambdaVersionManager:
1✔
42
    # arn this Lambda Version manager manages
43
    function_arn: str
1✔
44
    function_version: FunctionVersion
1✔
45
    function: Function
1✔
46

47
    # Scale provisioned concurrency up and down
48
    provisioning_thread: FuncThread | None
1✔
49
    # Additional guard to prevent scheduling invocation on version during shutdown
50
    shutdown_event: threading.Event
1✔
51

52
    state: VersionState | None
1✔
53
    provisioned_state: ProvisionedConcurrencyState | None  # TODO: remove?
1✔
54
    log_handler: LogHandler
1✔
55
    counting_service: CountingService
1✔
56
    assignment_service: AssignmentService
1✔
57

58
    ldm_provisioner: LDMProvisioner | None
1✔
59

60
    def __init__(
1✔
61
        self,
62
        function_arn: str,
63
        function_version: FunctionVersion,
64
        # HACK allowing None for Lambda@Edge; only used in invoke for get_invocation_lease
65
        function: Function | None,
66
        counting_service: CountingService,
67
        assignment_service: AssignmentService,
68
    ):
69
        self.id = long_uid()
1✔
70
        self.function_arn = function_arn
1✔
71
        self.function_version = function_version
1✔
72
        self.function = function
1✔
73
        self.counting_service = counting_service
1✔
74
        self.assignment_service = assignment_service
1✔
75
        self.log_handler = LogHandler(function_version.config.role, function_version.id.region)
1✔
76

77
        # async
78
        self.provisioning_thread = None
1✔
79
        self.shutdown_event = threading.Event()
1✔
80

81
        # async state
82
        self.provisioned_state = None
1✔
83
        self.provisioned_state_lock = threading.RLock()
1✔
84
        # https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
85
        self.state = VersionState(state=State.Pending)
1✔
86

87
        self.ldm_provisioner = None
1✔
88
        lambda_hooks.inject_ldm_provisioner.run(self)
1✔
89

90
    def start(self) -> VersionState:
1✔
91
        try:
1✔
92
            self.log_handler.start_subscriber()
1✔
93
            time_before = time.perf_counter()
1✔
94
            get_runtime_executor().prepare_version(self.function_version)  # TODO: make pluggable?
1✔
95
            LOG.debug(
1✔
96
                "Version preparation of function %s took %0.2fms",
97
                self.function_version.qualified_arn,
98
                (time.perf_counter() - time_before) * 1000,
99
            )
100

101
            # code and reason not set for success scenario because only failed states provide this field:
102
            # https://docs.aws.amazon.com/lambda/latest/dg/API_GetFunctionConfiguration.html#SSS-GetFunctionConfiguration-response-LastUpdateStatusReasonCode
103
            self.state = VersionState(state=State.Active)
1✔
104
            LOG.debug(
1✔
105
                "Changing Lambda %s (id %s) to active",
106
                self.function_arn,
107
                self.function_version.config.internal_revision,
108
            )
109
        except Exception as e:
×
UNCOV
110
            self.state = VersionState(
×
111
                state=State.Failed,
112
                code=StateReasonCode.InternalError,
113
                reason=f"Error while creating lambda: {e}",
114
            )
UNCOV
115
            LOG.debug(
×
116
                "Changing Lambda %s (id %s) to failed. Reason: %s",
117
                self.function_arn,
118
                self.function_version.config.internal_revision,
119
                e,
120
                exc_info=LOG.isEnabledFor(logging.DEBUG),
121
            )
122
        return self.state
1✔
123

124
    def stop(self) -> None:
1✔
125
        LOG.debug("Stopping lambda version '%s'", self.function_arn)
1✔
126
        self.state = VersionState(
1✔
127
            state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
128
        )
129
        self.shutdown_event.set()
1✔
130
        self.log_handler.stop()
1✔
131
        self.assignment_service.stop_environments_for_version(self.id)
1✔
132
        get_runtime_executor().cleanup_version(self.function_version)  # TODO: make pluggable?
1✔
133

134
    def update_provisioned_concurrency_config(
1✔
135
        self, provisioned_concurrent_executions: int
136
    ) -> Future[None]:
137
        """
138
        TODO: implement update while in progress (see test_provisioned_concurrency test)
139
        TODO: loop until diff == 0 and retry to remove/add diff environments
140
        TODO: alias routing & allocated (i.e., the status while updating provisioned concurrency)
141
        TODO: ProvisionedConcurrencyStatusEnum.FAILED
142
        TODO: status reason
143

144
        :param provisioned_concurrent_executions: set to 0 to stop all provisioned environments
145
        """
146
        with self.provisioned_state_lock:
1✔
147
            # LocalStack limitation: cannot update provisioned concurrency while another update is in progress
148
            if (
1✔
149
                self.provisioned_state
150
                and self.provisioned_state.status == ProvisionedConcurrencyStatusEnum.IN_PROGRESS
151
            ):
UNCOV
152
                raise ServiceException(
×
153
                    "Updating provisioned concurrency configuration while IN_PROGRESS is not supported yet."
154
                )
155

156
            if not self.provisioned_state:
1✔
157
                self.provisioned_state = ProvisionedConcurrencyState()
1✔
158

159
        def scale_environments(*args, **kwargs) -> None:
1✔
160
            futures = self.assignment_service.scale_provisioned_concurrency(
1✔
161
                self.id, self.function_version, provisioned_concurrent_executions
162
            )
163

164
            concurrent.futures.wait(futures)
1✔
165

166
            with self.provisioned_state_lock:
1✔
167
                if provisioned_concurrent_executions == 0:
1✔
168
                    self.provisioned_state = None
1✔
169
                else:
170
                    self.provisioned_state.available = provisioned_concurrent_executions
1✔
171
                    self.provisioned_state.allocated = provisioned_concurrent_executions
1✔
172
                    self.provisioned_state.status = ProvisionedConcurrencyStatusEnum.READY
1✔
173

174
        self.provisioning_thread = start_thread(scale_environments)
1✔
175
        return self.provisioning_thread.result_future
1✔
176

177
    # Extract environment handling
178

179
    def invoke(self, *, invocation: Invocation) -> InvocationResult:
1✔
180
        """
181
        synchronous invoke entrypoint
182

183
        0. check counter, get lease
184
        1. try to get an inactive (no active invoke) environment
185
        2.(allgood) send invoke to environment
186
        3. wait for invocation result
187
        4. return invocation result & release lease
188

189
        2.(nogood) fail fast fail hard
190

191
        """
192
        LOG.debug(
1✔
193
            "Got an invocation for function %s with request_id %s",
194
            self.function_arn,
195
            invocation.request_id,
196
        )
197
        if self.shutdown_event.is_set():
1✔
198
            message = f"Got an invocation with request_id {invocation.request_id} for a version shutting down"
×
199
            LOG.warning(message)
×
UNCOV
200
            raise ServiceException(message)
×
201

202
        # If the environment has debugging enabled, route the invocation there;
203
        # debug environments bypass Lambda service quotas.
204
        if self.ldm_provisioner and (
1✔
205
            ldm_execution_environment := self.ldm_provisioner.get_execution_environment(
206
                qualified_lambda_arn=self.function_version.qualified_arn,
207
                user_agent=invocation.user_agent,
208
            )
209
        ):
210
            try:
×
211
                invocation_result = ldm_execution_environment.invoke(invocation)
×
212
                invocation_result.executed_version = self.function_version.id.qualifier
×
UNCOV
213
                self.store_logs(
×
214
                    invocation_result=invocation_result, execution_env=ldm_execution_environment
215
                )
216
            except CancelledError as e:
×
217
                # Timeouts for invocation futures are managed by LDM, a cancelled error here is
218
                # aligned with the debug container terminating whilst debugging/invocation.
UNCOV
219
                LOG.debug("LDM invocation future encountered a cancelled error: '%s'", e)
×
UNCOV
220
                invocation_result = InvocationResult(
×
221
                    request_id="",
222
                    payload=to_bytes(
223
                        "The invocation was canceled because the debug configuration "
224
                        "was removed or the operation timed out"
225
                    ),
226
                    is_error=True,
227
                    logs="",
228
                    executed_version=self.function_version.id.qualifier,
229
                )
UNCOV
230
            except StatusErrorException as e:
×
UNCOV
231
                invocation_result = InvocationResult(
×
232
                    request_id="",
233
                    payload=e.payload,
234
                    is_error=True,
235
                    logs="",
236
                    executed_version=self.function_version.id.qualifier,
237
                )
238
            finally:
UNCOV
239
                ldm_execution_environment.release()
×
UNCOV
240
            return invocation_result
×
241

242
        with self.counting_service.get_invocation_lease(
1✔
243
            self.function, self.function_version
244
        ) as provisioning_type:
245
            # TODO: potential race condition when changing provisioned concurrency after getting the lease but before
246
            #   getting an environment
247
            try:
1✔
248
                # Blocks and potentially creates a new execution environment for this invocation
249
                with self.assignment_service.get_environment(
1✔
250
                    self.id, self.function_version, provisioning_type
251
                ) as execution_env:
252
                    invocation_result = execution_env.invoke(invocation)
1✔
253
                    invocation_result.executed_version = self.function_version.id.qualifier
1✔
254
                    self.store_logs(
1✔
255
                        invocation_result=invocation_result, execution_env=execution_env
256
                    )
257
            except StatusErrorException as e:
1✔
258
                invocation_result = InvocationResult(
1✔
259
                    request_id="",
260
                    payload=e.payload,
261
                    is_error=True,
262
                    logs="",
263
                    executed_version=self.function_version.id.qualifier,
264
                )
265

266
        function_id = self.function_version.id
1✔
267
        # Record CloudWatch metrics in separate threads
268
        # MAYBE reuse threads rather than starting new threads upon every invocation
269
        if invocation_result.is_error:
1✔
270
            start_thread(
1✔
271
                lambda *args, **kwargs: record_cw_metric_error(
272
                    function_name=function_id.function_name,
273
                    account_id=function_id.account,
274
                    region_name=function_id.region,
275
                ),
276
                name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
277
            )
278
        else:
279
            start_thread(
1✔
280
                lambda *args, **kwargs: record_cw_metric_invocation(
281
                    function_name=function_id.function_name,
282
                    account_id=function_id.account,
283
                    region_name=function_id.region,
284
                ),
285
                name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
286
            )
287
        # TODO: consider using the same prefix logging as in error case for execution environment.
288
        #   possibly as separate named logger.
289
        if invocation_result.logs is not None:
1✔
290
            LOG.debug("Got logs for invocation '%s'", invocation.request_id)
1✔
291
            for log_line in invocation_result.logs.splitlines():
1✔
292
                LOG.debug(
1✔
293
                    "[%s-%s] %s",
294
                    function_id.function_name,
295
                    invocation.request_id,
296
                    truncate(log_line, config.LAMBDA_TRUNCATE_STDOUT),
297
                )
298
        else:
UNCOV
299
            LOG.warning(
×
300
                "[%s] Error while printing logs for function '%s': Received no logs from environment.",
301
                invocation.request_id,
302
                function_id.function_name,
303
            )
304
        return invocation_result
1✔
305

306
    def store_logs(
1✔
307
        self, invocation_result: InvocationResult, execution_env: ExecutionEnvironment
308
    ) -> None:
309
        if invocation_result.logs:
1✔
310
            log_item = LogItem(
1✔
311
                execution_env.get_log_group_name(),
312
                execution_env.get_log_stream_name(),
313
                invocation_result.logs,
314
            )
315
            self.log_handler.add_logs(log_item)
1✔
316
        else:
UNCOV
317
            LOG.warning(
×
318
                "Received no logs from invocation with id %s for lambda %s. Execution environment logs: \n%s",
319
                invocation_result.request_id,
320
                self.function_arn,
321
                execution_env.get_prefixed_logs(),
322
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc