• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20085422227

09 Dec 2025 10:17PM UTC coverage: 86.887% (+0.01%) from 86.875%
20085422227

push

github

web-flow
ecs/add service principal (#13474)

2 of 2 new or added lines in 1 file covered. (100.0%)

224 existing lines in 8 files now uncovered.

69922 of 80475 relevant lines covered (86.89%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.4
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import time
1✔
9
import uuid
1✔
10
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
11
from datetime import datetime
1✔
12
from hashlib import sha256
1✔
13
from pathlib import PurePosixPath, PureWindowsPath
1✔
14
from threading import RLock
1✔
15
from typing import TYPE_CHECKING
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api.lambda_ import (
1✔
19
    InvalidParameterValueException,
20
    InvalidRequestContentException,
21
    InvocationType,
22
    LastUpdateStatus,
23
    NoPublishedVersionException,
24
    ResourceConflictException,
25
    ResourceNotFoundException,
26
    State,
27
)
28
from localstack.aws.connect import connect_to
1✔
29
from localstack.constants import AWS_REGION_US_EAST_1
1✔
30
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
31
from localstack.services.lambda_.analytics import (
1✔
32
    FunctionInitializationType,
33
    FunctionOperation,
34
    FunctionStatus,
35
    function_counter,
36
    hotreload_counter,
37
)
38
from localstack.services.lambda_.api_utils import (
1✔
39
    lambda_arn,
40
    qualified_lambda_arn,
41
    qualifier_is_alias,
42
)
43
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
44
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
45
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
46
from localstack.services.lambda_.invocation.lambda_models import (
1✔
47
    ArchiveCode,
48
    Function,
49
    FunctionVersion,
50
    HotReloadingCode,
51
    ImageCode,
52
    Invocation,
53
    InvocationResult,
54
    S3Code,
55
    UpdateStatus,
56
    VersionAlias,
57
    VersionState,
58
)
59
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
60
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
61
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
62
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
63
from localstack.utils.container_utils.container_client import ContainerException
1✔
64
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
65
from localstack.utils.strings import short_uid, to_str
1✔
66

67
if TYPE_CHECKING:
1✔
UNCOV
68
    from mypy_boto3_s3 import S3Client
×
69

70
LOG = logging.getLogger(__name__)
1✔
71

72
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
73
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
74

75

76
class LambdaService:
1✔
77
    # mapping from qualified ARN to version manager
78
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
79
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
80
    # mapping from qualified ARN to event manager
81
    event_managers = dict[str, LambdaEventManager]
1✔
82
    lambda_version_manager_lock: RLock
1✔
83
    task_executor: Executor
1✔
84

85
    assignment_service: AssignmentService
1✔
86
    counting_service: CountingService
1✔
87

88
    def __init__(self) -> None:
1✔
89
        self.lambda_running_versions = {}
1✔
90
        self.lambda_starting_versions = {}
1✔
91
        self.event_managers = {}
1✔
92
        self.lambda_version_manager_lock = RLock()
1✔
93
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
94
        self.assignment_service = AssignmentService()
1✔
95
        self.counting_service = CountingService()
1✔
96

97
    def stop(self) -> None:
1✔
98
        """
99
        Stop the whole lambda service
100
        """
101
        shutdown_futures = []
1✔
102
        for event_manager in self.event_managers.values():
1✔
103
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
104
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
105
        for version_manager in self.lambda_running_versions.values():
1✔
106
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
107
        for version_manager in self.lambda_starting_versions.values():
1✔
108
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
UNCOV
109
            shutdown_futures.append(
×
110
                self.task_executor.submit(
111
                    version_manager.function_version.config.code.destroy_cached
112
                )
113
            )
114
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
115
        if not_done:
1✔
UNCOV
116
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
117
        self.task_executor.shutdown(cancel_futures=True)
1✔
118
        self.assignment_service.stop()
1✔
119

120
    def stop_version(self, qualified_arn: str) -> None:
1✔
121
        """
122
        Stops a specific lambda service version
123
        :param qualified_arn: Qualified arn for the version to stop
124
        """
125
        LOG.debug("Stopping version %s", qualified_arn)
1✔
126
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
127
        if not event_manager:
1✔
UNCOV
128
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
129
        else:
130
            self.task_executor.submit(event_manager.stop)
1✔
131
        version_manager = self.lambda_running_versions.pop(
1✔
132
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
133
        )
134
        if not version_manager:
1✔
UNCOV
135
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
136
        self.task_executor.submit(version_manager.stop)
1✔
137
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
138

139
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
140
        """
141
        Get the lambda version for the given arn
142
        :param function_arn: qualified arn for the lambda version
143
        :return: LambdaVersionManager for the arn
144
        """
145
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
146
        if not version_manager:
1✔
UNCOV
147
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
148

149
        return version_manager
1✔
150

151
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
152
        """
153
        Get the lambda event manager for the given arn
154
        :param function_arn: qualified arn for the lambda version
155
        :return: LambdaEventManager for the arn
156
        """
157
        event_manager = self.event_managers.get(function_arn)
1✔
158
        if not event_manager:
1✔
UNCOV
159
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
160

161
        return event_manager
1✔
162

163
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
164
        new_state = version_manager.start()
1✔
165
        self.update_version_state(
1✔
166
            function_version=version_manager.function_version, new_state=new_state
167
        )
168

169
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
170
        """
171
        Creates a new function version (manager), and puts it in the startup dict
172

173
        :param function_version: Function Version to create
174
        """
175
        with self.lambda_version_manager_lock:
1✔
176
            qualified_arn = function_version.id.qualified_arn()
1✔
177
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
178
            if version_manager:
1✔
179
                raise ResourceConflictException(
1✔
180
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
181
                    Type="User",
182
                )
183
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
184
            fn = state.functions.get(function_version.id.function_name)
1✔
185
            version_manager = LambdaVersionManager(
1✔
186
                function_arn=qualified_arn,
187
                function_version=function_version,
188
                function=fn,
189
                counting_service=self.counting_service,
190
                assignment_service=self.assignment_service,
191
            )
192
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
193
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
194
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
195

196
    def publish_version_async(self, function_version: FunctionVersion):
1✔
UNCOV
197
        self.task_executor.submit(self.publish_version, function_version)
×
198

199
    def publish_version(self, function_version: FunctionVersion):
1✔
200
        """
201
        Synchronously create a function version (manager)
202
        Should only be called on publishing new versions, which basically clone an existing one.
203
        The new version needs to be added to the lambda store before invoking this.
204
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
205
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
206

207
        :param function_version: Function Version to create
208
        """
209
        # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
210
        # publish_version followed by get_function because transitioning LastUpdateStatus from InProgress to
211
        # Successful happens too fast on LocalStack (thanks to caching in prepare_version).
212
        # Without this hack, test_latest_published_update_config fails at get_function_response_postpublish
213
        # and test_lifecycle_invoke is flaky, sometimes not triggering the ResourceConflictException
214
        # Increasing this sleep too much (e.g., 10s) shouldn't cause any side effects apart from slow responsiveness
215
        if function_version.config.CapacityProviderConfig:
1✔
UNCOV
216
            time.sleep(0.1)
×
217
        with self.lambda_version_manager_lock:
1✔
218
            qualified_arn = function_version.id.qualified_arn()
1✔
219
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
220
            if version_manager:
1✔
UNCOV
221
                raise Exception(
×
222
                    "Version '%s' already starting up and in state %s",
223
                    qualified_arn,
224
                    version_manager.state,
225
                )
226
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
227
            fn = state.functions.get(function_version.id.function_name)
1✔
228
            version_manager = LambdaVersionManager(
1✔
229
                function_arn=qualified_arn,
230
                function_version=function_version,
231
                function=fn,
232
                counting_service=self.counting_service,
233
                assignment_service=self.assignment_service,
234
            )
235
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
236
        self._start_lambda_version(version_manager)
1✔
237

238
    # Commands
239
    def invoke(
1✔
240
        self,
241
        function_name: str,
242
        qualifier: str | None,
243
        region: str,
244
        account_id: str,
245
        invocation_type: InvocationType | None,
246
        client_context: str | None,
247
        request_id: str,
248
        payload: bytes | None,
249
        trace_context: dict | None = None,
250
        user_agent: str | None = None,
251
    ) -> InvocationResult | None:
252
        """
253
        Invokes a specific version of a lambda
254

255
        :param request_id: context request ID
256
        :param function_name: Function name
257
        :param qualifier: Function version qualifier
258
        :param region: Region of the function
259
        :param account_id: Account id of the function
260
        :param invocation_type: Invocation Type
261
        :param client_context: Client Context, if applicable
262
        :param trace_context: tracing information such as X-Ray header
263
        :param payload: Invocation payload
264
        :return: The invocation result
265
        """
266
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
267
        trace_context = trace_context or {}
1✔
268
        # Invoked arn (for lambda context) does not have qualifier if not supplied
269
        invoked_arn = lambda_arn(
1✔
270
            function_name=function_name,
271
            qualifier=qualifier,
272
            account=account_id,
273
            region=region,
274
        )
275
        state = lambda_stores[account_id][region]
1✔
276
        function = state.functions.get(function_name)
1✔
277

278
        if function is None:
1✔
279
            if not qualifier:
1✔
280
                invoked_arn += ":$LATEST"
1✔
281
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
282

283
        # A provided qualifier always takes precedence, but the default depends on whether $LATEST.PUBLISHED exists
284
        version_latest_published = function.versions.get("$LATEST.PUBLISHED")
1✔
285
        if version_latest_published:
1✔
286
            qualifier = qualifier or "$LATEST.PUBLISHED"
×
UNCOV
287
            invoked_arn = lambda_arn(
×
288
                function_name=function_name,
289
                qualifier=qualifier,
290
                account=account_id,
291
                region=region,
292
            )
293
        else:
294
            qualifier = qualifier or "$LATEST"
1✔
295

296
        if qualifier_is_alias(qualifier):
1✔
297
            alias = function.aliases.get(qualifier)
1✔
298
            if not alias:
1✔
299
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
300
            version_qualifier = alias.function_version
1✔
301
            if alias.routing_configuration:
1✔
302
                version, probability = next(
1✔
303
                    iter(alias.routing_configuration.version_weights.items())
304
                )
305
                if random.random() < probability:
1✔
306
                    version_qualifier = version
1✔
307
        else:
308
            version_qualifier = qualifier
1✔
309

310
        # Need the qualified arn to exactly get the target lambda
311
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
312
        version = function.versions.get(version_qualifier)
1✔
313
        if version is None:
1✔
UNCOV
314
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
×
315
        runtime = version.config.runtime or "n/a"
1✔
316
        package_type = version.config.package_type
1✔
317
        # Not considering provisioned concurrency for such early errors
318
        initialization_type = (
1✔
319
            FunctionInitializationType.lambda_managed_instances
320
            if version.config.CapacityProviderConfig
321
            else FunctionInitializationType.on_demand
322
        )
323
        if version.config.CapacityProviderConfig and qualifier == "$LATEST":
1✔
UNCOV
324
            if function.versions.get("$LATEST.PUBLISHED"):
×
UNCOV
325
                raise InvalidParameterValueException(
×
326
                    "Functions configured with capacity provider configuration can't be invoked with $LATEST qualifier. To invoke this function, specify a published version qualifier or $LATEST.PUBLISHED.",
327
                    Type="User",
328
                )
329
            else:
330
                raise NoPublishedVersionException(
×
331
                    "The function can't be invoked because no published version exists. For functions with capacity provider configuration, either publish a version to $LATEST.PUBLISHED, or specify a published version qualifier.",
332
                    Type="User",
333
                )
334
        try:
1✔
335
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
336
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
UNCOV
337
        except ValueError as e:
×
UNCOV
338
            state = version and version.config.state.state
×
UNCOV
339
            if state == State.Failed:
×
340
                status = FunctionStatus.failed_state_error
×
341
                HINT_LOG.error(
×
342
                    f"Failed to create the runtime executor for the function {function_name}. "
343
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
344
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
345
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
346
                )
UNCOV
347
            elif state == State.Pending:
×
UNCOV
348
                status = FunctionStatus.pending_state_error
×
UNCOV
349
                HINT_LOG.warning(
×
350
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
351
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
352
                    "Pending to Active using: "
353
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
354
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
355
                )
356
            else:
UNCOV
357
                status = FunctionStatus.unhandled_state_error
×
UNCOV
358
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
359
            function_counter.labels(
×
360
                operation=FunctionOperation.invoke,
361
                runtime=runtime,
362
                status=status,
363
                invocation_type=invocation_type,
364
                package_type=package_type,
365
                initialization_type=initialization_type,
366
            ).increment()
UNCOV
367
            raise ResourceConflictException(
×
368
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
369
            ) from e
370
        # empty payloads have to work as well
371
        if payload is None:
1✔
372
            payload = b"{}"
1✔
373
        else:
374
            # detect invalid payloads early before creating an execution environment
375
            try:
1✔
376
                to_str(payload)
1✔
377
            except Exception as e:
1✔
378
                function_counter.labels(
1✔
379
                    operation=FunctionOperation.invoke,
380
                    runtime=runtime,
381
                    status=FunctionStatus.invalid_payload_error,
382
                    invocation_type=invocation_type,
383
                    package_type=package_type,
384
                    initialization_type=initialization_type,
385
                ).increment()
386
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
387
                raise InvalidRequestContentException(
1✔
388
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
389
                    Type="User",
390
                )
391
        if invocation_type is None:
1✔
392
            invocation_type = InvocationType.RequestResponse
1✔
393
        if invocation_type == InvocationType.DryRun:
1✔
394
            return None
1✔
395
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
396
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
397
        #
398
        if invocation_type == InvocationType.Event:
1✔
399
            return event_manager.enqueue_event(
1✔
400
                invocation=Invocation(
401
                    payload=payload,
402
                    invoked_arn=invoked_arn,
403
                    client_context=client_context,
404
                    invocation_type=invocation_type,
405
                    invoke_time=datetime.now(),
406
                    request_id=request_id,
407
                    trace_context=trace_context,
408
                    user_agent=user_agent,
409
                )
410
            )
411

412
        invocation_result = version_manager.invoke(
1✔
413
            invocation=Invocation(
414
                payload=payload,
415
                invoked_arn=invoked_arn,
416
                client_context=client_context,
417
                invocation_type=invocation_type,
418
                invoke_time=datetime.now(),
419
                request_id=request_id,
420
                trace_context=trace_context,
421
                user_agent=user_agent,
422
            )
423
        )
424
        status = (
1✔
425
            FunctionStatus.invocation_error
426
            if invocation_result.is_error
427
            else FunctionStatus.success
428
        )
429
        # TODO: handle initialization_type provisioned-concurrency, requires enriching invocation_result
430
        function_counter.labels(
1✔
431
            operation=FunctionOperation.invoke,
432
            runtime=runtime,
433
            status=status,
434
            invocation_type=invocation_type,
435
            package_type=package_type,
436
            initialization_type=initialization_type,
437
        ).increment()
438
        return invocation_result
1✔
439

440
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
441
        """
442
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
443
        to be invoked
444

445
        :param new_version: New version (with the same qualifier as an older one)
446
        """
447
        if (
1✔
448
            new_version.qualified_arn not in self.lambda_running_versions
449
            and not new_version.config.CapacityProviderConfig
450
        ):
UNCOV
451
            raise ValueError(
×
452
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
453
            )
454

455
        return self.create_function_version(function_version=new_version)
1✔
456

457
    def update_version_state(
1✔
458
        self, function_version: FunctionVersion, new_state: VersionState
459
    ) -> None:
460
        """
461
        Update the version state for the given function version.
462

463
        This will perform a rollover to the given function if the new state is active and there is a previously
464
        running version registered. The old version will be shutdown and its code deleted.
465

466
        If the new state is failed, it will abort the update and mark it as failed.
467
        If an older version is still running, it will keep running.
468

469
        :param function_version: Version reporting the state
470
        :param new_state: New state
471
        """
472
        function_arn = function_version.qualified_arn
1✔
473
        try:
1✔
474
            old_version = None
1✔
475
            old_event_manager = None
1✔
476
            with self.lambda_version_manager_lock:
1✔
477
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
478
                if not new_version_manager:
1✔
UNCOV
479
                    raise ValueError(
×
480
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
481
                    )
482
                if new_state.state == State.Active:
1✔
483
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
484
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
485
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
486
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
487
                        version_manager=new_version_manager
488
                    )
489
                    self.event_managers[function_arn].start()
1✔
490
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
491
                elif new_state.state == State.Failed:
×
UNCOV
492
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
UNCOV
493
                    self.task_executor.submit(new_version_manager.stop)
×
UNCOV
494
                elif (
×
495
                    new_state.state == State.ActiveNonInvocable
496
                    and function_version.config.CapacityProviderConfig
497
                ):
UNCOV
498
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
×
499
                else:
500
                    # TODO what to do if state pending or inactive is supported?
UNCOV
501
                    self.task_executor.submit(new_version_manager.stop)
×
UNCOV
502
                    LOG.error(
×
503
                        "State %s for version %s should not have been reported. New version will be stopped.",
504
                        new_state,
505
                        function_arn,
506
                    )
UNCOV
507
                    return
×
508

509
            # TODO is it necessary to get the version again? Should be locked for modification anyway
510
            # Without updating the new state, the function would not change to active, last_update would be missing, and
511
            # the revision id would not be updated.
512
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
513
            # FIXME this will fail if the function is deleted during this code lines here
514
            function = state.functions.get(function_version.id.function_name)
1✔
515
            if old_event_manager:
1✔
516
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
517
            if old_version:
1✔
518
                # if there is an old version, we assume it is an update, and stop the old one
519
                self.task_executor.submit(old_version.stop)
1✔
520
                if function:
1✔
521
                    self.task_executor.submit(
1✔
522
                        destroy_code_if_not_used, old_version.function_version.config.code, function
523
                    )
524
            if not function:
1✔
UNCOV
525
                LOG.debug("Function %s was deleted during status update", function_arn)
×
UNCOV
526
                return
×
527
            current_version = function.versions[function_version.id.qualifier]
1✔
528
            new_version_manager.state = new_state
1✔
529
            new_version_state = dataclasses.replace(
1✔
530
                current_version,
531
                config=dataclasses.replace(
532
                    current_version.config, state=new_state, last_update=update_status
533
                ),
534
            )
535
            state.functions[function_version.id.function_name].versions[
1✔
536
                function_version.id.qualifier
537
            ] = new_version_state
538

UNCOV
539
        except Exception:
×
UNCOV
540
            LOG.error(
×
541
                "Failed to update function version for arn %s",
542
                function_arn,
543
                exc_info=LOG.isEnabledFor(logging.DEBUG),
544
            )
545

546
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
547
        # if pointer changed, need to restart provisioned
548
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
549
            old_alias.name
550
        )
551
        if (
1✔
552
            old_alias.function_version != new_alias.function_version
553
            and provisioned_concurrency_config is not None
554
        ):
UNCOV
555
            LOG.warning("Deprovisioning")
×
UNCOV
556
            fn_version_old = function.versions.get(old_alias.function_version)
×
UNCOV
557
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
UNCOV
558
            fn_version_new = function.versions.get(new_alias.function_version)
×
UNCOV
559
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
560

561
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
562
            # TODO: make this fully async
UNCOV
563
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
UNCOV
564
            vm_new.update_provisioned_concurrency_config(
×
565
                provisioned_concurrency_config.provisioned_concurrent_executions
566
            )  # async again
567

568
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
569
        """
570
        Checks whether lambda can assume the given role.
571
        This _should_ only fail if IAM enforcement is enabled.
572

573
        :param role_arn: Role to assume
574
        :return: True if the role can be assumed by lambda, false otherwise
575
        """
576
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
577
        try:
1✔
578
            sts_client.assume_role(
1✔
579
                RoleArn=role_arn,
580
                RoleSessionName=f"test-assume-{short_uid()}",
581
                DurationSeconds=900,
582
            )
583
            return True
1✔
UNCOV
584
        except Exception as e:
×
UNCOV
585
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
UNCOV
586
            return False
×
587

588

589
# TODO: Move helper functions out of lambda_service into a separate module
590

591

592
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
593
    """
594
    Check if given code is still used in some version of the function
595

596
    :param code: Code object
597
    :param function: function to check
598
    :return: bool whether code is used in another version of the function
599
    """
600
    with function.lock:
1✔
601
        return any(code == version.config.code for version in function.versions.values())
1✔
602

603

604
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
605
    """
606
    Destroy the given code if it is not used in some version of the function
607
    Do nothing otherwise
608

609
    :param code: Code object
610
    :param function: Function the code belongs too
611
    """
612
    with function.lock:
1✔
613
        if not is_code_used(code, function):
1✔
614
            code.destroy()
1✔
615

616

617
def store_lambda_archive(
1✔
618
    archive_file: bytes, function_name: str, region_name: str, account_id: str
619
) -> S3Code:
620
    """
621
    Stores the given lambda archive in an internal s3 bucket.
622
    Also checks if zipfile matches the specifications
623

624
    :param archive_file: Archive file to store
625
    :param function_name: function name the archive should be stored for
626
    :param region_name: region name the archive should be stored for
627
    :param account_id: account id the archive should be stored for
628
    :return: S3 Code object representing the archive stored in S3
629
    """
630
    # check if zip file
631
    if not is_zip_file(archive_file):
1✔
632
        raise InvalidParameterValueException(
1✔
633
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
634
            Type="User",
635
        )
636
    # check unzipped size
637
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
638
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
639
        raise InvalidParameterValueException(
1✔
640
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
641
            Type="User",
642
        )
643
    # store all buckets in us-east-1 for now
644
    s3_client = connect_to(
1✔
645
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
646
    ).s3
647
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
648
    # s3 create bucket is idempotent in us-east-1
649
    s3_client.create_bucket(Bucket=bucket_name)
1✔
650
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
651
    key = f"snapshots/{account_id}/{code_id}"
1✔
652
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
653
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
654
    return S3Code(
1✔
655
        id=code_id,
656
        account_id=account_id,
657
        s3_bucket=bucket_name,
658
        s3_key=key,
659
        s3_object_version=None,
660
        code_sha256=code_sha256,
661
        code_size=len(archive_file),
662
    )
663

664

665
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
666
    """
667
    Check whether a given path, after environment variable substitution, is an absolute path.
668
    Accepts either posix or windows paths, with environment placeholders.
669
    Example placeholders: $ENV_VAR, ${ENV_VAR}
670

671
    :param path: Posix or windows path, potentially containing environment variable placeholders.
672
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
673
    """
674
    # expand variables in path before checking for an absolute path
675
    expanded_path = os.path.expandvars(path)
1✔
676
    if (
1✔
677
        not PurePosixPath(expanded_path).is_absolute()
678
        and not PureWindowsPath(expanded_path).is_absolute()
679
    ):
680
        raise InvalidParameterValueException(
1✔
681
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
682
        )
683

684

685
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
686
    assert_hot_reloading_path_absolute(path)
1✔
687
    return HotReloadingCode(host_path=path)
1✔
688

689

690
def store_s3_bucket_archive(
1✔
691
    archive_bucket: str,
692
    archive_key: str,
693
    archive_version: str | None,
694
    function_name: str,
695
    region_name: str,
696
    account_id: str,
697
) -> ArchiveCode:
698
    """
699
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
700

701
    :param archive_bucket: Bucket the archive is stored in
702
    :param archive_key: Key the archive is stored under
703
    :param archive_version: Version of the archive object in the bucket
704
    :param function_name: function name the archive should be stored for
705
    :param region_name: region name the archive should be stored for
706
    :param account_id: account id the archive should be stored for
707
    :return: S3 Code object representing the archive stored in S3
708
    """
709
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
710
        hotreload_counter.labels(operation="create").increment()
1✔
711
        return create_hot_reloading_code(path=archive_key)
1✔
712
    s3_client: S3Client = connect_to().s3
1✔
713
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
714
    try:
1✔
715
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
716
            "Body"
717
        ].read()
718
    except s3_client.exceptions.ClientError as e:
1✔
719
        raise InvalidParameterValueException(
1✔
720
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
721
            Type="User",
722
        )
723
    return store_lambda_archive(
1✔
724
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
725
    )
726

727

728
def create_image_code(image_uri: str) -> ImageCode:
1✔
729
    """
730
    Creates an image code by inspecting the provided image
731

732
    :param image_uri: Image URI of the image to inspect
733
    :return: Image code object
734
    """
735
    code_sha256 = "<cannot-get-image-hash>"
1✔
736
    if CONTAINER_CLIENT.has_docker():
1✔
737
        try:
1✔
738
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
739
        except ContainerException:
1✔
740
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
741
        try:
1✔
742
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
743
                0
744
            ].rpartition(":")[2]
UNCOV
745
        except Exception as e:
×
UNCOV
746
            LOG.debug(
×
747
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
748
            )
749
    else:
UNCOV
750
        LOG.warning(
×
751
            "Unable to get image hash for image %s - no docker socket available."
752
            "Image hash returned by Lambda will not be correct.",
753
            image_uri,
754
        )
755
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc