• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.31
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import time
1✔
9
import uuid
1✔
10
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
11
from datetime import datetime
1✔
12
from hashlib import sha256
1✔
13
from pathlib import PurePosixPath, PureWindowsPath
1✔
14
from threading import RLock
1✔
15
from typing import TYPE_CHECKING
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api.lambda_ import (
1✔
19
    InvalidParameterValueException,
20
    InvalidRequestContentException,
21
    InvocationType,
22
    LastUpdateStatus,
23
    NoPublishedVersionException,
24
    ResourceConflictException,
25
    ResourceNotFoundException,
26
    State,
27
)
28
from localstack.aws.connect import connect_to
1✔
29
from localstack.constants import AWS_REGION_US_EAST_1
1✔
30
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
31
from localstack.services.lambda_.analytics import (
1✔
32
    FunctionInitializationType,
33
    FunctionOperation,
34
    FunctionStatus,
35
    function_counter,
36
    hotreload_counter,
37
)
38
from localstack.services.lambda_.api_utils import (
1✔
39
    lambda_arn,
40
    qualified_lambda_arn,
41
    qualifier_is_alias,
42
)
43
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
44
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
45
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
46
from localstack.services.lambda_.invocation.lambda_models import (
1✔
47
    ArchiveCode,
48
    Function,
49
    FunctionVersion,
50
    HotReloadingCode,
51
    ImageCode,
52
    Invocation,
53
    InvocationResult,
54
    S3Code,
55
    UpdateStatus,
56
    VersionAlias,
57
    VersionState,
58
)
59
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1✔
60
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
61
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
62
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
63
from localstack.utils.container_utils.container_client import ContainerException
1✔
64
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
65
from localstack.utils.strings import short_uid, to_str
1✔
66

67
if TYPE_CHECKING:
1✔
68
    from mypy_boto3_s3 import S3Client
×
69

70
LOG = logging.getLogger(__name__)
1✔
71

72
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
73
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
74

75

76
class LambdaService:
1✔
77
    # mapping from qualified ARN to version manager
78
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
79
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
80
    # mapping from qualified ARN to event manager
81
    event_managers = dict[str, LambdaEventManager]
1✔
82
    lambda_version_manager_lock: RLock
1✔
83
    task_executor: Executor
1✔
84

85
    assignment_service: AssignmentService
1✔
86
    counting_service: CountingService
1✔
87

88
    def __init__(self) -> None:
1✔
89
        self.lambda_running_versions = {}
1✔
90
        self.lambda_starting_versions = {}
1✔
91
        self.event_managers = {}
1✔
92
        self.lambda_version_manager_lock = RLock()
1✔
93
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
94
        self.assignment_service = AssignmentService()
1✔
95
        self.counting_service = CountingService()
1✔
96

97
    def stop(self) -> None:
1✔
98
        """
99
        Stop the whole lambda service
100
        """
101
        shutdown_futures = []
1✔
102
        for event_manager in self.event_managers.values():
1✔
103
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
104
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
105
        for version_manager in self.lambda_running_versions.values():
1✔
106
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
107
        for version_manager in self.lambda_starting_versions.values():
1✔
108
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
109
            shutdown_futures.append(
×
110
                self.task_executor.submit(
111
                    version_manager.function_version.config.code.destroy_cached
112
                )
113
            )
114
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
115
        if not_done:
1✔
116
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
117
        self.task_executor.shutdown(cancel_futures=True)
1✔
118
        self.assignment_service.stop()
1✔
119

120
    def stop_version(self, qualified_arn: str) -> None:
1✔
121
        """
122
        Stops a specific lambda service version
123
        :param qualified_arn: Qualified arn for the version to stop
124
        """
125
        LOG.debug("Stopping version %s", qualified_arn)
1✔
126
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
127
        if not event_manager:
1✔
128
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
129
        else:
130
            self.task_executor.submit(event_manager.stop)
1✔
131
        version_manager = self.lambda_running_versions.pop(
1✔
132
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
133
        )
134
        if not version_manager:
1✔
135
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
136
        self.task_executor.submit(version_manager.stop)
1✔
137
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
138

139
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
140
        """
141
        Get the lambda version for the given arn
142
        :param function_arn: qualified arn for the lambda version
143
        :return: LambdaVersionManager for the arn
144
        """
145
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
146
        if not version_manager:
1✔
147
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
148

149
        return version_manager
1✔
150

151
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
152
        """
153
        Get the lambda event manager for the given arn
154
        :param function_arn: qualified arn for the lambda version
155
        :return: LambdaEventManager for the arn
156
        """
157
        event_manager = self.event_managers.get(function_arn)
1✔
158
        if not event_manager:
1✔
159
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
160

161
        return event_manager
1✔
162

163
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
164
        new_state = version_manager.start()
1✔
165
        self.update_version_state(
1✔
166
            function_version=version_manager.function_version, new_state=new_state
167
        )
168

169
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
170
        """
171
        Creates a new function version (manager), and puts it in the startup dict
172

173
        :param function_version: Function Version to create
174
        """
175
        with self.lambda_version_manager_lock:
1✔
176
            qualified_arn = function_version.id.qualified_arn()
1✔
177
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
178
            if version_manager:
1✔
179
                raise ResourceConflictException(
1✔
180
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
181
                    Type="User",
182
                )
183
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
184
            fn = state.functions.get(function_version.id.function_name)
1✔
185
            version_manager = LambdaVersionManager(
1✔
186
                function_arn=qualified_arn,
187
                function_version=function_version,
188
                function=fn,
189
                counting_service=self.counting_service,
190
                assignment_service=self.assignment_service,
191
            )
192
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
193
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
194
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
195

196
    def publish_version_async(self, function_version: FunctionVersion):
1✔
197
        self.task_executor.submit(self.publish_version, function_version)
×
198

199
    def delete_function_version_async(
1✔
200
        self, function: Function, version: FunctionVersion, qualifier: str
201
    ):
202
        """
203
        Simulates async function cleanup after function deletion API is called
204
        by introducing a small delay before actually removing the function from the store
205
        to allow for getting the function details after deletion.
206
        """
207

208
        def _cleanup():
×
209
            time.sleep(0.5)
×
210
            function.versions.pop(qualifier, None)
×
211

212
        new_state = VersionState(state=State.Deleting)
×
213
        new_last_status = UpdateStatus(status=LastUpdateStatus.InProgress)
×
214
        function.versions[version.id.qualifier] = dataclasses.replace(
×
215
            version,
216
            config=dataclasses.replace(
217
                version.config, state=new_state, last_update=new_last_status
218
            ),
219
        )
220
        destroy_code_if_not_used(code=version.config.code, function=function)
×
221

222
        self.task_executor.submit(_cleanup)
×
223

224
    def delete_function_async(self, store: LambdaStore, function_name: str):
1✔
225
        """
226
        Simulates async function version cleanup after function deletion API is called
227
        by introducing a small delay before actually removing the function from the store
228
        to allow for getting the function version details after deletion.
229
        """
230

231
        def _cleanup():
×
232
            time.sleep(0.5)
×
233
            store.functions.pop(function_name)
×
234

235
        # set each version of the function to deleting state first, to allow for getting the function version details after deletion
236
        function = store.functions.get(function_name)
×
237
        if function:
×
238
            for version in function.versions.values():
×
239
                new_state = VersionState(state=State.Deleting)
×
240
                new_last_status = UpdateStatus(status=LastUpdateStatus.InProgress)
×
241
                previous_revision_id = version.config.revision_id
×
242

243
                function.versions[version.id.qualifier] = dataclasses.replace(
×
244
                    version,
245
                    config=dataclasses.replace(
246
                        version.config, state=new_state, last_update=new_last_status
247
                    ),
248
                )
249
                # Seems the revision id doesn't change when deleting a function right after it has been created (even though state has changed)
250
                # reassign revision id to avoid dataclass replace removing it, since it's init=False
251
                object.__setattr__(
×
252
                    function.versions[version.id.qualifier].config,
253
                    "revision_id",
254
                    previous_revision_id,
255
                )
256

257
        self.task_executor.submit(_cleanup)
×
258

259
    def publish_version(self, function_version: FunctionVersion):
1✔
260
        """
261
        Synchronously create a function version (manager)
262
        Should only be called on publishing new versions, which basically clone an existing one.
263
        The new version needs to be added to the lambda store before invoking this.
264
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
265
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
266

267
        :param function_version: Function Version to create
268
        """
269
        # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
270
        # publish_version followed by get_function because transitioning LastUpdateStatus from InProgress to
271
        # Successful happens too fast on LocalStack (thanks to caching in prepare_version).
272
        # Without this hack, test_latest_published_update_config fails at get_function_response_postpublish
273
        # and test_lifecycle_invoke is flaky, sometimes not triggering the ResourceConflictException
274
        # Increasing this sleep too much (e.g., 10s) shouldn't cause any side effects apart from slow responsiveness
275
        if function_version.config.capacity_provider_config:
1✔
276
            time.sleep(0.1)
×
277
        with self.lambda_version_manager_lock:
1✔
278
            qualified_arn = function_version.id.qualified_arn()
1✔
279
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
280
            if version_manager:
1✔
281
                raise Exception(
×
282
                    "Version '%s' already starting up and in state %s",
283
                    qualified_arn,
284
                    version_manager.state,
285
                )
286
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
287
            fn = state.functions.get(function_version.id.function_name)
1✔
288
            version_manager = LambdaVersionManager(
1✔
289
                function_arn=qualified_arn,
290
                function_version=function_version,
291
                function=fn,
292
                counting_service=self.counting_service,
293
                assignment_service=self.assignment_service,
294
            )
295
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
296
        self._start_lambda_version(version_manager)
1✔
297

298
    # Commands
299
    def invoke(
1✔
300
        self,
301
        function_name: str,
302
        qualifier: str | None,
303
        region: str,
304
        account_id: str,
305
        invocation_type: InvocationType | None,
306
        client_context: str | None,
307
        request_id: str,
308
        payload: bytes | None,
309
        trace_context: dict | None = None,
310
        user_agent: str | None = None,
311
    ) -> InvocationResult | None:
312
        """
313
        Invokes a specific version of a lambda
314

315
        :param request_id: context request ID
316
        :param function_name: Function name
317
        :param qualifier: Function version qualifier
318
        :param region: Region of the function
319
        :param account_id: Account id of the function
320
        :param invocation_type: Invocation Type
321
        :param client_context: Client Context, if applicable
322
        :param trace_context: tracing information such as X-Ray header
323
        :param payload: Invocation payload
324
        :return: The invocation result
325
        """
326
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
327
        trace_context = trace_context or {}
1✔
328
        # Invoked arn (for lambda context) does not have qualifier if not supplied
329
        invoked_arn = lambda_arn(
1✔
330
            function_name=function_name,
331
            qualifier=qualifier,
332
            account=account_id,
333
            region=region,
334
        )
335
        state = lambda_stores[account_id][region]
1✔
336
        function = state.functions.get(function_name)
1✔
337

338
        if function is None:
1✔
339
            if not qualifier:
1✔
340
                invoked_arn += ":$LATEST"
1✔
341
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
342

343
        # A provided qualifier always takes precedence, but the default depends on whether $LATEST.PUBLISHED exists
344
        version_latest_published = function.versions.get("$LATEST.PUBLISHED")
1✔
345
        if version_latest_published:
1✔
346
            qualifier = qualifier or "$LATEST.PUBLISHED"
×
347
            invoked_arn = lambda_arn(
×
348
                function_name=function_name,
349
                qualifier=qualifier,
350
                account=account_id,
351
                region=region,
352
            )
353
        else:
354
            qualifier = qualifier or "$LATEST"
1✔
355

356
        if qualifier_is_alias(qualifier):
1✔
357
            alias = function.aliases.get(qualifier)
1✔
358
            if not alias:
1✔
359
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
360
            version_qualifier = alias.function_version
1✔
361
            if alias.routing_configuration:
1✔
362
                version, probability = next(
1✔
363
                    iter(alias.routing_configuration.version_weights.items())
364
                )
365
                if random.random() < probability:
1✔
366
                    version_qualifier = version
1✔
367
        else:
368
            version_qualifier = qualifier
1✔
369

370
        # Need the qualified arn to exactly get the target lambda
371
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
372
        version = function.versions.get(version_qualifier)
1✔
373
        if version is None:
1✔
374
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
×
375
        runtime = version.config.runtime or "n/a"
1✔
376
        package_type = version.config.package_type
1✔
377
        # Not considering provisioned concurrency for such early errors
378
        initialization_type = (
1✔
379
            FunctionInitializationType.lambda_managed_instances
380
            if version.config.capacity_provider_config
381
            else FunctionInitializationType.on_demand
382
        )
383
        if version.config.capacity_provider_config and qualifier == "$LATEST":
1✔
384
            if function.versions.get("$LATEST.PUBLISHED"):
×
385
                raise InvalidParameterValueException(
×
386
                    "Functions configured with capacity provider configuration can't be invoked with $LATEST qualifier. To invoke this function, specify a published version qualifier or $LATEST.PUBLISHED.",
387
                    Type="User",
388
                )
389
            else:
390
                raise NoPublishedVersionException(
×
391
                    "The function can't be invoked because no published version exists. For functions with capacity provider configuration, either publish a version to $LATEST.PUBLISHED, or specify a published version qualifier.",
392
                    Type="User",
393
                )
394
        try:
1✔
395
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
396
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
397
        except ValueError as e:
×
398
            state = version and version.config.state.state
×
399
            if state == State.Failed:
×
400
                status = FunctionStatus.failed_state_error
×
401
                HINT_LOG.error(
×
402
                    f"Failed to create the runtime executor for the function {function_name}. "
403
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
404
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
405
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
406
                )
407
            elif state == State.Pending:
×
408
                status = FunctionStatus.pending_state_error
×
409
                HINT_LOG.warning(
×
410
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
411
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
412
                    "Pending to Active using: "
413
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
414
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
415
                )
416
            else:
417
                status = FunctionStatus.unhandled_state_error
×
418
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
419
            function_counter.labels(
×
420
                operation=FunctionOperation.invoke,
421
                runtime=runtime,
422
                status=status,
423
                invocation_type=invocation_type,
424
                package_type=package_type,
425
                initialization_type=initialization_type,
426
            ).increment()
427
            raise ResourceConflictException(
×
428
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
429
            ) from e
430
        # empty payloads have to work as well
431
        if payload is None:
1✔
432
            payload = b"{}"
1✔
433
        else:
434
            # detect invalid payloads early before creating an execution environment
435
            try:
1✔
436
                to_str(payload)
1✔
437
            except Exception as e:
1✔
438
                function_counter.labels(
1✔
439
                    operation=FunctionOperation.invoke,
440
                    runtime=runtime,
441
                    status=FunctionStatus.invalid_payload_error,
442
                    invocation_type=invocation_type,
443
                    package_type=package_type,
444
                    initialization_type=initialization_type,
445
                ).increment()
446
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
447
                raise InvalidRequestContentException(
1✔
448
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
449
                    Type="User",
450
                )
451
        if invocation_type is None:
1✔
452
            invocation_type = InvocationType.RequestResponse
1✔
453
        if invocation_type == InvocationType.DryRun:
1✔
454
            return None
1✔
455
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
456
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
457
        #
458
        if invocation_type == InvocationType.Event:
1✔
459
            return event_manager.enqueue_event(
1✔
460
                invocation=Invocation(
461
                    payload=payload,
462
                    invoked_arn=invoked_arn,
463
                    client_context=client_context,
464
                    invocation_type=invocation_type,
465
                    invoke_time=datetime.now(),
466
                    request_id=request_id,
467
                    trace_context=trace_context,
468
                    user_agent=user_agent,
469
                )
470
            )
471

472
        invocation_result = version_manager.invoke(
1✔
473
            invocation=Invocation(
474
                payload=payload,
475
                invoked_arn=invoked_arn,
476
                client_context=client_context,
477
                invocation_type=invocation_type,
478
                invoke_time=datetime.now(),
479
                request_id=request_id,
480
                trace_context=trace_context,
481
                user_agent=user_agent,
482
            )
483
        )
484
        status = (
1✔
485
            FunctionStatus.invocation_error
486
            if invocation_result.is_error
487
            else FunctionStatus.success
488
        )
489
        # TODO: handle initialization_type provisioned-concurrency, requires enriching invocation_result
490
        function_counter.labels(
1✔
491
            operation=FunctionOperation.invoke,
492
            runtime=runtime,
493
            status=status,
494
            invocation_type=invocation_type,
495
            package_type=package_type,
496
            initialization_type=initialization_type,
497
        ).increment()
498
        return invocation_result
1✔
499

500
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
501
        """
502
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
503
        to be invoked
504

505
        :param new_version: New version (with the same qualifier as an older one)
506
        """
507
        if new_version.config.capacity_provider_config:
1✔
508
            # simulate AWS behavior with a slight delay after update_function_configuration,
509
            # so we can observe LastUpdateStatus transitioning to InProgress before it becomes Successful
510
            time.sleep(0.5)
×
511

512
        if (
1✔
513
            new_version.qualified_arn not in self.lambda_running_versions
514
            and not new_version.config.capacity_provider_config
515
        ):
516
            raise ValueError(
×
517
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
518
            )
519

520
        return self.create_function_version(function_version=new_version)
1✔
521

522
    def update_version_state(
1✔
523
        self, function_version: FunctionVersion, new_state: VersionState
524
    ) -> None:
525
        """
526
        Update the version state for the given function version.
527

528
        This will perform a rollover to the given function if the new state is active and there is a previously
529
        running version registered. The old version will be shutdown and its code deleted.
530

531
        If the new state is failed, it will abort the update and mark it as failed.
532
        If an older version is still running, it will keep running.
533

534
        :param function_version: Version reporting the state
535
        :param new_state: New state
536
        """
537
        function_arn = function_version.qualified_arn
1✔
538
        try:
1✔
539
            old_version = None
1✔
540
            old_event_manager = None
1✔
541
            with self.lambda_version_manager_lock:
1✔
542
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
543
                if not new_version_manager:
1✔
544
                    raise ValueError(
×
545
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
546
                    )
547
                if new_state.state == State.Active:
1✔
548
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
549
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
550
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
551
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
552
                        version_manager=new_version_manager
553
                    )
554
                    self.event_managers[function_arn].start()
1✔
555
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
556
                elif new_state.state == State.Failed:
×
557
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
558
                    self.task_executor.submit(new_version_manager.stop)
×
559
                elif (
×
560
                    new_state.state == State.ActiveNonInvocable
561
                    and function_version.config.capacity_provider_config
562
                ):
563
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
×
564
                else:
565
                    # TODO what to do if state pending or inactive is supported?
566
                    self.task_executor.submit(new_version_manager.stop)
×
567
                    LOG.error(
×
568
                        "State %s for version %s should not have been reported. New version will be stopped.",
569
                        new_state,
570
                        function_arn,
571
                    )
572
                    return
×
573

574
            # TODO is it necessary to get the version again? Should be locked for modification anyway
575
            # Without updating the new state, the function would not change to active, last_update would be missing, and
576
            # the revision id would not be updated.
577
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
578
            # FIXME this will fail if the function is deleted during this code lines here
579
            function = state.functions.get(function_version.id.function_name)
1✔
580
            if old_event_manager:
1✔
581
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
582
            if old_version:
1✔
583
                # if there is an old version, we assume it is an update, and stop the old one
584
                self.task_executor.submit(old_version.stop)
1✔
585
                if function:
1✔
586
                    self.task_executor.submit(
1✔
587
                        destroy_code_if_not_used, old_version.function_version.config.code, function
588
                    )
589
            if not function:
1✔
590
                LOG.debug("Function %s was deleted during status update", function_arn)
×
591
                return
×
592
            current_version = function.versions[function_version.id.qualifier]
1✔
593
            new_version_manager.state = new_state
1✔
594
            new_version_state = dataclasses.replace(
1✔
595
                current_version,
596
                config=dataclasses.replace(
597
                    current_version.config, state=new_state, last_update=update_status
598
                ),
599
            )
600
            state.functions[function_version.id.function_name].versions[
1✔
601
                function_version.id.qualifier
602
            ] = new_version_state
603

604
        except Exception:
×
605
            LOG.error(
×
606
                "Failed to update function version for arn %s",
607
                function_arn,
608
                exc_info=LOG.isEnabledFor(logging.DEBUG),
609
            )
610

611
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
612
        # if pointer changed, need to restart provisioned
613
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
614
            old_alias.name
615
        )
616
        if (
1✔
617
            old_alias.function_version != new_alias.function_version
618
            and provisioned_concurrency_config is not None
619
        ):
620
            LOG.warning("Deprovisioning")
×
621
            fn_version_old = function.versions.get(old_alias.function_version)
×
622
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
623
            fn_version_new = function.versions.get(new_alias.function_version)
×
624
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
625

626
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
627
            # TODO: make this fully async
628
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
629
            vm_new.update_provisioned_concurrency_config(
×
630
                provisioned_concurrency_config.provisioned_concurrent_executions
631
            )  # async again
632

633
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
634
        """
635
        Checks whether lambda can assume the given role.
636
        This _should_ only fail if IAM enforcement is enabled.
637

638
        :param role_arn: Role to assume
639
        :return: True if the role can be assumed by lambda, false otherwise
640
        """
641
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
642
        try:
1✔
643
            sts_client.assume_role(
1✔
644
                RoleArn=role_arn,
645
                RoleSessionName=f"test-assume-{short_uid()}",
646
                DurationSeconds=900,
647
            )
648
            return True
1✔
649
        except Exception as e:
×
650
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
651
            return False
×
652

653

654
# TODO: Move helper functions out of lambda_service into a separate module
655

656

657
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
658
    """
659
    Check if given code is still used in some version of the function
660

661
    :param code: Code object
662
    :param function: function to check
663
    :return: bool whether code is used in another version of the function
664
    """
665
    with function.lock:
1✔
666
        return any(code == version.config.code for version in function.versions.values())
1✔
667

668

669
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
670
    """
671
    Destroy the given code if it is not used in some version of the function
672
    Do nothing otherwise
673

674
    :param code: Code object
675
    :param function: Function the code belongs too
676
    """
677
    with function.lock:
1✔
678
        if not is_code_used(code, function):
1✔
679
            code.destroy()
1✔
680

681

682
def store_lambda_archive(
1✔
683
    archive_file: bytes, function_name: str, region_name: str, account_id: str
684
) -> S3Code:
685
    """
686
    Stores the given lambda archive in an internal s3 bucket.
687
    Also checks if zipfile matches the specifications
688

689
    :param archive_file: Archive file to store
690
    :param function_name: function name the archive should be stored for
691
    :param region_name: region name the archive should be stored for
692
    :param account_id: account id the archive should be stored for
693
    :return: S3 Code object representing the archive stored in S3
694
    """
695
    # check if zip file
696
    if not is_zip_file(archive_file):
1✔
697
        raise InvalidParameterValueException(
1✔
698
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
699
            Type="User",
700
        )
701
    # check unzipped size
702
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
703
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
704
        raise InvalidParameterValueException(
1✔
705
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
706
            Type="User",
707
        )
708
    # store all buckets in us-east-1 for now
709
    s3_client = connect_to(
1✔
710
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
711
    ).s3
712
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
713
    # s3 create bucket is idempotent in us-east-1
714
    s3_client.create_bucket(Bucket=bucket_name)
1✔
715
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
716
    key = f"snapshots/{account_id}/{code_id}"
1✔
717
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
718
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
719
    return S3Code(
1✔
720
        id=code_id,
721
        account_id=account_id,
722
        s3_bucket=bucket_name,
723
        s3_key=key,
724
        s3_object_version=None,
725
        code_sha256=code_sha256,
726
        code_size=len(archive_file),
727
    )
728

729

730
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
731
    """
732
    Check whether a given path, after environment variable substitution, is an absolute path.
733
    Accepts either posix or windows paths, with environment placeholders.
734
    Example placeholders: $ENV_VAR, ${ENV_VAR}
735

736
    :param path: Posix or windows path, potentially containing environment variable placeholders.
737
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
738
    """
739
    # expand variables in path before checking for an absolute path
740
    expanded_path = os.path.expandvars(path)
1✔
741
    if (
1✔
742
        not PurePosixPath(expanded_path).is_absolute()
743
        and not PureWindowsPath(expanded_path).is_absolute()
744
    ):
745
        raise InvalidParameterValueException(
1✔
746
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
747
        )
748

749

750
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
751
    assert_hot_reloading_path_absolute(path)
1✔
752
    return HotReloadingCode(host_path=path)
1✔
753

754

755
def store_s3_bucket_archive(
1✔
756
    archive_bucket: str,
757
    archive_key: str,
758
    archive_version: str | None,
759
    function_name: str,
760
    region_name: str,
761
    account_id: str,
762
) -> ArchiveCode:
763
    """
764
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
765

766
    :param archive_bucket: Bucket the archive is stored in
767
    :param archive_key: Key the archive is stored under
768
    :param archive_version: Version of the archive object in the bucket
769
    :param function_name: function name the archive should be stored for
770
    :param region_name: region name the archive should be stored for
771
    :param account_id: account id the archive should be stored for
772
    :return: S3 Code object representing the archive stored in S3
773
    """
774
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
775
        hotreload_counter.labels(operation="create").increment()
1✔
776
        return create_hot_reloading_code(path=archive_key)
1✔
777
    s3_client: S3Client = connect_to().s3
1✔
778
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
779
    try:
1✔
780
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
781
            "Body"
782
        ].read()
783
    except s3_client.exceptions.ClientError as e:
1✔
784
        raise InvalidParameterValueException(
1✔
785
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
786
            Type="User",
787
        )
788
    return store_lambda_archive(
1✔
789
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
790
    )
791

792

793
def create_image_code(image_uri: str) -> ImageCode:
1✔
794
    """
795
    Creates an image code by inspecting the provided image
796

797
    :param image_uri: Image URI of the image to inspect
798
    :return: Image code object
799
    """
800
    code_sha256 = "<cannot-get-image-hash>"
1✔
801
    if CONTAINER_CLIENT.has_docker():
1✔
802
        try:
1✔
803
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
804
        except ContainerException:
1✔
805
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
806
        try:
1✔
807
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
808
                0
809
            ].rpartition(":")[2]
810
        except Exception as e:
×
811
            LOG.debug(
×
812
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
813
            )
814
    else:
815
        LOG.warning(
×
816
            "Unable to get image hash for image %s - no docker socket available."
817
            "Image hash returned by Lambda will not be correct.",
818
            image_uri,
819
        )
820
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc