• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16981563750

14 Aug 2025 10:49PM UTC coverage: 86.896% (+0.04%) from 86.852%
16981563750

push

github

web-flow
add support for Fn::Tranform in CFnV2 (#12966)

Co-authored-by: Simon Walker <simon.walker@localstack.cloud>

181 of 195 new or added lines in 6 files covered. (92.82%)

348 existing lines in 22 files now uncovered.

66915 of 77006 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.63
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import uuid
1✔
9
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
10
from datetime import datetime
1✔
11
from hashlib import sha256
1✔
12
from pathlib import PurePosixPath, PureWindowsPath
1✔
13
from threading import RLock
1✔
14
from typing import TYPE_CHECKING
1✔
15

16
from localstack import config
1✔
17
from localstack.aws.api.lambda_ import (
1✔
18
    InvalidParameterValueException,
19
    InvalidRequestContentException,
20
    InvocationType,
21
    LastUpdateStatus,
22
    ResourceConflictException,
23
    ResourceNotFoundException,
24
    State,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.constants import AWS_REGION_US_EAST_1
1✔
28
from localstack.services.lambda_ import hooks as lambda_hooks
1✔
29
from localstack.services.lambda_.analytics import (
1✔
30
    FunctionOperation,
31
    FunctionStatus,
32
    function_counter,
33
    hotreload_counter,
34
)
35
from localstack.services.lambda_.api_utils import (
1✔
36
    lambda_arn,
37
    qualified_lambda_arn,
38
    qualifier_is_alias,
39
)
40
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
41
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
42
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
43
from localstack.services.lambda_.invocation.lambda_models import (
1✔
44
    ArchiveCode,
45
    Function,
46
    FunctionVersion,
47
    HotReloadingCode,
48
    ImageCode,
49
    Invocation,
50
    InvocationResult,
51
    S3Code,
52
    UpdateStatus,
53
    VersionAlias,
54
    VersionState,
55
)
56
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
57
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
58
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
59
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
60
from localstack.utils.container_utils.container_client import ContainerException
1✔
61
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
62
from localstack.utils.strings import short_uid, to_str
1✔
63

64
if TYPE_CHECKING:
1✔
65
    from mypy_boto3_s3 import S3Client
×
66

67
LOG = logging.getLogger(__name__)
1✔
68

69
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
70
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
71

72

73
class LambdaService:
1✔
74
    # mapping from qualified ARN to version manager
75
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
76
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
77
    # mapping from qualified ARN to event manager
78
    event_managers = dict[str, LambdaEventManager]
1✔
79
    lambda_version_manager_lock: RLock
1✔
80
    task_executor: Executor
1✔
81

82
    assignment_service: AssignmentService
1✔
83
    counting_service: CountingService
1✔
84

85
    def __init__(self) -> None:
1✔
86
        self.lambda_running_versions = {}
1✔
87
        self.lambda_starting_versions = {}
1✔
88
        self.event_managers = {}
1✔
89
        self.lambda_version_manager_lock = RLock()
1✔
90
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
91
        self.assignment_service = AssignmentService()
1✔
92
        self.counting_service = CountingService()
1✔
93

94
    def stop(self) -> None:
1✔
95
        """
96
        Stop the whole lambda service
97
        """
98
        shutdown_futures = []
1✔
99
        for event_manager in self.event_managers.values():
1✔
100
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
101
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
102
        for version_manager in self.lambda_running_versions.values():
1✔
103
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
104
        for version_manager in self.lambda_starting_versions.values():
1✔
105
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
106
            shutdown_futures.append(
×
107
                self.task_executor.submit(
108
                    version_manager.function_version.config.code.destroy_cached
109
                )
110
            )
111
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
112
        if not_done:
1✔
113
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
114
        self.task_executor.shutdown(cancel_futures=True)
1✔
115
        self.assignment_service.stop()
1✔
116

117
    def stop_version(self, qualified_arn: str) -> None:
1✔
118
        """
119
        Stops a specific lambda service version
120
        :param qualified_arn: Qualified arn for the version to stop
121
        """
122
        LOG.debug("Stopping version %s", qualified_arn)
1✔
123
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
124
        if not event_manager:
1✔
125
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
126
        else:
127
            self.task_executor.submit(event_manager.stop)
1✔
128
        version_manager = self.lambda_running_versions.pop(
1✔
129
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
130
        )
131
        if not version_manager:
1✔
132
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
133
        self.task_executor.submit(version_manager.stop)
1✔
134
        lambda_hooks.delete_function_version.run(qualified_arn)
1✔
135

136
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
137
        """
138
        Get the lambda version for the given arn
139
        :param function_arn: qualified arn for the lambda version
140
        :return: LambdaVersionManager for the arn
141
        """
142
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
143
        if not version_manager:
1✔
144
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
145

146
        return version_manager
1✔
147

148
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
149
        """
150
        Get the lambda event manager for the given arn
151
        :param function_arn: qualified arn for the lambda version
152
        :return: LambdaEventManager for the arn
153
        """
154
        event_manager = self.event_managers.get(function_arn)
1✔
155
        if not event_manager:
1✔
156
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
157

158
        return event_manager
1✔
159

160
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
161
        new_state = version_manager.start()
1✔
162
        self.update_version_state(
1✔
163
            function_version=version_manager.function_version, new_state=new_state
164
        )
165

166
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
167
        """
168
        Creates a new function version (manager), and puts it in the startup dict
169

170
        :param function_version: Function Version to create
171
        """
172
        with self.lambda_version_manager_lock:
1✔
173
            qualified_arn = function_version.id.qualified_arn()
1✔
174
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
175
            if version_manager:
1✔
176
                raise ResourceConflictException(
1✔
177
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
178
                    Type="User",
179
                )
180
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
181
            fn = state.functions.get(function_version.id.function_name)
1✔
182
            version_manager = LambdaVersionManager(
1✔
183
                function_arn=qualified_arn,
184
                function_version=function_version,
185
                function=fn,
186
                counting_service=self.counting_service,
187
                assignment_service=self.assignment_service,
188
            )
189
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
190
            lambda_hooks.create_function_version.run(function_version.qualified_arn)
1✔
191
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
192

193
    def publish_version(self, function_version: FunctionVersion):
1✔
194
        """
195
        Synchronously create a function version (manager)
196
        Should only be called on publishing new versions, which basically clone an existing one.
197
        The new version needs to be added to the lambda store before invoking this.
198
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
199
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
200

201
        :param function_version: Function Version to create
202
        """
203
        with self.lambda_version_manager_lock:
1✔
204
            qualified_arn = function_version.id.qualified_arn()
1✔
205
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
206
            if version_manager:
1✔
207
                raise Exception(
×
208
                    "Version '%s' already starting up and in state %s",
209
                    qualified_arn,
210
                    version_manager.state,
211
                )
212
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
213
            fn = state.functions.get(function_version.id.function_name)
1✔
214
            version_manager = LambdaVersionManager(
1✔
215
                function_arn=qualified_arn,
216
                function_version=function_version,
217
                function=fn,
218
                counting_service=self.counting_service,
219
                assignment_service=self.assignment_service,
220
            )
221
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
222
        self._start_lambda_version(version_manager)
1✔
223

224
    # Commands
225
    def invoke(
1✔
226
        self,
227
        function_name: str,
228
        qualifier: str,
229
        region: str,
230
        account_id: str,
231
        invocation_type: InvocationType | None,
232
        client_context: str | None,
233
        request_id: str,
234
        payload: bytes | None,
235
        trace_context: dict | None = None,
236
        user_agent: str | None = None,
237
    ) -> InvocationResult | None:
238
        """
239
        Invokes a specific version of a lambda
240

241
        :param request_id: context request ID
242
        :param function_name: Function name
243
        :param qualifier: Function version qualifier
244
        :param region: Region of the function
245
        :param account_id: Account id of the function
246
        :param invocation_type: Invocation Type
247
        :param client_context: Client Context, if applicable
248
        :param trace_context: tracing information such as X-Ray header
249
        :param payload: Invocation payload
250
        :return: The invocation result
251
        """
252
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
253
        trace_context = trace_context or {}
1✔
254
        # Invoked arn (for lambda context) does not have qualifier if not supplied
255
        invoked_arn = lambda_arn(
1✔
256
            function_name=function_name,
257
            qualifier=qualifier,
258
            account=account_id,
259
            region=region,
260
        )
261
        qualifier = qualifier or "$LATEST"
1✔
262
        state = lambda_stores[account_id][region]
1✔
263
        function = state.functions.get(function_name)
1✔
264

265
        if function is None:
1✔
266
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
267

268
        if qualifier_is_alias(qualifier):
1✔
269
            alias = function.aliases.get(qualifier)
1✔
270
            if not alias:
1✔
271
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
272
            version_qualifier = alias.function_version
1✔
273
            if alias.routing_configuration:
1✔
274
                version, probability = next(
1✔
275
                    iter(alias.routing_configuration.version_weights.items())
276
                )
277
                if random.random() < probability:
1✔
278
                    version_qualifier = version
1✔
279
        else:
280
            version_qualifier = qualifier
1✔
281

282
        # Need the qualified arn to exactly get the target lambda
283
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
284
        version = function.versions.get(version_qualifier)
1✔
285
        runtime = version.config.runtime or "n/a"
1✔
286
        package_type = version.config.package_type
1✔
287
        try:
1✔
288
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
289
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
290
        except ValueError as e:
×
291
            state = version and version.config.state.state
×
292
            if state == State.Failed:
×
293
                status = FunctionStatus.failed_state_error
×
294
                HINT_LOG.error(
×
295
                    f"Failed to create the runtime executor for the function {function_name}. "
296
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
297
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
298
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
299
                )
300
            elif state == State.Pending:
×
301
                status = FunctionStatus.pending_state_error
×
302
                HINT_LOG.warning(
×
303
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
304
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
305
                    "Pending to Active using: "
306
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
307
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
308
                )
309
            else:
310
                status = FunctionStatus.unhandled_state_error
×
311
                LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
×
312
            function_counter.labels(
×
313
                operation=FunctionOperation.invoke,
314
                runtime=runtime,
315
                status=status,
316
                invocation_type=invocation_type,
317
                package_type=package_type,
318
            ).increment()
319
            raise ResourceConflictException(
×
320
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
321
            ) from e
322
        # empty payloads have to work as well
323
        if payload is None:
1✔
324
            payload = b"{}"
1✔
325
        else:
326
            # detect invalid payloads early before creating an execution environment
327
            try:
1✔
328
                to_str(payload)
1✔
329
            except Exception as e:
1✔
330
                function_counter.labels(
1✔
331
                    operation=FunctionOperation.invoke,
332
                    runtime=runtime,
333
                    status=FunctionStatus.invalid_payload_error,
334
                    invocation_type=invocation_type,
335
                    package_type=package_type,
336
                ).increment()
337
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
338
                raise InvalidRequestContentException(
1✔
339
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
340
                    Type="User",
341
                )
342
        if invocation_type is None:
1✔
343
            invocation_type = InvocationType.RequestResponse
1✔
344
        if invocation_type == InvocationType.DryRun:
1✔
345
            return None
1✔
346
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
347
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
348
        #
349
        if invocation_type == InvocationType.Event:
1✔
350
            return event_manager.enqueue_event(
1✔
351
                invocation=Invocation(
352
                    payload=payload,
353
                    invoked_arn=invoked_arn,
354
                    client_context=client_context,
355
                    invocation_type=invocation_type,
356
                    invoke_time=datetime.now(),
357
                    request_id=request_id,
358
                    trace_context=trace_context,
359
                    user_agent=user_agent,
360
                )
361
            )
362

363
        invocation_result = version_manager.invoke(
1✔
364
            invocation=Invocation(
365
                payload=payload,
366
                invoked_arn=invoked_arn,
367
                client_context=client_context,
368
                invocation_type=invocation_type,
369
                invoke_time=datetime.now(),
370
                request_id=request_id,
371
                trace_context=trace_context,
372
                user_agent=user_agent,
373
            )
374
        )
375
        status = (
1✔
376
            FunctionStatus.invocation_error
377
            if invocation_result.is_error
378
            else FunctionStatus.success
379
        )
380
        function_counter.labels(
1✔
381
            operation=FunctionOperation.invoke,
382
            runtime=runtime,
383
            status=status,
384
            invocation_type=invocation_type,
385
            package_type=package_type,
386
        ).increment()
387
        return invocation_result
1✔
388

389
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
390
        """
391
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
392
        to be invoked
393

394
        :param new_version: New version (with the same qualifier as an older one)
395
        """
396
        if new_version.qualified_arn not in self.lambda_running_versions:
1✔
397
            raise ValueError(
×
398
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
399
            )
400

401
        return self.create_function_version(function_version=new_version)
1✔
402

403
    def update_version_state(
1✔
404
        self, function_version: FunctionVersion, new_state: VersionState
405
    ) -> None:
406
        """
407
        Update the version state for the given function version.
408

409
        This will perform a rollover to the given function if the new state is active and there is a previously
410
        running version registered. The old version will be shutdown and its code deleted.
411

412
        If the new state is failed, it will abort the update and mark it as failed.
413
        If an older version is still running, it will keep running.
414

415
        :param function_version: Version reporting the state
416
        :param new_state: New state
417
        """
418
        function_arn = function_version.qualified_arn
1✔
419
        try:
1✔
420
            old_version = None
1✔
421
            old_event_manager = None
1✔
422
            with self.lambda_version_manager_lock:
1✔
423
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
424
                if not new_version_manager:
1✔
425
                    raise ValueError(
×
426
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
427
                    )
428
                if new_state.state == State.Active:
1✔
429
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
430
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
431
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
432
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
433
                        version_manager=new_version_manager
434
                    )
435
                    self.event_managers[function_arn].start()
1✔
436
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
437
                elif new_state.state == State.Failed:
×
438
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
439
                    self.task_executor.submit(new_version_manager.stop)
×
440
                else:
441
                    # TODO what to do if state pending or inactive is supported?
442
                    self.task_executor.submit(new_version_manager.stop)
×
443
                    LOG.error(
×
444
                        "State %s for version %s should not have been reported. New version will be stopped.",
445
                        new_state,
446
                        function_arn,
447
                    )
448
                    return
×
449

450
            # TODO is it necessary to get the version again? Should be locked for modification anyway
451
            # Without updating the new state, the function would not change to active, last_update would be missing, and
452
            # the revision id would not be updated.
453
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
454
            # FIXME this will fail if the function is deleted during this code lines here
455
            function = state.functions.get(function_version.id.function_name)
1✔
456
            if old_event_manager:
1✔
457
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
458
            if old_version:
1✔
459
                # if there is an old version, we assume it is an update, and stop the old one
460
                self.task_executor.submit(old_version.stop)
1✔
461
                if function:
1✔
462
                    self.task_executor.submit(
1✔
463
                        destroy_code_if_not_used, old_version.function_version.config.code, function
464
                    )
465
            if not function:
1✔
466
                LOG.debug("Function %s was deleted during status update", function_arn)
×
467
                return
×
468
            current_version = function.versions[function_version.id.qualifier]
1✔
469
            new_version_manager.state = new_state
1✔
470
            new_version_state = dataclasses.replace(
1✔
471
                current_version,
472
                config=dataclasses.replace(
473
                    current_version.config, state=new_state, last_update=update_status
474
                ),
475
            )
476
            state.functions[function_version.id.function_name].versions[
1✔
477
                function_version.id.qualifier
478
            ] = new_version_state
479

480
        except Exception:
×
481
            LOG.error(
×
482
                "Failed to update function version for arn %s",
483
                function_arn,
484
                exc_info=LOG.isEnabledFor(logging.DEBUG),
485
            )
486

487
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
488
        # if pointer changed, need to restart provisioned
489
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
490
            old_alias.name
491
        )
492
        if (
1✔
493
            old_alias.function_version != new_alias.function_version
494
            and provisioned_concurrency_config is not None
495
        ):
496
            LOG.warning("Deprovisioning")
×
UNCOV
497
            fn_version_old = function.versions.get(old_alias.function_version)
×
UNCOV
498
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
UNCOV
499
            fn_version_new = function.versions.get(new_alias.function_version)
×
500
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
501

502
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
503
            # TODO: make this fully async
UNCOV
504
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
UNCOV
505
            vm_new.update_provisioned_concurrency_config(
×
506
                provisioned_concurrency_config.provisioned_concurrent_executions
507
            )  # async again
508

509
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
510
        """
511
        Checks whether lambda can assume the given role.
512
        This _should_ only fail if IAM enforcement is enabled.
513

514
        :param role_arn: Role to assume
515
        :return: True if the role can be assumed by lambda, false otherwise
516
        """
517
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
518
        try:
1✔
519
            sts_client.assume_role(
1✔
520
                RoleArn=role_arn,
521
                RoleSessionName=f"test-assume-{short_uid()}",
522
                DurationSeconds=900,
523
            )
524
            return True
1✔
UNCOV
525
        except Exception as e:
×
UNCOV
526
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
UNCOV
527
            return False
×
528

529

530
# TODO: Move helper functions out of lambda_service into a separate module
531

532

533
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
534
    """
535
    Check if given code is still used in some version of the function
536

537
    :param code: Code object
538
    :param function: function to check
539
    :return: bool whether code is used in another version of the function
540
    """
541
    with function.lock:
1✔
542
        return any(code == version.config.code for version in function.versions.values())
1✔
543

544

545
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
546
    """
547
    Destroy the given code if it is not used in some version of the function
548
    Do nothing otherwise
549

550
    :param code: Code object
551
    :param function: Function the code belongs too
552
    """
553
    with function.lock:
1✔
554
        if not is_code_used(code, function):
1✔
555
            code.destroy()
1✔
556

557

558
def store_lambda_archive(
1✔
559
    archive_file: bytes, function_name: str, region_name: str, account_id: str
560
) -> S3Code:
561
    """
562
    Stores the given lambda archive in an internal s3 bucket.
563
    Also checks if zipfile matches the specifications
564

565
    :param archive_file: Archive file to store
566
    :param function_name: function name the archive should be stored for
567
    :param region_name: region name the archive should be stored for
568
    :param account_id: account id the archive should be stored for
569
    :return: S3 Code object representing the archive stored in S3
570
    """
571
    # check if zip file
572
    if not is_zip_file(archive_file):
1✔
573
        raise InvalidParameterValueException(
1✔
574
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
575
            Type="User",
576
        )
577
    # check unzipped size
578
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
579
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
580
        raise InvalidParameterValueException(
1✔
581
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
582
            Type="User",
583
        )
584
    # store all buckets in us-east-1 for now
585
    s3_client = connect_to(
1✔
586
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
587
    ).s3
588
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
589
    # s3 create bucket is idempotent in us-east-1
590
    s3_client.create_bucket(Bucket=bucket_name)
1✔
591
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
592
    key = f"snapshots/{account_id}/{code_id}"
1✔
593
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
594
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
595
    return S3Code(
1✔
596
        id=code_id,
597
        account_id=account_id,
598
        s3_bucket=bucket_name,
599
        s3_key=key,
600
        s3_object_version=None,
601
        code_sha256=code_sha256,
602
        code_size=len(archive_file),
603
    )
604

605

606
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
607
    """
608
    Check whether a given path, after environment variable substitution, is an absolute path.
609
    Accepts either posix or windows paths, with environment placeholders.
610
    Example placeholders: $ENV_VAR, ${ENV_VAR}
611

612
    :param path: Posix or windows path, potentially containing environment variable placeholders.
613
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
614
    """
615
    # expand variables in path before checking for an absolute path
616
    expanded_path = os.path.expandvars(path)
1✔
617
    if (
1✔
618
        not PurePosixPath(expanded_path).is_absolute()
619
        and not PureWindowsPath(expanded_path).is_absolute()
620
    ):
621
        raise InvalidParameterValueException(
1✔
622
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
623
        )
624

625

626
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
627
    assert_hot_reloading_path_absolute(path)
1✔
628
    return HotReloadingCode(host_path=path)
1✔
629

630

631
def store_s3_bucket_archive(
1✔
632
    archive_bucket: str,
633
    archive_key: str,
634
    archive_version: str | None,
635
    function_name: str,
636
    region_name: str,
637
    account_id: str,
638
) -> ArchiveCode:
639
    """
640
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
641

642
    :param archive_bucket: Bucket the archive is stored in
643
    :param archive_key: Key the archive is stored under
644
    :param archive_version: Version of the archive object in the bucket
645
    :param function_name: function name the archive should be stored for
646
    :param region_name: region name the archive should be stored for
647
    :param account_id: account id the archive should be stored for
648
    :return: S3 Code object representing the archive stored in S3
649
    """
650
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
651
        hotreload_counter.labels(operation="create").increment()
1✔
652
        return create_hot_reloading_code(path=archive_key)
1✔
653
    s3_client: S3Client = connect_to().s3
1✔
654
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
655
    try:
1✔
656
        archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
657
            "Body"
658
        ].read()
659
    except s3_client.exceptions.ClientError as e:
1✔
660
        raise InvalidParameterValueException(
1✔
661
            f"Error occurred while GetObject. S3 Error Code: {e.response['Error']['Code']}. S3 Error Message: {e.response['Error']['Message']}",
662
            Type="User",
663
        )
664
    return store_lambda_archive(
1✔
665
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
666
    )
667

668

669
def create_image_code(image_uri: str) -> ImageCode:
1✔
670
    """
671
    Creates an image code by inspecting the provided image
672

673
    :param image_uri: Image URI of the image to inspect
674
    :return: Image code object
675
    """
676
    code_sha256 = "<cannot-get-image-hash>"
1✔
677
    if CONTAINER_CLIENT.has_docker():
1✔
678
        try:
1✔
679
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
680
        except ContainerException:
1✔
681
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
682
        try:
1✔
683
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
684
                0
685
            ].rpartition(":")[2]
UNCOV
686
        except Exception as e:
×
687
            LOG.debug(
×
688
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
689
            )
690
    else:
UNCOV
691
        LOG.warning(
×
692
            "Unable to get image hash for image %s - no docker socket available."
693
            "Image hash returned by Lambda will not be correct.",
694
            image_uri,
695
        )
696
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc