• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19950811432

04 Dec 2025 07:06PM UTC coverage: 86.904% (+0.007%) from 86.897%
19950811432

push

github

web-flow
Lambda: Keep track of scaling config in store (#13463)

1 of 1 new or added line in 1 file covered. (100.0%)

165 existing lines in 7 files now uncovered.

69796 of 80314 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.0
/localstack-core/localstack/services/s3/models.py
1
import base64
1✔
2
import hashlib
1✔
3
import logging
1✔
4
from collections import defaultdict
1✔
5
from datetime import datetime
1✔
6
from secrets import token_urlsafe
1✔
7
from typing import Literal, NamedTuple, Union
1✔
8
from zoneinfo import ZoneInfo
1✔
9

10
from localstack.aws.api import CommonServiceException
1✔
11
from localstack.aws.api.s3 import (
1✔
12
    AccessControlPolicy,
13
    AccountId,
14
    AnalyticsConfiguration,
15
    AnalyticsId,
16
    BadDigest,
17
    BucketAccelerateStatus,
18
    BucketKeyEnabled,
19
    BucketLocationConstraint,
20
    BucketName,
21
    BucketRegion,
22
    BucketVersioningStatus,
23
    ChecksumAlgorithm,
24
    ChecksumType,
25
    CompletedPartList,
26
    CORSConfiguration,
27
    DefaultRetention,
28
    EntityTooSmall,
29
    ETag,
30
    Expiration,
31
    IntelligentTieringConfiguration,
32
    IntelligentTieringId,
33
    InvalidArgument,
34
    InvalidPart,
35
    InventoryConfiguration,
36
    InventoryId,
37
    LifecycleRules,
38
    LoggingEnabled,
39
    Metadata,
40
    MethodNotAllowed,
41
    MetricsConfiguration,
42
    MetricsId,
43
    MultipartUploadId,
44
    NoSuchKey,
45
    NoSuchVersion,
46
    NotificationConfiguration,
47
    ObjectKey,
48
    ObjectLockLegalHoldStatus,
49
    ObjectLockMode,
50
    ObjectLockRetainUntilDate,
51
    ObjectLockRetentionMode,
52
    ObjectOwnership,
53
    ObjectStorageClass,
54
    ObjectVersionId,
55
    Owner,
56
    Part,
57
    PartNumber,
58
    Payer,
59
    Policy,
60
    PublicAccessBlockConfiguration,
61
    ReplicationConfiguration,
62
    Restore,
63
    ServerSideEncryption,
64
    ServerSideEncryptionRule,
65
    Size,
66
    SSECustomerKeyMD5,
67
    SSEKMSKeyId,
68
    StorageClass,
69
    TransitionDefaultMinimumObjectSize,
70
    WebsiteConfiguration,
71
    WebsiteRedirectLocation,
72
)
73
from localstack.constants import AWS_REGION_US_EAST_1
1✔
74
from localstack.services.s3.constants import (
1✔
75
    DEFAULT_BUCKET_ENCRYPTION,
76
    DEFAULT_PUBLIC_BLOCK_ACCESS,
77
    S3_UPLOAD_PART_MIN_SIZE,
78
)
79
from localstack.services.s3.exceptions import InvalidRequest
1✔
80
from localstack.services.s3.utils import (
1✔
81
    CombinedCrcHash,
82
    get_s3_checksum,
83
    rfc_1123_datetime,
84
)
85
from localstack.services.stores import (
1✔
86
    AccountRegionBundle,
87
    BaseStore,
88
    CrossAccountAttribute,
89
    CrossRegionAttribute,
90
    LocalAttribute,
91
)
92
from localstack.utils.aws import arns
1✔
93
from localstack.utils.tagging import TaggingService
1✔
94

95
LOG = logging.getLogger(__name__)
1✔
96

97
_gmt_zone_info = ZoneInfo("GMT")
1✔
98

99

100
class InternalObjectPart(Part):
1✔
101
    _position: int
1✔
102

103

104
# note: not really a need to use a dataclass here, as it has a lot of fields, but only a few are set at creation
105
class S3Bucket:
1✔
106
    name: BucketName
1✔
107
    bucket_account_id: AccountId
1✔
108
    bucket_region: BucketRegion
1✔
109
    location_constraint: BucketLocationConstraint | Literal[""]
1✔
110
    creation_date: datetime
1✔
111
    multiparts: dict[MultipartUploadId, "S3Multipart"]
1✔
112
    objects: Union["KeyStore", "VersionedKeyStore"]
1✔
113
    versioning_status: BucketVersioningStatus | None
1✔
114
    lifecycle_rules: LifecycleRules | None
1✔
115
    transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize | None
1✔
116
    policy: Policy | None
1✔
117
    website_configuration: WebsiteConfiguration | None
1✔
118
    acl: AccessControlPolicy
1✔
119
    cors_rules: CORSConfiguration | None
1✔
120
    logging: LoggingEnabled
1✔
121
    notification_configuration: NotificationConfiguration
1✔
122
    payer: Payer
1✔
123
    encryption_rule: ServerSideEncryptionRule | None
1✔
124
    public_access_block: PublicAccessBlockConfiguration | None
1✔
125
    accelerate_status: BucketAccelerateStatus | None
1✔
126
    object_lock_enabled: bool
1✔
127
    object_ownership: ObjectOwnership
1✔
128
    intelligent_tiering_configurations: dict[IntelligentTieringId, IntelligentTieringConfiguration]
1✔
129
    analytics_configurations: dict[AnalyticsId, AnalyticsConfiguration]
1✔
130
    inventory_configurations: dict[InventoryId, InventoryConfiguration]
1✔
131
    metric_configurations: dict[MetricsId, MetricsConfiguration]
1✔
132
    object_lock_default_retention: DefaultRetention | None
1✔
133
    replication: ReplicationConfiguration
1✔
134
    owner: Owner
1✔
135

136
    # set all buckets parameters here
137
    def __init__(
1✔
138
        self,
139
        name: BucketName,
140
        account_id: AccountId,
141
        bucket_region: BucketRegion,
142
        owner: Owner,
143
        acl: AccessControlPolicy = None,
144
        object_ownership: ObjectOwnership = None,
145
        object_lock_enabled_for_bucket: bool = None,
146
        location_constraint: BucketLocationConstraint | Literal[""] = "",
147
    ):
148
        self.name = name
1✔
149
        self.bucket_account_id = account_id
1✔
150
        self.bucket_region = bucket_region
1✔
151
        self.location_constraint = location_constraint
1✔
152
        # If ObjectLock is enabled, it forces the bucket to be versioned as well
153
        self.versioning_status = None if not object_lock_enabled_for_bucket else "Enabled"
1✔
154
        self.objects = KeyStore() if not object_lock_enabled_for_bucket else VersionedKeyStore()
1✔
155
        self.object_ownership = object_ownership or ObjectOwnership.BucketOwnerEnforced
1✔
156
        self.object_lock_enabled = object_lock_enabled_for_bucket
1✔
157
        self.encryption_rule = DEFAULT_BUCKET_ENCRYPTION
1✔
158
        self.creation_date = datetime.now(tz=_gmt_zone_info)
1✔
159
        self.payer = Payer.BucketOwner
1✔
160
        self.public_access_block = DEFAULT_PUBLIC_BLOCK_ACCESS
1✔
161
        self.multiparts = {}
1✔
162
        self.notification_configuration = {}
1✔
163
        self.logging = {}
1✔
164
        self.cors_rules = None
1✔
165
        self.lifecycle_rules = None
1✔
166
        self.transition_default_minimum_object_size = None
1✔
167
        self.website_configuration = None
1✔
168
        self.policy = None
1✔
169
        self.accelerate_status = None
1✔
170
        self.intelligent_tiering_configurations = {}
1✔
171
        self.analytics_configurations = {}
1✔
172
        self.inventory_configurations = {}
1✔
173
        self.metric_configurations = {}
1✔
174
        self.object_lock_default_retention = {}
1✔
175
        self.replication = None
1✔
176
        self.acl = acl
1✔
177
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
178
        self.owner = owner
1✔
179
        self.bucket_arn = arns.s3_bucket_arn(self.name, region=bucket_region)
1✔
180

181
    def get_object(
1✔
182
        self,
183
        key: ObjectKey,
184
        version_id: ObjectVersionId = None,
185
        http_method: Literal["GET", "PUT", "HEAD", "DELETE"] = "GET",
186
    ) -> "S3Object":
187
        """
188
        :param key: the Object Key
189
        :param version_id: optional, the versionId of the object
190
        :param http_method: the HTTP method of the original call. This is necessary for the exception if the bucket is
191
        versioned or suspended
192
        see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeleteMarker.html
193
        :return: the S3Object from the bucket
194
        :raises NoSuchKey if the object key does not exist at all, or if the object is a DeleteMarker
195
        :raises MethodNotAllowed if the object is a DeleteMarker and the operation is not allowed against it
196
        """
197

198
        if self.versioning_status is None:
1✔
199
            if version_id and version_id != "null":
1✔
200
                raise InvalidArgument(
1✔
201
                    "Invalid version id specified",
202
                    ArgumentName="versionId",
203
                    ArgumentValue=version_id,
204
                )
205

206
            s3_object = self.objects.get(key)
1✔
207

208
            if not s3_object:
1✔
209
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
210

211
        else:
212
            self.objects: VersionedKeyStore
1✔
213
            if version_id:
1✔
214
                s3_object_version = self.objects.get(key, version_id)
1✔
215
                if not s3_object_version:
1✔
216
                    raise NoSuchVersion(
1✔
217
                        "The specified version does not exist.",
218
                        Key=key,
219
                        VersionId=version_id,
220
                    )
221
                elif isinstance(s3_object_version, S3DeleteMarker):
1✔
222
                    if http_method == "HEAD":
1✔
223
                        raise CommonServiceException(
1✔
224
                            code="405",
225
                            message="Method Not Allowed",
226
                            status_code=405,
227
                        )
228

229
                    raise MethodNotAllowed(
1✔
230
                        "The specified method is not allowed against this resource.",
231
                        Method=http_method,
232
                        ResourceType="DeleteMarker",
233
                        DeleteMarker=True,
234
                        Allow="DELETE",
235
                        VersionId=s3_object_version.version_id,
236
                    )
237
                return s3_object_version
1✔
238

239
            s3_object = self.objects.get(key)
1✔
240

241
            if not s3_object:
1✔
242
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
243

244
            elif isinstance(s3_object, S3DeleteMarker):
1✔
245
                if http_method not in ("HEAD", "GET"):
1✔
246
                    raise MethodNotAllowed(
1✔
247
                        "The specified method is not allowed against this resource.",
248
                        Method=http_method,
249
                        ResourceType="DeleteMarker",
250
                        DeleteMarker=True,
251
                        Allow="DELETE",
252
                        VersionId=s3_object.version_id,
253
                    )
254

255
                raise NoSuchKey(
1✔
256
                    "The specified key does not exist.",
257
                    Key=key,
258
                    DeleteMarker=True,
259
                    VersionId=s3_object.version_id,
260
                )
261

262
        return s3_object
1✔
263

264

265
class S3Object:
1✔
266
    key: ObjectKey
1✔
267
    version_id: ObjectVersionId | None
1✔
268
    bucket: BucketName
1✔
269
    owner: Owner | None
1✔
270
    size: Size | None
1✔
271
    etag: ETag | None
1✔
272
    user_metadata: Metadata
1✔
273
    system_metadata: Metadata
1✔
274
    last_modified: datetime
1✔
275
    expires: datetime | None
1✔
276
    expiration: Expiration | None  # right now, this is stored in the provider cache
1✔
277
    storage_class: StorageClass | ObjectStorageClass
1✔
278
    encryption: ServerSideEncryption | None  # inherit bucket
1✔
279
    kms_key_id: SSEKMSKeyId | None  # inherit bucket
1✔
280
    bucket_key_enabled: bool | None  # inherit bucket
1✔
281
    sse_key_hash: SSECustomerKeyMD5 | None
1✔
282
    checksum_algorithm: ChecksumAlgorithm
1✔
283
    checksum_value: str
1✔
284
    checksum_type: ChecksumType
1✔
285
    lock_mode: ObjectLockMode | ObjectLockRetentionMode | None
1✔
286
    lock_legal_status: ObjectLockLegalHoldStatus | None
1✔
287
    lock_until: datetime | None
1✔
288
    website_redirect_location: WebsiteRedirectLocation | None
1✔
289
    acl: AccessControlPolicy | None
1✔
290
    is_current: bool
1✔
291
    parts: dict[int, InternalObjectPart] | None
1✔
292
    restore: Restore | None
1✔
293
    internal_last_modified: int
1✔
294

295
    def __init__(
1✔
296
        self,
297
        key: ObjectKey,
298
        etag: ETag | None = None,
299
        size: int | None = None,
300
        version_id: ObjectVersionId | None = None,
301
        user_metadata: Metadata | None = None,
302
        system_metadata: Metadata | None = None,
303
        storage_class: StorageClass = StorageClass.STANDARD,
304
        expires: datetime | None = None,
305
        expiration: Expiration | None = None,
306
        checksum_algorithm: ChecksumAlgorithm | None = None,
307
        checksum_value: str | None = None,
308
        checksum_type: ChecksumType | None = ChecksumType.FULL_OBJECT,
309
        encryption: ServerSideEncryption | None = None,
310
        kms_key_id: SSEKMSKeyId | None = None,
311
        sse_key_hash: SSECustomerKeyMD5 | None = None,
312
        bucket_key_enabled: bool = False,
313
        lock_mode: ObjectLockMode | ObjectLockRetentionMode | None = None,
314
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
315
        lock_until: datetime | None = None,
316
        website_redirect_location: WebsiteRedirectLocation | None = None,
317
        acl: AccessControlPolicy | None = None,  # TODO
318
        owner: Owner | None = None,
319
    ):
320
        self.key = key
1✔
321
        self.user_metadata = (
1✔
322
            {k.lower(): v for k, v in user_metadata.items()} if user_metadata else {}
323
        )
324
        self.system_metadata = system_metadata or {}
1✔
325
        self.version_id = version_id
1✔
326
        self.storage_class = storage_class or StorageClass.STANDARD
1✔
327
        self.etag = etag
1✔
328
        self.size = size
1✔
329
        self.expires = expires
1✔
330
        self.checksum_algorithm = checksum_algorithm or ChecksumAlgorithm.CRC64NVME
1✔
331
        self.checksum_value = checksum_value
1✔
332
        self.checksum_type = checksum_type
1✔
333
        self.encryption = encryption
1✔
334
        self.kms_key_id = kms_key_id
1✔
335
        self.bucket_key_enabled = bucket_key_enabled
1✔
336
        self.sse_key_hash = sse_key_hash
1✔
337
        self.lock_mode = lock_mode
1✔
338
        self.lock_legal_status = lock_legal_status
1✔
339
        self.lock_until = lock_until
1✔
340
        self.acl = acl
1✔
341
        self.expiration = expiration
1✔
342
        self.website_redirect_location = website_redirect_location
1✔
343
        self.is_current = True
1✔
344
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
345
        self.parts = {}
1✔
346
        self.restore = None
1✔
347
        self.owner = owner
1✔
348
        self.internal_last_modified = 0
1✔
349

350
    def get_system_metadata_fields(self) -> dict:
1✔
351
        headers = {
1✔
352
            "LastModified": self.last_modified_rfc1123,
353
            "ContentLength": str(self.size),
354
            "ETag": self.quoted_etag,
355
        }
356
        if self.expires:
1✔
357
            headers["Expires"] = self.expires_rfc1123
1✔
358

359
        for metadata_key, metadata_value in self.system_metadata.items():
1✔
360
            headers[metadata_key] = metadata_value
1✔
361

362
        if self.storage_class != StorageClass.STANDARD:
1✔
363
            headers["StorageClass"] = self.storage_class
1✔
364

365
        return headers
1✔
366

367
    @property
1✔
368
    def last_modified_rfc1123(self) -> str:
1✔
369
        # TODO: verify if we need them with proper snapshot testing, for now it's copied from moto
370
        # Different datetime formats depending on how the key is obtained
371
        # https://github.com/boto/boto/issues/466
372
        return rfc_1123_datetime(self.last_modified)
1✔
373

374
    @property
1✔
375
    def expires_rfc1123(self) -> str:
1✔
376
        return rfc_1123_datetime(self.expires)
1✔
377

378
    @property
1✔
379
    def quoted_etag(self) -> str:
1✔
380
        return f'"{self.etag}"'
1✔
381

382
    def is_locked(self, bypass_governance: bool = False) -> bool:
1✔
383
        if self.lock_legal_status == "ON":
1✔
384
            return True
1✔
385

386
        if bypass_governance and self.lock_mode == ObjectLockMode.GOVERNANCE:
1✔
387
            return False
1✔
388

389
        if self.lock_until:
1✔
390
            return self.lock_until > datetime.now(tz=_gmt_zone_info)
1✔
391

392
        return False
1✔
393

394

395
# TODO: could use dataclass, validate after models are set
396
class S3DeleteMarker:
1✔
397
    key: ObjectKey
1✔
398
    version_id: str
1✔
399
    last_modified: datetime
1✔
400
    is_current: bool
1✔
401

402
    def __init__(self, key: ObjectKey, version_id: ObjectVersionId):
1✔
403
        self.key = key
1✔
404
        self.version_id = version_id
1✔
405
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
406
        self.is_current = True
1✔
407

408
    @staticmethod
1✔
409
    def is_locked(*args, **kwargs) -> bool:
1✔
410
        # an S3DeleteMarker cannot be lock protected
411
        return False
1✔
412

413

414
# TODO: could use dataclass, validate after models are set
415
class S3Part:
1✔
416
    part_number: PartNumber
1✔
417
    etag: ETag | None
1✔
418
    last_modified: datetime
1✔
419
    size: int | None
1✔
420
    checksum_algorithm: ChecksumAlgorithm | None
1✔
421
    checksum_value: str | None
1✔
422

423
    def __init__(
1✔
424
        self,
425
        part_number: PartNumber,
426
        size: int = None,
427
        etag: ETag = None,
428
        checksum_algorithm: ChecksumAlgorithm | None = None,
429
        checksum_value: str | None = None,
430
    ):
431
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
432
        self.part_number = part_number
1✔
433
        self.size = size
1✔
434
        self.etag = etag
1✔
435
        self.checksum_algorithm = checksum_algorithm
1✔
436
        self.checksum_value = checksum_value
1✔
437

438
    @property
1✔
439
    def quoted_etag(self) -> str:
1✔
440
        return f'"{self.etag}"'
1✔
441

442

443
class S3Multipart:
1✔
444
    parts: dict[PartNumber, S3Part]
1✔
445
    object: S3Object
1✔
446
    upload_id: MultipartUploadId
1✔
447
    checksum_value: str | None
1✔
448
    checksum_type: ChecksumType | None
1✔
449
    checksum_algorithm: ChecksumAlgorithm
1✔
450
    initiated: datetime
1✔
451
    precondition: bool
1✔
452

453
    def __init__(
1✔
454
        self,
455
        key: ObjectKey,
456
        storage_class: StorageClass | ObjectStorageClass = StorageClass.STANDARD,
457
        expires: datetime | None = None,
458
        expiration: datetime | None = None,  # come from lifecycle
459
        checksum_algorithm: ChecksumAlgorithm | None = None,
460
        checksum_type: ChecksumType | None = None,
461
        encryption: ServerSideEncryption | None = None,  # inherit bucket
462
        kms_key_id: SSEKMSKeyId | None = None,  # inherit bucket
463
        bucket_key_enabled: bool = False,  # inherit bucket
464
        sse_key_hash: SSECustomerKeyMD5 | None = None,
465
        lock_mode: ObjectLockMode | None = None,
466
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
467
        lock_until: datetime | None = None,
468
        website_redirect_location: WebsiteRedirectLocation | None = None,
469
        acl: AccessControlPolicy | None = None,  # TODO
470
        user_metadata: Metadata | None = None,
471
        system_metadata: Metadata | None = None,
472
        initiator: Owner | None = None,
473
        tagging: dict[str, str] | None = None,
474
        owner: Owner | None = None,
475
        precondition: bool | None = None,
476
    ):
477
        self.id = token_urlsafe(96)  # MultipartUploadId is 128 characters long
1✔
478
        self.initiated = datetime.now(tz=_gmt_zone_info)
1✔
479
        self.parts = {}
1✔
480
        self.initiator = initiator
1✔
481
        self.tagging = tagging
1✔
482
        self.checksum_value = None
1✔
483
        self.checksum_type = checksum_type
1✔
484
        self.checksum_algorithm = checksum_algorithm
1✔
485
        self.precondition = precondition
1✔
486
        self.object = S3Object(
1✔
487
            key=key,
488
            user_metadata=user_metadata,
489
            system_metadata=system_metadata,
490
            storage_class=storage_class or StorageClass.STANDARD,
491
            expires=expires,
492
            expiration=expiration,
493
            checksum_algorithm=checksum_algorithm,
494
            checksum_type=checksum_type,
495
            encryption=encryption,
496
            kms_key_id=kms_key_id,
497
            bucket_key_enabled=bucket_key_enabled,
498
            sse_key_hash=sse_key_hash,
499
            lock_mode=lock_mode,
500
            lock_legal_status=lock_legal_status,
501
            lock_until=lock_until,
502
            website_redirect_location=website_redirect_location,
503
            acl=acl,
504
            owner=owner,
505
        )
506

507
    def complete_multipart(
1✔
508
        self, parts: CompletedPartList, mpu_size: int = None, validation_checksum: str = None
509
    ):
510
        last_part_index = len(parts) - 1
1✔
511
        object_etag = hashlib.md5(usedforsecurity=False)
1✔
512
        has_checksum = self.checksum_algorithm is not None
1✔
513
        checksum_hash = None
1✔
514
        checksum_key = None
1✔
515
        if has_checksum:
1✔
516
            checksum_key = f"Checksum{self.checksum_algorithm.upper()}"
1✔
517
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
518
                checksum_hash = get_s3_checksum(self.checksum_algorithm)
1✔
519
            else:
520
                checksum_hash = CombinedCrcHash(self.checksum_algorithm)
1✔
521

522
        pos = 0
1✔
523
        parts_map: dict[int, InternalObjectPart] = {}
1✔
524
        for index, part in enumerate(parts):
1✔
525
            part_number = part["PartNumber"]
1✔
526
            part_etag = part["ETag"].strip('"')
1✔
527

528
            s3_part = self.parts.get(part_number)
1✔
529
            if (
1✔
530
                not s3_part
531
                or s3_part.etag != part_etag
532
                or (not has_checksum and any(k.startswith("Checksum") for k in part))
533
            ):
534
                raise InvalidPart(
1✔
535
                    "One or more of the specified parts could not be found.  "
536
                    "The part may not have been uploaded, "
537
                    "or the specified entity tag may not match the part's entity tag.",
538
                    ETag=part_etag,
539
                    PartNumber=part_number,
540
                    UploadId=self.id,
541
                )
542

543
            if has_checksum:
1✔
544
                if not (part_checksum := part.get(checksum_key)):
1✔
545
                    if self.checksum_type == ChecksumType.COMPOSITE:
1✔
546
                        # weird case, they still try to validate a different checksum type than the multipart
547
                        for field in part:
1✔
548
                            if field.startswith("Checksum"):
1✔
549
                                algo = field.removeprefix("Checksum").lower()
1✔
550
                                raise BadDigest(
1✔
551
                                    f"The {algo} you specified for part {part_number} did not match what we received."
552
                                )
553

554
                        raise InvalidRequest(
1✔
555
                            f"The upload was created using a {self.checksum_algorithm.lower()} checksum. "
556
                            f"The complete request must include the checksum for each part. "
557
                            f"It was missing for part {part_number} in the request."
558
                        )
559
                elif part_checksum != s3_part.checksum_value:
1✔
560
                    raise InvalidPart(
1✔
561
                        "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
562
                        ETag=part_etag,
563
                        PartNumber=part_number,
564
                        UploadId=self.id,
565
                    )
566

567
                part_checksum_value = base64.b64decode(s3_part.checksum_value)
1✔
568
                if self.checksum_type == ChecksumType.COMPOSITE:
1✔
569
                    checksum_hash.update(part_checksum_value)
1✔
570
                else:
571
                    checksum_hash.combine(part_checksum_value, s3_part.size)
1✔
572

573
            elif any(k.startswith("Checksum") for k in part):
1✔
UNCOV
574
                raise InvalidPart(
×
575
                    "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
576
                    ETag=part_etag,
577
                    PartNumber=part_number,
578
                    UploadId=self.id,
579
                )
580

581
            if index != last_part_index and s3_part.size < S3_UPLOAD_PART_MIN_SIZE:
1✔
582
                raise EntityTooSmall(
1✔
583
                    "Your proposed upload is smaller than the minimum allowed size",
584
                    ETag=part_etag,
585
                    PartNumber=part_number,
586
                    MinSizeAllowed=S3_UPLOAD_PART_MIN_SIZE,
587
                    ProposedSize=s3_part.size,
588
                )
589

590
            object_etag.update(bytes.fromhex(s3_part.etag))
1✔
591
            # keep track of the parts size, as it can be queried afterward on the object as a Range
592
            internal_part = InternalObjectPart(
1✔
593
                _position=pos,
594
                Size=s3_part.size,
595
                ETag=s3_part.etag,
596
                PartNumber=s3_part.part_number,
597
            )
598
            if has_checksum and self.checksum_type == ChecksumType.COMPOSITE:
1✔
599
                internal_part[checksum_key] = s3_part.checksum_value
1✔
600

601
            parts_map[part_number] = internal_part
1✔
602
            pos += s3_part.size
1✔
603

604
        if mpu_size and mpu_size != pos:
1✔
605
            raise InvalidRequest(
1✔
606
                f"The provided 'x-amz-mp-object-size' header value {mpu_size} "
607
                f"does not match what was computed: {pos}"
608
            )
609

610
        if has_checksum:
1✔
611
            checksum_value = base64.b64encode(checksum_hash.digest()).decode()
1✔
612
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
613
                checksum_value = f"{checksum_value}-{len(parts)}"
1✔
614

615
            elif self.checksum_type == ChecksumType.FULL_OBJECT:
1✔
616
                if validation_checksum and validation_checksum != checksum_value:
1✔
617
                    raise BadDigest(
1✔
618
                        f"The {self.object.checksum_algorithm.lower()} you specified did not match the calculated checksum."
619
                    )
620

621
            self.checksum_value = checksum_value
1✔
622
            self.object.checksum_value = checksum_value
1✔
623

624
        multipart_etag = f"{object_etag.hexdigest()}-{len(parts)}"
1✔
625
        self.object.etag = multipart_etag
1✔
626
        self.object.parts = parts_map
1✔
627

628

629
class KeyStore:
1✔
630
    """
631
    Object representing an S3 Un-versioned Bucket's Key Store. An object is mapped by a key, and you can simply
632
    retrieve the object from that key.
633
    """
634

635
    def __init__(self):
1✔
636
        self._store = {}
1✔
637

638
    def get(self, object_key: ObjectKey) -> S3Object | None:
1✔
639
        return self._store.get(object_key)
1✔
640

641
    def set(self, object_key: ObjectKey, s3_object: S3Object):
1✔
642
        self._store[object_key] = s3_object
1✔
643

644
    def pop(self, object_key: ObjectKey, default=None) -> S3Object | None:
1✔
645
        return self._store.pop(object_key, default)
1✔
646

647
    def values(self, *_, **__) -> list[S3Object | S3DeleteMarker]:
1✔
648
        # we create a shallow copy with dict to avoid size changed during iteration
649
        return list(dict(self._store).values())
1✔
650

651
    def is_empty(self) -> bool:
1✔
652
        return not self._store
1✔
653

654
    def __contains__(self, item):
1✔
UNCOV
655
        return item in self._store
×
656

657

658
class VersionedKeyStore:
1✔
659
    """
660
    Object representing an S3 Versioned Bucket's Key Store. An object is mapped by a key, and adding an object to the
661
    same key will create a new version of it. When deleting the object, a S3DeleteMarker is created and put on top
662
    of the version stack, to signal the object has been "deleted".
663
    This object allows easy retrieval and saving of new object versions.
664
    See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/versioning-workflows.html
665
    """
666

667
    def __init__(self):
1✔
668
        self._store = defaultdict(dict)
1✔
669

670
    @classmethod
1✔
671
    def from_key_store(cls, keystore: KeyStore) -> "VersionedKeyStore":
1✔
672
        new_versioned_keystore = cls()
1✔
673
        for s3_object in keystore.values():
1✔
674
            # TODO: maybe do the object mutation inside the provider instead? but would need to iterate twice
675
            #  or do this whole operation inside the provider instead, when actually working on versioning
676
            s3_object.version_id = "null"
1✔
677
            new_versioned_keystore.set(object_key=s3_object.key, s3_object=s3_object)
1✔
678

679
        return new_versioned_keystore
1✔
680

681
    def get(
1✔
682
        self, object_key: ObjectKey, version_id: ObjectVersionId = None
683
    ) -> S3Object | S3DeleteMarker | None:
684
        """
685
        :param object_key: the key of the Object we need to retrieve
686
        :param version_id: Optional, if not specified, return the current version (last one inserted)
687
        :return: an S3Object or S3DeleteMarker
688
        """
689
        if not version_id and (versions := self._store.get(object_key)):
1✔
690
            for version_id in reversed(versions):
1✔
691
                return versions.get(version_id)
1✔
692

693
        return self._store.get(object_key, {}).get(version_id)
1✔
694

695
    def set(self, object_key: ObjectKey, s3_object: S3Object | S3DeleteMarker):
1✔
696
        """
697
        Set an S3 object, using its already set VersionId.
698
        If the bucket versioning is `Enabled`, then we're just inserting a new Version.
699
        If the bucket versioning is `Suspended`, the current object version will be set to `null`, so if setting a new
700
        object at the same key, we will override it at the `null` versionId entry.
701
        :param object_key: the key of the Object we are setting
702
        :param s3_object: the S3 object or S3DeleteMarker to set
703
        :return: None
704
        """
705
        existing_s3_object = self.get(object_key)
1✔
706
        if existing_s3_object:
1✔
707
            existing_s3_object.is_current = False
1✔
708

709
        self._store[object_key][s3_object.version_id] = s3_object
1✔
710

711
    def pop(
1✔
712
        self, object_key: ObjectKey, version_id: ObjectVersionId = None, default=None
713
    ) -> S3Object | S3DeleteMarker | None:
714
        versions = self._store.get(object_key)
1✔
715
        if not versions:
1✔
UNCOV
716
            return None
×
717

718
        object_version = versions.pop(version_id, default)
1✔
719
        if not versions:
1✔
720
            self._store.pop(object_key)
1✔
721
        else:
722
            existing_s3_object = self.get(object_key)
1✔
723
            existing_s3_object.is_current = True
1✔
724

725
        return object_version
1✔
726

727
    def values(self, with_versions: bool = False) -> list[S3Object | S3DeleteMarker]:
1✔
728
        if with_versions:
1✔
729
            # we create a shallow copy with dict to avoid size changed during iteration
730
            return [
1✔
731
                object_version
732
                for values in dict(self._store).values()
733
                for object_version in dict(values).values()
734
            ]
735

736
        # if `with_versions` is False, then we need to return only the current version if it's not a DeleteMarker
737
        objects = []
1✔
738
        for object_key, versions in dict(self._store).items():
1✔
739
            # we're getting the last set object in the versions dictionary
740
            for version_id in reversed(versions):
1✔
741
                current_object = versions[version_id]
1✔
742
                if isinstance(current_object, S3DeleteMarker):
1✔
743
                    break
1✔
744

745
                objects.append(versions[version_id])
1✔
746
                break
1✔
747

748
        return objects
1✔
749

750
    def is_empty(self) -> bool:
1✔
751
        return not self._store
1✔
752

753
    def __contains__(self, item):
1✔
754
        return item in self._store
1✔
755

756

757
class S3Store(BaseStore):
1✔
758
    buckets: dict[BucketName, S3Bucket] = CrossRegionAttribute(default=dict)
1✔
759
    global_bucket_map: dict[BucketName, AccountId] = CrossAccountAttribute(default=dict)
1✔
760
    aws_managed_kms_key_id: SSEKMSKeyId = LocalAttribute(default=str)
1✔
761

762
    # static tagging service instance
763
    TAGS: TaggingService = CrossAccountAttribute(default=TaggingService)
1✔
764

765

766
class BucketCorsIndex:
1✔
767
    def __init__(self):
1✔
768
        self._cors_index_cache = None
1✔
769
        self._bucket_index_cache = None
1✔
770

771
    @property
1✔
772
    def cors(self) -> dict[str, CORSConfiguration]:
1✔
773
        if self._cors_index_cache is None:
1✔
UNCOV
774
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
×
775
        return self._cors_index_cache
1✔
776

777
    @property
1✔
778
    def buckets(self) -> set[str]:
1✔
779
        if self._bucket_index_cache is None:
1✔
780
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
1✔
781
        return self._bucket_index_cache
1✔
782

783
    def invalidate(self):
1✔
784
        self._cors_index_cache = None
1✔
785
        self._bucket_index_cache = None
1✔
786

787
    @staticmethod
1✔
788
    def _build_index() -> tuple[set[BucketName], dict[BucketName, CORSConfiguration]]:
1✔
789
        buckets = set()
1✔
790
        cors_index = {}
1✔
791
        # we create a shallow copy with dict to avoid size changed during iteration, as the store could have new account
792
        # or region create from any other requests
793
        for account_id, regions in dict(s3_stores).items():
1✔
794
            for bucket_name, bucket in dict(regions[AWS_REGION_US_EAST_1].buckets).items():
1✔
795
                bucket: S3Bucket
796
                buckets.add(bucket_name)
1✔
797
                if bucket.cors_rules is not None:
1✔
798
                    cors_index[bucket_name] = bucket.cors_rules
1✔
799

800
        return buckets, cors_index
1✔
801

802

803
class EncryptionParameters(NamedTuple):
1✔
804
    encryption: ServerSideEncryption
1✔
805
    kms_key_id: SSEKMSKeyId
1✔
806
    bucket_key_enabled: BucketKeyEnabled
1✔
807

808

809
class ObjectLockParameters(NamedTuple):
1✔
810
    lock_until: ObjectLockRetainUntilDate
1✔
811
    lock_legal_status: ObjectLockLegalHoldStatus
1✔
812
    lock_mode: ObjectLockMode | ObjectLockRetentionMode
1✔
813

814

815
s3_stores = AccountRegionBundle[S3Store]("s3", S3Store)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc