• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17601295436

10 Sep 2025 12:27AM UTC coverage: 86.847% (+0.01%) from 86.837%
17601295436

push

github

web-flow
CFn: improve parity of describing failed change sets (#13123)

12 of 12 new or added lines in 1 file covered. (100.0%)

231 existing lines in 7 files now uncovered.

67122 of 77288 relevant lines covered (86.85%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/localstack-core/localstack/services/lambda_/invocation/lambda_models.py
1
"""Lambda models for internal use and persistence.
2
The LambdaProviderPro in localstack-pro imports this model and configures persistence.
3
The actual function code is stored in S3 (see S3Code).
4
"""
5

6
import dataclasses
1✔
7
import logging
1✔
8
import os.path
1✔
9
import shutil
1✔
10
import tempfile
1✔
11
import threading
1✔
12
from abc import ABCMeta, abstractmethod
1✔
13
from datetime import datetime
1✔
14
from pathlib import Path
1✔
15
from typing import IO, Literal, TypedDict
1✔
16

17
import boto3
1✔
18
from botocore.exceptions import ClientError
1✔
19

20
from localstack import config
1✔
21
from localstack.aws.api import CommonServiceException
1✔
22
from localstack.aws.api.lambda_ import (
1✔
23
    AllowedPublishers,
24
    Architecture,
25
    CodeSigningPolicies,
26
    Cors,
27
    DestinationConfig,
28
    FunctionUrlAuthType,
29
    InvocationType,
30
    InvokeMode,
31
    LastUpdateStatus,
32
    LoggingConfig,
33
    PackageType,
34
    ProvisionedConcurrencyStatusEnum,
35
    RecursiveLoop,
36
    Runtime,
37
    RuntimeVersionConfig,
38
    SnapStartResponse,
39
    State,
40
    StateReasonCode,
41
    TracingMode,
42
)
43
from localstack.aws.connect import connect_to
1✔
44
from localstack.constants import AWS_REGION_US_EAST_1, INTERNAL_AWS_SECRET_ACCESS_KEY
1✔
45
from localstack.services.lambda_.api_utils import qualified_lambda_arn, unqualified_lambda_arn
1✔
46
from localstack.utils.archives import unzip
1✔
47
from localstack.utils.strings import long_uid, short_uid
1✔
48

49
LOG = logging.getLogger(__name__)
1✔
50

51

52
# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
53
@dataclasses.dataclass(frozen=True)
1✔
54
class VersionState:
1✔
55
    state: State
1✔
56
    code: StateReasonCode | None = None
1✔
57
    reason: str | None = None
1✔
58

59

60
@dataclasses.dataclass
1✔
61
class Invocation:
1✔
62
    payload: bytes
1✔
63
    invoked_arn: str
1✔
64
    client_context: str | None
1✔
65
    invocation_type: InvocationType
1✔
66
    invoke_time: datetime
1✔
67
    # = invocation_id
68
    request_id: str
1✔
69
    trace_context: dict
1✔
70
    user_agent: str | None = None
1✔
71

72

73
InitializationType = Literal["on-demand", "provisioned-concurrency"]
1✔
74

75

76
class ArchiveCode(metaclass=ABCMeta):
1✔
77
    @abstractmethod
1✔
78
    def generate_presigned_url(self, endpoint_url: str):
1✔
79
        """
80
        Generates a presigned url pointing to the code archive
81
        """
UNCOV
82
        pass
×
83

84
    @abstractmethod
1✔
85
    def is_hot_reloading(self):
1✔
86
        """
87
        Whether this code archive is for hot reloading.
88
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
89

90
        :return: True if this object represents hot reloading, False otherwise
91
        """
UNCOV
92
        pass
×
93

94
    @abstractmethod
1✔
95
    def get_unzipped_code_location(self):
1✔
96
        """
97
        Get the location of the unzipped archive on disk
98
        """
UNCOV
99
        pass
×
100

101
    @abstractmethod
1✔
102
    def prepare_for_execution(self):
1✔
103
        """
104
        Unzips the code archive to the proper destination on disk, if not already present
105
        """
UNCOV
106
        pass
×
107

108
    @abstractmethod
1✔
109
    def destroy_cached(self):
1✔
110
        """
111
        Destroys the code object on disk, if it was saved on disk before
112
        """
UNCOV
113
        pass
×
114

115
    @abstractmethod
1✔
116
    def destroy(self):
1✔
117
        """
118
        Deletes the code object from S3 and the unzipped version from disk
119
        """
UNCOV
120
        pass
×
121

122

123
@dataclasses.dataclass(frozen=True)
1✔
124
class S3Code(ArchiveCode):
1✔
125
    """
126
    Objects representing a code archive stored in an internal S3 bucket.
127

128
    S3 Store:
129
      Code archives represented by this method are stored in a bucket awslambda-{region_name}-tasks,
130
      (e.g. awslambda-us-east-1-tasks), when correctly created using create_lambda_archive.
131
      The "awslambda" prefix matches the behavior at real AWS.
132

133
      This class will then provide different properties / methods to be operated on the stored code,
134
      like the ability to create presigned-urls, checking the code hash etc.
135

136
      A call to destroy() of this class will delete the code object from both the S3 store and the local cache
137
    Unzipped Cache:
138
      After a call to prepare_for_execution, an unzipped version of the represented code will be stored on disk,
139
      ready to mount/copy.
140

141
      It will be present at the location returned by get_unzipped_code_location,
142
      namely /tmp/lambda/{bucket_name}/{id}/code
143

144
      The cache on disk will be deleted after a call to destroy_cached (or destroy)
145
    """
146

147
    id: str
1✔
148
    account_id: str
1✔
149
    s3_bucket: str
1✔
150
    s3_key: str
1✔
151
    s3_object_version: str | None
1✔
152
    code_sha256: str
1✔
153
    code_size: int
1✔
154
    _disk_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
155

156
    def _download_archive_to_file(self, target_file: IO) -> None:
1✔
157
        """
158
        Download the code archive into a given file
159

160
        :param target_file: File the code archive should be downloaded into (IO object)
161
        """
162
        s3_client = connect_to(
1✔
163
            region_name=AWS_REGION_US_EAST_1,
164
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
165
        ).s3
166
        extra_args = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
167
        s3_client.download_fileobj(
1✔
168
            Bucket=self.s3_bucket, Key=self.s3_key, Fileobj=target_file, ExtraArgs=extra_args
169
        )
170
        target_file.flush()
1✔
171

172
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
173
        """
174
        Generates a presigned url pointing to the code archive
175
        """
176
        s3_client = boto3.client(
1✔
177
            "s3",
178
            region_name=AWS_REGION_US_EAST_1,
179
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
180
            aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,
181
            endpoint_url=endpoint_url,
182
        )
183
        params = {"Bucket": self.s3_bucket, "Key": self.s3_key}
1✔
184
        if self.s3_object_version:
1✔
UNCOV
185
            params["VersionId"] = self.s3_object_version
×
186
        return s3_client.generate_presigned_url("get_object", Params=params)
1✔
187

188
    def is_hot_reloading(self) -> bool:
1✔
189
        """
190
        Whether this code archive is hot reloading
191

192
        :return: True if it must it represents hot reloading, False otherwise
193
        """
194
        return False
1✔
195

196
    def get_unzipped_code_location(self) -> Path:
1✔
197
        """
198
        Get the location of the unzipped archive on disk
199
        """
200
        return Path(f"{tempfile.gettempdir()}/lambda/{self.s3_bucket}/{self.id}/code")
1✔
201

202
    def prepare_for_execution(self) -> None:
1✔
203
        """
204
        Unzips the code archive to the proper destination on disk, if not already present
205
        """
206
        target_path = self.get_unzipped_code_location()
1✔
207
        with self._disk_lock:
1✔
208
            if target_path.exists():
1✔
209
                return
1✔
210
            LOG.debug("Saving code %s to disk", self.id)
1✔
211
            target_path.mkdir(parents=True, exist_ok=True)
1✔
212
            with tempfile.NamedTemporaryFile() as file:
1✔
213
                self._download_archive_to_file(file)
1✔
214
                unzip(file.name, str(target_path))
1✔
215

216
    def destroy_cached(self) -> None:
1✔
217
        """
218
        Destroys the code object on disk, if it was saved on disk before
219
        """
220
        # delete parent folder to delete the whole code location
221
        code_path = self.get_unzipped_code_location().parent
1✔
222
        if not code_path.exists():
1✔
223
            return
1✔
224
        try:
1✔
225
            shutil.rmtree(code_path)
1✔
UNCOV
226
        except OSError as e:
×
UNCOV
227
            LOG.debug(
×
228
                "Could not cleanup function code path %s due to error %s while deleting file %s",
229
                code_path,
230
                e.strerror,
231
                e.filename,
232
            )
233

234
    def destroy(self) -> None:
1✔
235
        """
236
        Deletes the code object from S3 and the unzipped version from disk
237
        """
238
        LOG.debug("Final code destruction for %s", self.id)
1✔
239
        self.destroy_cached()
1✔
240
        s3_client = connect_to(
1✔
241
            region_name=AWS_REGION_US_EAST_1,
242
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
243
        ).s3
244
        kwargs = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
245
        try:
1✔
246
            s3_client.delete_object(Bucket=self.s3_bucket, Key=self.s3_key, **kwargs)
1✔
UNCOV
247
        except ClientError as e:
×
UNCOV
248
            LOG.debug(
×
249
                "Cannot delete lambda archive %s in bucket %s: %s", self.s3_key, self.s3_bucket, e
250
            )
251

252

253
@dataclasses.dataclass(frozen=True)
1✔
254
class HotReloadingCode(ArchiveCode):
1✔
255
    """
256
    Objects representing code which is mounted from a given directory from the host, for hot reloading
257
    """
258

259
    host_path: str
1✔
260
    code_sha256: str = "hot-reloading-hash-not-available"
1✔
261
    code_size: int = 0
1✔
262

263
    def generate_presigned_url(self, endpoint_url: str) -> str:
1✔
264
        return f"file://{self.host_path}"
1✔
265

266
    def get_unzipped_code_location(self) -> Path:
1✔
267
        path = os.path.expandvars(self.host_path)
1✔
268
        return Path(path)
1✔
269

270
    def is_hot_reloading(self) -> bool:
1✔
271
        """
272
        Whether this code archive is for hot reloading.
273
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
274

275
        :return: True if it represents hot reloading, False otherwise
276
        """
277
        return True
1✔
278

279
    def prepare_for_execution(self) -> None:
1✔
280
        pass
1✔
281

282
    def destroy_cached(self) -> None:
1✔
283
        """
284
        Destroys the code object on disk, if it was saved on disk before
285
        """
UNCOV
286
        pass
×
287

288
    def destroy(self) -> None:
1✔
289
        """
290
        Deletes the code object from S3 and the unzipped version from disk
291
        """
292
        pass
1✔
293

294

295
@dataclasses.dataclass(frozen=True)
1✔
296
class ImageCode:
1✔
297
    image_uri: str
1✔
298
    repository_type: str
1✔
299
    code_sha256: str
1✔
300

301
    @property
1✔
302
    def resolved_image_uri(self):
1✔
303
        return f"{self.image_uri.rpartition(':')[0]}@sha256:{self.code_sha256}"
1✔
304

305

306
@dataclasses.dataclass
1✔
307
class DeadLetterConfig:
1✔
308
    target_arn: str
1✔
309

310

311
@dataclasses.dataclass
1✔
312
class FileSystemConfig:
1✔
313
    arn: str
1✔
314
    local_mount_path: str
1✔
315

316

317
@dataclasses.dataclass(frozen=True)
1✔
318
class ImageConfig:
1✔
319
    working_directory: str
1✔
320
    command: list[str] = dataclasses.field(default_factory=list)
1✔
321
    entrypoint: list[str] = dataclasses.field(default_factory=list)
1✔
322

323

324
@dataclasses.dataclass
1✔
325
class VpcConfig:
1✔
326
    vpc_id: str
1✔
327
    security_group_ids: list[str] = dataclasses.field(default_factory=list)
1✔
328
    subnet_ids: list[str] = dataclasses.field(default_factory=list)
1✔
329

330

331
@dataclasses.dataclass(frozen=True)
1✔
332
class UpdateStatus:
1✔
333
    status: LastUpdateStatus | None
1✔
334
    code: str | None = None  # TODO: probably not a string
1✔
335
    reason: str | None = None
1✔
336

337

338
@dataclasses.dataclass
1✔
339
class LambdaEphemeralStorage:
1✔
340
    size: int
1✔
341

342

343
@dataclasses.dataclass
1✔
344
class FunctionUrlConfig:
1✔
345
    """
346
    * HTTP(s)
347
    * You can apply function URLs to any function alias, or to the $LATEST unpublished function version. You can't add a function URL to any other function version.
348
    * Once you create a function URL, its URL endpoint never changes
349
    """
350

351
    function_arn: str  # fully qualified ARN
1✔
352
    function_name: str  # resolved name
1✔
353
    cors: Cors
1✔
354
    url_id: str  # Custom URL (via tag), or generated unique subdomain id  e.g. pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn
1✔
355
    url: str  # full URL (e.g. "https://pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn.lambda-url.eu-west-3.on.aws/")
1✔
356
    auth_type: FunctionUrlAuthType
1✔
357
    creation_time: str  # time
1✔
358
    last_modified_time: str | None = (
1✔
359
        None  # TODO: check if this is the creation time when initially creating
360
    )
361
    function_qualifier: str | None = "$LATEST"  # only $LATEST or alias name
1✔
362
    invoke_mode: InvokeMode | None = None
1✔
363

364

365
@dataclasses.dataclass
1✔
366
class ProvisionedConcurrencyConfiguration:
1✔
367
    provisioned_concurrent_executions: int
1✔
368
    last_modified: str  # date
1✔
369

370

371
@dataclasses.dataclass
1✔
372
class ProvisionedConcurrencyState:
1✔
373
    """transient items"""
374

375
    allocated: int = 0
1✔
376
    available: int = 0
1✔
377
    status: ProvisionedConcurrencyStatusEnum = dataclasses.field(
1✔
378
        default=ProvisionedConcurrencyStatusEnum.IN_PROGRESS
379
    )
380
    status_reason: str | None = None
1✔
381

382

383
@dataclasses.dataclass
1✔
384
class AliasRoutingConfig:
1✔
385
    version_weights: dict[str, float]
1✔
386

387

388
@dataclasses.dataclass(frozen=True)
1✔
389
class VersionIdentifier:
1✔
390
    function_name: str
1✔
391
    qualifier: str
1✔
392
    region: str
1✔
393
    account: str
1✔
394

395
    def qualified_arn(self):
1✔
396
        return qualified_lambda_arn(
1✔
397
            function_name=self.function_name,
398
            qualifier=self.qualifier,
399
            region=self.region,
400
            account=self.account,
401
        )
402

403
    def unqualified_arn(self):
1✔
404
        return unqualified_lambda_arn(
1✔
405
            function_name=self.function_name,
406
            region=self.region,
407
            account=self.account,
408
        )
409

410

411
@dataclasses.dataclass(frozen=True)
1✔
412
class VersionAlias:
1✔
413
    function_version: str
1✔
414
    name: str
1✔
415
    description: str | None
1✔
416
    routing_configuration: AliasRoutingConfig | None = None
1✔
417
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
418

419

420
@dataclasses.dataclass
1✔
421
class ResourcePolicy:
1✔
422
    Version: str
1✔
423
    Id: str
1✔
424
    Statement: list[dict]
1✔
425

426

427
@dataclasses.dataclass
1✔
428
class FunctionResourcePolicy:
1✔
429
    policy: ResourcePolicy
1✔
430

431

432
@dataclasses.dataclass
1✔
433
class EventInvokeConfig:
1✔
434
    function_name: str
1✔
435
    qualifier: str
1✔
436

437
    last_modified: str | None = dataclasses.field(compare=False)
1✔
438
    destination_config: DestinationConfig | None = None
1✔
439
    maximum_retry_attempts: int | None = None
1✔
440
    maximum_event_age_in_seconds: int | None = None
1✔
441

442

443
# Result Models
444
@dataclasses.dataclass
1✔
445
class InvocationResult:
1✔
446
    request_id: str
1✔
447
    payload: bytes | None
1✔
448
    is_error: bool
1✔
449
    logs: str | None
1✔
450
    executed_version: str | None = None
1✔
451

452

453
@dataclasses.dataclass
1✔
454
class InvocationLogs:
1✔
455
    request_id: str
1✔
456
    logs: str
1✔
457

458

459
class Credentials(TypedDict):
1✔
460
    AccessKeyId: str
1✔
461
    SecretAccessKey: str
1✔
462
    SessionToken: str
1✔
463
    Expiration: datetime
1✔
464

465

466
class OtherServiceEndpoint:
1✔
467
    def status_ready(self, executor_id: str) -> None:
1✔
468
        """
469
        Processes a status ready report by RAPID
470
        :param executor_id: Executor ID this ready report is for
471
        """
472
        raise NotImplementedError()
473

474
    def status_error(self, executor_id: str) -> None:
1✔
475
        """
476
        Processes a status error report by RAPID
477
        :param executor_id: Executor ID this error report is for
478
        """
479
        raise NotImplementedError()
480

481

482
@dataclasses.dataclass(frozen=True)
1✔
483
class CodeSigningConfig:
1✔
484
    csc_id: str
1✔
485
    arn: str
1✔
486

487
    allowed_publishers: AllowedPublishers
1✔
488
    policies: CodeSigningPolicies
1✔
489
    last_modified: str
1✔
490
    description: str | None = None
1✔
491

492

493
@dataclasses.dataclass
1✔
494
class LayerPolicyStatement:
1✔
495
    sid: str
1✔
496
    action: str
1✔
497
    principal: str
1✔
498
    organization_id: str | None
1✔
499

500

501
@dataclasses.dataclass
1✔
502
class LayerPolicy:
1✔
503
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
504
    id: str = "default"  # static
1✔
505
    version: str = "2012-10-17"  # static
1✔
506
    statements: dict[str, LayerPolicyStatement] = dataclasses.field(
1✔
507
        default_factory=dict
508
    )  # statement ID => statement
509

510

511
@dataclasses.dataclass
1✔
512
class LayerVersion:
1✔
513
    layer_version_arn: str
1✔
514
    layer_arn: str
1✔
515

516
    version: int
1✔
517
    code: ArchiveCode
1✔
518
    license_info: str
1✔
519
    compatible_runtimes: list[Runtime]
1✔
520
    compatible_architectures: list[Architecture]
1✔
521
    created: str  # date
1✔
522
    description: str = ""
1✔
523

524
    policy: LayerPolicy = None
1✔
525

526

527
@dataclasses.dataclass
1✔
528
class Layer:
1✔
529
    arn: str
1✔
530
    next_version: int = 1
1✔
531
    next_version_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
532
    layer_versions: dict[str, LayerVersion] = dataclasses.field(default_factory=dict)
1✔
533

534

535
@dataclasses.dataclass(frozen=True)
1✔
536
class VersionFunctionConfiguration:
1✔
537
    # fields
538
    description: str
1✔
539
    role: str
1✔
540
    timeout: int
1✔
541
    runtime: Runtime
1✔
542
    memory_size: int
1✔
543
    handler: str
1✔
544
    package_type: PackageType
1✔
545
    environment: dict[str, str]
1✔
546
    architectures: list[Architecture]
1✔
547
    # internal revision is updated when runtime restart is necessary
548
    internal_revision: str
1✔
549
    ephemeral_storage: LambdaEphemeralStorage
1✔
550
    snap_start: SnapStartResponse
1✔
551

552
    tracing_config_mode: TracingMode
1✔
553
    code: ArchiveCode
1✔
554
    last_modified: str  # ISO string
1✔
555
    state: VersionState
1✔
556

557
    image: ImageCode | None = None
1✔
558
    image_config: ImageConfig | None = None
1✔
559
    runtime_version_config: RuntimeVersionConfig | None = None
1✔
560
    last_update: UpdateStatus | None = None
1✔
561
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
562
    layers: list[LayerVersion] = dataclasses.field(default_factory=list)
1✔
563

564
    dead_letter_arn: str | None = None
1✔
565

566
    # kms_key_arn: str
567
    # file_system_configs: FileSystemConfig
568
    vpc_config: VpcConfig | None = None
1✔
569

570
    logging_config: LoggingConfig = dataclasses.field(default_factory=dict)
1✔
571

572

573
@dataclasses.dataclass(frozen=True)
1✔
574
class FunctionVersion:
1✔
575
    id: VersionIdentifier
1✔
576
    config: VersionFunctionConfiguration
1✔
577

578
    @property
1✔
579
    def qualified_arn(self) -> str:
1✔
580
        return self.id.qualified_arn()
1✔
581

582

583
@dataclasses.dataclass
1✔
584
class Function:
1✔
585
    function_name: str
1✔
586
    code_signing_config_arn: str | None = None
1✔
587
    aliases: dict[str, VersionAlias] = dataclasses.field(default_factory=dict)
1✔
588
    versions: dict[str, FunctionVersion] = dataclasses.field(default_factory=dict)
1✔
589
    function_url_configs: dict[str, FunctionUrlConfig] = dataclasses.field(
1✔
590
        default_factory=dict
591
    )  # key is $LATEST, version, or alias
592
    permissions: dict[str, FunctionResourcePolicy] = dataclasses.field(
1✔
593
        default_factory=dict
594
    )  # key is $LATEST, version or alias
595
    event_invoke_configs: dict[str, EventInvokeConfig] = dataclasses.field(
1✔
596
        default_factory=dict
597
    )  # key is $LATEST(?), version or alias
598
    reserved_concurrent_executions: int | None = None
1✔
599
    recursive_loop: RecursiveLoop = RecursiveLoop.Terminate
1✔
600
    provisioned_concurrency_configs: dict[str, ProvisionedConcurrencyConfiguration] = (
1✔
601
        dataclasses.field(default_factory=dict)
602
    )
603

604
    lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
605
    next_version: int = 1
1✔
606

607
    def latest(self) -> FunctionVersion:
1✔
608
        return self.versions["$LATEST"]
1✔
609

610
    # HACK to model a volatile variable that should be ignored for persistence
611
    def __post_init__(self):
1✔
612
        # Identifier unique to this function and LocalStack instance.
613
        # A LocalStack restart or persistence load should create a new instance id.
614
        # Used for retaining invoke queues across version updates for $LATEST, but separate unrelated instances.
615
        self.instance_id = short_uid()
1✔
616

617
    def __getstate__(self):
1✔
618
        """Ignore certain volatile fields for pickling.
619
        # https://docs.python.org/3/library/pickle.html#handling-stateful-objects
620
        """
621
        # Copy the object's state from self.__dict__ which contains
622
        # all our instance attributes. Always use the dict.copy()
623
        # method to avoid modifying the original state.
624
        state = self.__dict__.copy()
×
625
        # Remove the volatile entries.
UNCOV
626
        del state["instance_id"]
×
UNCOV
627
        return state
×
628

629
    def __setstate__(self, state):
1✔
630
        # Inject persistent state
UNCOV
631
        self.__dict__.update(state)
×
632
        # Create new instance id
UNCOV
633
        self.__post_init__()
×
634

635

636
class ValidationException(CommonServiceException):
1✔
637
    def __init__(self, message: str):
1✔
638
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
639

640

641
class RequestEntityTooLargeException(CommonServiceException):
1✔
642
    def __init__(self, message: str):
1✔
643
        super().__init__(code="RequestEntityTooLargeException", status_code=413, message=message)
1✔
644

645

646
# note: we might at some point want to generalize these limits across all services and fetch them from there
647

648

649
@dataclasses.dataclass
1✔
650
class AccountSettings:
1✔
651
    total_code_size: int = config.LAMBDA_LIMITS_TOTAL_CODE_SIZE
1✔
652
    code_size_zipped: int = config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED
1✔
653
    code_size_unzipped: int = config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED
1✔
654
    concurrent_executions: int = config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc