• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19821277742

01 Dec 2025 08:16AM UTC coverage: 86.821% (-0.04%) from 86.863%
19821277742

push

github

web-flow
Add Lambda Managed Instances (#13440)

Co-authored-by: Joel Scheuner <joel.scheuner.dev@gmail.com>
Co-authored-by: Anisa Oshafi <anisaoshafi@gmail.com>
Co-authored-by: Cristopher Pinzón <cristopher.pinzon@gmail.com>
Co-authored-by: Alexander Rashed <alexander.rashed@localstack.cloud>
Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com>
Co-authored-by: Mathieu Cloutier <79954947+cloutierMat@users.noreply.github.com>
Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

127 of 181 new or added lines in 11 files covered. (70.17%)

17 existing lines in 5 files now uncovered.

69556 of 80114 relevant lines covered (86.82%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.33
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import time
1✔
9
import uuid
1✔
10
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
11
from datetime import datetime
1✔
12
from hashlib import sha256
1✔
13
from pathlib import PurePosixPath, PureWindowsPath
1✔
14
from threading import RLock
1✔
15
from typing import TYPE_CHECKING
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api.lambda_ import (
1✔
19
    InvalidParameterValueException,
20
    InvalidRequestContentException,
21
    InvocationType,
22
    LastUpdateStatus,
23
    NoPublishedVersionException,
24
    ResourceConflictException,
25
    ResourceNotFoundException,
26
    State,
27
)
28
from localstack.aws.connect import connect_to
1✔
29
from localstack.constants import AWS_REGION_US_EAST_1
1✔
30
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
31
from localstack.services.lambda_.analytics import (
1✔
32
    FunctionOperation,
33
    FunctionStatus,
34
    function_counter,
35
    hotreload_counter,
36
)
37
from localstack.services.lambda_.api_utils import (
1✔
38
    lambda_arn,
39
    qualified_lambda_arn,
40
    qualifier_is_alias,
41
)
42
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
43
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
44
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
45
from localstack.services.lambda_.invocation.lambda_models import (
1✔
46
    ArchiveCode,
47
    Function,
48
    FunctionVersion,
49
    HotReloadingCode,
50
    ImageCode,
51
    Invocation,
52
    InvocationResult,
53
    S3Code,
54
    UpdateStatus,
55
    VersionAlias,
56
    VersionState,
57
)
58
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
59
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
60
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
61
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
62
from localstack.utils.container_utils.container_client import ContainerException
1✔
63
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
64
from localstack.utils.strings import short_uid, to_str
1✔
65

66
if TYPE_CHECKING:
1✔
67
    from mypy_boto3_s3 import S3Client
×
68

69
LOG = logging.getLogger(__name__)
1✔
70

71
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
72
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
73

74

75
class LambdaService:
1✔
76
    # mapping from qualified ARN to version manager
77
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
78
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
79
    # mapping from qualified ARN to event manager
80
    event_managers = dict[str, LambdaEventManager]
1✔
81
    lambda_version_manager_lock: RLock
1✔
82
    task_executor: Executor
1✔
83

84
    assignment_service: AssignmentService
1✔
85
    counting_service: CountingService
1✔
86

87
    def __init__(self) -> None:
1✔
88
        self.lambda_running_versions = {}
1✔
89
        self.lambda_starting_versions = {}
1✔
90
        self.event_managers = {}
1✔
91
        self.lambda_version_manager_lock = RLock()
1✔
92
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
93
        self.assignment_service = AssignmentService()
1✔
94
        self.counting_service = CountingService()
1✔
95

96
    def stop(self) -> None:
1✔
97
        """
98
        Stop the whole lambda service
99
        """
100
        shutdown_futures = []
1✔
101
        for event_manager in self.event_managers.values():
1✔
102
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
103
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
104
        for version_manager in self.lambda_running_versions.values():
1✔
105
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
106
        for version_manager in self.lambda_starting_versions.values():
1✔
107
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
108
            shutdown_futures.append(
×
109
                self.task_executor.submit(
110
                    version_manager.function_version.config.code.destroy_cached
111
                )
112
            )
113
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
114
        if not_done:
1✔
115
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
116
        self.task_executor.shutdown(cancel_futures=True)
1✔
117
        self.assignment_service.stop()
1✔
118

119
    def stop_version(self, qualified_arn: str) -> None:
1✔
120
        """
121
        Stops a specific lambda service version
122
        :param qualified_arn: Qualified arn for the version to stop
123
        """
124
        LOG.debug("Stopping version %s", qualified_arn)
1✔
125
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
126
        if not event_manager:
1✔
127
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
128
        else:
129
            self.task_executor.submit(event_manager.stop)
1✔
130
        version_manager = self.lambda_running_versions.pop(
1✔
131
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
132
        )
133
        if not version_manager:
1✔
134
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
135
        self.task_executor.submit(version_manager.stop)
1✔
136
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
137

138
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
139
        """
140
        Get the lambda version for the given arn
141
        :param function_arn: qualified arn for the lambda version
142
        :return: LambdaVersionManager for the arn
143
        """
144
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
145
        if not version_manager:
1✔
146
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
147

148
        return version_manager
1✔
149

150
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
151
        """
152
        Get the lambda event manager for the given arn
153
        :param function_arn: qualified arn for the lambda version
154
        :return: LambdaEventManager for the arn
155
        """
156
        event_manager = self.event_managers.get(function_arn)
1✔
157
        if not event_manager:
1✔
158
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
159

160
        return event_manager
1✔
161

162
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
163
        new_state = version_manager.start()
1✔
164
        self.update_version_state(
1✔
165
            function_version=version_manager.function_version, new_state=new_state
166
        )
167

168
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
169
        """
170
        Creates a new function version (manager), and puts it in the startup dict
171

172
        :param function_version: Function Version to create
173
        """
174
        with self.lambda_version_manager_lock:
1✔
175
            qualified_arn = function_version.id.qualified_arn()
1✔
176
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
177
            if version_manager:
1✔
178
                raise ResourceConflictException(
1✔
179
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
180
                    Type="User",
181
                )
182
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
183
            fn = state.functions.get(function_version.id.function_name)
1✔
184
            version_manager = LambdaVersionManager(
1✔
185
                function_arn=qualified_arn,
186
                function_version=function_version,
187
                function=fn,
188
                counting_service=self.counting_service,
189
                assignment_service=self.assignment_service,
190
            )
191
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
192
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
193
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
194

195
    def publish_version_async(self, function_version: FunctionVersion):
1✔
NEW
196
        self.task_executor.submit(self.publish_version, function_version)
×
197

198
    def publish_version(self, function_version: FunctionVersion):
1✔
199
        """
200
        Synchronously create a function version (manager)
201
        Should only be called on publishing new versions, which basically clone an existing one.
202
        The new version needs to be added to the lambda store before invoking this.
203
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
204
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
205

206
        :param function_version: Function Version to create
207
        """
208
        # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
209
        # publish_version followed by get_function because transitioning LastUpdateStatus from InProgress to
210
        # Successful happens too fast on LocalStack (thanks to caching in prepare_version).
211
        # Without this hack, test_latest_published_update_config fails at get_function_response_postpublish
212
        # and test_lifecycle_invoke is flaky, sometimes not triggering the ResourceConflictException
213
        # Increasing this sleep too much (e.g., 10s) shouldn't cause any side effects apart from slow responsiveness
214
        if function_version.config.CapacityProviderConfig:
1✔
NEW
215
            time.sleep(0.1)
×
216
        with self.lambda_version_manager_lock:
1✔
217
            qualified_arn = function_version.id.qualified_arn()
1✔
218
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
219
            if version_manager:
1✔
220
                raise Exception(
×
221
                    "Version '%s' already starting up and in state %s",
222
                    qualified_arn,
223
                    version_manager.state,
224
                )
225
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
226
            fn = state.functions.get(function_version.id.function_name)
1✔
227
            version_manager = LambdaVersionManager(
1✔
228
                function_arn=qualified_arn,
229
                function_version=function_version,
230
                function=fn,
231
                counting_service=self.counting_service,
232
                assignment_service=self.assignment_service,
233
            )
234
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
235
        self._start_lambda_version(version_manager)
1✔
236

237
    # Commands
238
    def invoke(
1✔
239
        self,
240
        function_name: str,
241
        qualifier: str | None,
242
        region: str,
243
        account_id: str,
244
        invocation_type: InvocationType | None,
245
        client_context: str | None,
246
        request_id: str,
247
        payload: bytes | None,
248
        trace_context: dict | None = None,
249
        user_agent: str | None = None,
250
    ) -> InvocationResult | None:
251
        """
252
        Invokes a specific version of a lambda
253

254
        :param request_id: context request ID
255
        :param function_name: Function name
256
        :param qualifier: Function version qualifier
257
        :param region: Region of the function
258
        :param account_id: Account id of the function
259
        :param invocation_type: Invocation Type
260
        :param client_context: Client Context, if applicable
261
        :param trace_context: tracing information such as X-Ray header
262
        :param payload: Invocation payload
263
        :return: The invocation result
264
        """
265
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
266
        trace_context = trace_context or {}
1✔
267
        # Invoked arn (for lambda context) does not have qualifier if not supplied
268
        invoked_arn = lambda_arn(
1✔
269
            function_name=function_name,
270
            qualifier=qualifier,
271
            account=account_id,
272
            region=region,
273
        )
274
        state = lambda_stores[account_id][region]
1✔
275
        function = state.functions.get(function_name)
1✔
276

277
        if function is None:
1✔
278
            if not qualifier:
1✔
279
                invoked_arn += ":$LATEST"
1✔
280
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
281

282
        # A provided qualifier always takes precedence, but the default depends on whether $LATEST.PUBLISHED exists
283
        version_latest_published = function.versions.get("$LATEST.PUBLISHED")
1✔
284
        if version_latest_published:
1✔
NEW
285
            qualifier = qualifier or "$LATEST.PUBLISHED"
×
NEW
286
            invoked_arn = lambda_arn(
×
287
                function_name=function_name,
288
                qualifier=qualifier,
289
                account=account_id,
290
                region=region,
291
            )
292
        else:
293
            qualifier = qualifier or "$LATEST"
1✔
294

295
        if qualifier_is_alias(qualifier):
1✔
296
            alias = function.aliases.get(qualifier)
1✔
297
            if not alias:
1✔
298
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
299
            version_qualifier = alias.function_version
1✔
300
            if alias.routing_configuration:
1✔
301
                version, probability = next(
1✔
302
                    iter(alias.routing_configuration.version_weights.items())
303
                )
304
                if random.random() < probability:
1✔
305
                    version_qualifier = version
1✔
306
        else:
307
            version_qualifier = qualifier
1✔
308

309
        # Need the qualified arn to exactly get the target lambda
310
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
311
        version = function.versions.get(version_qualifier)
1✔
312
        if version is None:
1✔
NEW
313
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
×
314
        runtime = version.config.runtime or "n/a"
1✔
315
        package_type = version.config.package_type
1✔
316
        if version.config.CapacityProviderConfig and qualifier == "$LATEST":
1✔
NEW
317
            if function.versions.get("$LATEST.PUBLISHED"):
×
NEW
318
                raise InvalidParameterValueException(
×
319
                    "Functions configured with capacity provider configuration can't be invoked with $LATEST qualifier. To invoke this function, specify a published version qualifier or $LATEST.PUBLISHED.",
320
                    Type="User",
321
                )
322
            else:
NEW
323
                raise NoPublishedVersionException(
×
324
                    "The function can't be invoked because no published version exists. For functions with capacity provider configuration, either publish a version to $LATEST.PUBLISHED, or specify a published version qualifier.",
325
                    Type="User",
326
                )
327
        try:
1✔
328
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
329
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
330
        except ValueError as e:
×
331
            state = version and version.config.state.state
×
332
            if state == State.Failed:
×
333
                status = FunctionStatus.failed_state_error
×
334
                HINT_LOG.error(
×
335
                    f"Failed to create the runtime executor for the function {function_name}. "
336
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
337
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
338
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
339
                )
340
            elif state == State.Pending:
×
341
                status = FunctionStatus.pending_state_error
×
342
                HINT_LOG.warning(
×
343
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
344
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
345
                    "Pending to Active using: "
346
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
347
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
348
                )
349
            else:
350
                status = FunctionStatus.unhandled_state_error
×
351
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
352
            function_counter.labels(
×
353
                operation=FunctionOperation.invoke,
354
                runtime=runtime,
355
                status=status,
356
                invocation_type=invocation_type,
357
                package_type=package_type,
358
            ).increment()
359
            raise ResourceConflictException(
×
360
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
361
            ) from e
362
        # empty payloads have to work as well
363
        if payload is None:
1✔
364
            payload = b"{}"
1✔
365
        else:
366
            # detect invalid payloads early before creating an execution environment
367
            try:
1✔
368
                to_str(payload)
1✔
369
            except Exception as e:
1✔
370
                function_counter.labels(
1✔
371
                    operation=FunctionOperation.invoke,
372
                    runtime=runtime,
373
                    status=FunctionStatus.invalid_payload_error,
374
                    invocation_type=invocation_type,
375
                    package_type=package_type,
376
                ).increment()
377
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
378
                raise InvalidRequestContentException(
1✔
379
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
380
                    Type="User",
381
                )
382
        if invocation_type is None:
1✔
383
            invocation_type = InvocationType.RequestResponse
1✔
384
        if invocation_type == InvocationType.DryRun:
1✔
385
            return None
1✔
386
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
387
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
388
        #
389
        if invocation_type == InvocationType.Event:
1✔
390
            return event_manager.enqueue_event(
1✔
391
                invocation=Invocation(
392
                    payload=payload,
393
                    invoked_arn=invoked_arn,
394
                    client_context=client_context,
395
                    invocation_type=invocation_type,
396
                    invoke_time=datetime.now(),
397
                    request_id=request_id,
398
                    trace_context=trace_context,
399
                    user_agent=user_agent,
400
                )
401
            )
402

403
        invocation_result = version_manager.invoke(
1✔
404
            invocation=Invocation(
405
                payload=payload,
406
                invoked_arn=invoked_arn,
407
                client_context=client_context,
408
                invocation_type=invocation_type,
409
                invoke_time=datetime.now(),
410
                request_id=request_id,
411
                trace_context=trace_context,
412
                user_agent=user_agent,
413
            )
414
        )
415
        status = (
1✔
416
            FunctionStatus.invocation_error
417
            if invocation_result.is_error
418
            else FunctionStatus.success
419
        )
420
        function_counter.labels(
1✔
421
            operation=FunctionOperation.invoke,
422
            runtime=runtime,
423
            status=status,
424
            invocation_type=invocation_type,
425
            package_type=package_type,
426
        ).increment()
427
        return invocation_result
1✔
428

429
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
430
        """
431
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
432
        to be invoked
433

434
        :param new_version: New version (with the same qualifier as an older one)
435
        """
436
        if (
1✔
437
            new_version.qualified_arn not in self.lambda_running_versions
438
            and not new_version.config.CapacityProviderConfig
439
        ):
UNCOV
440
            raise ValueError(
×
441
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
442
            )
443

444
        return self.create_function_version(function_version=new_version)
1✔
445

446
    def update_version_state(
1✔
447
        self, function_version: FunctionVersion, new_state: VersionState
448
    ) -> None:
449
        """
450
        Update the version state for the given function version.
451

452
        This will perform a rollover to the given function if the new state is active and there is a previously
453
        running version registered. The old version will be shutdown and its code deleted.
454

455
        If the new state is failed, it will abort the update and mark it as failed.
456
        If an older version is still running, it will keep running.
457

458
        :param function_version: Version reporting the state
459
        :param new_state: New state
460
        """
461
        function_arn = function_version.qualified_arn
1✔
462
        try:
1✔
463
            old_version = None
1✔
464
            old_event_manager = None
1✔
465
            with self.lambda_version_manager_lock:
1✔
466
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
467
                if not new_version_manager:
1✔
468
                    raise ValueError(
×
469
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
470
                    )
471
                if new_state.state == State.Active:
1✔
472
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
473
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
474
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
475
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
476
                        version_manager=new_version_manager
477
                    )
478
                    self.event_managers[function_arn].start()
1✔
479
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
480
                elif new_state.state == State.Failed:
×
481
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
482
                    self.task_executor.submit(new_version_manager.stop)
×
NEW
483
                elif (
×
484
                    new_state.state == State.ActiveNonInvocable
485
                    and function_version.config.CapacityProviderConfig
486
                ):
NEW
487
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
×
488
                else:
489
                    # TODO what to do if state pending or inactive is supported?
490
                    self.task_executor.submit(new_version_manager.stop)
×
491
                    LOG.error(
×
492
                        "State %s for version %s should not have been reported. New version will be stopped.",
493
                        new_state,
494
                        function_arn,
495
                    )
496
                    return
×
497

498
            # TODO is it necessary to get the version again? Should be locked for modification anyway
499
            # Without updating the new state, the function would not change to active, last_update would be missing, and
500
            # the revision id would not be updated.
501
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
502
            # FIXME this will fail if the function is deleted during this code lines here
503
            function = state.functions.get(function_version.id.function_name)
1✔
504
            if old_event_manager:
1✔
505
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
506
            if old_version:
1✔
507
                # if there is an old version, we assume it is an update, and stop the old one
508
                self.task_executor.submit(old_version.stop)
1✔
509
                if function:
1✔
510
                    self.task_executor.submit(
1✔
511
                        destroy_code_if_not_used, old_version.function_version.config.code, function
512
                    )
513
            if not function:
1✔
514
                LOG.debug("Function %s was deleted during status update", function_arn)
×
515
                return
×
516
            current_version = function.versions[function_version.id.qualifier]
1✔
517
            new_version_manager.state = new_state
1✔
518
            new_version_state = dataclasses.replace(
1✔
519
                current_version,
520
                config=dataclasses.replace(
521
                    current_version.config, state=new_state, last_update=update_status
522
                ),
523
            )
524
            state.functions[function_version.id.function_name].versions[
1✔
525
                function_version.id.qualifier
526
            ] = new_version_state
527

528
        except Exception:
×
529
            LOG.error(
×
530
                "Failed to update function version for arn %s",
531
                function_arn,
532
                exc_info=LOG.isEnabledFor(logging.DEBUG),
533
            )
534

535
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
536
        # if pointer changed, need to restart provisioned
537
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
538
            old_alias.name
539
        )
540
        if (
1✔
541
            old_alias.function_version != new_alias.function_version
542
            and provisioned_concurrency_config is not None
543
        ):
544
            LOG.warning("Deprovisioning")
×
545
            fn_version_old = function.versions.get(old_alias.function_version)
×
546
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
547
            fn_version_new = function.versions.get(new_alias.function_version)
×
548
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
549

550
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
551
            # TODO: make this fully async
552
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
553
            vm_new.update_provisioned_concurrency_config(
×
554
                provisioned_concurrency_config.provisioned_concurrent_executions
555
            )  # async again
556

557
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
558
        """
559
        Checks whether lambda can assume the given role.
560
        This _should_ only fail if IAM enforcement is enabled.
561

562
        :param role_arn: Role to assume
563
        :return: True if the role can be assumed by lambda, false otherwise
564
        """
565
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
566
        try:
1✔
567
            sts_client.assume_role(
1✔
568
                RoleArn=role_arn,
569
                RoleSessionName=f"test-assume-{short_uid()}",
570
                DurationSeconds=900,
571
            )
572
            return True
1✔
573
        except Exception as e:
×
574
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
575
            return False
×
576

577

578
# TODO: Move helper functions out of lambda_service into a separate module
579

580

581
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
582
    """
583
    Check if given code is still used in some version of the function
584

585
    :param code: Code object
586
    :param function: function to check
587
    :return: bool whether code is used in another version of the function
588
    """
589
    with function.lock:
1✔
590
        return any(code == version.config.code for version in function.versions.values())
1✔
591

592

593
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
594
    """
595
    Destroy the given code if it is not used in some version of the function
596
    Do nothing otherwise
597

598
    :param code: Code object
599
    :param function: Function the code belongs too
600
    """
601
    with function.lock:
1✔
602
        if not is_code_used(code, function):
1✔
603
            code.destroy()
1✔
604

605

606
def store_lambda_archive(
1✔
607
    archive_file: bytes, function_name: str, region_name: str, account_id: str
608
) -> S3Code:
609
    """
610
    Stores the given lambda archive in an internal s3 bucket.
611
    Also checks if zipfile matches the specifications
612

613
    :param archive_file: Archive file to store
614
    :param function_name: function name the archive should be stored for
615
    :param region_name: region name the archive should be stored for
616
    :param account_id: account id the archive should be stored for
617
    :return: S3 Code object representing the archive stored in S3
618
    """
619
    # check if zip file
620
    if not is_zip_file(archive_file):
1✔
621
        raise InvalidParameterValueException(
1✔
622
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
623
            Type="User",
624
        )
625
    # check unzipped size
626
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
627
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
628
        raise InvalidParameterValueException(
1✔
629
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
630
            Type="User",
631
        )
632
    # store all buckets in us-east-1 for now
633
    s3_client = connect_to(
1✔
634
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
635
    ).s3
636
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
637
    # s3 create bucket is idempotent in us-east-1
638
    s3_client.create_bucket(Bucket=bucket_name)
1✔
639
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
640
    key = f"snapshots/{account_id}/{code_id}"
1✔
641
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
642
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
643
    return S3Code(
1✔
644
        id=code_id,
645
        account_id=account_id,
646
        s3_bucket=bucket_name,
647
        s3_key=key,
648
        s3_object_version=None,
649
        code_sha256=code_sha256,
650
        code_size=len(archive_file),
651
    )
652

653

654
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
655
    """
656
    Check whether a given path, after environment variable substitution, is an absolute path.
657
    Accepts either posix or windows paths, with environment placeholders.
658
    Example placeholders: $ENV_VAR, ${ENV_VAR}
659

660
    :param path: Posix or windows path, potentially containing environment variable placeholders.
661
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
662
    """
663
    # expand variables in path before checking for an absolute path
664
    expanded_path = os.path.expandvars(path)
1✔
665
    if (
1✔
666
        not PurePosixPath(expanded_path).is_absolute()
667
        and not PureWindowsPath(expanded_path).is_absolute()
668
    ):
669
        raise InvalidParameterValueException(
1✔
670
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
671
        )
672

673

674
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
675
    assert_hot_reloading_path_absolute(path)
1✔
676
    return HotReloadingCode(host_path=path)
1✔
677

678

679
def store_s3_bucket_archive(
1✔
680
    archive_bucket: str,
681
    archive_key: str,
682
    archive_version: str | None,
683
    function_name: str,
684
    region_name: str,
685
    account_id: str,
686
) -> ArchiveCode:
687
    """
688
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
689

690
    :param archive_bucket: Bucket the archive is stored in
691
    :param archive_key: Key the archive is stored under
692
    :param archive_version: Version of the archive object in the bucket
693
    :param function_name: function name the archive should be stored for
694
    :param region_name: region name the archive should be stored for
695
    :param account_id: account id the archive should be stored for
696
    :return: S3 Code object representing the archive stored in S3
697
    """
698
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
699
        hotreload_counter.labels(operation="create").increment()
1✔
700
        return create_hot_reloading_code(path=archive_key)
1✔
701
    s3_client: S3Client = connect_to().s3
1✔
702
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
703
    try:
1✔
704
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
705
            "Body"
706
        ].read()
707
    except s3_client.exceptions.ClientError as e:
1✔
708
        raise InvalidParameterValueException(
1✔
709
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
710
            Type="User",
711
        )
712
    return store_lambda_archive(
1✔
713
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
714
    )
715

716

717
def create_image_code(image_uri: str) -> ImageCode:
1✔
718
    """
719
    Creates an image code by inspecting the provided image
720

721
    :param image_uri: Image URI of the image to inspect
722
    :return: Image code object
723
    """
724
    code_sha256 = "<cannot-get-image-hash>"
1✔
725
    if CONTAINER_CLIENT.has_docker():
1✔
726
        try:
1✔
727
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
728
        except ContainerException:
1✔
729
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
730
        try:
1✔
731
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
732
                0
733
            ].rpartition(":")[2]
734
        except Exception as e:
×
735
            LOG.debug(
×
736
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
737
            )
738
    else:
739
        LOG.warning(
×
740
            "Unable to get image hash for image %s - no docker socket available."
741
            "Image hash returned by Lambda will not be correct.",
742
            image_uri,
743
        )
744
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc