• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4d722ec8-e76d-4bbc-9fcc-2ce2c2dda7f7

15 Apr 2025 06:51PM UTC coverage: 86.419% (-0.05%) from 86.472%
4d722ec8-e76d-4bbc-9fcc-2ce2c2dda7f7

push

circleci

web-flow
CloudFormation Engine v2: Base Mappings and Conditions tests for Update Graph and PreProc (#12527)

2 of 2 new or added lines in 1 file covered. (100.0%)

44 existing lines in 20 files now uncovered.

63626 of 73625 relevant lines covered (86.42%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.04
/localstack-core/localstack/services/lambda_/invocation/lambda_models.py
1
"""Lambda models for internal use and persistence.
2
The LambdaProviderPro in localstack-pro imports this model and configures persistence.
3
The actual function code is stored in S3 (see S3Code).
4
"""
5

6
import dataclasses
1✔
7
import logging
1✔
8
import os.path
1✔
9
import shutil
1✔
10
import tempfile
1✔
11
import threading
1✔
12
from abc import ABCMeta, abstractmethod
1✔
13
from datetime import datetime
1✔
14
from pathlib import Path
1✔
15
from typing import IO, Dict, Literal, Optional, TypedDict
1✔
16

17
from botocore.exceptions import ClientError
1✔
18

19
from localstack import config
1✔
20
from localstack.aws.api import CommonServiceException
1✔
21
from localstack.aws.api.lambda_ import (
1✔
22
    AllowedPublishers,
23
    Architecture,
24
    CodeSigningPolicies,
25
    Cors,
26
    DestinationConfig,
27
    FunctionUrlAuthType,
28
    InvocationType,
29
    InvokeMode,
30
    LastUpdateStatus,
31
    LoggingConfig,
32
    PackageType,
33
    ProvisionedConcurrencyStatusEnum,
34
    RecursiveLoop,
35
    Runtime,
36
    RuntimeVersionConfig,
37
    SnapStartResponse,
38
    State,
39
    StateReasonCode,
40
    TracingMode,
41
)
42
from localstack.aws.connect import connect_to
1✔
43
from localstack.constants import AWS_REGION_US_EAST_1
1✔
44
from localstack.services.lambda_.api_utils import qualified_lambda_arn, unqualified_lambda_arn
1✔
45
from localstack.utils.archives import unzip
1✔
46
from localstack.utils.strings import long_uid, short_uid
1✔
47

48
LOG = logging.getLogger(__name__)
1✔
49

50

51
# TODO: maybe we should make this more "transient" by always initializing to Pending and *not* persisting it?
52
@dataclasses.dataclass(frozen=True)
1✔
53
class VersionState:
1✔
54
    state: State
1✔
55
    code: Optional[StateReasonCode] = None
1✔
56
    reason: Optional[str] = None
1✔
57

58

59
@dataclasses.dataclass
1✔
60
class Invocation:
1✔
61
    payload: bytes
1✔
62
    invoked_arn: str
1✔
63
    client_context: str | None
1✔
64
    invocation_type: InvocationType
1✔
65
    invoke_time: datetime
1✔
66
    # = invocation_id
67
    request_id: str
1✔
68
    trace_context: dict
1✔
69

70

71
InitializationType = Literal["on-demand", "provisioned-concurrency"]
1✔
72

73

74
class ArchiveCode(metaclass=ABCMeta):
1✔
75
    @abstractmethod
1✔
76
    def generate_presigned_url(self, endpoint_url: str | None = None):
1✔
77
        """
78
        Generates a presigned url pointing to the code archive
79
        """
80
        pass
×
81

82
    @abstractmethod
1✔
83
    def is_hot_reloading(self):
1✔
84
        """
85
        Whether this code archive is for hot reloading.
86
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
87

88
        :return: True if this object represents hot reloading, False otherwise
89
        """
90
        pass
×
91

92
    @abstractmethod
1✔
93
    def get_unzipped_code_location(self):
1✔
94
        """
95
        Get the location of the unzipped archive on disk
96
        """
97
        pass
×
98

99
    @abstractmethod
1✔
100
    def prepare_for_execution(self):
1✔
101
        """
102
        Unzips the code archive to the proper destination on disk, if not already present
103
        """
104
        pass
×
105

106
    @abstractmethod
1✔
107
    def destroy_cached(self):
1✔
108
        """
109
        Destroys the code object on disk, if it was saved on disk before
110
        """
111
        pass
×
112

113
    @abstractmethod
1✔
114
    def destroy(self):
1✔
115
        """
116
        Deletes the code object from S3 and the unzipped version from disk
117
        """
118
        pass
×
119

120

121
@dataclasses.dataclass(frozen=True)
1✔
122
class S3Code(ArchiveCode):
1✔
123
    """
124
    Objects representing a code archive stored in an internal S3 bucket.
125

126
    S3 Store:
127
      Code archives represented by this method are stored in a bucket awslambda-{region_name}-tasks,
128
      (e.g. awslambda-us-east-1-tasks), when correctly created using create_lambda_archive.
129
      The "awslambda" prefix matches the behavior at real AWS.
130

131
      This class will then provide different properties / methods to be operated on the stored code,
132
      like the ability to create presigned-urls, checking the code hash etc.
133

134
      A call to destroy() of this class will delete the code object from both the S3 store and the local cache
135
    Unzipped Cache:
136
      After a call to prepare_for_execution, an unzipped version of the represented code will be stored on disk,
137
      ready to mount/copy.
138

139
      It will be present at the location returned by get_unzipped_code_location,
140
      namely /tmp/lambda/{bucket_name}/{id}/code
141

142
      The cache on disk will be deleted after a call to destroy_cached (or destroy)
143
    """
144

145
    id: str
1✔
146
    account_id: str
1✔
147
    s3_bucket: str
1✔
148
    s3_key: str
1✔
149
    s3_object_version: str | None
1✔
150
    code_sha256: str
1✔
151
    code_size: int
1✔
152
    _disk_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
153

154
    def _download_archive_to_file(self, target_file: IO) -> None:
1✔
155
        """
156
        Download the code archive into a given file
157

158
        :param target_file: File the code archive should be downloaded into (IO object)
159
        """
160
        s3_client = connect_to(
1✔
161
            region_name=AWS_REGION_US_EAST_1,
162
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
163
        ).s3
164
        extra_args = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
165
        s3_client.download_fileobj(
1✔
166
            Bucket=self.s3_bucket, Key=self.s3_key, Fileobj=target_file, ExtraArgs=extra_args
167
        )
168
        target_file.flush()
1✔
169

170
    def generate_presigned_url(self, endpoint_url: str | None = None) -> str:
1✔
171
        """
172
        Generates a presigned url pointing to the code archive
173
        """
174
        s3_client = connect_to(
1✔
175
            region_name=AWS_REGION_US_EAST_1,
176
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
177
            endpoint_url=endpoint_url,
178
        ).s3
179
        params = {"Bucket": self.s3_bucket, "Key": self.s3_key}
1✔
180
        if self.s3_object_version:
1✔
181
            params["VersionId"] = self.s3_object_version
×
182
        return s3_client.generate_presigned_url("get_object", Params=params)
1✔
183

184
    def is_hot_reloading(self) -> bool:
1✔
185
        """
186
        Whether this code archive is hot reloading
187

188
        :return: True if it must it represents hot reloading, False otherwise
189
        """
190
        return False
1✔
191

192
    def get_unzipped_code_location(self) -> Path:
1✔
193
        """
194
        Get the location of the unzipped archive on disk
195
        """
196
        return Path(f"{tempfile.gettempdir()}/lambda/{self.s3_bucket}/{self.id}/code")
1✔
197

198
    def prepare_for_execution(self) -> None:
1✔
199
        """
200
        Unzips the code archive to the proper destination on disk, if not already present
201
        """
202
        target_path = self.get_unzipped_code_location()
1✔
203
        with self._disk_lock:
1✔
204
            if target_path.exists():
1✔
205
                return
1✔
206
            LOG.debug("Saving code %s to disk", self.id)
1✔
207
            target_path.mkdir(parents=True, exist_ok=True)
1✔
208
            with tempfile.NamedTemporaryFile() as file:
1✔
209
                self._download_archive_to_file(file)
1✔
210
                unzip(file.name, str(target_path))
1✔
211

212
    def destroy_cached(self) -> None:
1✔
213
        """
214
        Destroys the code object on disk, if it was saved on disk before
215
        """
216
        # delete parent folder to delete the whole code location
217
        code_path = self.get_unzipped_code_location().parent
1✔
218
        if not code_path.exists():
1✔
219
            return
1✔
220
        try:
1✔
221
            shutil.rmtree(code_path)
1✔
UNCOV
222
        except OSError as e:
×
UNCOV
223
            LOG.debug(
×
224
                "Could not cleanup function code path %s due to error %s while deleting file %s",
225
                code_path,
226
                e.strerror,
227
                e.filename,
228
            )
229

230
    def destroy(self) -> None:
1✔
231
        """
232
        Deletes the code object from S3 and the unzipped version from disk
233
        """
234
        LOG.debug("Final code destruction for %s", self.id)
1✔
235
        self.destroy_cached()
1✔
236
        s3_client = connect_to(
1✔
237
            region_name=AWS_REGION_US_EAST_1,
238
            aws_access_key_id=config.INTERNAL_RESOURCE_ACCOUNT,
239
        ).s3
240
        kwargs = {"VersionId": self.s3_object_version} if self.s3_object_version else {}
1✔
241
        try:
1✔
242
            s3_client.delete_object(Bucket=self.s3_bucket, Key=self.s3_key, **kwargs)
1✔
243
        except ClientError as e:
×
244
            LOG.debug(
×
245
                "Cannot delete lambda archive %s in bucket %s: %s", self.s3_key, self.s3_bucket, e
246
            )
247

248

249
@dataclasses.dataclass(frozen=True)
1✔
250
class HotReloadingCode(ArchiveCode):
1✔
251
    """
252
    Objects representing code which is mounted from a given directory from the host, for hot reloading
253
    """
254

255
    host_path: str
1✔
256
    code_sha256: str = "hot-reloading-hash-not-available"
1✔
257
    code_size: int = 0
1✔
258

259
    def generate_presigned_url(self, endpoint_url: str | None = None) -> str:
1✔
260
        return f"Code location: {self.host_path}"
1✔
261

262
    def get_unzipped_code_location(self) -> Path:
1✔
263
        path = os.path.expandvars(self.host_path)
1✔
264
        return Path(path)
1✔
265

266
    def is_hot_reloading(self) -> bool:
1✔
267
        """
268
        Whether this code archive is for hot reloading.
269
        This means it should mount the location from the host, and should instruct the runtimes to listen for changes
270

271
        :return: True if it represents hot reloading, False otherwise
272
        """
273
        return True
1✔
274

275
    def prepare_for_execution(self) -> None:
1✔
276
        pass
1✔
277

278
    def destroy_cached(self) -> None:
1✔
279
        """
280
        Destroys the code object on disk, if it was saved on disk before
281
        """
282
        pass
×
283

284
    def destroy(self) -> None:
1✔
285
        """
286
        Deletes the code object from S3 and the unzipped version from disk
287
        """
288
        pass
1✔
289

290

291
@dataclasses.dataclass(frozen=True)
1✔
292
class ImageCode:
1✔
293
    image_uri: str
1✔
294
    repository_type: str
1✔
295
    code_sha256: str
1✔
296

297
    @property
1✔
298
    def resolved_image_uri(self):
1✔
299
        return f"{self.image_uri.rpartition(':')[0]}@sha256:{self.code_sha256}"
1✔
300

301

302
@dataclasses.dataclass
1✔
303
class DeadLetterConfig:
1✔
304
    target_arn: str
1✔
305

306

307
@dataclasses.dataclass
1✔
308
class FileSystemConfig:
1✔
309
    arn: str
1✔
310
    local_mount_path: str
1✔
311

312

313
@dataclasses.dataclass(frozen=True)
1✔
314
class ImageConfig:
1✔
315
    working_directory: str
1✔
316
    command: list[str] = dataclasses.field(default_factory=list)
1✔
317
    entrypoint: list[str] = dataclasses.field(default_factory=list)
1✔
318

319

320
@dataclasses.dataclass
1✔
321
class VpcConfig:
1✔
322
    vpc_id: str
1✔
323
    security_group_ids: list[str] = dataclasses.field(default_factory=list)
1✔
324
    subnet_ids: list[str] = dataclasses.field(default_factory=list)
1✔
325

326

327
@dataclasses.dataclass(frozen=True)
1✔
328
class UpdateStatus:
1✔
329
    status: LastUpdateStatus | None
1✔
330
    code: str | None = None  # TODO: probably not a string
1✔
331
    reason: str | None = None
1✔
332

333

334
@dataclasses.dataclass
1✔
335
class LambdaEphemeralStorage:
1✔
336
    size: int
1✔
337

338

339
@dataclasses.dataclass
1✔
340
class FunctionUrlConfig:
1✔
341
    """
342
    * HTTP(s)
343
    * You can apply function URLs to any function alias, or to the $LATEST unpublished function version. You can't add a function URL to any other function version.
344
    * Once you create a function URL, its URL endpoint never changes
345
    """
346

347
    function_arn: str  # fully qualified ARN
1✔
348
    function_name: str  # resolved name
1✔
349
    cors: Cors
1✔
350
    url_id: str  # Custom URL (via tag), or generated unique subdomain id  e.g. pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn
1✔
351
    url: str  # full URL (e.g. "https://pfn5bdb2dl5mzkbn6eb2oi3xfe0nthdn.lambda-url.eu-west-3.on.aws/")
1✔
352
    auth_type: FunctionUrlAuthType
1✔
353
    creation_time: str  # time
1✔
354
    last_modified_time: Optional[str] = (
1✔
355
        None  # TODO: check if this is the creation time when initially creating
356
    )
357
    function_qualifier: Optional[str] = "$LATEST"  # only $LATEST or alias name
1✔
358
    invoke_mode: Optional[InvokeMode] = None
1✔
359

360

361
@dataclasses.dataclass
1✔
362
class ProvisionedConcurrencyConfiguration:
1✔
363
    provisioned_concurrent_executions: int
1✔
364
    last_modified: str  # date
1✔
365

366

367
@dataclasses.dataclass
1✔
368
class ProvisionedConcurrencyState:
1✔
369
    """transient items"""
370

371
    allocated: int = 0
1✔
372
    available: int = 0
1✔
373
    status: ProvisionedConcurrencyStatusEnum = dataclasses.field(
1✔
374
        default=ProvisionedConcurrencyStatusEnum.IN_PROGRESS
375
    )
376
    status_reason: Optional[str] = None
1✔
377

378

379
@dataclasses.dataclass
1✔
380
class AliasRoutingConfig:
1✔
381
    version_weights: Dict[str, float]
1✔
382

383

384
@dataclasses.dataclass(frozen=True)
1✔
385
class VersionIdentifier:
1✔
386
    function_name: str
1✔
387
    qualifier: str
1✔
388
    region: str
1✔
389
    account: str
1✔
390

391
    def qualified_arn(self):
1✔
392
        return qualified_lambda_arn(
1✔
393
            function_name=self.function_name,
394
            qualifier=self.qualifier,
395
            region=self.region,
396
            account=self.account,
397
        )
398

399
    def unqualified_arn(self):
1✔
400
        return unqualified_lambda_arn(
1✔
401
            function_name=self.function_name,
402
            region=self.region,
403
            account=self.account,
404
        )
405

406

407
@dataclasses.dataclass(frozen=True)
1✔
408
class VersionAlias:
1✔
409
    function_version: str
1✔
410
    name: str
1✔
411
    description: str | None
1✔
412
    routing_configuration: AliasRoutingConfig | None = None
1✔
413
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
414

415

416
@dataclasses.dataclass
1✔
417
class ResourcePolicy:
1✔
418
    Version: str
1✔
419
    Id: str
1✔
420
    Statement: list[dict]
1✔
421

422

423
@dataclasses.dataclass
1✔
424
class FunctionResourcePolicy:
1✔
425
    policy: ResourcePolicy
1✔
426

427

428
@dataclasses.dataclass
1✔
429
class EventInvokeConfig:
1✔
430
    function_name: str
1✔
431
    qualifier: str
1✔
432

433
    last_modified: Optional[str] = dataclasses.field(compare=False)
1✔
434
    destination_config: Optional[DestinationConfig] = None
1✔
435
    maximum_retry_attempts: Optional[int] = None
1✔
436
    maximum_event_age_in_seconds: Optional[int] = None
1✔
437

438

439
# Result Models
440
@dataclasses.dataclass
1✔
441
class InvocationResult:
1✔
442
    request_id: str
1✔
443
    payload: bytes | None
1✔
444
    is_error: bool
1✔
445
    logs: str | None
1✔
446
    executed_version: str | None = None
1✔
447

448

449
@dataclasses.dataclass
1✔
450
class InvocationLogs:
1✔
451
    request_id: str
1✔
452
    logs: str
1✔
453

454

455
class Credentials(TypedDict):
1✔
456
    AccessKeyId: str
1✔
457
    SecretAccessKey: str
1✔
458
    SessionToken: str
1✔
459
    Expiration: datetime
1✔
460

461

462
class OtherServiceEndpoint:
1✔
463
    def status_ready(self, executor_id: str) -> None:
1✔
464
        """
465
        Processes a status ready report by RAPID
466
        :param executor_id: Executor ID this ready report is for
467
        """
468
        raise NotImplementedError()
469

470
    def status_error(self, executor_id: str) -> None:
1✔
471
        """
472
        Processes a status error report by RAPID
473
        :param executor_id: Executor ID this error report is for
474
        """
475
        raise NotImplementedError()
476

477

478
@dataclasses.dataclass(frozen=True)
1✔
479
class CodeSigningConfig:
1✔
480
    csc_id: str
1✔
481
    arn: str
1✔
482

483
    allowed_publishers: AllowedPublishers
1✔
484
    policies: CodeSigningPolicies
1✔
485
    last_modified: str
1✔
486
    description: Optional[str] = None
1✔
487

488

489
@dataclasses.dataclass
1✔
490
class LayerPolicyStatement:
1✔
491
    sid: str
1✔
492
    action: str
1✔
493
    principal: str
1✔
494
    organization_id: Optional[str]
1✔
495

496

497
@dataclasses.dataclass
1✔
498
class LayerPolicy:
1✔
499
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
500
    id: str = "default"  # static
1✔
501
    version: str = "2012-10-17"  # static
1✔
502
    statements: dict[str, LayerPolicyStatement] = dataclasses.field(
1✔
503
        default_factory=dict
504
    )  # statement ID => statement
505

506

507
@dataclasses.dataclass
1✔
508
class LayerVersion:
1✔
509
    layer_version_arn: str
1✔
510
    layer_arn: str
1✔
511

512
    version: int
1✔
513
    code: ArchiveCode
1✔
514
    license_info: str
1✔
515
    compatible_runtimes: list[Runtime]
1✔
516
    compatible_architectures: list[Architecture]
1✔
517
    created: str  # date
1✔
518
    description: str = ""
1✔
519

520
    policy: LayerPolicy = None
1✔
521

522

523
@dataclasses.dataclass
1✔
524
class Layer:
1✔
525
    arn: str
1✔
526
    next_version: int = 1
1✔
527
    next_version_lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
528
    layer_versions: dict[str, LayerVersion] = dataclasses.field(default_factory=dict)
1✔
529

530

531
@dataclasses.dataclass(frozen=True)
1✔
532
class VersionFunctionConfiguration:
1✔
533
    # fields
534
    description: str
1✔
535
    role: str
1✔
536
    timeout: int
1✔
537
    runtime: Runtime
1✔
538
    memory_size: int
1✔
539
    handler: str
1✔
540
    package_type: PackageType
1✔
541
    environment: dict[str, str]
1✔
542
    architectures: list[Architecture]
1✔
543
    # internal revision is updated when runtime restart is necessary
544
    internal_revision: str
1✔
545
    ephemeral_storage: LambdaEphemeralStorage
1✔
546
    snap_start: SnapStartResponse
1✔
547

548
    tracing_config_mode: TracingMode
1✔
549
    code: ArchiveCode
1✔
550
    last_modified: str  # ISO string
1✔
551
    state: VersionState
1✔
552

553
    image: Optional[ImageCode] = None
1✔
554
    image_config: Optional[ImageConfig] = None
1✔
555
    runtime_version_config: Optional[RuntimeVersionConfig] = None
1✔
556
    last_update: Optional[UpdateStatus] = None
1✔
557
    revision_id: str = dataclasses.field(init=False, default_factory=long_uid)
1✔
558
    layers: list[LayerVersion] = dataclasses.field(default_factory=list)
1✔
559

560
    dead_letter_arn: Optional[str] = None
1✔
561

562
    # kms_key_arn: str
563
    # file_system_configs: FileSystemConfig
564
    vpc_config: Optional[VpcConfig] = None
1✔
565

566
    logging_config: LoggingConfig = dataclasses.field(default_factory=dict)
1✔
567

568

569
@dataclasses.dataclass(frozen=True)
1✔
570
class FunctionVersion:
1✔
571
    id: VersionIdentifier
1✔
572
    config: VersionFunctionConfiguration
1✔
573

574
    @property
1✔
575
    def qualified_arn(self) -> str:
1✔
576
        return self.id.qualified_arn()
1✔
577

578

579
@dataclasses.dataclass
1✔
580
class Function:
1✔
581
    function_name: str
1✔
582
    code_signing_config_arn: Optional[str] = None
1✔
583
    aliases: dict[str, VersionAlias] = dataclasses.field(default_factory=dict)
1✔
584
    versions: dict[str, FunctionVersion] = dataclasses.field(default_factory=dict)
1✔
585
    function_url_configs: dict[str, FunctionUrlConfig] = dataclasses.field(
1✔
586
        default_factory=dict
587
    )  # key is $LATEST, version, or alias
588
    permissions: dict[str, FunctionResourcePolicy] = dataclasses.field(
1✔
589
        default_factory=dict
590
    )  # key is $LATEST, version or alias
591
    event_invoke_configs: dict[str, EventInvokeConfig] = dataclasses.field(
1✔
592
        default_factory=dict
593
    )  # key is $LATEST(?), version or alias
594
    reserved_concurrent_executions: Optional[int] = None
1✔
595
    recursive_loop: RecursiveLoop = RecursiveLoop.Terminate
1✔
596
    provisioned_concurrency_configs: dict[str, ProvisionedConcurrencyConfiguration] = (
1✔
597
        dataclasses.field(default_factory=dict)
598
    )
599

600
    lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)
1✔
601
    next_version: int = 1
1✔
602

603
    def latest(self) -> FunctionVersion:
1✔
604
        return self.versions["$LATEST"]
1✔
605

606
    # HACK to model a volatile variable that should be ignored for persistence
607
    def __post_init__(self):
1✔
608
        # Identifier unique to this function and LocalStack instance.
609
        # A LocalStack restart or persistence load should create a new instance id.
610
        # Used for retaining invoke queues across version updates for $LATEST, but separate unrelated instances.
611
        self.instance_id = short_uid()
1✔
612

613
    def __getstate__(self):
1✔
614
        """Ignore certain volatile fields for pickling.
615
        # https://docs.python.org/3/library/pickle.html#handling-stateful-objects
616
        """
617
        # Copy the object's state from self.__dict__ which contains
618
        # all our instance attributes. Always use the dict.copy()
619
        # method to avoid modifying the original state.
620
        state = self.__dict__.copy()
×
621
        # Remove the volatile entries.
622
        del state["instance_id"]
×
623
        return state
×
624

625
    def __setstate__(self, state):
1✔
626
        # Inject persistent state
627
        self.__dict__.update(state)
×
628
        # Create new instance id
629
        self.__post_init__()
×
630

631

632
class ValidationException(CommonServiceException):
1✔
633
    def __init__(self, message: str):
1✔
634
        super().__init__(code="ValidationException", status_code=400, message=message)
1✔
635

636

637
class RequestEntityTooLargeException(CommonServiceException):
1✔
638
    def __init__(self, message: str):
1✔
639
        super().__init__(code="RequestEntityTooLargeException", status_code=413, message=message)
1✔
640

641

642
# note: we might at some point want to generalize these limits across all services and fetch them from there
643

644

645
@dataclasses.dataclass
1✔
646
class AccountSettings:
1✔
647
    total_code_size: int = config.LAMBDA_LIMITS_TOTAL_CODE_SIZE
1✔
648
    code_size_zipped: int = config.LAMBDA_LIMITS_CODE_SIZE_ZIPPED
1✔
649
    code_size_unzipped: int = config.LAMBDA_LIMITS_CODE_SIZE_UNZIPPED
1✔
650
    concurrent_executions: int = config.LAMBDA_LIMITS_CONCURRENT_EXECUTIONS
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc