• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16665047018

31 Jul 2025 06:34PM UTC coverage: 86.897% (+0.1%) from 86.781%
16665047018

push

github

web-flow
Apigw/enable vpce routing (#12937)

5 of 5 new or added lines in 1 file covered. (100.0%)

314 existing lines in 13 files now uncovered.

66469 of 76492 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.63
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import uuid
1✔
9
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
10
from datetime import datetime
1✔
11
from hashlib import sha256
1✔
12
from pathlib import PurePosixPath, PureWindowsPath
1✔
13
from threading import RLock
1✔
14
from typing import TYPE_CHECKING, Optional
1✔
15

16
from localstack import config
1✔
17
from localstack.aws.api.lambda_ import (
1✔
18
    InvalidParameterValueException,
19
    InvalidRequestContentException,
20
    InvocationType,
21
    LastUpdateStatus,
22
    ResourceConflictException,
23
    ResourceNotFoundException,
24
    State,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.constants import AWS_REGION_US_EAST_1
1✔
28
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
29
from localstack.services.lambda_.analytics import (
1✔
30
    FunctionOperation,
31
    FunctionStatus,
32
    function_counter,
33
    hotreload_counter,
34
)
35
from localstack.services.lambda_.api_utils import (
1✔
36
    lambda_arn,
37
    qualified_lambda_arn,
38
    qualifier_is_alias,
39
)
40
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
41
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
42
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
43
from localstack.services.lambda_.invocation.lambda_models import (
1✔
44
    ArchiveCode,
45
    Function,
46
    FunctionVersion,
47
    HotReloadingCode,
48
    ImageCode,
49
    Invocation,
50
    InvocationResult,
51
    S3Code,
52
    UpdateStatus,
53
    VersionAlias,
54
    VersionState,
55
)
56
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
57
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
58
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
59
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
60
from localstack.utils.container_utils.container_client import ContainerException
1✔
61
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
62
from localstack.utils.strings import short_uid, to_str
1✔
63

64
if TYPE_CHECKING:
1✔
UNCOV
65
    from mypy_boto3_s3 import S3Client
×
66

67
LOG = logging.getLogger(__name__)
1✔
68

69
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
70
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
71

72

73
class LambdaService:
1✔
74
    # mapping from qualified ARN to version manager
75
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
76
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
77
    # mapping from qualified ARN to event manager
78
    event_managers = dict[str, LambdaEventManager]
1✔
79
    lambda_version_manager_lock: RLock
1✔
80
    task_executor: Executor
1✔
81

82
    assignment_service: AssignmentService
1✔
83
    counting_service: CountingService
1✔
84

85
    def __init__(self) -> None:
1✔
86
        self.lambda_running_versions = {}
1✔
87
        self.lambda_starting_versions = {}
1✔
88
        self.event_managers = {}
1✔
89
        self.lambda_version_manager_lock = RLock()
1✔
90
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
91
        self.assignment_service = AssignmentService()
1✔
92
        self.counting_service = CountingService()
1✔
93

94
    def stop(self) -> None:
1✔
95
        """
96
        Stop the whole lambda service
97
        """
98
        shutdown_futures = []
1✔
99
        for event_manager in self.event_managers.values():
1✔
100
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
101
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
102
        for version_manager in self.lambda_running_versions.values():
1✔
103
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
104
        for version_manager in self.lambda_starting_versions.values():
1✔
105
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
UNCOV
106
            shutdown_futures.append(
×
107
                self.task_executor.submit(
108
                    version_manager.function_version.config.code.destroy_cached
109
                )
110
            )
111
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
112
        if not_done:
1✔
UNCOV
113
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
114
        self.task_executor.shutdown(cancel_futures=True)
1✔
115
        self.assignment_service.stop()
1✔
116

117
    def stop_version(self, qualified_arn: str) -> None:
1✔
118
        """
119
        Stops a specific lambda service version
120
        :param qualified_arn: Qualified arn for the version to stop
121
        """
122
        LOG.debug("Stopping version %s", qualified_arn)
1✔
123
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
124
        if not event_manager:
1✔
UNCOV
125
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
126
        else:
127
            self.task_executor.submit(event_manager.stop)
1✔
128
        version_manager = self.lambda_running_versions.pop(
1✔
129
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
130
        )
131
        if not version_manager:
1✔
UNCOV
132
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
133
        self.task_executor.submit(version_manager.stop)
1✔
134
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
135

136
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
137
        """
138
        Get the lambda version for the given arn
139
        :param function_arn: qualified arn for the lambda version
140
        :return: LambdaVersionManager for the arn
141
        """
142
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
143
        if not version_manager:
1✔
UNCOV
144
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
145

146
        return version_manager
1✔
147

148
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
149
        """
150
        Get the lambda event manager for the given arn
151
        :param function_arn: qualified arn for the lambda version
152
        :return: LambdaEventManager for the arn
153
        """
154
        event_manager = self.event_managers.get(function_arn)
1✔
155
        if not event_manager:
1✔
UNCOV
156
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
157

158
        return event_manager
1✔
159

160
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
161
        new_state = version_manager.start()
1✔
162
        self.update_version_state(
1✔
163
            function_version=version_manager.function_version, new_state=new_state
164
        )
165

166
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
167
        """
168
        Creates a new function version (manager), and puts it in the startup dict
169

170
        :param function_version: Function Version to create
171
        """
172
        with self.lambda_version_manager_lock:
1✔
173
            qualified_arn = function_version.id.qualified_arn()
1✔
174
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
175
            if version_manager:
1✔
176
                raise ResourceConflictException(
1✔
177
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
178
                    Type="User",
179
                )
180
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
181
            fn = state.functions.get(function_version.id.function_name)
1✔
182
            version_manager = LambdaVersionManager(
1✔
183
                function_arn=qualified_arn,
184
                function_version=function_version,
185
                function=fn,
186
                counting_service=self.counting_service,
187
                assignment_service=self.assignment_service,
188
            )
189
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
190
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
191
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
192

193
    def publish_version(self, function_version: FunctionVersion):
1✔
194
        """
195
        Synchronously create a function version (manager)
196
        Should only be called on publishing new versions, which basically clone an existing one.
197
        The new version needs to be added to the lambda store before invoking this.
198
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
199
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
200

201
        :param function_version: Function Version to create
202
        """
203
        with self.lambda_version_manager_lock:
1✔
204
            qualified_arn = function_version.id.qualified_arn()
1✔
205
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
206
            if version_manager:
1✔
UNCOV
207
                raise Exception(
×
208
                    "Version '%s' already starting up and in state %s",
209
                    qualified_arn,
210
                    version_manager.state,
211
                )
212
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
213
            fn = state.functions.get(function_version.id.function_name)
1✔
214
            version_manager = LambdaVersionManager(
1✔
215
                function_arn=qualified_arn,
216
                function_version=function_version,
217
                function=fn,
218
                counting_service=self.counting_service,
219
                assignment_service=self.assignment_service,
220
            )
221
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
222
        self._start_lambda_version(version_manager)
1✔
223

224
    # Commands
225
    def invoke(
1✔
226
        self,
227
        function_name: str,
228
        qualifier: str,
229
        region: str,
230
        account_id: str,
231
        invocation_type: InvocationType | None,
232
        client_context: str | None,
233
        request_id: str,
234
        payload: bytes | None,
235
        trace_context: dict | None = None,
236
        user_agent: Optional[str] = None,
237
    ) -> InvocationResult | None:
238
        """
239
        Invokes a specific version of a lambda
240

241
        :param request_id: context request ID
242
        :param function_name: Function name
243
        :param qualifier: Function version qualifier
244
        :param region: Region of the function
245
        :param account_id: Account id of the function
246
        :param invocation_type: Invocation Type
247
        :param client_context: Client Context, if applicable
248
        :param trace_context: tracing information such as X-Ray header
249
        :param payload: Invocation payload
250
        :return: The invocation result
251
        """
252
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
253
        trace_context = trace_context or {}
1✔
254
        # Invoked arn (for lambda context) does not have qualifier if not supplied
255
        invoked_arn = lambda_arn(
1✔
256
            function_name=function_name,
257
            qualifier=qualifier,
258
            account=account_id,
259
            region=region,
260
        )
261
        qualifier = qualifier or "$LATEST"
1✔
262
        state = lambda_stores[account_id][region]
1✔
263
        function = state.functions.get(function_name)
1✔
264

265
        if function is None:
1✔
266
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
267

268
        if qualifier_is_alias(qualifier):
1✔
269
            alias = function.aliases.get(qualifier)
1✔
270
            if not alias:
1✔
271
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
272
            version_qualifier = alias.function_version
1✔
273
            if alias.routing_configuration:
1✔
274
                version, probability = next(
1✔
275
                    iter(alias.routing_configuration.version_weights.items())
276
                )
277
                if random.random() < probability:
1✔
278
                    version_qualifier = version
1✔
279
        else:
280
            version_qualifier = qualifier
1✔
281

282
        # Need the qualified arn to exactly get the target lambda
283
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
284
        version = function.versions.get(version_qualifier)
1✔
285
        runtime = version.config.runtime or "n/a"
1✔
286
        package_type = version.config.package_type
1✔
287
        try:
1✔
288
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
289
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
290
        except ValueError as e:
×
291
            state = version and version.config.state.state
×
UNCOV
292
            if state == State.Failed:
×
UNCOV
293
                status = FunctionStatus.failed_state_error
×
UNCOV
294
                HINT_LOG.error(
×
295
                    f"Failed to create the runtime executor for the function {function_name}. "
296
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
297
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
298
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
299
                )
UNCOV
300
            elif state == State.Pending:
×
UNCOV
301
                status = FunctionStatus.pending_state_error
×
UNCOV
302
                HINT_LOG.warning(
×
303
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
304
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
305
                    "Pending to Active using: "
306
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
307
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
308
                )
309
            else:
UNCOV
310
                status = FunctionStatus.unhandled_state_error
×
UNCOV
311
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
UNCOV
312
            function_counter.labels(
×
313
                operation=FunctionOperation.invoke,
314
                runtime=runtime,
315
                status=status,
316
                invocation_type=invocation_type,
317
                package_type=package_type,
318
            ).increment()
UNCOV
319
            raise ResourceConflictException(
×
320
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
321
            ) from e
322
        # empty payloads have to work as well
323
        if payload is None:
1✔
324
            payload = b"{}"
1✔
325
        else:
326
            # detect invalid payloads early before creating an execution environment
327
            try:
1✔
328
                to_str(payload)
1✔
329
            except Exception as e:
1✔
330
                function_counter.labels(
1✔
331
                    operation=FunctionOperation.invoke,
332
                    runtime=runtime,
333
                    status=FunctionStatus.invalid_payload_error,
334
                    invocation_type=invocation_type,
335
                    package_type=package_type,
336
                ).increment()
337
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
338
                raise InvalidRequestContentException(
1✔
339
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
340
                    Type="User",
341
                )
342
        if invocation_type is None:
1✔
343
            invocation_type = InvocationType.RequestResponse
1✔
344
        if invocation_type == InvocationType.DryRun:
1✔
345
            return None
1✔
346
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
347
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
348
        #
349
        if invocation_type == InvocationType.Event:
1✔
350
            return event_manager.enqueue_event(
1✔
351
                invocation=Invocation(
352
                    payload=payload,
353
                    invoked_arn=invoked_arn,
354
                    client_context=client_context,
355
                    invocation_type=invocation_type,
356
                    invoke_time=datetime.now(),
357
                    request_id=request_id,
358
                    trace_context=trace_context,
359
                    user_agent=user_agent,
360
                )
361
            )
362

363
        invocation_result = version_manager.invoke(
1✔
364
            invocation=Invocation(
365
                payload=payload,
366
                invoked_arn=invoked_arn,
367
                client_context=client_context,
368
                invocation_type=invocation_type,
369
                invoke_time=datetime.now(),
370
                request_id=request_id,
371
                trace_context=trace_context,
372
                user_agent=user_agent,
373
            )
374
        )
375
        status = (
1✔
376
            FunctionStatus.invocation_error
377
            if invocation_result.is_error
378
            else FunctionStatus.success
379
        )
380
        function_counter.labels(
1✔
381
            operation=FunctionOperation.invoke,
382
            runtime=runtime,
383
            status=status,
384
            invocation_type=invocation_type,
385
            package_type=package_type,
386
        ).increment()
387
        return invocation_result
1✔
388

389
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
390
        """
391
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
392
        to be invoked
393

394
        :param new_version: New version (with the same qualifier as an older one)
395
        """
396
        if new_version.qualified_arn not in self.lambda_running_versions:
1✔
UNCOV
397
            raise ValueError(
×
398
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
399
            )
400

401
        return self.create_function_version(function_version=new_version)
1✔
402

403
    def update_version_state(
1✔
404
        self, function_version: FunctionVersion, new_state: VersionState
405
    ) -> None:
406
        """
407
        Update the version state for the given function version.
408

409
        This will perform a rollover to the given function if the new state is active and there is a previously
410
        running version registered. The old version will be shutdown and its code deleted.
411

412
        If the new state is failed, it will abort the update and mark it as failed.
413
        If an older version is still running, it will keep running.
414

415
        :param function_version: Version reporting the state
416
        :param new_state: New state
417
        """
418
        function_arn = function_version.qualified_arn
1✔
419
        try:
1✔
420
            old_version = None
1✔
421
            old_event_manager = None
1✔
422
            with self.lambda_version_manager_lock:
1✔
423
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
424
                if not new_version_manager:
1✔
UNCOV
425
                    raise ValueError(
×
426
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
427
                    )
428
                if new_state.state == State.Active:
1✔
429
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
430
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
431
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
432
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
433
                        version_manager=new_version_manager
434
                    )
435
                    self.event_managers[function_arn].start()
1✔
436
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
UNCOV
437
                elif new_state.state == State.Failed:
×
UNCOV
438
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
439
                    self.task_executor.submit(new_version_manager.stop)
×
440
                else:
441
                    # TODO what to do if state pending or inactive is supported?
UNCOV
442
                    self.task_executor.submit(new_version_manager.stop)
×
UNCOV
443
                    LOG.error(
×
444
                        "State %s for version %s should not have been reported. New version will be stopped.",
445
                        new_state,
446
                        function_arn,
447
                    )
UNCOV
448
                    return
×
449

450
            # TODO is it necessary to get the version again? Should be locked for modification anyway
451
            # Without updating the new state, the function would not change to active, last_update would be missing, and
452
            # the revision id would not be updated.
453
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
454
            # FIXME this will fail if the function is deleted during this code lines here
455
            function = state.functions.get(function_version.id.function_name)
1✔
456
            if old_event_manager:
1✔
457
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
458
            if old_version:
1✔
459
                # if there is an old version, we assume it is an update, and stop the old one
460
                self.task_executor.submit(old_version.stop)
1✔
461
                if function:
1✔
462
                    self.task_executor.submit(
1✔
463
                        destroy_code_if_not_used, old_version.function_version.config.code, function
464
                    )
465
            if not function:
1✔
UNCOV
466
                LOG.debug("Function %s was deleted during status update", function_arn)
×
UNCOV
467
                return
×
468
            current_version = function.versions[function_version.id.qualifier]
1✔
469
            new_version_manager.state = new_state
1✔
470
            new_version_state = dataclasses.replace(
1✔
471
                current_version,
472
                config=dataclasses.replace(
473
                    current_version.config, state=new_state, last_update=update_status
474
                ),
475
            )
476
            state.functions[function_version.id.function_name].versions[
1✔
477
                function_version.id.qualifier
478
            ] = new_version_state
479

UNCOV
480
        except Exception:
×
UNCOV
481
            LOG.exception("Failed to update function version for arn %s", function_arn)
×
482

483
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
484
        # if pointer changed, need to restart provisioned
485
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
486
            old_alias.name
487
        )
488
        if (
1✔
489
            old_alias.function_version != new_alias.function_version
490
            and provisioned_concurrency_config is not None
491
        ):
492
            LOG.warning("Deprovisioning")
×
493
            fn_version_old = function.versions.get(old_alias.function_version)
×
UNCOV
494
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
UNCOV
495
            fn_version_new = function.versions.get(new_alias.function_version)
×
UNCOV
496
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
497

498
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
499
            # TODO: make this fully async
UNCOV
500
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
UNCOV
501
            vm_new.update_provisioned_concurrency_config(
×
502
                provisioned_concurrency_config.provisioned_concurrent_executions
503
            )  # async again
504

505
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
506
        """
507
        Checks whether lambda can assume the given role.
508
        This _should_ only fail if IAM enforcement is enabled.
509

510
        :param role_arn: Role to assume
511
        :return: True if the role can be assumed by lambda, false otherwise
512
        """
513
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
514
        try:
1✔
515
            sts_client.assume_role(
1✔
516
                RoleArn=role_arn,
517
                RoleSessionName=f"test-assume-{short_uid()}",
518
                DurationSeconds=900,
519
            )
520
            return True
1✔
UNCOV
521
        except Exception as e:
×
UNCOV
522
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
UNCOV
523
            return False
×
524

525

526
# TODO: Move helper functions out of lambda_service into a separate module
527

528

529
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
530
    """
531
    Check if given code is still used in some version of the function
532

533
    :param code: Code object
534
    :param function: function to check
535
    :return: bool whether code is used in another version of the function
536
    """
537
    with function.lock:
1✔
538
        return any(code == version.config.code for version in function.versions.values())
1✔
539

540

541
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
542
    """
543
    Destroy the given code if it is not used in some version of the function
544
    Do nothing otherwise
545

546
    :param code: Code object
547
    :param function: Function the code belongs too
548
    """
549
    with function.lock:
1✔
550
        if not is_code_used(code, function):
1✔
551
            code.destroy()
1✔
552

553

554
def store_lambda_archive(
1✔
555
    archive_file: bytes, function_name: str, region_name: str, account_id: str
556
) -> S3Code:
557
    """
558
    Stores the given lambda archive in an internal s3 bucket.
559
    Also checks if zipfile matches the specifications
560

561
    :param archive_file: Archive file to store
562
    :param function_name: function name the archive should be stored for
563
    :param region_name: region name the archive should be stored for
564
    :param account_id: account id the archive should be stored for
565
    :return: S3 Code object representing the archive stored in S3
566
    """
567
    # check if zip file
568
    if not is_zip_file(archive_file):
1✔
569
        raise InvalidParameterValueException(
1✔
570
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
571
            Type="User",
572
        )
573
    # check unzipped size
574
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
575
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
576
        raise InvalidParameterValueException(
1✔
577
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
578
            Type="User",
579
        )
580
    # store all buckets in us-east-1 for now
581
    s3_client = connect_to(
1✔
582
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
583
    ).s3
584
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
585
    # s3 create bucket is idempotent in us-east-1
586
    s3_client.create_bucket(Bucket=bucket_name)
1✔
587
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
588
    key = f"snapshots/{account_id}/{code_id}"
1✔
589
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
590
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
591
    return S3Code(
1✔
592
        id=code_id,
593
        account_id=account_id,
594
        s3_bucket=bucket_name,
595
        s3_key=key,
596
        s3_object_version=None,
597
        code_sha256=code_sha256,
598
        code_size=len(archive_file),
599
    )
600

601

602
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
603
    """
604
    Check whether a given path, after environment variable substitution, is an absolute path.
605
    Accepts either posix or windows paths, with environment placeholders.
606
    Example placeholders: $ENV_VAR, ${ENV_VAR}
607

608
    :param path: Posix or windows path, potentially containing environment variable placeholders.
609
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
610
    """
611
    # expand variables in path before checking for an absolute path
612
    expanded_path = os.path.expandvars(path)
1✔
613
    if (
1✔
614
        not PurePosixPath(expanded_path).is_absolute()
615
        and not PureWindowsPath(expanded_path).is_absolute()
616
    ):
617
        raise InvalidParameterValueException(
1✔
618
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
619
        )
620

621

622
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
623
    assert_hot_reloading_path_absolute(path)
1✔
624
    return HotReloadingCode(host_path=path)
1✔
625

626

627
def store_s3_bucket_archive(
1✔
628
    archive_bucket: str,
629
    archive_key: str,
630
    archive_version: Optional[str],
631
    function_name: str,
632
    region_name: str,
633
    account_id: str,
634
) -> ArchiveCode:
635
    """
636
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
637

638
    :param archive_bucket: Bucket the archive is stored in
639
    :param archive_key: Key the archive is stored under
640
    :param archive_version: Version of the archive object in the bucket
641
    :param function_name: function name the archive should be stored for
642
    :param region_name: region name the archive should be stored for
643
    :param account_id: account id the archive should be stored for
644
    :return: S3 Code object representing the archive stored in S3
645
    """
646
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
647
        hotreload_counter.labels(operation="create").increment()
1✔
648
        return create_hot_reloading_code(path=archive_key)
1✔
649
    s3_client: "S3Client" = connect_to().s3
1✔
650
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
651
    try:
1✔
652
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
653
            "Body"
654
        ].read()
655
    except s3_client.exceptions.ClientError as e:
1✔
656
        raise InvalidParameterValueException(
1✔
657
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
658
            Type="User",
659
        )
660
    return store_lambda_archive(
1✔
661
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
662
    )
663

664

665
def create_image_code(image_uri: str) -> ImageCode:
1✔
666
    """
667
    Creates an image code by inspecting the provided image
668

669
    :param image_uri: Image URI of the image to inspect
670
    :return: Image code object
671
    """
672
    code_sha256 = "<cannot-get-image-hash>"
1✔
673
    if CONTAINER_CLIENT.has_docker():
1✔
674
        try:
1✔
675
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
676
        except ContainerException:
1✔
677
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
678
        try:
1✔
679
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
680
                0
681
            ].rpartition(":")[2]
UNCOV
682
        except Exception as e:
×
UNCOV
683
            LOG.debug(
×
684
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
685
            )
686
    else:
UNCOV
687
        LOG.warning(
×
688
            "Unable to get image hash for image %s - no docker socket available."
689
            "Image hash returned by Lambda will not be correct.",
690
            image_uri,
691
        )
692
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc