• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20942662173

12 Jan 2026 04:45PM UTC coverage: 86.905% (-0.03%) from 86.936%
20942662173

push

github

web-flow
Allow authenticated pull and push of docker images (#13569)

34 of 51 new or added lines in 4 files covered. (66.67%)

247 existing lines in 15 files now uncovered.

70218 of 80799 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.3
/localstack-core/localstack/services/lambda_/invocation/lambda_models.py
1
"""Lambda models for internal use and persistence.
2
The LambdaProviderPro in localstack-pro imports this model and configures persistence.
3
The actual function code is stored in S3 (see S3Code).
4
"""
5

6
import dataclasses
1✔
7
import logging
1✔
8
import os.path
1✔
9
import shutil
1✔
10
import tempfile
1✔
11
import threading
1✔
12
from abc import ABCMeta, abstractmethod
1✔
13
from datetime import datetime
1✔
14
from enum import StrEnum
1✔
15
from pathlib import Path
1✔
16
from typing import IO, Optional, TypedDict
1✔
17

18
import boto3
1✔
19
from botocore.exceptions import ClientError
1✔
20

21
from localstack import config
1✔
22
from localstack.aws.api import CommonServiceException
1✔
23
from localstack.aws.api.lambda_ import (
1✔
24
    AllowedPublishers,
25
    Architecture,
26
    CapacityProviderArn,
27
    CapacityProviderConfig,
28
    CapacityProviderPermissionsConfig,
29
    CapacityProviderScalingConfig,
30
    CapacityProviderVpcConfig,
31
    CodeSigningPolicies,
32
    Cors,
33
    DestinationConfig,
34
    FunctionScalingConfig,
35
    FunctionUrlAuthType,
36
    InstanceRequirements,
37
    InvocationType,
38
    InvokeMode,
39
    KMSKeyArn,
40
    LastUpdateStatus,
41
    LoggingConfig,
42
    PackageType,
43
    ProvisionedConcurrencyStatusEnum,
44
    RecursiveLoop,
45
    Runtime,
46
    RuntimeVersionConfig,
47
    SnapStartResponse,
48
    State,
49
    StateReasonCode,
50
    Timestamp,
51
    TracingMode,
52
)
53
from localstack.aws.connect import connect_to
1✔
54
from localstack.constants import AWS_REGION_US_EAST_1, INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
55
from localstack.services.lambda_.api_utils import qualified_lambda_arn, unqualified_lambda_arn
1✔
56
from localstack.utils.archives import unzip
1✔
57
from localstack.utils.files import chmod_r
1✔
58
from localstack.utils.strings import long_uid, short_uid
1✔
59

60
LOG = logging.getLogger(__name__)
1✔
61

62

63
# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
64
@dataclasses.dataclass(frozen=True)
1✔
65
class VersionState:
1✔
66
    state: State
1✔
67
    code: StateReasonCode | None = None
1✔
68
    reason: str | None = None
1✔
69

70

71
@dataclasses.dataclass
1✔
72
class Invocation:
1✔
73
    payload: bytes
1✔
74
    invoked_arn: str
1✔
75
    client_context: str | None
1✔
76
    invocation_type: InvocationType
1✔
77
    invoke_time: datetime
1✔
78
    # = invocation_id
79
    request_id: str
1✔
80
    trace_context: dict
1✔
81
    user_agent: str | None = None
1✔
82

83

84
class InitializationType(StrEnum):
1✔
85
    on_demand = "on-demand"
1✔
86
    provisioned_concurrency = "provisioned-concurrency"
1✔
87
    lambda_managed_instances = "lambda-managed-instances"
1✔
88

89

90
class ArchiveCode(metaclass=ABCMeta):
1✔
91
    @abstractmethod
1✔
92
    def generate_presigned_url(self, endpoint_url: str):
1✔
93
        """
94
        Generates a presigned url pointing to the code archive
95
        """
UNCOV
96
        pass
×
97

98
    @abstractmethod
1✔
99
    def is_hot_reloading(self):
1✔
100
        """
101
        Whether this code archive is for hot reloading.
102
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
103

104
        :return: True if this object represents hot reloading, False otherwise
105
        """
UNCOV
106
        pass
×
107

108
    @abstractmethod
1✔
109
    def get_unzipped_code_location(self):
1✔
110
        """
111
        Get the location of the unzipped archive on disk
112
        """
UNCOV
113
        pass
×
114

115
    @abstractmethod
1✔
116
    def prepare_for_execution(self):
1✔
117
        """
118
        Unzips the code archive to the proper destination on disk, if not already present
119
        """
UNCOV
120
        pass
×
121

122
    @abstractmethod
1✔
123
    def destroy_cached(self):
1✔
124
        """
125
        Destroys the code object on disk, if it was saved on disk before
126
        """
UNCOV
127
        pass
×
128

129
    @abstractmethod
1✔
130
    def destroy(self):
1✔
131
        """
132
        Deletes the code object from S3 and the unzipped version from disk
133
        """
UNCOV
134
        pass
×
135

136

137
@dataclasses.dataclass(frozen=True)
1✔
138
class S3Code(ArchiveCode):
1✔
139
    """
140
    Objects representing a code archive stored in an internal S3 bucket.
141

142
    S3 Store:
143
      Code archives represented by this method are stored in a bucket awslambda-{region_name}-tasks,
144
      (e.g. awslambda-us-east-1-tasks), when correctly created using create_lambda_archive.
145
      The "awslambda" prefix matches the behavior at real AWS.
146

147
      This class will then provide different properties / methods to be operated on the stored code,
148
      like the ability to create presigned-urls, checking the code hash etc.
149

150
      A call to destroy() of this class will delete the code object from both the S3 store and the local cache
151
    Unzipped Cache:
152
      After a call to prepare_for_execution, an unzipped version of the represented code will be stored on disk,
153
      ready to mount/copy.
154

155
      It will be present at the location returned by get_unzipped_code_location,
156
      namely /tmp/lambda/{bucket_name}/{id}/code
157

158
      The cache on disk will be deleted after a call to destroy_cached (or destroy)
159
    """
160

161
    id: str
1✔
162
    account_id: str
1✔
163
    s3_bucket: str
1✔
164
    s3_key: str
1✔
165
    s3_object_version: str | None
1✔
166
    code_sha256: str
1✔
167
    code_size: int
1✔
168
    _disk_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
169

170
    def _download_archive_to_file(self, target_file: IO) -> None:
1✔
171
        """
172
        Download the code archive into a given file
173

174
        :param target_file: File the code archive should be downloaded into (IO object)
175
        """
176
        s3_client = connect_to(
1✔
177
            region_name=AWS_REGION_US_EAST_1,
178
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
179
        ).s3
180
        extra_args = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
181
        s3_client.download_fileobj(
1✔
182
            Bucket=self.s3_bucket, Key=self.s3_key, Fileobj=target_file, ExtraArgs=extra_args
183
        )
184
        target_file.flush()
1✔
185

186
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
187
        """
188
        Generates a presigned url pointing to the code archive
189
        """
190
        s3_client = boto3.client(
1✔
191
            "s3",
192
            region_name=AWS_REGION_US_EAST_1,
193
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
194
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
195
            endpoint_url=endpoint_url,
196
        )
197
        params = {"Bucket": self.s3_bucket, "Key": self.s3_key}
1✔
198
        if self.s3_object_version:
1✔
UNCOV
199
            params["VersionId"] = self.s3_object_version
×
200
        return s3_client.generate_presigned_url("get_object", Params=params)
1✔
201

202
    def is_hot_reloading(self) -> bool:
1✔
203
        """
204
        Whether this code archive is hot reloading
205

206
        :return: True if it must it represents hot reloading, False otherwise
207
        """
208
        return False
1✔
209

210
    def get_unzipped_code_location(self) -> Path:
1✔
211
        """
212
        Get the location of the unzipped archive on disk
213
        """
214
        return Path(f"{tempfile.gettempdir()}/lambda/{self.s3_bucket}/{self.id}/code")
1✔
215

216
    def prepare_for_execution(self) -> None:
1✔
217
        """
218
        Unzips the code archive to the proper destination on disk, if not already present
219
        """
220
        target_path = self.get_unzipped_code_location()
1✔
221
        with self._disk_lock:
1✔
222
            if target_path.exists():
1✔
223
                return
1✔
224
            LOG.debug("Saving code %s to disk", self.id)
1✔
225
            target_path.mkdir(parents=True, exist_ok=True)
1✔
226
            with tempfile.NamedTemporaryFile() as file:
1✔
227
                self._download_archive_to_file(file)
1✔
228
                unzip(file.name, str(target_path))
1✔
229
                chmod_r(str(target_path), 0o755)
1✔
230

231
    def destroy_cached(self) -> None:
1✔
232
        """
233
        Destroys the code object on disk, if it was saved on disk before
234
        """
235
        # delete parent folder to delete the whole code location
236
        code_path = self.get_unzipped_code_location().parent
1✔
237
        if not code_path.exists():
1✔
238
            return
1✔
239
        try:
1✔
240
            shutil.rmtree(code_path)
1✔
UNCOV
241
        except OSError as e:
×
UNCOV
242
            LOG.debug(
×
243
                "Could not cleanup function code path %s due to error %s while deleting file %s",
244
                code_path,
245
                e.strerror,
246
                e.filename,
247
            )
248

249
    def destroy(self) -> None:
1✔
250
        """
251
        Deletes the code object from S3 and the unzipped version from disk
252
        """
253
        LOG.debug("Final code destruction for %s", self.id)
1✔
254
        self.destroy_cached()
1✔
255
        s3_client = connect_to(
1✔
256
            region_name=AWS_REGION_US_EAST_1,
257
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
258
        ).s3
259
        kwargs = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
260
        try:
1✔
261
            s3_client.delete_object(Bucket=self.s3_bucket, Key=self.s3_key, **kwargs)
1✔
UNCOV
262
        except ClientError as e:
×
UNCOV
263
            LOG.debug(
×
264
                "Cannot delete lambda archive %s in bucket %s: %s", self.s3_key, self.s3_bucket, e
265
            )
266

267

268
@dataclasses.dataclass(frozen=True)
1✔
269
class HotReloadingCode(ArchiveCode):
1✔
270
    """
271
    Objects representing code which is mounted from a given directory from the host, for hot reloading
272
    """
273

274
    host_path: str
1✔
275
    code_sha256: str = "hot-reloading-hash-not-available"
1✔
276
    code_size: int = 0
1✔
277

278
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
279
        return f"file://{self.host_path}"
1✔
280

281
    def get_unzipped_code_location(self) -> Path:
1✔
282
        path = os.path.expandvars(self.host_path)
1✔
283
        return Path(path)
1✔
284

285
    def is_hot_reloading(self) -> bool:
1✔
286
        """
287
        Whether this code archive is for hot reloading.
288
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
289

290
        :return: True if it represents hot reloading, False otherwise
291
        """
292
        return True
1✔
293

294
    def prepare_for_execution(self) -> None:
1✔
295
        pass
1✔
296

297
    def destroy_cached(self) -> None:
1✔
298
        """
299
        Destroys the code object on disk, if it was saved on disk before
300
        """
UNCOV
301
        pass
×
302

303
    def destroy(self) -> None:
1✔
304
        """
305
        Deletes the code object from S3 and the unzipped version from disk
306
        """
307
        pass
1✔
308

309

310
@dataclasses.dataclass(frozen=True)
1✔
311
class ImageCode:
1✔
312
    image_uri: str
1✔
313
    repository_type: str
1✔
314
    code_sha256: str
1✔
315

316
    @property
1✔
317
    def resolved_image_uri(self):
1✔
318
        return f"{self.image_uri.rpartition(':')[0]}@sha256:{self.code_sha256}"
1✔
319

320

321
@dataclasses.dataclass
1✔
322
class DeadLetterConfig:
1✔
323
    target_arn: str
1✔
324

325

326
@dataclasses.dataclass
1✔
327
class FileSystemConfig:
1✔
328
    arn: str
1✔
329
    local_mount_path: str
1✔
330

331

332
@dataclasses.dataclass(frozen=True)
1✔
333
class ImageConfig:
1✔
334
    working_directory: str
1✔
335
    command: list[str] = dataclasses.field(default_factory=list)
1✔
336
    entrypoint: list[str] = dataclasses.field(default_factory=list)
1✔
337

338

339
@dataclasses.dataclass
1✔
340
class VpcConfig:
1✔
341
    vpc_id: str
1✔
342
    security_group_ids: list[str] = dataclasses.field(default_factory=list)
1✔
343
    subnet_ids: list[str] = dataclasses.field(default_factory=list)
1✔
344

345

346
@dataclasses.dataclass(frozen=True)
1✔
347
class UpdateStatus:
1✔
348
    status: LastUpdateStatus | None
1✔
349
    code: str | None = None
1✔
350
    reason: str | None = None
1✔
351

352

353
@dataclasses.dataclass
1✔
354
class LambdaEphemeralStorage:
1✔
355
    size: int
1✔
356

357

358
@dataclasses.dataclass
1✔
359
class FunctionUrlConfig:
1✔
360
    """
361
    * HTTP(s)
362
    * You can apply function URLs to any function alias, or to the $LATEST unpublished function version. You can't add a function URL to any other function version.
363
    * Once you create a function URL, its URL endpoint never changes
364
    """
365

366
    function_arn: str  # fully qualified ARN
1✔
367
    function_name: str  # resolved name
1✔
368
    cors: Cors
1✔
369
    url_id: str  # Custom URL (via tag), or generated unique subdomain id  e.g. pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn
1✔
370
    url: str  # full URL (e.g. "https://pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn.lambda-url.eu-west-3.on.aws/")
1✔
371
    auth_type: FunctionUrlAuthType
1✔
372
    creation_time: str  # time
1✔
373
    last_modified_time: str | None = (
1✔
374
        None  # TODO: check if this is the creation time when initially creating
375
    )
376
    function_qualifier: str | None = "$LATEST"  # only $LATEST or alias name
1✔
377
    invoke_mode: InvokeMode | None = None
1✔
378

379

380
@dataclasses.dataclass
1✔
381
class ProvisionedConcurrencyConfiguration:
1✔
382
    provisioned_concurrent_executions: int
1✔
383
    last_modified: str  # date
1✔
384

385

386
@dataclasses.dataclass
1✔
387
class ProvisionedConcurrencyState:
1✔
388
    """transient items"""
389

390
    allocated: int = 0
1✔
391
    available: int = 0
1✔
392
    status: ProvisionedConcurrencyStatusEnum = dataclasses.field(
1✔
393
        default=ProvisionedConcurrencyStatusEnum.IN_PROGRESS
394
    )
395
    status_reason: str | None = None
1✔
396

397

398
@dataclasses.dataclass
1✔
399
class AliasRoutingConfig:
1✔
400
    version_weights: dict[str, float]
1✔
401

402

403
@dataclasses.dataclass(frozen=True)
1✔
404
class VersionIdentifier:
1✔
405
    function_name: str
1✔
406
    qualifier: str
1✔
407
    region: str
1✔
408
    account: str
1✔
409

410
    def qualified_arn(self):
1✔
411
        return qualified_lambda_arn(
1✔
412
            function_name=self.function_name,
413
            qualifier=self.qualifier,
414
            region=self.region,
415
            account=self.account,
416
        )
417

418
    def unqualified_arn(self):
1✔
419
        return unqualified_lambda_arn(
1✔
420
            function_name=self.function_name,
421
            region=self.region,
422
            account=self.account,
423
        )
424

425

426
@dataclasses.dataclass(frozen=True)
1✔
427
class VersionAlias:
1✔
428
    function_version: str
1✔
429
    name: str
1✔
430
    description: str | None
1✔
431
    routing_configuration: AliasRoutingConfig | None = None
1✔
432
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
433

434

435
@dataclasses.dataclass
1✔
436
class ResourcePolicy:
1✔
437
    Version: str
1✔
438
    Id: str
1✔
439
    Statement: list[dict]
1✔
440

441

442
@dataclasses.dataclass
1✔
443
class FunctionResourcePolicy:
1✔
444
    policy: ResourcePolicy
1✔
445

446

447
@dataclasses.dataclass
1✔
448
class EventInvokeConfig:
1✔
449
    function_name: str
1✔
450
    qualifier: str
1✔
451

452
    last_modified: str | None = dataclasses.field(compare=False)
1✔
453
    destination_config: DestinationConfig | None = None
1✔
454
    maximum_retry_attempts: int | None = None
1✔
455
    maximum_event_age_in_seconds: int | None = None
1✔
456

457

458
# Result Models
459
@dataclasses.dataclass
1✔
460
class InvocationResult:
1✔
461
    request_id: str
1✔
462
    payload: bytes | None
1✔
463
    is_error: bool
1✔
464
    logs: str | None
1✔
465
    executed_version: str | None = None
1✔
466

467

468
@dataclasses.dataclass
1✔
469
class InvocationLogs:
1✔
470
    request_id: str
1✔
471
    logs: str
1✔
472

473

474
class Credentials(TypedDict):
1✔
475
    AccessKeyId: str
1✔
476
    SecretAccessKey: str
1✔
477
    SessionToken: str
1✔
478
    Expiration: datetime
1✔
479

480

481
class OtherServiceEndpoint:
1✔
482
    def status_ready(self, executor_id: str) -> None:
1✔
483
        """
484
        Processes a status ready report by RAPID
485
        :param executor_id: Executor ID this ready report is for
486
        """
487
        raise NotImplementedError()
488

489
    def status_error(self, executor_id: str) -> None:
1✔
490
        """
491
        Processes a status error report by RAPID
492
        :param executor_id: Executor ID this error report is for
493
        """
494
        raise NotImplementedError()
495

496

497
@dataclasses.dataclass(frozen=True)
1✔
498
class CodeSigningConfig:
1✔
499
    csc_id: str
1✔
500
    arn: str
1✔
501

502
    allowed_publishers: AllowedPublishers
1✔
503
    policies: CodeSigningPolicies
1✔
504
    last_modified: str
1✔
505
    description: str | None = None
1✔
506

507

508
@dataclasses.dataclass
1✔
509
class LayerPolicyStatement:
1✔
510
    sid: str
1✔
511
    action: str
1✔
512
    principal: str
1✔
513
    organization_id: str | None
1✔
514

515

516
@dataclasses.dataclass
1✔
517
class LayerPolicy:
1✔
518
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
519
    id: str = "default"  # static
1✔
520
    version: str = "2012-10-17"  # static
1✔
521
    statements: dict[str, LayerPolicyStatement] = dataclasses.field(
1✔
522
        default_factory=dict
523
    )  # statement ID => statement
524

525

526
@dataclasses.dataclass
1✔
527
class LayerVersion:
1✔
528
    layer_version_arn: str
1✔
529
    layer_arn: str
1✔
530

531
    version: int
1✔
532
    code: ArchiveCode
1✔
533
    license_info: str
1✔
534
    compatible_runtimes: list[Runtime]
1✔
535
    compatible_architectures: list[Architecture]
1✔
536
    created: str  # date
1✔
537
    description: str = ""
1✔
538

539
    policy: LayerPolicy = None
1✔
540

541

542
@dataclasses.dataclass
1✔
543
class Layer:
1✔
544
    arn: str
1✔
545
    next_version: int = 1
1✔
546
    next_version_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
547
    layer_versions: dict[str, LayerVersion] = dataclasses.field(default_factory=dict)
1✔
548

549

550
@dataclasses.dataclass(frozen=True)
1✔
551
class VersionFunctionConfiguration:
1✔
552
    # fields
553
    description: str
1✔
554
    role: str
1✔
555
    timeout: int
1✔
556
    runtime: Runtime
1✔
557
    memory_size: int
1✔
558
    handler: str
1✔
559
    package_type: PackageType
1✔
560
    environment: dict[str, str]
1✔
561
    architectures: list[Architecture]
1✔
562
    # internal revision is updated when runtime restart is necessary
563
    internal_revision: str
1✔
564
    ephemeral_storage: LambdaEphemeralStorage
1✔
565
    snap_start: SnapStartResponse
1✔
566

567
    tracing_config_mode: TracingMode
1✔
568
    code: ArchiveCode
1✔
569
    last_modified: str  # ISO string
1✔
570
    state: VersionState
1✔
571

572
    image: ImageCode | None = None
1✔
573
    image_config: ImageConfig | None = None
1✔
574
    runtime_version_config: RuntimeVersionConfig | None = None
1✔
575
    last_update: UpdateStatus | None = None
1✔
576
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
577
    layers: list[LayerVersion] = dataclasses.field(default_factory=list)
1✔
578

579
    dead_letter_arn: str | None = None
1✔
580

581
    # kms_key_arn: str
582
    # file_system_configs: FileSystemConfig
583
    vpc_config: VpcConfig | None = None
1✔
584

585
    logging_config: LoggingConfig = dataclasses.field(default_factory=dict)
1✔
586
    # TODO: why does `CapacityProviderConfig | None = None` fail with on Python 3.13.9:
587
    #  TypeError: unsupported operand type(s) for |: 'NoneType' and 'NoneType'
588
    CapacityProviderConfig: Optional[CapacityProviderConfig] = None  # noqa
1✔
589

590

591
@dataclasses.dataclass(frozen=True)
1✔
592
class FunctionVersion:
1✔
593
    id: VersionIdentifier
1✔
594
    config: VersionFunctionConfiguration
1✔
595

596
    @property
1✔
597
    def qualified_arn(self) -> str:
1✔
598
        return self.id.qualified_arn()
1✔
599

600

601
@dataclasses.dataclass
1✔
602
class CapacityProvider:
1✔
603
    CapacityProviderArn: CapacityProviderArn
1✔
604
    # State is determined dynamically
605
    VpcConfig: CapacityProviderVpcConfig
1✔
606
    PermissionsConfig: CapacityProviderPermissionsConfig
1✔
607
    InstanceRequirements: InstanceRequirements
1✔
608
    CapacityProviderScalingConfig: CapacityProviderScalingConfig
1✔
609
    LastModified: Timestamp
1✔
610
    KmsKeyArn: KMSKeyArn | None = None
1✔
611

612

613
@dataclasses.dataclass
1✔
614
class Function:
1✔
615
    function_name: str
1✔
616
    code_signing_config_arn: str | None = None
1✔
617
    aliases: dict[str, VersionAlias] = dataclasses.field(default_factory=dict)
1✔
618
    versions: dict[str, FunctionVersion] = dataclasses.field(default_factory=dict)
1✔
619
    function_url_configs: dict[str, FunctionUrlConfig] = dataclasses.field(
1✔
620
        default_factory=dict
621
    )  # key is $LATEST, version, or alias
622
    permissions: dict[str, FunctionResourcePolicy] = dataclasses.field(
1✔
623
        default_factory=dict
624
    )  # key is $LATEST, version or alias
625
    event_invoke_configs: dict[str, EventInvokeConfig] = dataclasses.field(
1✔
626
        default_factory=dict
627
    )  # key is $LATEST(?), version or alias
628
    reserved_concurrent_executions: int | None = None
1✔
629
    recursive_loop: RecursiveLoop = RecursiveLoop.Terminate
1✔
630
    provisioned_concurrency_configs: dict[str, ProvisionedConcurrencyConfiguration] = (
1✔
631
        dataclasses.field(default_factory=dict)
632
    )
633
    function_scaling_configs: dict[str, FunctionScalingConfig] = dataclasses.field(
1✔
634
        default_factory=dict
635
    )
636

637
    lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
638
    next_version: int = 1
1✔
639

640
    def latest(self) -> FunctionVersion:
1✔
641
        return self.versions["$LATEST"]
1✔
642

643
    # HACK to model a volatile variable that should be ignored for persistence
644
    def __post_init__(self):
1✔
645
        # Identifier unique to this function and LocalStack instance.
646
        # A LocalStack restart or persistence load should create a new instance id.
647
        # Used for retaining invoke queues across version updates for $LATEST, but separate unrelated instances.
648
        self.instance_id = short_uid()
1✔
649

650
    def __getstate__(self):
1✔
651
        """Ignore certain volatile fields for pickling.
652
        # https://docs.python.org/3/library/pickle.html#handling-stateful-objects
653
        """
654
        # Copy the object's state from self.__dict__ which contains
655
        # all our instance attributes. Always use the dict.copy()
656
        # method to avoid modifying the original state.
UNCOV
657
        state = self.__dict__.copy()
×
658
        # Remove the volatile entries.
UNCOV
659
        del state["instance_id"]
×
660
        return state
×
661

662
    def __setstate__(self, state):
1✔
663
        # Inject persistent state
UNCOV
664
        self.__dict__.update(state)
×
665
        # Create new instance id
UNCOV
666
        self.__post_init__()
×
667

668

669
class ValidationException(CommonServiceException):
1✔
670
    def __init__(self, message: str):
1✔
671
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
672

673

674
class RequestEntityTooLargeException(CommonServiceException):
1✔
675
    def __init__(self, message: str):
1✔
676
        super().__init__(code="RequestEntityTooLargeException", status_code=413, message=message)
1✔
677

678

679
# note: we might at some point want to generalize these limits across all services and fetch them from there
680

681

682
@dataclasses.dataclass
1✔
683
class AccountSettings:
1✔
684
    total_code_size: int = config.LAMBDA_LIMITS_TOTAL_CODE_SIZE
1✔
685
    code_size_zipped: int = config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED
1✔
686
    code_size_unzipped: int = config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED
1✔
687
    concurrent_executions: int = config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc