• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21053780928

15 Jan 2026 06:16PM UTC coverage: 86.964% (+0.004%) from 86.96%
21053780928

push

github

web-flow
CFn: correctly detect createOnlyProperties (#13619)

3 of 3 new or added lines in 1 file covered. (100.0%)

12 existing lines in 3 files now uncovered.

70335 of 80878 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.34
/localstack-core/localstack/services/lambda_/invocation/lambda_models.py
1
"""Lambda models for internal use and persistence.
2
The LambdaProviderPro in localstack-pro imports this model and configures persistence.
3
The actual function code is stored in S3 (see S3Code).
4
"""
5

6
import dataclasses
1✔
7
import logging
1✔
8
import os.path
1✔
9
import shutil
1✔
10
import tempfile
1✔
11
import threading
1✔
12
from abc import ABCMeta, abstractmethod
1✔
13
from datetime import datetime
1✔
14
from enum import StrEnum
1✔
15
from pathlib import Path
1✔
16
from typing import IO, Optional, TypedDict
1✔
17

18
import boto3
1✔
19
from botocore.exceptions import ClientError
1✔
20

21
from localstack import config
1✔
22
from localstack.aws.api import CommonServiceException
1✔
23
from localstack.aws.api.lambda_ import (
1✔
24
    AllowedPublishers,
25
    Architecture,
26
    CapacityProviderArn,
27
    CapacityProviderConfig,
28
    CapacityProviderPermissionsConfig,
29
    CapacityProviderScalingConfig,
30
    CapacityProviderVpcConfig,
31
    CodeSigningPolicies,
32
    Cors,
33
    DestinationConfig,
34
    FunctionScalingConfig,
35
    FunctionUrlAuthType,
36
    InstanceRequirements,
37
    InvocationType,
38
    InvokeMode,
39
    KMSKeyArn,
40
    LastUpdateStatus,
41
    LoggingConfig,
42
    PackageType,
43
    ProvisionedConcurrencyStatusEnum,
44
    RecursiveLoop,
45
    Runtime,
46
    RuntimeVersionConfig,
47
    SnapStartResponse,
48
    State,
49
    StateReasonCode,
50
    Timestamp,
51
    TracingMode,
52
)
53
from localstack.aws.connect import connect_to
1✔
54
from localstack.constants import AWS_REGION_US_EAST_1, INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
55
from localstack.services.lambda_.api_utils import qualified_lambda_arn, unqualified_lambda_arn
1✔
56
from localstack.utils.archives import unzip
1✔
57
from localstack.utils.files import chmod_r
1✔
58
from localstack.utils.strings import long_uid, short_uid
1✔
59

60
LOG = logging.getLogger(__name__)
1✔
61

62

63
# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
64
@dataclasses.dataclass(frozen=True)
1✔
65
class VersionState:
1✔
66
    state: State
1✔
67
    code: StateReasonCode | None = None
1✔
68
    reason: str | None = None
1✔
69

70

71
@dataclasses.dataclass
1✔
72
class Invocation:
1✔
73
    payload: bytes
1✔
74
    invoked_arn: str
1✔
75
    client_context: str | None
1✔
76
    invocation_type: InvocationType
1✔
77
    invoke_time: datetime
1✔
78
    # = invocation_id
79
    request_id: str
1✔
80
    trace_context: dict
1✔
81
    user_agent: str | None = None
1✔
82

83

84
class InitializationType(StrEnum):
1✔
85
    on_demand = "on-demand"
1✔
86
    provisioned_concurrency = "provisioned-concurrency"
1✔
87
    lambda_managed_instances = "lambda-managed-instances"
1✔
88

89

90
class ArchiveCode(metaclass=ABCMeta):
1✔
91
    @abstractmethod
1✔
92
    def generate_presigned_url(self, endpoint_url: str):
1✔
93
        """
94
        Generates a presigned url pointing to the code archive
95
        """
96
        pass
×
97

98
    @abstractmethod
1✔
99
    def is_hot_reloading(self):
1✔
100
        """
101
        Whether this code archive is for hot reloading.
102
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
103

104
        :return: True if this object represents hot reloading, False otherwise
105
        """
106
        pass
×
107

108
    @abstractmethod
1✔
109
    def get_unzipped_code_location(self):
1✔
110
        """
111
        Get the location of the unzipped archive on disk
112
        """
113
        pass
×
114

115
    @abstractmethod
1✔
116
    def prepare_for_execution(self):
1✔
117
        """
118
        Unzips the code archive to the proper destination on disk, if not already present
119
        """
120
        pass
×
121

122
    @abstractmethod
1✔
123
    def destroy_cached(self):
1✔
124
        """
125
        Destroys the code object on disk, if it was saved on disk before
126
        """
127
        pass
×
128

129
    @abstractmethod
1✔
130
    def destroy(self):
1✔
131
        """
132
        Deletes the code object from S3 and the unzipped version from disk
133
        """
134
        pass
×
135

136

137
@dataclasses.dataclass(frozen=True)
1✔
138
class S3Code(ArchiveCode):
1✔
139
    """
140
    Objects representing a code archive stored in an internal S3 bucket.
141

142
    S3 Store:
143
      Code archives represented by this method are stored in a bucket awslambda-{region_name}-tasks,
144
      (e.g. awslambda-us-east-1-tasks), when correctly created using create_lambda_archive.
145
      The "awslambda" prefix matches the behavior at real AWS.
146

147
      This class will then provide different properties / methods to be operated on the stored code,
148
      like the ability to create presigned-urls, checking the code hash etc.
149

150
      A call to destroy() of this class will delete the code object from both the S3 store and the local cache
151
    Unzipped Cache:
152
      After a call to prepare_for_execution, an unzipped version of the represented code will be stored on disk,
153
      ready to mount/copy.
154

155
      It will be present at the location returned by get_unzipped_code_location,
156
      namely /tmp/lambda/{bucket_name}/{id}/code
157

158
      The cache on disk will be deleted after a call to destroy_cached (or destroy)
159
    """
160

161
    id: str
1✔
162
    account_id: str
1✔
163
    s3_bucket: str
1✔
164
    s3_key: str
1✔
165
    s3_object_version: str | None
1✔
166
    code_sha256: str
1✔
167
    code_size: int
1✔
168
    _disk_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
169

170
    def _download_archive_to_file(self, target_file: IO) -> None:
1✔
171
        """
172
        Download the code archive into a given file
173

174
        :param target_file: File the code archive should be downloaded into (IO object)
175
        """
176
        s3_client = connect_to(
1✔
177
            region_name=AWS_REGION_US_EAST_1,
178
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
179
        ).s3
180
        extra_args = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
181
        s3_client.download_fileobj(
1✔
182
            Bucket=self.s3_bucket, Key=self.s3_key, Fileobj=target_file, ExtraArgs=extra_args
183
        )
184
        target_file.flush()
1✔
185

186
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
187
        """
188
        Generates a presigned url pointing to the code archive
189
        """
190
        s3_client = boto3.client(
1✔
191
            "s3",
192
            region_name=AWS_REGION_US_EAST_1,
193
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
194
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
195
            endpoint_url=endpoint_url,
196
        )
197
        params = {"Bucket": self.s3_bucket, "Key": self.s3_key}
1✔
198
        if self.s3_object_version:
1✔
199
            params["VersionId"] = self.s3_object_version
×
200
        return s3_client.generate_presigned_url("get_object", Params=params)
1✔
201

202
    def is_hot_reloading(self) -> bool:
1✔
203
        """
204
        Whether this code archive is hot reloading
205

206
        :return: True if it must it represents hot reloading, False otherwise
207
        """
208
        return False
1✔
209

210
    def get_unzipped_code_location(self) -> Path:
1✔
211
        """
212
        Get the location of the unzipped archive on disk
213
        """
214
        return Path(f"{tempfile.gettempdir()}/lambda/{self.s3_bucket}/{self.id}/code")
1✔
215

216
    def prepare_for_execution(self) -> None:
1✔
217
        """
218
        Unzips the code archive to the proper destination on disk, if not already present
219
        """
220
        target_path = self.get_unzipped_code_location()
1✔
221
        with self._disk_lock:
1✔
222
            if target_path.exists():
1✔
223
                return
1✔
224
            LOG.debug("Saving code %s to disk", self.id)
1✔
225
            target_path.parent.mkdir(parents=True, exist_ok=True)
1✔
226
            # Use a temp directory for atomic operation to prevent partial reads
227
            # if the process crashes or is killed during unzip.
228
            # Create temp dir in same parent to ensure same filesystem for atomic rename.
229
            with tempfile.TemporaryDirectory(dir=target_path.parent) as temp_dir:
1✔
230
                temp_path = Path(temp_dir)
1✔
231

232
                with tempfile.NamedTemporaryFile() as file:
1✔
233
                    self._download_archive_to_file(file)
1✔
234
                    unzip(file.name, str(temp_path))
1✔
235
                    chmod_r(str(temp_path), 0o755)
1✔
236

237
                # Atomic move/rename
238
                temp_path.rename(target_path)
1✔
239

240
    def destroy_cached(self) -> None:
1✔
241
        """
242
        Destroys the code object on disk, if it was saved on disk before
243
        """
244
        # delete parent folder to delete the whole code location
245
        code_path = self.get_unzipped_code_location().parent
1✔
246
        if not code_path.exists():
1✔
247
            return
1✔
248
        try:
1✔
249
            shutil.rmtree(code_path)
1✔
UNCOV
250
        except OSError as e:
×
UNCOV
251
            LOG.debug(
×
252
                "Could not cleanup function code path %s due to error %s while deleting file %s",
253
                code_path,
254
                e.strerror,
255
                e.filename,
256
            )
257

258
    def destroy(self) -> None:
1✔
259
        """
260
        Deletes the code object from S3 and the unzipped version from disk
261
        """
262
        LOG.debug("Final code destruction for %s", self.id)
1✔
263
        self.destroy_cached()
1✔
264
        s3_client = connect_to(
1✔
265
            region_name=AWS_REGION_US_EAST_1,
266
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
267
        ).s3
268
        kwargs = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
269
        try:
1✔
270
            s3_client.delete_object(Bucket=self.s3_bucket, Key=self.s3_key, **kwargs)
1✔
UNCOV
271
        except ClientError as e:
×
UNCOV
272
            LOG.debug(
×
273
                "Cannot delete lambda archive %s in bucket %s: %s", self.s3_key, self.s3_bucket, e
274
            )
275

276

277
@dataclasses.dataclass(frozen=True)
1✔
278
class HotReloadingCode(ArchiveCode):
1✔
279
    """
280
    Objects representing code which is mounted from a given directory from the host, for hot reloading
281
    """
282

283
    host_path: str
1✔
284
    code_sha256: str = "hot-reloading-hash-not-available"
1✔
285
    code_size: int = 0
1✔
286

287
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
288
        return f"file://{self.host_path}"
1✔
289

290
    def get_unzipped_code_location(self) -> Path:
1✔
291
        path = os.path.expandvars(self.host_path)
1✔
292
        return Path(path)
1✔
293

294
    def is_hot_reloading(self) -> bool:
1✔
295
        """
296
        Whether this code archive is for hot reloading.
297
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
298

299
        :return: True if it represents hot reloading, False otherwise
300
        """
301
        return True
1✔
302

303
    def prepare_for_execution(self) -> None:
1✔
304
        pass
1✔
305

306
    def destroy_cached(self) -> None:
1✔
307
        """
308
        Destroys the code object on disk, if it was saved on disk before
309
        """
UNCOV
310
        pass
×
311

312
    def destroy(self) -> None:
1✔
313
        """
314
        Deletes the code object from S3 and the unzipped version from disk
315
        """
316
        pass
1✔
317

318

319
@dataclasses.dataclass(frozen=True)
1✔
320
class ImageCode:
1✔
321
    image_uri: str
1✔
322
    repository_type: str
1✔
323
    code_sha256: str
1✔
324

325
    @property
1✔
326
    def resolved_image_uri(self):
1✔
327
        return f"{self.image_uri.rpartition(':')[0]}@sha256:{self.code_sha256}"
1✔
328

329

330
@dataclasses.dataclass
1✔
331
class DeadLetterConfig:
1✔
332
    target_arn: str
1✔
333

334

335
@dataclasses.dataclass
1✔
336
class FileSystemConfig:
1✔
337
    arn: str
1✔
338
    local_mount_path: str
1✔
339

340

341
@dataclasses.dataclass(frozen=True)
1✔
342
class ImageConfig:
1✔
343
    working_directory: str
1✔
344
    command: list[str] = dataclasses.field(default_factory=list)
1✔
345
    entrypoint: list[str] = dataclasses.field(default_factory=list)
1✔
346

347

348
@dataclasses.dataclass
1✔
349
class VpcConfig:
1✔
350
    vpc_id: str
1✔
351
    security_group_ids: list[str] = dataclasses.field(default_factory=list)
1✔
352
    subnet_ids: list[str] = dataclasses.field(default_factory=list)
1✔
353

354

355
@dataclasses.dataclass(frozen=True)
1✔
356
class UpdateStatus:
1✔
357
    status: LastUpdateStatus | None
1✔
358
    code: str | None = None
1✔
359
    reason: str | None = None
1✔
360

361

362
@dataclasses.dataclass
1✔
363
class LambdaEphemeralStorage:
1✔
364
    size: int
1✔
365

366

367
@dataclasses.dataclass
1✔
368
class FunctionUrlConfig:
1✔
369
    """
370
    * HTTP(s)
371
    * You can apply function URLs to any function alias, or to the $LATEST unpublished function version. You can't add a function URL to any other function version.
372
    * Once you create a function URL, its URL endpoint never changes
373
    """
374

375
    function_arn: str  # fully qualified ARN
1✔
376
    function_name: str  # resolved name
1✔
377
    cors: Cors
1✔
378
    url_id: str  # Custom URL (via tag), or generated unique subdomain id  e.g. pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn
1✔
379
    url: str  # full URL (e.g. "https://pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn.lambda-url.eu-west-3.on.aws/")
1✔
380
    auth_type: FunctionUrlAuthType
1✔
381
    creation_time: str  # time
1✔
382
    last_modified_time: str | None = (
1✔
383
        None  # TODO: check if this is the creation time when initially creating
384
    )
385
    function_qualifier: str | None = "$LATEST"  # only $LATEST or alias name
1✔
386
    invoke_mode: InvokeMode | None = None
1✔
387

388

389
@dataclasses.dataclass
1✔
390
class ProvisionedConcurrencyConfiguration:
1✔
391
    provisioned_concurrent_executions: int
1✔
392
    last_modified: str  # date
1✔
393

394

395
@dataclasses.dataclass
1✔
396
class ProvisionedConcurrencyState:
1✔
397
    """transient items"""
398

399
    allocated: int = 0
1✔
400
    available: int = 0
1✔
401
    status: ProvisionedConcurrencyStatusEnum = dataclasses.field(
1✔
402
        default=ProvisionedConcurrencyStatusEnum.IN_PROGRESS
403
    )
404
    status_reason: str | None = None
1✔
405

406

407
@dataclasses.dataclass
1✔
408
class AliasRoutingConfig:
1✔
409
    version_weights: dict[str, float]
1✔
410

411

412
@dataclasses.dataclass(frozen=True)
1✔
413
class VersionIdentifier:
1✔
414
    function_name: str
1✔
415
    qualifier: str
1✔
416
    region: str
1✔
417
    account: str
1✔
418

419
    def qualified_arn(self):
1✔
420
        return qualified_lambda_arn(
1✔
421
            function_name=self.function_name,
422
            qualifier=self.qualifier,
423
            region=self.region,
424
            account=self.account,
425
        )
426

427
    def unqualified_arn(self):
1✔
428
        return unqualified_lambda_arn(
1✔
429
            function_name=self.function_name,
430
            region=self.region,
431
            account=self.account,
432
        )
433

434

435
@dataclasses.dataclass(frozen=True)
1✔
436
class VersionAlias:
1✔
437
    function_version: str
1✔
438
    name: str
1✔
439
    description: str | None
1✔
440
    routing_configuration: AliasRoutingConfig | None = None
1✔
441
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
442

443

444
@dataclasses.dataclass
1✔
445
class ResourcePolicy:
1✔
446
    Version: str
1✔
447
    Id: str
1✔
448
    Statement: list[dict]
1✔
449

450

451
@dataclasses.dataclass
1✔
452
class FunctionResourcePolicy:
1✔
453
    policy: ResourcePolicy
1✔
454

455

456
@dataclasses.dataclass
1✔
457
class EventInvokeConfig:
1✔
458
    function_name: str
1✔
459
    qualifier: str
1✔
460

461
    last_modified: str | None = dataclasses.field(compare=False)
1✔
462
    destination_config: DestinationConfig | None = None
1✔
463
    maximum_retry_attempts: int | None = None
1✔
464
    maximum_event_age_in_seconds: int | None = None
1✔
465

466

467
# Result Models
468
@dataclasses.dataclass
1✔
469
class InvocationResult:
1✔
470
    request_id: str
1✔
471
    payload: bytes | None
1✔
472
    is_error: bool
1✔
473
    logs: str | None
1✔
474
    executed_version: str | None = None
1✔
475

476

477
@dataclasses.dataclass
1✔
478
class InvocationLogs:
1✔
479
    request_id: str
1✔
480
    logs: str
1✔
481

482

483
class Credentials(TypedDict):
1✔
484
    AccessKeyId: str
1✔
485
    SecretAccessKey: str
1✔
486
    SessionToken: str
1✔
487
    Expiration: datetime
1✔
488

489

490
class OtherServiceEndpoint:
1✔
491
    def status_ready(self, executor_id: str) -> None:
1✔
492
        """
493
        Processes a status ready report by RAPID
494
        :param executor_id: Executor ID this ready report is for
495
        """
496
        raise NotImplementedError()
497

498
    def status_error(self, executor_id: str) -> None:
1✔
499
        """
500
        Processes a status error report by RAPID
501
        :param executor_id: Executor ID this error report is for
502
        """
503
        raise NotImplementedError()
504

505

506
@dataclasses.dataclass(frozen=True)
1✔
507
class CodeSigningConfig:
1✔
508
    csc_id: str
1✔
509
    arn: str
1✔
510

511
    allowed_publishers: AllowedPublishers
1✔
512
    policies: CodeSigningPolicies
1✔
513
    last_modified: str
1✔
514
    description: str | None = None
1✔
515

516

517
@dataclasses.dataclass
1✔
518
class LayerPolicyStatement:
1✔
519
    sid: str
1✔
520
    action: str
1✔
521
    principal: str
1✔
522
    organization_id: str | None
1✔
523

524

525
@dataclasses.dataclass
1✔
526
class LayerPolicy:
1✔
527
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
528
    id: str = "default"  # static
1✔
529
    version: str = "2012-10-17"  # static
1✔
530
    statements: dict[str, LayerPolicyStatement] = dataclasses.field(
1✔
531
        default_factory=dict
532
    )  # statement ID => statement
533

534

535
@dataclasses.dataclass
1✔
536
class LayerVersion:
1✔
537
    layer_version_arn: str
1✔
538
    layer_arn: str
1✔
539

540
    version: int
1✔
541
    code: ArchiveCode
1✔
542
    license_info: str
1✔
543
    compatible_runtimes: list[Runtime]
1✔
544
    compatible_architectures: list[Architecture]
1✔
545
    created: str  # date
1✔
546
    description: str = ""
1✔
547

548
    policy: LayerPolicy = None
1✔
549

550

551
@dataclasses.dataclass
1✔
552
class Layer:
1✔
553
    arn: str
1✔
554
    next_version: int = 1
1✔
555
    next_version_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
556
    layer_versions: dict[str, LayerVersion] = dataclasses.field(default_factory=dict)
1✔
557

558

559
@dataclasses.dataclass(frozen=True)
1✔
560
class VersionFunctionConfiguration:
1✔
561
    # fields
562
    description: str
1✔
563
    role: str
1✔
564
    timeout: int
1✔
565
    runtime: Runtime
1✔
566
    memory_size: int
1✔
567
    handler: str
1✔
568
    package_type: PackageType
1✔
569
    environment: dict[str, str]
1✔
570
    architectures: list[Architecture]
1✔
571
    # internal revision is updated when runtime restart is necessary
572
    internal_revision: str
1✔
573
    ephemeral_storage: LambdaEphemeralStorage
1✔
574
    snap_start: SnapStartResponse
1✔
575

576
    tracing_config_mode: TracingMode
1✔
577
    code: ArchiveCode
1✔
578
    last_modified: str  # ISO string
1✔
579
    state: VersionState
1✔
580

581
    image: ImageCode | None = None
1✔
582
    image_config: ImageConfig | None = None
1✔
583
    runtime_version_config: RuntimeVersionConfig | None = None
1✔
584
    last_update: UpdateStatus | None = None
1✔
585
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
586
    layers: list[LayerVersion] = dataclasses.field(default_factory=list)
1✔
587

588
    dead_letter_arn: str | None = None
1✔
589

590
    # kms_key_arn: str
591
    # file_system_configs: FileSystemConfig
592
    vpc_config: VpcConfig | None = None
1✔
593

594
    logging_config: LoggingConfig = dataclasses.field(default_factory=dict)
1✔
595
    # TODO: why does `CapacityProviderConfig | None = None` fail with on Python 3.13.9:
596
    #  TypeError: unsupported operand type(s) for |: 'NoneType' and 'NoneType'
597
    CapacityProviderConfig: Optional[CapacityProviderConfig] = None  # noqa
1✔
598

599

600
@dataclasses.dataclass(frozen=True)
1✔
601
class FunctionVersion:
1✔
602
    id: VersionIdentifier
1✔
603
    config: VersionFunctionConfiguration
1✔
604

605
    @property
1✔
606
    def qualified_arn(self) -> str:
1✔
607
        return self.id.qualified_arn()
1✔
608

609

610
@dataclasses.dataclass
1✔
611
class CapacityProvider:
1✔
612
    CapacityProviderArn: CapacityProviderArn
1✔
613
    # State is determined dynamically
614
    VpcConfig: CapacityProviderVpcConfig
1✔
615
    PermissionsConfig: CapacityProviderPermissionsConfig
1✔
616
    InstanceRequirements: InstanceRequirements
1✔
617
    CapacityProviderScalingConfig: CapacityProviderScalingConfig
1✔
618
    LastModified: Timestamp
1✔
619
    KmsKeyArn: KMSKeyArn | None = None
1✔
620

621

622
@dataclasses.dataclass
1✔
623
class Function:
1✔
624
    function_name: str
1✔
625
    code_signing_config_arn: str | None = None
1✔
626
    aliases: dict[str, VersionAlias] = dataclasses.field(default_factory=dict)
1✔
627
    versions: dict[str, FunctionVersion] = dataclasses.field(default_factory=dict)
1✔
628
    function_url_configs: dict[str, FunctionUrlConfig] = dataclasses.field(
1✔
629
        default_factory=dict
630
    )  # key is $LATEST, version, or alias
631
    permissions: dict[str, FunctionResourcePolicy] = dataclasses.field(
1✔
632
        default_factory=dict
633
    )  # key is $LATEST, version or alias
634
    event_invoke_configs: dict[str, EventInvokeConfig] = dataclasses.field(
1✔
635
        default_factory=dict
636
    )  # key is $LATEST(?), version or alias
637
    reserved_concurrent_executions: int | None = None
1✔
638
    recursive_loop: RecursiveLoop = RecursiveLoop.Terminate
1✔
639
    provisioned_concurrency_configs: dict[str, ProvisionedConcurrencyConfiguration] = (
1✔
640
        dataclasses.field(default_factory=dict)
641
    )
642
    function_scaling_configs: dict[str, FunctionScalingConfig] = dataclasses.field(
1✔
643
        default_factory=dict
644
    )
645

646
    lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
647
    next_version: int = 1
1✔
648

649
    def latest(self) -> FunctionVersion:
1✔
650
        return self.versions["$LATEST"]
1✔
651

652
    # HACK to model a volatile variable that should be ignored for persistence
653
    def __post_init__(self):
1✔
654
        # Identifier unique to this function and LocalStack instance.
655
        # A LocalStack restart or persistence load should create a new instance id.
656
        # Used for retaining invoke queues across version updates for $LATEST, but separate unrelated instances.
657
        self.instance_id = short_uid()
1✔
658

659
    def __getstate__(self):
1✔
660
        """Ignore certain volatile fields for pickling.
661
        # https://docs.python.org/3/library/pickle.html#handling-stateful-objects
662
        """
663
        # Copy the object's state from self.__dict__ which contains
664
        # all our instance attributes. Always use the dict.copy()
665
        # method to avoid modifying the original state.
666
        state = self.__dict__.copy()
×
667
        # Remove the volatile entries.
UNCOV
668
        del state["instance_id"]
×
UNCOV
669
        return state
×
670

671
    def __setstate__(self, state):
1✔
672
        # Inject persistent state
UNCOV
673
        self.__dict__.update(state)
×
674
        # Create new instance id
UNCOV
675
        self.__post_init__()
×
676

677

678
class ValidationException(CommonServiceException):
1✔
679
    def __init__(self, message: str):
1✔
680
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
681

682

683
class RequestEntityTooLargeException(CommonServiceException):
1✔
684
    def __init__(self, message: str):
1✔
685
        super().__init__(code="RequestEntityTooLargeException", status_code=413, message=message)
1✔
686

687

688
# note: we might at some point want to generalize these limits across all services and fetch them from there
689

690

691
@dataclasses.dataclass
1✔
692
class AccountSettings:
1✔
693
    total_code_size: int = config.LAMBDA_LIMITS_TOTAL_CODE_SIZE
1✔
694
    code_size_zipped: int = config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED
1✔
695
    code_size_unzipped: int = config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED
1✔
696
    concurrent_executions: int = config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc