• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / cfeb9bc5-3a9b-4b5a-b396-a83129ef6e11

14 Mar 2025 12:28PM UTC coverage: 86.93% (-0.03%) from 86.958%
cfeb9bc5-3a9b-4b5a-b396-a83129ef6e11

push

circleci

web-flow
Docker Utils: Expose the build logs (#12376)

6 of 7 new or added lines in 2 files covered. (85.71%)

82 existing lines in 12 files now uncovered.

62300 of 71667 relevant lines covered (86.93%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.15
/localstack-core/localstack/services/lambda_/invocation/lambda_service.py
1
import base64
1✔
2
import concurrent.futures
1✔
3
import dataclasses
1✔
4
import io
1✔
5
import logging
1✔
6
import os.path
1✔
7
import random
1✔
8
import uuid
1✔
9
from concurrent.futures import Executor, Future, ThreadPoolExecutor
1✔
10
from datetime import datetime
1✔
11
from hashlib import sha256
1✔
12
from pathlib import PurePosixPath, PureWindowsPath
1✔
13
from threading import RLock
1✔
14
from typing import TYPE_CHECKING, Optional
1✔
15

16
from localstack import config
1✔
17
from localstack.aws.api.lambda_ import (
1✔
18
    InvalidParameterValueException,
19
    InvalidRequestContentException,
20
    InvocationType,
21
    LastUpdateStatus,
22
    ResourceConflictException,
23
    ResourceNotFoundException,
24
    State,
25
)
26
from localstack.aws.connect import connect_to
1✔
27
from localstack.constants import AWS_REGION_US_EAST_1
1✔
28
from localstack.services.lambda_ import usage
1✔
29
from localstack.services.lambda_.api_utils import (
1✔
30
    lambda_arn,
31
    qualified_lambda_arn,
32
    qualifier_is_alias,
33
)
34
from localstack.services.lambda_.invocation.assignment import AssignmentService
1✔
35
from localstack.services.lambda_.invocation.counting_service import CountingService
1✔
36
from localstack.services.lambda_.invocation.event_manager import LambdaEventManager
1✔
37
from localstack.services.lambda_.invocation.lambda_models import (
1✔
38
    ArchiveCode,
39
    Function,
40
    FunctionVersion,
41
    HotReloadingCode,
42
    ImageCode,
43
    Invocation,
44
    InvocationResult,
45
    S3Code,
46
    UpdateStatus,
47
    VersionAlias,
48
    VersionState,
49
)
50
from localstack.services.lambda_.invocation.models import lambda_stores
1✔
51
from localstack.services.lambda_.invocation.version_manager import LambdaVersionManager
1✔
52
from localstack.services.lambda_.lambda_utils import HINT_LOG
1✔
53
from localstack.utils.archives import get_unzipped_size, is_zip_file
1✔
54
from localstack.utils.container_utils.container_client import ContainerException
1✔
55
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
1✔
56
from localstack.utils.strings import short_uid, to_str
1✔
57

58
if TYPE_CHECKING:
1✔
59
    from mypy_boto3_s3 import S3Client
×
60

61
LOG = logging.getLogger(__name__)
1✔
62

63
LAMBDA_DEFAULT_TIMEOUT_SECONDS = 3
1✔
64
LAMBDA_DEFAULT_MEMORY_SIZE = 128
1✔
65

66

67
class LambdaService:
1✔
68
    # mapping from qualified ARN to version manager
69
    lambda_running_versions: dict[str, LambdaVersionManager]
1✔
70
    lambda_starting_versions: dict[str, LambdaVersionManager]
1✔
71
    # mapping from qualified ARN to event manager
72
    event_managers = dict[str, LambdaEventManager]
1✔
73
    lambda_version_manager_lock: RLock
1✔
74
    task_executor: Executor
1✔
75

76
    assignment_service: AssignmentService
1✔
77
    counting_service: CountingService
1✔
78

79
    def __init__(self) -> None:
1✔
80
        self.lambda_running_versions = {}
1✔
81
        self.lambda_starting_versions = {}
1✔
82
        self.event_managers = {}
1✔
83
        self.lambda_version_manager_lock = RLock()
1✔
84
        self.task_executor = ThreadPoolExecutor(thread_name_prefix="lambda-service-task")
1✔
85
        self.assignment_service = AssignmentService()
1✔
86
        self.counting_service = CountingService()
1✔
87

88
    def stop(self) -> None:
1✔
89
        """
90
        Stop the whole lambda service
91
        """
92
        shutdown_futures = []
1✔
93
        for event_manager in self.event_managers.values():
1✔
94
            shutdown_futures.append(self.task_executor.submit(event_manager.stop))
1✔
95
        # TODO: switch shutdown order? yes, shutdown starting versions before the running versions would make more sense
96
        for version_manager in self.lambda_running_versions.values():
1✔
97
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
1✔
98
        for version_manager in self.lambda_starting_versions.values():
1✔
99
            shutdown_futures.append(self.task_executor.submit(version_manager.stop))
×
100
            shutdown_futures.append(
×
101
                self.task_executor.submit(
102
                    version_manager.function_version.config.code.destroy_cached
103
                )
104
            )
105
        _, not_done = concurrent.futures.wait(shutdown_futures, timeout=5)
1✔
106
        if not_done:
1✔
107
            LOG.debug("Shutdown not complete, missing threads: %s", not_done)
×
108
        self.task_executor.shutdown(cancel_futures=True)
1✔
109
        self.assignment_service.stop()
1✔
110

111
    def stop_version(self, qualified_arn: str) -> None:
1✔
112
        """
113
        Stops a specific lambda service version
114
        :param qualified_arn: Qualified arn for the version to stop
115
        """
116
        LOG.debug("Stopping version %s", qualified_arn)
1✔
117
        event_manager = self.event_managers.pop(qualified_arn, None)
1✔
118
        if not event_manager:
1✔
119
            LOG.debug("Could not find event manager to stop for function %s...", qualified_arn)
×
120
        else:
121
            self.task_executor.submit(event_manager.stop)
1✔
122
        version_manager = self.lambda_running_versions.pop(
1✔
123
            qualified_arn, self.lambda_starting_versions.pop(qualified_arn, None)
124
        )
125
        if not version_manager:
1✔
126
            raise ValueError(f"Unable to find version manager for {qualified_arn}")
×
127
        self.task_executor.submit(version_manager.stop)
1✔
128

129
    def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
1✔
130
        """
131
        Get the lambda version for the given arn
132
        :param function_arn: qualified arn for the lambda version
133
        :return: LambdaVersionManager for the arn
134
        """
135
        version_manager = self.lambda_running_versions.get(function_arn)
1✔
136
        if not version_manager:
1✔
137
            raise ValueError(f"Could not find version '{function_arn}'. Is it created?")
×
138

139
        return version_manager
1✔
140

141
    def get_lambda_event_manager(self, function_arn: str) -> LambdaEventManager:
1✔
142
        """
143
        Get the lambda event manager for the given arn
144
        :param function_arn: qualified arn for the lambda version
145
        :return: LambdaEventManager for the arn
146
        """
147
        event_manager = self.event_managers.get(function_arn)
1✔
148
        if not event_manager:
1✔
149
            raise ValueError(f"Could not find event manager '{function_arn}'. Is it created?")
×
150

151
        return event_manager
1✔
152

153
    def _start_lambda_version(self, version_manager: LambdaVersionManager) -> None:
1✔
154
        new_state = version_manager.start()
1✔
155
        self.update_version_state(
1✔
156
            function_version=version_manager.function_version, new_state=new_state
157
        )
158

159
    def create_function_version(self, function_version: FunctionVersion) -> Future[None]:
1✔
160
        """
161
        Creates a new function version (manager), and puts it in the startup dict
162

163
        :param function_version: Function Version to create
164
        """
165
        with self.lambda_version_manager_lock:
1✔
166
            qualified_arn = function_version.id.qualified_arn()
1✔
167
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
168
            if version_manager:
1✔
169
                raise ResourceConflictException(
1✔
170
                    f"The operation cannot be performed at this time. An update is in progress for resource: {function_version.id.unqualified_arn()}",
171
                    Type="User",
172
                )
173
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
174
            fn = state.functions.get(function_version.id.function_name)
1✔
175
            version_manager = LambdaVersionManager(
1✔
176
                function_arn=qualified_arn,
177
                function_version=function_version,
178
                function=fn,
179
                counting_service=self.counting_service,
180
                assignment_service=self.assignment_service,
181
            )
182
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
183
        return self.task_executor.submit(self._start_lambda_version, version_manager)
1✔
184

185
    def publish_version(self, function_version: FunctionVersion):
1✔
186
        """
187
        Synchronously create a function version (manager)
188
        Should only be called on publishing new versions, which basically clone an existing one.
189
        The new version needs to be added to the lambda store before invoking this.
190
        After successful completion of this method, the lambda version stored will be modified to be active, with a new revision id.
191
        It will then be active for execution, and should be retrieved again from the store before returning the data over the API.
192

193
        :param function_version: Function Version to create
194
        """
195
        with self.lambda_version_manager_lock:
1✔
196
            qualified_arn = function_version.id.qualified_arn()
1✔
197
            version_manager = self.lambda_starting_versions.get(qualified_arn)
1✔
198
            if version_manager:
1✔
199
                raise Exception(
×
200
                    "Version '%s' already starting up and in state %s",
201
                    qualified_arn,
202
                    version_manager.state,
203
                )
204
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
205
            fn = state.functions.get(function_version.id.function_name)
1✔
206
            version_manager = LambdaVersionManager(
1✔
207
                function_arn=qualified_arn,
208
                function_version=function_version,
209
                function=fn,
210
                counting_service=self.counting_service,
211
                assignment_service=self.assignment_service,
212
            )
213
            self.lambda_starting_versions[qualified_arn] = version_manager
1✔
214
        self._start_lambda_version(version_manager)
1✔
215

216
    # Commands
217
    def invoke(
1✔
218
        self,
219
        function_name: str,
220
        qualifier: str,
221
        region: str,
222
        account_id: str,
223
        invocation_type: InvocationType | None,
224
        client_context: str | None,
225
        request_id: str,
226
        payload: bytes | None,
227
        trace_context: dict | None = None,
228
    ) -> InvocationResult | None:
229
        """
230
        Invokes a specific version of a lambda
231

232
        :param request_id: context request ID
233
        :param function_name: Function name
234
        :param qualifier: Function version qualifier
235
        :param region: Region of the function
236
        :param account_id: Account id of the function
237
        :param invocation_type: Invocation Type
238
        :param client_context: Client Context, if applicable
239
        :param trace_context: tracing information such as X-Ray header
240
        :param payload: Invocation payload
241
        :return: The invocation result
242
        """
243
        # NOTE: consider making the trace_context mandatory once we update all usages (should be easier after v4.0)
244
        trace_context = trace_context or {}
1✔
245
        # Invoked arn (for lambda context) does not have qualifier if not supplied
246
        invoked_arn = lambda_arn(
1✔
247
            function_name=function_name,
248
            qualifier=qualifier,
249
            account=account_id,
250
            region=region,
251
        )
252
        qualifier = qualifier or "$LATEST"
1✔
253
        state = lambda_stores[account_id][region]
1✔
254
        function = state.functions.get(function_name)
1✔
255

256
        if function is None:
1✔
257
            raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
258

259
        if qualifier_is_alias(qualifier):
1✔
260
            alias = function.aliases.get(qualifier)
1✔
261
            if not alias:
1✔
262
                raise ResourceNotFoundException(f"Function not found: {invoked_arn}", Type="User")
1✔
263
            version_qualifier = alias.function_version
1✔
264
            if alias.routing_configuration:
1✔
265
                version, probability = next(
1✔
266
                    iter(alias.routing_configuration.version_weights.items())
267
                )
268
                if random.random() < probability:
1✔
269
                    version_qualifier = version
1✔
270
        else:
271
            version_qualifier = qualifier
1✔
272

273
        # Need the qualified arn to exactly get the target lambda
274
        qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
1✔
275
        try:
1✔
276
            version_manager = self.get_lambda_version_manager(qualified_arn)
1✔
277
            event_manager = self.get_lambda_event_manager(qualified_arn)
1✔
278
            usage.runtime.record(version_manager.function_version.config.runtime)
1✔
279
        except ValueError as e:
×
280
            version = function.versions.get(version_qualifier)
×
281
            state = version and version.config.state.state
×
282
            # TODO: make such developer hints optional or remove after initial v2 transition period
283
            if state == State.Failed:
×
284
                HINT_LOG.error(
×
285
                    f"Failed to create the runtime executor for the function {function_name}. "
286
                    "Please ensure that Docker is available in the LocalStack container by adding the volume mount "
287
                    '"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
288
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
289
                )
290
            elif state == State.Pending:
×
291
                HINT_LOG.warning(
×
292
                    "Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
293
                    f"Before invoking {function_name}, please wait until the function transitioned from the state "
294
                    "Pending to Active using: "
295
                    f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
296
                    "Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
297
                )
298
            raise ResourceConflictException(
×
299
                f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
300
            ) from e
301
        # empty payloads have to work as well
302
        if payload is None:
1✔
303
            payload = b"{}"
1✔
304
        else:
305
            # detect invalid payloads early before creating an execution environment
306
            try:
1✔
307
                to_str(payload)
1✔
308
            except Exception as e:
1✔
309
                # MAYBE: improve parity of detailed exception message (quite cumbersome)
310
                raise InvalidRequestContentException(
1✔
311
                    f"Could not parse request body into json: Could not parse payload into json: {e}",
312
                    Type="User",
313
                )
314
        if invocation_type is None:
1✔
315
            invocation_type = InvocationType.RequestResponse
1✔
316
        if invocation_type == InvocationType.DryRun:
1✔
317
            return None
1✔
318
        # TODO payload verification  An error occurred (InvalidRequestContentException) when calling the Invoke operation: Could not parse request body into json: Could not parse payload into json: Unexpected character (''' (code 39)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
319
        #  at [Source: (byte[])"'test'"; line: 1, column: 2]
320
        #
321
        if invocation_type == InvocationType.Event:
1✔
322
            return event_manager.enqueue_event(
1✔
323
                invocation=Invocation(
324
                    payload=payload,
325
                    invoked_arn=invoked_arn,
326
                    client_context=client_context,
327
                    invocation_type=invocation_type,
328
                    invoke_time=datetime.now(),
329
                    request_id=request_id,
330
                    trace_context=trace_context,
331
                )
332
            )
333

334
        return version_manager.invoke(
1✔
335
            invocation=Invocation(
336
                payload=payload,
337
                invoked_arn=invoked_arn,
338
                client_context=client_context,
339
                invocation_type=invocation_type,
340
                invoke_time=datetime.now(),
341
                request_id=request_id,
342
                trace_context=trace_context,
343
            )
344
        )
345

346
    def update_version(self, new_version: FunctionVersion) -> Future[None]:
1✔
347
        """
348
        Updates a given version. Will perform a rollover, so the old version will be active until the new one is ready
349
        to be invoked
350

351
        :param new_version: New version (with the same qualifier as an older one)
352
        """
353
        if new_version.qualified_arn not in self.lambda_running_versions:
1✔
354
            raise ValueError(
×
355
                f"Version {new_version.qualified_arn} cannot be updated if an old one is not running"
356
            )
357

358
        return self.create_function_version(function_version=new_version)
1✔
359

360
    def update_version_state(
1✔
361
        self, function_version: FunctionVersion, new_state: VersionState
362
    ) -> None:
363
        """
364
        Update the version state for the given function version.
365

366
        This will perform a rollover to the given function if the new state is active and there is a previously
367
        running version registered. The old version will be shutdown and its code deleted.
368

369
        If the new state is failed, it will abort the update and mark it as failed.
370
        If an older version is still running, it will keep running.
371

372
        :param function_version: Version reporting the state
373
        :param new_state: New state
374
        """
375
        function_arn = function_version.qualified_arn
1✔
376
        try:
1✔
377
            old_version = None
1✔
378
            old_event_manager = None
1✔
379
            with self.lambda_version_manager_lock:
1✔
380
                new_version_manager = self.lambda_starting_versions.pop(function_arn)
1✔
381
                if not new_version_manager:
1✔
382
                    raise ValueError(
×
383
                        f"Version {function_arn} reporting state {new_state.state} does exist in the starting versions."
384
                    )
385
                if new_state.state == State.Active:
1✔
386
                    old_version = self.lambda_running_versions.get(function_arn, None)
1✔
387
                    old_event_manager = self.event_managers.get(function_arn, None)
1✔
388
                    self.lambda_running_versions[function_arn] = new_version_manager
1✔
389
                    self.event_managers[function_arn] = LambdaEventManager(
1✔
390
                        version_manager=new_version_manager
391
                    )
392
                    self.event_managers[function_arn].start()
1✔
393
                    update_status = UpdateStatus(status=LastUpdateStatus.Successful)
1✔
394
                elif new_state.state == State.Failed:
×
395
                    update_status = UpdateStatus(status=LastUpdateStatus.Failed)
×
396
                    self.task_executor.submit(new_version_manager.stop)
×
397
                else:
398
                    # TODO what to do if state pending or inactive is supported?
399
                    self.task_executor.submit(new_version_manager.stop)
×
400
                    LOG.error(
×
401
                        "State %s for version %s should not have been reported. New version will be stopped.",
402
                        new_state,
403
                        function_arn,
404
                    )
405
                    return
×
406

407
            # TODO is it necessary to get the version again? Should be locked for modification anyway
408
            # Without updating the new state, the function would not change to active, last_update would be missing, and
409
            # the revision id would not be updated.
410
            state = lambda_stores[function_version.id.account][function_version.id.region]
1✔
411
            # FIXME this will fail if the function is deleted during this code lines here
412
            function = state.functions.get(function_version.id.function_name)
1✔
413
            if old_event_manager:
1✔
414
                self.task_executor.submit(old_event_manager.stop_for_update)
1✔
415
            if old_version:
1✔
416
                # if there is an old version, we assume it is an update, and stop the old one
417
                self.task_executor.submit(old_version.stop)
1✔
418
                if function:
1✔
419
                    self.task_executor.submit(
1✔
420
                        destroy_code_if_not_used, old_version.function_version.config.code, function
421
                    )
422
            if not function:
1✔
423
                LOG.debug("Function %s was deleted during status update", function_arn)
×
424
                return
×
425
            current_version = function.versions[function_version.id.qualifier]
1✔
426
            new_version_manager.state = new_state
1✔
427
            new_version_state = dataclasses.replace(
1✔
428
                current_version,
429
                config=dataclasses.replace(
430
                    current_version.config, state=new_state, last_update=update_status
431
                ),
432
            )
433
            state.functions[function_version.id.function_name].versions[
1✔
434
                function_version.id.qualifier
435
            ] = new_version_state
436

UNCOV
437
        except Exception:
×
UNCOV
438
            LOG.exception("Failed to update function version for arn %s", function_arn)
×
439

440
    def update_alias(self, old_alias: VersionAlias, new_alias: VersionAlias, function: Function):
1✔
441
        # if pointer changed, need to restart provisioned
442
        provisioned_concurrency_config = function.provisioned_concurrency_configs.get(
1✔
443
            old_alias.name
444
        )
445
        if (
1✔
446
            old_alias.function_version != new_alias.function_version
447
            and provisioned_concurrency_config is not None
448
        ):
449
            LOG.warning("Deprovisioning")
×
450
            fn_version_old = function.versions.get(old_alias.function_version)
×
451
            vm_old = self.get_lambda_version_manager(function_arn=fn_version_old.qualified_arn)
×
452
            fn_version_new = function.versions.get(new_alias.function_version)
×
453
            vm_new = self.get_lambda_version_manager(function_arn=fn_version_new.qualified_arn)
×
454

455
            # TODO: we might need to pull provisioned concurrency state a bit more out of the version manager for get_provisioned_concurrency_config
456
            # TODO: make this fully async
457
            vm_old.update_provisioned_concurrency_config(0).result(timeout=4)  # sync
×
458
            vm_new.update_provisioned_concurrency_config(
×
459
                provisioned_concurrency_config.provisioned_concurrent_executions
460
            )  # async again
461

462
    def can_assume_role(self, role_arn: str, region: str) -> bool:
1✔
463
        """
464
        Checks whether lambda can assume the given role.
465
        This _should_ only fail if IAM enforcement is enabled.
466

467
        :param role_arn: Role to assume
468
        :return: True if the role can be assumed by lambda, false otherwise
469
        """
470
        sts_client = connect_to(region_name=region).sts.request_metadata(service_principal="lambda")
1✔
471
        try:
1✔
472
            sts_client.assume_role(
1✔
473
                RoleArn=role_arn,
474
                RoleSessionName=f"test-assume-{short_uid()}",
475
                DurationSeconds=900,
476
            )
477
            return True
1✔
478
        except Exception as e:
×
479
            LOG.debug("Cannot assume role %s: %s", role_arn, e)
×
480
            return False
×
481

482

483
# TODO: Move helper functions out of lambda_service into a separate module
484

485

486
def is_code_used(code: S3Code, function: Function) -> bool:
1✔
487
    """
488
    Check if given code is still used in some version of the function
489

490
    :param code: Code object
491
    :param function: function to check
492
    :return: bool whether code is used in another version of the function
493
    """
494
    with function.lock:
1✔
495
        return any(code == version.config.code for version in function.versions.values())
1✔
496

497

498
def destroy_code_if_not_used(code: S3Code, function: Function) -> None:
1✔
499
    """
500
    Destroy the given code if it is not used in some version of the function
501
    Do nothing otherwise
502

503
    :param code: Code object
504
    :param function: Function the code belongs too
505
    """
506
    with function.lock:
1✔
507
        if not is_code_used(code, function):
1✔
508
            code.destroy()
1✔
509

510

511
def store_lambda_archive(
1✔
512
    archive_file: bytes, function_name: str, region_name: str, account_id: str
513
) -> S3Code:
514
    """
515
    Stores the given lambda archive in an internal s3 bucket.
516
    Also checks if zipfile matches the specifications
517

518
    :param archive_file: Archive file to store
519
    :param function_name: function name the archive should be stored for
520
    :param region_name: region name the archive should be stored for
521
    :param account_id: account id the archive should be stored for
522
    :return: S3 Code object representing the archive stored in S3
523
    """
524
    # check if zip file
525
    if not is_zip_file(archive_file):
1✔
526
        raise InvalidParameterValueException(
1✔
527
            "Could not unzip uploaded file. Please check your file, then try to upload again.",
528
            Type="User",
529
        )
530
    # check unzipped size
531
    unzipped_size = get_unzipped_size(zip_file=io.BytesIO(archive_file))
1✔
532
    if unzipped_size >= config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED:
1✔
533
        raise InvalidParameterValueException(
1✔
534
            f"Unzipped size must be smaller than {config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED} bytes",
535
            Type="User",
536
        )
537
    # store all buckets in us-east-1 for now
538
    s3_client = connect_to(
1✔
539
        region_name=AWS_REGION_US_EAST_1, aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT
540
    ).s3
541
    bucket_name = f"awslambda-{region_name}-tasks"
1✔
542
    # s3 create bucket is idempotent in us-east-1
543
    s3_client.create_bucket(Bucket=bucket_name)
1✔
544
    code_id = f"{function_name}-{uuid.uuid4()}"
1✔
545
    key = f"snapshots/{account_id}/{code_id}"
1✔
546
    s3_client.upload_fileobj(Fileobj=io.BytesIO(archive_file), Bucket=bucket_name, Key=key)
1✔
547
    code_sha256 = to_str(base64.b64encode(sha256(archive_file).digest()))
1✔
548
    return S3Code(
1✔
549
        id=code_id,
550
        account_id=account_id,
551
        s3_bucket=bucket_name,
552
        s3_key=key,
553
        s3_object_version=None,
554
        code_sha256=code_sha256,
555
        code_size=len(archive_file),
556
    )
557

558

559
def assert_hot_reloading_path_absolute(path: str) -> None:
1✔
560
    """
561
    Check whether a given path, after environment variable substitution, is an absolute path.
562
    Accepts either posix or windows paths, with environment placeholders.
563
    Example placeholders: $ENV_VAR, ${ENV_VAR}
564

565
    :param path: Posix or windows path, potentially containing environment variable placeholders.
566
        Example: `$ENV_VAR/lambda/src` with `ENV_VAR=/home/user/test-repo` set.
567
    """
568
    # expand variables in path before checking for an absolute path
569
    expanded_path = os.path.expandvars(path)
1✔
570
    if (
1✔
571
        not PurePosixPath(expanded_path).is_absolute()
572
        and not PureWindowsPath(expanded_path).is_absolute()
573
    ):
574
        raise InvalidParameterValueException(
1✔
575
            f"When using hot reloading, the archive key has to be an absolute path! Your archive key: {path}",
576
        )
577

578

579
def create_hot_reloading_code(path: str) -> HotReloadingCode:
1✔
580
    assert_hot_reloading_path_absolute(path)
1✔
581
    return HotReloadingCode(host_path=path)
1✔
582

583

584
def store_s3_bucket_archive(
1✔
585
    archive_bucket: str,
586
    archive_key: str,
587
    archive_version: Optional[str],
588
    function_name: str,
589
    region_name: str,
590
    account_id: str,
591
) -> ArchiveCode:
592
    """
593
    Takes the lambda archive stored in the given bucket and stores it in an internal s3 bucket
594

595
    :param archive_bucket: Bucket the archive is stored in
596
    :param archive_key: Key the archive is stored under
597
    :param archive_version: Version of the archive object in the bucket
598
    :param function_name: function name the archive should be stored for
599
    :param region_name: region name the archive should be stored for
600
    :param account_id: account id the archive should be stored for
601
    :return: S3 Code object representing the archive stored in S3
602
    """
603
    if archive_bucket == config.BUCKET_MARKER_LOCAL:
1✔
604
        usage.hotreload.increment()
1✔
605
        return create_hot_reloading_code(path=archive_key)
1✔
606
    s3_client: "S3Client" = connect_to().s3
1✔
607
    kwargs = {"VersionId": archive_version} if archive_version else {}
1✔
608
    archive_file = s3_client.get_object(Bucket=archive_bucket, Key=archive_key, **kwargs)[
1✔
609
        "Body"
610
    ].read()
611
    return store_lambda_archive(
1✔
612
        archive_file, function_name=function_name, region_name=region_name, account_id=account_id
613
    )
614

615

616
def create_image_code(image_uri: str) -> ImageCode:
1✔
617
    """
618
    Creates an image code by inspecting the provided image
619

620
    :param image_uri: Image URI of the image to inspect
621
    :return: Image code object
622
    """
623
    code_sha256 = "<cannot-get-image-hash>"
1✔
624
    if CONTAINER_CLIENT.has_docker():
1✔
625
        try:
1✔
626
            CONTAINER_CLIENT.pull_image(docker_image=image_uri)
1✔
627
        except ContainerException:
1✔
628
            LOG.debug("Cannot pull image %s. Maybe only available locally?", image_uri)
1✔
629
        try:
1✔
630
            code_sha256 = CONTAINER_CLIENT.inspect_image(image_name=image_uri)["RepoDigests"][
1✔
631
                0
632
            ].rpartition(":")[2]
633
        except Exception as e:
×
634
            LOG.debug(
×
635
                "Cannot inspect image %s. Is this image and/or docker available: %s", image_uri, e
636
            )
637
    else:
638
        LOG.warning(
×
639
            "Unable to get image hash for image %s - no docker socket available."
640
            "Image hash returned by Lambda will not be correct.",
641
            image_uri,
642
        )
643
    return ImageCode(image_uri=image_uri, code_sha256=code_sha256, repository_type="ECR")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc