• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22209548116

19 Feb 2026 02:08PM UTC coverage: 86.964% (-0.04%) from 87.003%
22209548116

push

github

web-flow
Logs: fix snapshot region from tests (#13792)

69755 of 80211 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.47
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import time
1✔
9
import uuid
1✔
10
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
11
from datetime import datetime
1✔
12
from hashlib import sha256
1✔
13
from pathlib import PurePosixPath, PureWindowsPath
1✔
14
from threading import RLock
1✔
15
from typing import TYPE_CHECKING
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api.lambda_ import (
1✔
19
    InvalidParameterValueException,
20
    InvalidRequestContentException,
21
    InvocationType,
22
    LastUpdateStatus,
23
    NoPublishedVersionException,
24
    ResourceConflictException,
25
    ResourceNotFoundException,
26
    State,
27
)
28
from localstack.aws.connect import connect_to
1✔
29
from localstack.constants import AWS_REGION_US_EAST_1
1✔
30
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
31
from localstack.services.lambda_.analytics import (
1✔
32
    FunctionInitializationType,
33
    FunctionOperation,
34
    FunctionStatus,
35
    function_counter,
36
    hotreload_counter,
37
)
38
from localstack.services.lambda_.api_utils import (
1✔
39
    lambda_arn,
40
    qualified_lambda_arn,
41
    qualifier_is_alias,
42
)
43
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
44
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
45
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
46
from localstack.services.lambda_.invocation.lambda_models import (
1✔
47
    ArchiveCode,
48
    Function,
49
    FunctionVersion,
50
    HotReloadingCode,
51
    ImageCode,
52
    Invocation,
53
    InvocationResult,
54
    S3Code,
55
    UpdateStatus,
56
    VersionAlias,
57
    VersionState,
58
)
59
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1✔
60
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
61
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
62
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
63
from localstack.utils.container_utils.container_client import ContainerException
1✔
64
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
65
from localstack.utils.strings import short_uid, to_str
1✔
66

67
if TYPE_CHECKING:
1✔
68
    from mypy_boto3_s3 import S3Client
×
69

70
LOG = logging.getLogger(__name__)
1✔
71

72
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
73
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
74

75

76
class LambdaService:
1✔
77
    # mapping from qualified ARN to version manager
78
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
79
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
80
    # mapping from qualified ARN to event manager
81
    event_managers = dict[str, LambdaEventManager]
1✔
82
    lambda_version_manager_lock: RLock
1✔
83
    task_executor: Executor
1✔
84

85
    assignment_service: AssignmentService
1✔
86
    counting_service: CountingService
1✔
87

88
    def __init__(self) -> None:
1✔
89
        self.lambda_running_versions = {}
1✔
90
        self.lambda_starting_versions = {}
1✔
91
        self.event_managers = {}
1✔
92
        self.lambda_version_manager_lock = RLock()
1✔
93
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
94
        self.assignment_service = AssignmentService()
1✔
95
        self.counting_service = CountingService()
1✔
96

97
    def stop(self) -> None:
1✔
98
        """
99
        Stop the whole lambda service
100
        """
101
        shutdown_futures = []
1✔
102
        for event_manager in self.event_managers.values():
1✔
103
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
104
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
105
        for version_manager in self.lambda_running_versions.values():
1✔
106
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
107
        for version_manager in self.lambda_starting_versions.values():
1✔
108
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
109
            shutdown_futures.append(
×
110
                self.task_executor.submit(
111
                    version_manager.function_version.config.code.destroy_cached
112
                )
113
            )
114
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
115
        if not_done:
1✔
116
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
117
        self.task_executor.shutdown(cancel_futures=True)
1✔
118
        self.assignment_service.stop()
1✔
119

120
    def stop_version(self, qualified_arn: str) -> None:
1✔
121
        """
122
        Stops a specific lambda service version
123
        :param qualified_arn: Qualified arn for the version to stop
124
        """
125
        LOG.debug("Stopping version %s", qualified_arn)
1✔
126
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
127
        if not event_manager:
1✔
128
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
129
        else:
130
            self.task_executor.submit(event_manager.stop)
1✔
131
        version_manager = self.lambda_running_versions.pop(
1✔
132
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
133
        )
134
        if not version_manager:
1✔
135
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
136
        self.task_executor.submit(version_manager.stop)
1✔
137
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
138

139
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
140
        """
141
        Get the lambda version for the given arn
142
        :param function_arn: qualified arn for the lambda version
143
        :return: LambdaVersionManager for the arn
144
        """
145
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
146
        if not version_manager:
1✔
147
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
148

149
        return version_manager
1✔
150

151
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
152
        """
153
        Get the lambda event manager for the given arn
154
        :param function_arn: qualified arn for the lambda version
155
        :return: LambdaEventManager for the arn
156
        """
157
        event_manager = self.event_managers.get(function_arn)
1✔
158
        if not event_manager:
1✔
159
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
160

161
        return event_manager
1✔
162

163
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
164
        new_state = version_manager.start()
1✔
165
        self.update_version_state(
1✔
166
            function_version=version_manager.function_version, new_state=new_state
167
        )
168

169
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
170
        """
171
        Creates a new function version (manager), and puts it in the startup dict
172

173
        :param function_version: Function Version to create
174
        """
175
        with self.lambda_version_manager_lock:
1✔
176
            qualified_arn = function_version.id.qualified_arn()
1✔
177
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
178
            if version_manager:
1✔
179
                raise ResourceConflictException(
1✔
180
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
181
                    Type="User",
182
                )
183
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
184
            fn = state.functions.get(function_version.id.function_name)
1✔
185
            version_manager = LambdaVersionManager(
1✔
186
                function_arn=qualified_arn,
187
                function_version=function_version,
188
                function=fn,
189
                counting_service=self.counting_service,
190
                assignment_service=self.assignment_service,
191
            )
192
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
193
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
194
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
195

196
    def publish_version_async(self, function_version: FunctionVersion):
1✔
197
        self.task_executor.submit(self.publish_version, function_version)
×
198

199
    def delete_function_version_async(
1✔
200
        self, function: Function, version: FunctionVersion, qualifier: str
201
    ):
202
        """
203
        Simulates async function cleanup after function deletion API is called
204
        by introducing a small delay before actually removing the function from the store
205
        to allow for getting the function details after deletion.
206
        """
207

208
        def _cleanup():
×
209
            time.sleep(0.5)
×
210
            function.versions.pop(qualifier, None)
×
211

212
        new_state = VersionState(state=State.Deleting)
×
213
        new_last_status = UpdateStatus(status=LastUpdateStatus.InProgress)
×
214
        function.versions[version.id.qualifier] = dataclasses.replace(
×
215
            version,
216
            config=dataclasses.replace(
217
                version.config, state=new_state, last_update=new_last_status
218
            ),
219
        )
220
        destroy_code_if_not_used(code=version.config.code, function=function)
×
221

222
        self.task_executor.submit(_cleanup)
×
223

224
    def delete_function_async(self, store: LambdaStore, function_name: str):
1✔
225
        """
226
        Simulates async function version cleanup after function deletion API is called
227
        by introducing a small delay before actually removing the function from the store
228
        to allow for getting the function version details after deletion.
229
        """
230

231
        def _cleanup():
×
232
            time.sleep(0.5)
×
233
            store.functions.pop(function_name)
×
234

235
        # set each version of the function to deleting state first, to allow for getting the function version details after deletion
236
        function = store.functions.get(function_name)
×
237
        if function:
×
238
            for version in function.versions.values():
×
239
                new_state = VersionState(state=State.Deleting)
×
240
                new_last_status = UpdateStatus(status=LastUpdateStatus.InProgress)
×
241
                previous_revision_id = version.config.revision_id
×
242

243
                function.versions[version.id.qualifier] = dataclasses.replace(
×
244
                    version,
245
                    config=dataclasses.replace(
246
                        version.config, state=new_state, last_update=new_last_status
247
                    ),
248
                )
249
                # Seems the revision id doesn't change when deleting a function right after it has been created (even though state has changed)
250
                # reassign revision id to avoid dataclass replace removing it, since it's init=False
251
                object.__setattr__(
×
252
                    function.versions[version.id.qualifier].config,
253
                    "revision_id",
254
                    previous_revision_id,
255
                )
256

257
        self.task_executor.submit(_cleanup)
×
258

259
    def publish_version(self, function_version: FunctionVersion):
1✔
260
        """
261
        Synchronously create a function version (manager)
262
        Should only be called on publishing new versions, which basically clone an existing one.
263
        The new version needs to be added to the lambda store before invoking this.
264
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
265
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
266

267
        :param function_version: Function Version to create
268
        """
269
        # HACK: trying to match the AWS timing behavior of Lambda Managed Instances for the operation
270
        # publish_version followed by get_function because transitioning LastUpdateStatus from InProgress to
271
        # Successful happens too fast on LocalStack (thanks to caching in prepare_version).
272
        # Without this hack, test_latest_published_update_config fails at get_function_response_postpublish
273
        # and test_lifecycle_invoke is flaky, sometimes not triggering the ResourceConflictException
274
        # Increasing this sleep too much (e.g., 10s) shouldn't cause any side effects apart from slow responsiveness
275
        if function_version.config.capacity_provider_config:
1✔
276
            time.sleep(0.1)
×
277
        with self.lambda_version_manager_lock:
1✔
278
            qualified_arn = function_version.id.qualified_arn()
1✔
279
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
280
            if version_manager:
1✔
281
                raise Exception(
×
282
                    "Version '%s' already starting up and in state %s",
283
                    qualified_arn,
284
                    version_manager.state,
285
                )
286
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
287
            fn = state.functions.get(function_version.id.function_name)
1✔
288
            version_manager = LambdaVersionManager(
1✔
289
                function_arn=qualified_arn,
290
                function_version=function_version,
291
                function=fn,
292
                counting_service=self.counting_service,
293
                assignment_service=self.assignment_service,
294
            )
295
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
296
        self._start_lambda_version(version_manager)
1✔
297

298
    # Commands
299
    def invoke(
1✔
300
        self,
301
        function_name: str,
302
        qualifier: str | None,
303
        region: str,
304
        account_id: str,
305
        invocation_type: InvocationType | None,
306
        client_context: str | None,
307
        request_id: str,
308
        payload: bytes | None,
309
        trace_context: dict | None = None,
310
        user_agent: str | None = None,
311
    ) -> InvocationResult | None:
312
        """
313
        Invokes a specific version of a lambda
314

315
        :param request_id: context request ID
316
        :param function_name: Function name
317
        :param qualifier: Function version qualifier
318
        :param region: Region of the function
319
        :param account_id: Account id of the function
320
        :param invocation_type: Invocation Type
321
        :param client_context: Client Context, if applicable
322
        :param trace_context: tracing information such as X-Ray header
323
        :param payload: Invocation payload
324
        :return: The invocation result
325
        """
326
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
327
        trace_context = trace_context or {}
1✔
328
        # Invoked arn (for lambda context) does not have qualifier if not supplied
329
        invoked_arn = lambda_arn(
1✔
330
            function_name=function_name,
331
            qualifier=qualifier,
332
            account=account_id,
333
            region=region,
334
        )
335
        state = lambda_stores[account_id][region]
1✔
336
        function = state.functions.get(function_name)
1✔
337

338
        if function is None:
1✔
339
            if not qualifier:
1✔
340
                invoked_arn += ":$LATEST"
1✔
341
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
342

343
        # A provided qualifier always takes precedence, but the default depends on whether $LATEST.PUBLISHED exists
344
        version_latest_published = function.versions.get("$LATEST.PUBLISHED")
1✔
345
        if version_latest_published:
1✔
346
            qualifier = qualifier or "$LATEST.PUBLISHED"
×
347
            invoked_arn = lambda_arn(
×
348
                function_name=function_name,
349
                qualifier=qualifier,
350
                account=account_id,
351
                region=region,
352
            )
353
        else:
354
            qualifier = qualifier or "$LATEST"
1✔
355

356
        if qualifier_is_alias(qualifier):
1✔
357
            alias = function.aliases.get(qualifier)
1✔
358
            if not alias:
1✔
359
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
360
            version_qualifier = alias.function_version
1✔
361
            if alias.routing_configuration:
1✔
362
                version, probability = next(
1✔
363
                    iter(alias.routing_configuration.version_weights.items())
364
                )
365
                if random.random() < probability:
1✔
366
                    version_qualifier = version
1✔
367
        else:
368
            version_qualifier = qualifier
1✔
369

370
        # Need the qualified arn to exactly get the target lambda
371
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
372
        version = function.versions.get(version_qualifier)
1✔
373
        if version is None:
1✔
374
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
×
375
        runtime = version.config.runtime or "n/a"
1✔
376
        package_type = version.config.package_type
1✔
377
        # Not considering provisioned concurrency for such early errors
378
        initialization_type = (
1✔
379
            FunctionInitializationType.lambda_managed_instances
380
            if version.config.capacity_provider_config
381
            else FunctionInitializationType.on_demand
382
        )
383
        if version.config.capacity_provider_config and qualifier == "$LATEST":
1✔
384
            if function.versions.get("$LATEST.PUBLISHED"):
×
385
                raise InvalidParameterValueException(
×
386
                    "Functions configured with capacity provider configuration can't be invoked with $LATEST qualifier. To invoke this function, specify a published version qualifier or $LATEST.PUBLISHED.",
387
                    Type="User",
388
                )
389
            else:
390
                raise NoPublishedVersionException(
×
391
                    "The function can't be invoked because no published version exists. For functions with capacity provider configuration, either publish a version to $LATEST.PUBLISHED, or specify a published version qualifier.",
392
                    Type="User",
393
                )
394
        try:
1✔
395
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
396
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
397
        except ValueError as e:
×
398
            state = version and version.config.state.state
×
399
            if state == State.Failed:
×
400
                status = FunctionStatus.failed_state_error
×
401
                HINT_LOG.error(
×
402
                    f"Failed to create the runtime executor for the function {function_name}. "
403
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
404
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
405
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
406
                )
407
            elif state == State.Pending:
×
408
                status = FunctionStatus.pending_state_error
×
409
                HINT_LOG.warning(
×
410
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
411
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
412
                    "Pending to Active using: "
413
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
414
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
415
                )
416
            else:
417
                status = FunctionStatus.unhandled_state_error
×
418
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
419
            function_counter.labels(
×
420
                operation=FunctionOperation.invoke,
421
                runtime=runtime,
422
                status=status,
423
                invocation_type=invocation_type,
424
                package_type=package_type,
425
                initialization_type=initialization_type,
426
            ).increment()
427
            raise ResourceConflictException(
×
428
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
429
            ) from e
430
        # empty payloads have to work as well
431
        if payload is None:
1✔
432
            payload = b"{}"
1✔
433
        else:
434
            # detect invalid payloads early before creating an execution environment
435
            try:
1✔
436
                to_str(payload)
1✔
437
            except Exception as e:
1✔
438
                function_counter.labels(
1✔
439
                    operation=FunctionOperation.invoke,
440
                    runtime=runtime,
441
                    status=FunctionStatus.invalid_payload_error,
442
                    invocation_type=invocation_type,
443
                    package_type=package_type,
444
                    initialization_type=initialization_type,
445
                ).increment()
446
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
447
                raise InvalidRequestContentException(
1✔
448
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
449
                    Type="User",
450
                )
451
        if invocation_type is None:
1✔
452
            invocation_type = InvocationType.RequestResponse
1✔
453
        if invocation_type == InvocationType.DryRun:
1✔
454
            return None
1✔
455
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
456
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
457
        #
458
        if invocation_type == InvocationType.Event:
1✔
459
            return event_manager.enqueue_event(
1✔
460
                invocation=Invocation(
461
                    payload=payload,
462
                    invoked_arn=invoked_arn,
463
                    client_context=client_context,
464
                    invocation_type=invocation_type,
465
                    invoke_time=datetime.now(),
466
                    request_id=request_id,
467
                    trace_context=trace_context,
468
                    user_agent=user_agent,
469
                )
470
            )
471

472
        invocation_result = version_manager.invoke(
1✔
473
            invocation=Invocation(
474
                payload=payload,
475
                invoked_arn=invoked_arn,
476
                client_context=client_context,
477
                invocation_type=invocation_type,
478
                invoke_time=datetime.now(),
479
                request_id=request_id,
480
                trace_context=trace_context,
481
                user_agent=user_agent,
482
            )
483
        )
484
        status = (
1✔
485
            FunctionStatus.invocation_error
486
            if invocation_result.is_error
487
            else FunctionStatus.success
488
        )
489
        # TODO: handle initialization_type provisioned-concurrency, requires enriching invocation_result
490
        function_counter.labels(
1✔
491
            operation=FunctionOperation.invoke,
492
            runtime=runtime,
493
            status=status,
494
            invocation_type=invocation_type,
495
            package_type=package_type,
496
            initialization_type=initialization_type,
497
        ).increment()
498
        return invocation_result
1✔
499

500
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
501
        """
502
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
503
        to be invoked
504

505
        :param new_version: New version (with the same qualifier as an older one)
506
        """
507
        if (
1✔
508
            new_version.qualified_arn not in self.lambda_running_versions
509
            and not new_version.config.capacity_provider_config
510
        ):
511
            raise ValueError(
×
512
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
513
            )
514

515
        return self.create_function_version(function_version=new_version)
1✔
516

517
    def update_version_state(
1✔
518
        self, function_version: FunctionVersion, new_state: VersionState
519
    ) -> None:
520
        """
521
        Update the version state for the given function version.
522

523
        This will perform a rollover to the given function if the new state is active and there is a previously
524
        running version registered. The old version will be shutdown and its code deleted.
525

526
        If the new state is failed, it will abort the update and mark it as failed.
527
        If an older version is still running, it will keep running.
528

529
        :param function_version: Version reporting the state
530
        :param new_state: New state
531
        """
532
        function_arn = function_version.qualified_arn
1✔
533
        try:
1✔
534
            old_version = None
1✔
535
            old_event_manager = None
1✔
536
            with self.lambda_version_manager_lock:
1✔
537
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
538
                if not new_version_manager:
1✔
539
                    raise ValueError(
×
540
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
541
                    )
542
                if new_state.state == State.Active:
1✔
543
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
544
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
545
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
546
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
547
                        version_manager=new_version_manager
548
                    )
549
                    self.event_managers[function_arn].start()
1✔
550
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
551
                elif new_state.state == State.Failed:
×
552
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
553
                    self.task_executor.submit(new_version_manager.stop)
×
554
                elif (
×
555
                    new_state.state == State.ActiveNonInvocable
556
                    and function_version.config.capacity_provider_config
557
                ):
558
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
×
559
                else:
560
                    # TODO what to do if state pending or inactive is supported?
561
                    self.task_executor.submit(new_version_manager.stop)
×
562
                    LOG.error(
×
563
                        "State %s for version %s should not have been reported. New version will be stopped.",
564
                        new_state,
565
                        function_arn,
566
                    )
567
                    return
×
568

569
            # TODO is it necessary to get the version again? Should be locked for modification anyway
570
            # Without updating the new state, the function would not change to active, last_update would be missing, and
571
            # the revision id would not be updated.
572
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
573
            # FIXME this will fail if the function is deleted during this code lines here
574
            function = state.functions.get(function_version.id.function_name)
1✔
575
            if old_event_manager:
1✔
576
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
577
            if old_version:
1✔
578
                # if there is an old version, we assume it is an update, and stop the old one
579
                self.task_executor.submit(old_version.stop)
1✔
580
                if function:
1✔
581
                    self.task_executor.submit(
1✔
582
                        destroy_code_if_not_used, old_version.function_version.config.code, function
583
                    )
584
            if not function:
1✔
585
                LOG.debug("Function %s was deleted during status update", function_arn)
×
586
                return
×
587
            current_version = function.versions[function_version.id.qualifier]
1✔
588
            new_version_manager.state = new_state
1✔
589
            new_version_state = dataclasses.replace(
1✔
590
                current_version,
591
                config=dataclasses.replace(
592
                    current_version.config, state=new_state, last_update=update_status
593
                ),
594
            )
595
            state.functions[function_version.id.function_name].versions[
1✔
596
                function_version.id.qualifier
597
            ] = new_version_state
598

599
        except Exception:
×
600
            LOG.error(
×
601
                "Failed to update function version for arn %s",
602
                function_arn,
603
                exc_info=LOG.isEnabledFor(logging.DEBUG),
604
            )
605

606
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
607
        # if pointer changed, need to restart provisioned
608
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
609
            old_alias.name
610
        )
611
        if (
1✔
612
            old_alias.function_version != new_alias.function_version
613
            and provisioned_concurrency_config is not None
614
        ):
615
            LOG.warning("Deprovisioning")
×
616
            fn_version_old = function.versions.get(old_alias.function_version)
×
617
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
618
            fn_version_new = function.versions.get(new_alias.function_version)
×
619
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
620

621
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
622
            # TODO: make this fully async
623
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
624
            vm_new.update_provisioned_concurrency_config(
×
625
                provisioned_concurrency_config.provisioned_concurrent_executions
626
            )  # async again
627

628
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
629
        """
630
        Checks whether lambda can assume the given role.
631
        This _should_ only fail if IAM enforcement is enabled.
632

633
        :param role_arn: Role to assume
634
        :return: True if the role can be assumed by lambda, false otherwise
635
        """
636
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
637
        try:
1✔
638
            sts_client.assume_role(
1✔
639
                RoleArn=role_arn,
640
                RoleSessionName=f"test-assume-{short_uid()}",
641
                DurationSeconds=900,
642
            )
643
            return True
1✔
644
        except Exception as e:
×
645
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
646
            return False
×
647

648

649
# TODO: Move helper functions out of lambda_service into a separate module
650

651

652
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
653
    """
654
    Check if given code is still used in some version of the function
655

656
    :param code: Code object
657
    :param function: function to check
658
    :return: bool whether code is used in another version of the function
659
    """
660
    with function.lock:
1✔
661
        return any(code == version.config.code for version in function.versions.values())
1✔
662

663

664
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
665
    """
666
    Destroy the given code if it is not used in some version of the function
667
    Do nothing otherwise
668

669
    :param code: Code object
670
    :param function: Function the code belongs too
671
    """
672
    with function.lock:
1✔
673
        if not is_code_used(code, function):
1✔
674
            code.destroy()
1✔
675

676

677
def store_lambda_archive(
1✔
678
    archive_file: bytes, function_name: str, region_name: str, account_id: str
679
) -> S3Code:
680
    """
681
    Stores the given lambda archive in an internal s3 bucket.
682
    Also checks if zipfile matches the specifications
683

684
    :param archive_file: Archive file to store
685
    :param function_name: function name the archive should be stored for
686
    :param region_name: region name the archive should be stored for
687
    :param account_id: account id the archive should be stored for
688
    :return: S3 Code object representing the archive stored in S3
689
    """
690
    # check if zip file
691
    if not is_zip_file(archive_file):
1✔
692
        raise InvalidParameterValueException(
1✔
693
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
694
            Type="User",
695
        )
696
    # check unzipped size
697
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
698
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
699
        raise InvalidParameterValueException(
1✔
700
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
701
            Type="User",
702
        )
703
    # store all buckets in us-east-1 for now
704
    s3_client = connect_to(
1✔
705
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
706
    ).s3
707
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
708
    # s3 create bucket is idempotent in us-east-1
709
    s3_client.create_bucket(Bucket=bucket_name)
1✔
710
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
711
    key = f"snapshots/{account_id}/{code_id}"
1✔
712
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
713
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
714
    return S3Code(
1✔
715
        id=code_id,
716
        account_id=account_id,
717
        s3_bucket=bucket_name,
718
        s3_key=key,
719
        s3_object_version=None,
720
        code_sha256=code_sha256,
721
        code_size=len(archive_file),
722
    )
723

724

725
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
726
    """
727
    Check whether a given path, after environment variable substitution, is an absolute path.
728
    Accepts either posix or windows paths, with environment placeholders.
729
    Example placeholders: $ENV_VAR, ${ENV_VAR}
730

731
    :param path: Posix or windows path, potentially containing environment variable placeholders.
732
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
733
    """
734
    # expand variables in path before checking for an absolute path
735
    expanded_path = os.path.expandvars(path)
1✔
736
    if (
1✔
737
        not PurePosixPath(expanded_path).is_absolute()
738
        and not PureWindowsPath(expanded_path).is_absolute()
739
    ):
740
        raise InvalidParameterValueException(
1✔
741
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
742
        )
743

744

745
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
746
    assert_hot_reloading_path_absolute(path)
1✔
747
    return HotReloadingCode(host_path=path)
1✔
748

749

750
def store_s3_bucket_archive(
1✔
751
    archive_bucket: str,
752
    archive_key: str,
753
    archive_version: str | None,
754
    function_name: str,
755
    region_name: str,
756
    account_id: str,
757
) -> ArchiveCode:
758
    """
759
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
760

761
    :param archive_bucket: Bucket the archive is stored in
762
    :param archive_key: Key the archive is stored under
763
    :param archive_version: Version of the archive object in the bucket
764
    :param function_name: function name the archive should be stored for
765
    :param region_name: region name the archive should be stored for
766
    :param account_id: account id the archive should be stored for
767
    :return: S3 Code object representing the archive stored in S3
768
    """
769
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
770
        hotreload_counter.labels(operation="create").increment()
1✔
771
        return create_hot_reloading_code(path=archive_key)
1✔
772
    s3_client: S3Client = connect_to().s3
1✔
773
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
774
    try:
1✔
775
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
776
            "Body"
777
        ].read()
778
    except s3_client.exceptions.ClientError as e:
1✔
779
        raise InvalidParameterValueException(
1✔
780
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
781
            Type="User",
782
        )
783
    return store_lambda_archive(
1✔
784
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
785
    )
786

787

788
def create_image_code(image_uri: str) -> ImageCode:
1✔
789
    """
790
    Creates an image code by inspecting the provided image
791

792
    :param image_uri: Image URI of the image to inspect
793
    :return: Image code object
794
    """
795
    code_sha256 = "<cannot-get-image-hash>"
1✔
796
    if CONTAINER_CLIENT.has_docker():
1✔
797
        try:
1✔
798
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
799
        except ContainerException:
1✔
800
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
801
        try:
1✔
802
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
803
                0
804
            ].rpartition(":")[2]
805
        except Exception as e:
×
806
            LOG.debug(
×
807
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
808
            )
809
    else:
810
        LOG.warning(
×
811
            "Unable to get image hash for image %s - no docker socket available."
812
            "Image hash returned by Lambda will not be correct.",
813
            image_uri,
814
        )
815
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc