• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 858585c6-6e27-4bca-a3b8-78d6d978f737

25 Mar 2025 06:43PM UTC coverage: 86.88% (+0.02%) from 86.865%
858585c6-6e27-4bca-a3b8-78d6d978f737

push

circleci

web-flow
[Utils] Add a batch policy utility (#12430)

53 of 56 new or added lines in 1 file covered. (94.64%)

227 existing lines in 12 files now uncovered.

63251 of 72803 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.27
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import uuid
1✔
9
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
10
from datetime import datetime
1✔
11
from hashlib import sha256
1✔
12
from pathlib import PurePosixPath, PureWindowsPath
1✔
13
from threading import RLock
1✔
14
from typing import TYPE_CHECKING, Optional
1✔
15

16
from localstack import config
1✔
17
from localstack.aws.api.lambda_ import (
1✔
18
    InvalidParameterValueException,
19
    InvalidRequestContentException,
20
    InvocationType,
21
    LastUpdateStatus,
22
    ResourceConflictException,
23
    ResourceNotFoundException,
24
    State,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.constants import AWS_REGION_US_EAST_1
1✔
28
from localstack.services.lambda_.analytics import (
1✔
29
    FunctionOperation,
30
    FunctionStatus,
31
    function_counter,
32
    hotreload_counter,
33
)
34
from localstack.services.lambda_.api_utils import (
1✔
35
    lambda_arn,
36
    qualified_lambda_arn,
37
    qualifier_is_alias,
38
)
39
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
40
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
41
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
42
from localstack.services.lambda_.invocation.lambda_models import (
1✔
43
    ArchiveCode,
44
    Function,
45
    FunctionVersion,
46
    HotReloadingCode,
47
    ImageCode,
48
    Invocation,
49
    InvocationResult,
50
    S3Code,
51
    UpdateStatus,
52
    VersionAlias,
53
    VersionState,
54
)
55
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
56
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
57
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
58
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
59
from localstack.utils.container_utils.container_client import ContainerException
1✔
60
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
61
from localstack.utils.strings import short_uid, to_str
1✔
62

63
if TYPE_CHECKING:
1✔
UNCOV
64
    from mypy_boto3_s3 import S3Client
×
65

66
LOG = logging.getLogger(__name__)
1✔
67

68
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
69
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
70

71

72
class LambdaService:
1✔
73
    # mapping from qualified ARN to version manager
74
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
75
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
76
    # mapping from qualified ARN to event manager
77
    event_managers = dict[str, LambdaEventManager]
1✔
78
    lambda_version_manager_lock: RLock
1✔
79
    task_executor: Executor
1✔
80

81
    assignment_service: AssignmentService
1✔
82
    counting_service: CountingService
1✔
83

84
    def __init__(self) -> None:
1✔
85
        self.lambda_running_versions = {}
1✔
86
        self.lambda_starting_versions = {}
1✔
87
        self.event_managers = {}
1✔
88
        self.lambda_version_manager_lock = RLock()
1✔
89
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
90
        self.assignment_service = AssignmentService()
1✔
91
        self.counting_service = CountingService()
1✔
92

93
    def stop(self) -> None:
1✔
94
        """
95
        Stop the whole lambda service
96
        """
97
        shutdown_futures = []
1✔
98
        for event_manager in self.event_managers.values():
1✔
99
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
100
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
101
        for version_manager in self.lambda_running_versions.values():
1✔
102
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
103
        for version_manager in self.lambda_starting_versions.values():
1✔
UNCOV
104
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
UNCOV
105
            shutdown_futures.append(
×
106
                self.task_executor.submit(
107
                    version_manager.function_version.config.code.destroy_cached
108
                )
109
            )
110
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
111
        if not_done:
1✔
UNCOV
112
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
113
        self.task_executor.shutdown(cancel_futures=True)
1✔
114
        self.assignment_service.stop()
1✔
115

116
    def stop_version(self, qualified_arn: str) -> None:
1✔
117
        """
118
        Stops a specific lambda service version
119
        :param qualified_arn: Qualified arn for the version to stop
120
        """
121
        LOG.debug("Stopping version %s", qualified_arn)
1✔
122
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
123
        if not event_manager:
1✔
UNCOV
124
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
125
        else:
126
            self.task_executor.submit(event_manager.stop)
1✔
127
        version_manager = self.lambda_running_versions.pop(
1✔
128
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
129
        )
130
        if not version_manager:
1✔
UNCOV
131
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
132
        self.task_executor.submit(version_manager.stop)
1✔
133

134
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
135
        """
136
        Get the lambda version for the given arn
137
        :param function_arn: qualified arn for the lambda version
138
        :return: LambdaVersionManager for the arn
139
        """
140
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
141
        if not version_manager:
1✔
UNCOV
142
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
143

144
        return version_manager
1✔
145

146
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
147
        """
148
        Get the lambda event manager for the given arn
149
        :param function_arn: qualified arn for the lambda version
150
        :return: LambdaEventManager for the arn
151
        """
152
        event_manager = self.event_managers.get(function_arn)
1✔
153
        if not event_manager:
1✔
UNCOV
154
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
155

156
        return event_manager
1✔
157

158
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
159
        new_state = version_manager.start()
1✔
160
        self.update_version_state(
1✔
161
            function_version=version_manager.function_version, new_state=new_state
162
        )
163

164
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
165
        """
166
        Creates a new function version (manager), and puts it in the startup dict
167

168
        :param function_version: Function Version to create
169
        """
170
        with self.lambda_version_manager_lock:
1✔
171
            qualified_arn = function_version.id.qualified_arn()
1✔
172
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
173
            if version_manager:
1✔
174
                raise ResourceConflictException(
1✔
175
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
176
                    Type="User",
177
                )
178
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
179
            fn = state.functions.get(function_version.id.function_name)
1✔
180
            version_manager = LambdaVersionManager(
1✔
181
                function_arn=qualified_arn,
182
                function_version=function_version,
183
                function=fn,
184
                counting_service=self.counting_service,
185
                assignment_service=self.assignment_service,
186
            )
187
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
188
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
189

190
    def publish_version(self, function_version: FunctionVersion):
1✔
191
        """
192
        Synchronously create a function version (manager)
193
        Should only be called on publishing new versions, which basically clone an existing one.
194
        The new version needs to be added to the lambda store before invoking this.
195
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
196
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
197

198
        :param function_version: Function Version to create
199
        """
200
        with self.lambda_version_manager_lock:
1✔
201
            qualified_arn = function_version.id.qualified_arn()
1✔
202
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
203
            if version_manager:
1✔
UNCOV
204
                raise Exception(
×
205
                    "Version '%s' already starting up and in state %s",
206
                    qualified_arn,
207
                    version_manager.state,
208
                )
209
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
210
            fn = state.functions.get(function_version.id.function_name)
1✔
211
            version_manager = LambdaVersionManager(
1✔
212
                function_arn=qualified_arn,
213
                function_version=function_version,
214
                function=fn,
215
                counting_service=self.counting_service,
216
                assignment_service=self.assignment_service,
217
            )
218
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
219
        self._start_lambda_version(version_manager)
1✔
220

221
    # Commands
222
    def invoke(
1✔
223
        self,
224
        function_name: str,
225
        qualifier: str,
226
        region: str,
227
        account_id: str,
228
        invocation_type: InvocationType | None,
229
        client_context: str | None,
230
        request_id: str,
231
        payload: bytes | None,
232
        trace_context: dict | None = None,
233
    ) -> InvocationResult | None:
234
        """
235
        Invokes a specific version of a lambda
236

237
        :param request_id: context request ID
238
        :param function_name: Function name
239
        :param qualifier: Function version qualifier
240
        :param region: Region of the function
241
        :param account_id: Account id of the function
242
        :param invocation_type: Invocation Type
243
        :param client_context: Client Context, if applicable
244
        :param trace_context: tracing information such as X-Ray header
245
        :param payload: Invocation payload
246
        :return: The invocation result
247
        """
248
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
249
        trace_context = trace_context or {}
1✔
250
        # Invoked arn (for lambda context) does not have qualifier if not supplied
251
        invoked_arn = lambda_arn(
1✔
252
            function_name=function_name,
253
            qualifier=qualifier,
254
            account=account_id,
255
            region=region,
256
        )
257
        qualifier = qualifier or "$LATEST"
1✔
258
        state = lambda_stores[account_id][region]
1✔
259
        function = state.functions.get(function_name)
1✔
260

261
        if function is None:
1✔
262
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
263

264
        if qualifier_is_alias(qualifier):
1✔
265
            alias = function.aliases.get(qualifier)
1✔
266
            if not alias:
1✔
267
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
268
            version_qualifier = alias.function_version
1✔
269
            if alias.routing_configuration:
1✔
270
                version, probability = next(
1✔
271
                    iter(alias.routing_configuration.version_weights.items())
272
                )
273
                if random.random() < probability:
1✔
274
                    version_qualifier = version
1✔
275
        else:
276
            version_qualifier = qualifier
1✔
277

278
        # Need the qualified arn to exactly get the target lambda
279
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
280
        version = function.versions.get(version_qualifier)
1✔
281
        runtime = version.config.runtime or "n/a"
1✔
282
        package_type = version.config.package_type
1✔
283
        try:
1✔
284
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
285
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
UNCOV
286
        except ValueError as e:
×
UNCOV
287
            state = version and version.config.state.state
×
UNCOV
288
            if state == State.Failed:
×
UNCOV
289
                status = FunctionStatus.failed_state_error
×
290
                HINT_LOG.error(
×
291
                    f"Failed to create the runtime executor for the function {function_name}. "
292
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
293
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
294
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
295
                )
UNCOV
296
            elif state == State.Pending:
×
UNCOV
297
                status = FunctionStatus.pending_state_error
×
298
                HINT_LOG.warning(
×
299
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
300
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
301
                    "Pending to Active using: "
302
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
303
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
304
                )
305
            else:
UNCOV
306
                status = FunctionStatus.unhandled_state_error
×
UNCOV
307
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
UNCOV
308
            function_counter.labels(
×
309
                operation=FunctionOperation.invoke,
310
                runtime=runtime,
311
                status=status,
312
                invocation_type=invocation_type,
313
                package_type=package_type,
314
            ).increment()
UNCOV
315
            raise ResourceConflictException(
×
316
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
317
            ) from e
318
        # empty payloads have to work as well
319
        if payload is None:
1✔
320
            payload = b"{}"
1✔
321
        else:
322
            # detect invalid payloads early before creating an execution environment
323
            try:
1✔
324
                to_str(payload)
1✔
325
            except Exception as e:
1✔
326
                function_counter.labels(
1✔
327
                    operation=FunctionOperation.invoke,
328
                    runtime=runtime,
329
                    status=FunctionStatus.invalid_payload_error,
330
                    invocation_type=invocation_type,
331
                    package_type=package_type,
332
                ).increment()
333
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
334
                raise InvalidRequestContentException(
1✔
335
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
336
                    Type="User",
337
                )
338
        if invocation_type is None:
1✔
339
            invocation_type = InvocationType.RequestResponse
1✔
340
        if invocation_type == InvocationType.DryRun:
1✔
341
            return None
1✔
342
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
343
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
344
        #
345
        if invocation_type == InvocationType.Event:
1✔
346
            return event_manager.enqueue_event(
1✔
347
                invocation=Invocation(
348
                    payload=payload,
349
                    invoked_arn=invoked_arn,
350
                    client_context=client_context,
351
                    invocation_type=invocation_type,
352
                    invoke_time=datetime.now(),
353
                    request_id=request_id,
354
                    trace_context=trace_context,
355
                )
356
            )
357

358
        invocation_result = version_manager.invoke(
1✔
359
            invocation=Invocation(
360
                payload=payload,
361
                invoked_arn=invoked_arn,
362
                client_context=client_context,
363
                invocation_type=invocation_type,
364
                invoke_time=datetime.now(),
365
                request_id=request_id,
366
                trace_context=trace_context,
367
            )
368
        )
369
        status = (
1✔
370
            FunctionStatus.invocation_error
371
            if invocation_result.is_error
372
            else FunctionStatus.success
373
        )
374
        function_counter.labels(
1✔
375
            operation=FunctionOperation.invoke,
376
            runtime=runtime,
377
            status=status,
378
            invocation_type=invocation_type,
379
            package_type=package_type,
380
        ).increment()
381
        return invocation_result
1✔
382

383
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
384
        """
385
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
386
        to be invoked
387

388
        :param new_version: New version (with the same qualifier as an older one)
389
        """
390
        if new_version.qualified_arn not in self.lambda_running_versions:
1✔
UNCOV
391
            raise ValueError(
×
392
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
393
            )
394

395
        return self.create_function_version(function_version=new_version)
1✔
396

397
    def update_version_state(
1✔
398
        self, function_version: FunctionVersion, new_state: VersionState
399
    ) -> None:
400
        """
401
        Update the version state for the given function version.
402

403
        This will perform a rollover to the given function if the new state is active and there is a previously
404
        running version registered. The old version will be shutdown and its code deleted.
405

406
        If the new state is failed, it will abort the update and mark it as failed.
407
        If an older version is still running, it will keep running.
408

409
        :param function_version: Version reporting the state
410
        :param new_state: New state
411
        """
412
        function_arn = function_version.qualified_arn
1✔
413
        try:
1✔
414
            old_version = None
1✔
415
            old_event_manager = None
1✔
416
            with self.lambda_version_manager_lock:
1✔
417
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
418
                if not new_version_manager:
1✔
UNCOV
419
                    raise ValueError(
×
420
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
421
                    )
422
                if new_state.state == State.Active:
1✔
423
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
424
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
425
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
426
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
427
                        version_manager=new_version_manager
428
                    )
429
                    self.event_managers[function_arn].start()
1✔
430
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
UNCOV
431
                elif new_state.state == State.Failed:
×
UNCOV
432
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
UNCOV
433
                    self.task_executor.submit(new_version_manager.stop)
×
434
                else:
435
                    # TODO what to do if state pending or inactive is supported?
UNCOV
436
                    self.task_executor.submit(new_version_manager.stop)
×
437
                    LOG.error(
×
438
                        "State %s for version %s should not have been reported. New version will be stopped.",
439
                        new_state,
440
                        function_arn,
441
                    )
UNCOV
442
                    return
×
443

444
            # TODO is it necessary to get the version again? Should be locked for modification anyway
445
            # Without updating the new state, the function would not change to active, last_update would be missing, and
446
            # the revision id would not be updated.
447
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
448
            # FIXME this will fail if the function is deleted during this code lines here
449
            function = state.functions.get(function_version.id.function_name)
1✔
450
            if old_event_manager:
1✔
451
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
452
            if old_version:
1✔
453
                # if there is an old version, we assume it is an update, and stop the old one
454
                self.task_executor.submit(old_version.stop)
1✔
455
                if function:
1✔
456
                    self.task_executor.submit(
1✔
457
                        destroy_code_if_not_used, old_version.function_version.config.code, function
458
                    )
459
            if not function:
1✔
UNCOV
460
                LOG.debug("Function %s was deleted during status update", function_arn)
×
UNCOV
461
                return
×
462
            current_version = function.versions[function_version.id.qualifier]
1✔
463
            new_version_manager.state = new_state
1✔
464
            new_version_state = dataclasses.replace(
1✔
465
                current_version,
466
                config=dataclasses.replace(
467
                    current_version.config, state=new_state, last_update=update_status
468
                ),
469
            )
470
            state.functions[function_version.id.function_name].versions[
1✔
471
                function_version.id.qualifier
472
            ] = new_version_state
473

UNCOV
474
        except Exception:
×
UNCOV
475
            LOG.exception("Failed to update function version for arn %s", function_arn)
×
476

477
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
478
        # if pointer changed, need to restart provisioned
479
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
480
            old_alias.name
481
        )
482
        if (
1✔
483
            old_alias.function_version != new_alias.function_version
484
            and provisioned_concurrency_config is not None
485
        ):
UNCOV
486
            LOG.warning("Deprovisioning")
×
UNCOV
487
            fn_version_old = function.versions.get(old_alias.function_version)
×
UNCOV
488
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
UNCOV
489
            fn_version_new = function.versions.get(new_alias.function_version)
×
UNCOV
490
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
491

492
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
493
            # TODO: make this fully async
UNCOV
494
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
UNCOV
495
            vm_new.update_provisioned_concurrency_config(
×
496
                provisioned_concurrency_config.provisioned_concurrent_executions
497
            )  # async again
498

499
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
500
        """
501
        Checks whether lambda can assume the given role.
502
        This _should_ only fail if IAM enforcement is enabled.
503

504
        :param role_arn: Role to assume
505
        :return: True if the role can be assumed by lambda, false otherwise
506
        """
507
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
508
        try:
1✔
509
            sts_client.assume_role(
1✔
510
                RoleArn=role_arn,
511
                RoleSessionName=f"test-assume-{short_uid()}",
512
                DurationSeconds=900,
513
            )
514
            return True
1✔
UNCOV
515
        except Exception as e:
×
UNCOV
516
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
UNCOV
517
            return False
×
518

519

520
# TODO: Move helper functions out of lambda_service into a separate module
521

522

523
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
524
    """
525
    Check if given code is still used in some version of the function
526

527
    :param code: Code object
528
    :param function: function to check
529
    :return: bool whether code is used in another version of the function
530
    """
531
    with function.lock:
1✔
532
        return any(code == version.config.code for version in function.versions.values())
1✔
533

534

535
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
536
    """
537
    Destroy the given code if it is not used in some version of the function
538
    Do nothing otherwise
539

540
    :param code: Code object
541
    :param function: Function the code belongs too
542
    """
543
    with function.lock:
1✔
544
        if not is_code_used(code, function):
1✔
545
            code.destroy()
1✔
546

547

548
def store_lambda_archive(
1✔
549
    archive_file: bytes, function_name: str, region_name: str, account_id: str
550
) -> S3Code:
551
    """
552
    Stores the given lambda archive in an internal s3 bucket.
553
    Also checks if zipfile matches the specifications
554

555
    :param archive_file: Archive file to store
556
    :param function_name: function name the archive should be stored for
557
    :param region_name: region name the archive should be stored for
558
    :param account_id: account id the archive should be stored for
559
    :return: S3 Code object representing the archive stored in S3
560
    """
561
    # check if zip file
562
    if not is_zip_file(archive_file):
1✔
563
        raise InvalidParameterValueException(
1✔
564
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
565
            Type="User",
566
        )
567
    # check unzipped size
568
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
569
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
570
        raise InvalidParameterValueException(
1✔
571
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
572
            Type="User",
573
        )
574
    # store all buckets in us-east-1 for now
575
    s3_client = connect_to(
1✔
576
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
577
    ).s3
578
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
579
    # s3 create bucket is idempotent in us-east-1
580
    s3_client.create_bucket(Bucket=bucket_name)
1✔
581
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
582
    key = f"snapshots/{account_id}/{code_id}"
1✔
583
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
584
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
585
    return S3Code(
1✔
586
        id=code_id,
587
        account_id=account_id,
588
        s3_bucket=bucket_name,
589
        s3_key=key,
590
        s3_object_version=None,
591
        code_sha256=code_sha256,
592
        code_size=len(archive_file),
593
    )
594

595

596
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
597
    """
598
    Check whether a given path, after environment variable substitution, is an absolute path.
599
    Accepts either posix or windows paths, with environment placeholders.
600
    Example placeholders: $ENV_VAR, ${ENV_VAR}
601

602
    :param path: Posix or windows path, potentially containing environment variable placeholders.
603
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
604
    """
605
    # expand variables in path before checking for an absolute path
606
    expanded_path = os.path.expandvars(path)
1✔
607
    if (
1✔
608
        not PurePosixPath(expanded_path).is_absolute()
609
        and not PureWindowsPath(expanded_path).is_absolute()
610
    ):
611
        raise InvalidParameterValueException(
1✔
612
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
613
        )
614

615

616
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
617
    assert_hot_reloading_path_absolute(path)
1✔
618
    return HotReloadingCode(host_path=path)
1✔
619

620

621
def store_s3_bucket_archive(
1✔
622
    archive_bucket: str,
623
    archive_key: str,
624
    archive_version: Optional[str],
625
    function_name: str,
626
    region_name: str,
627
    account_id: str,
628
) -> ArchiveCode:
629
    """
630
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
631

632
    :param archive_bucket: Bucket the archive is stored in
633
    :param archive_key: Key the archive is stored under
634
    :param archive_version: Version of the archive object in the bucket
635
    :param function_name: function name the archive should be stored for
636
    :param region_name: region name the archive should be stored for
637
    :param account_id: account id the archive should be stored for
638
    :return: S3 Code object representing the archive stored in S3
639
    """
640
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
641
        hotreload_counter.labels(operation="create").increment()
1✔
642
        return create_hot_reloading_code(path=archive_key)
1✔
643
    s3_client: "S3Client" = connect_to().s3
1✔
644
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
645
    archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
646
        "Body"
647
    ].read()
648
    return store_lambda_archive(
1✔
649
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
650
    )
651

652

653
def create_image_code(image_uri: str) -> ImageCode:
1✔
654
    """
655
    Creates an image code by inspecting the provided image
656

657
    :param image_uri: Image URI of the image to inspect
658
    :return: Image code object
659
    """
660
    code_sha256 = "<cannot-get-image-hash>"
1✔
661
    if CONTAINER_CLIENT.has_docker():
1✔
662
        try:
1✔
663
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
664
        except ContainerException:
1✔
665
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
666
        try:
1✔
667
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
668
                0
669
            ].rpartition(":")[2]
UNCOV
670
        except Exception as e:
×
UNCOV
671
            LOG.debug(
×
672
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
673
            )
674
    else:
UNCOV
675
        LOG.warning(
×
676
            "Unable to get image hash for image %s - no docker socket available."
677
            "Image hash returned by Lambda will not be correct.",
678
            image_uri,
679
        )
680
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc