• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.01
/localstack-core/localstack/services/s3/models.py
1
import base64
1✔
2
import hashlib
1✔
3
import logging
1✔
4
from collections import defaultdict
1✔
5
from datetime import datetime
1✔
6
from secrets import token_urlsafe
1✔
7
from typing import Literal, NamedTuple, Union
1✔
8
from zoneinfo import ZoneInfo
1✔
9

10
from localstack.aws.api import CommonServiceException
1✔
11
from localstack.aws.api.s3 import (
1✔
12
    AccessControlPolicy,
13
    AccountId,
14
    AnalyticsConfiguration,
15
    AnalyticsId,
16
    BadDigest,
17
    BucketAccelerateStatus,
18
    BucketKeyEnabled,
19
    BucketLocationConstraint,
20
    BucketName,
21
    BucketRegion,
22
    BucketVersioningStatus,
23
    ChecksumAlgorithm,
24
    ChecksumType,
25
    CompletedPartList,
26
    CORSConfiguration,
27
    DefaultRetention,
28
    EntityTooSmall,
29
    ETag,
30
    Expiration,
31
    IntelligentTieringConfiguration,
32
    IntelligentTieringId,
33
    InvalidArgument,
34
    InvalidPart,
35
    InventoryConfiguration,
36
    InventoryId,
37
    LifecycleRules,
38
    LoggingEnabled,
39
    Metadata,
40
    MethodNotAllowed,
41
    MetricsConfiguration,
42
    MetricsId,
43
    MultipartUploadId,
44
    NoSuchKey,
45
    NoSuchVersion,
46
    NotificationConfiguration,
47
    ObjectKey,
48
    ObjectLockLegalHoldStatus,
49
    ObjectLockMode,
50
    ObjectLockRetainUntilDate,
51
    ObjectLockRetentionMode,
52
    ObjectOwnership,
53
    ObjectStorageClass,
54
    ObjectVersionId,
55
    Owner,
56
    Part,
57
    PartNumber,
58
    Payer,
59
    Policy,
60
    PublicAccessBlockConfiguration,
61
    ReplicationConfiguration,
62
    Restore,
63
    ServerSideEncryption,
64
    ServerSideEncryptionRule,
65
    Size,
66
    SSECustomerKeyMD5,
67
    SSEKMSKeyId,
68
    StorageClass,
69
    TransitionDefaultMinimumObjectSize,
70
    WebsiteConfiguration,
71
    WebsiteRedirectLocation,
72
)
73
from localstack.constants import AWS_REGION_US_EAST_1
1✔
74
from localstack.services.s3.constants import (
1✔
75
    DEFAULT_BUCKET_ENCRYPTION,
76
    DEFAULT_PUBLIC_BLOCK_ACCESS,
77
    S3_UPLOAD_PART_MIN_SIZE,
78
)
79
from localstack.services.s3.exceptions import InvalidRequest
1✔
80
from localstack.services.s3.headers import replace_non_iso_8859_1_characters
1✔
81
from localstack.services.s3.utils import (
1✔
82
    CombinedCrcHash,
83
    get_s3_checksum,
84
    rfc_1123_datetime,
85
)
86
from localstack.services.stores import (
1✔
87
    AccountRegionBundle,
88
    BaseStore,
89
    CrossAccountAttribute,
90
    CrossRegionAttribute,
91
    LocalAttribute,
92
)
93
from localstack.utils.aws import arns
1✔
94
from localstack.utils.tagging import Tags
1✔
95

96
LOG = logging.getLogger(__name__)
1✔
97

98
_gmt_zone_info = ZoneInfo("GMT")
1✔
99

100

101
class InternalObjectPart(Part):
1✔
102
    _position: int
1✔
103

104

105
class S3Bucket:
1✔
106
    name: BucketName
1✔
107
    bucket_account_id: AccountId
1✔
108
    bucket_region: BucketRegion
1✔
109
    bucket_arn: str
1✔
110
    location_constraint: BucketLocationConstraint | Literal[""]
1✔
111
    creation_date: datetime
1✔
112
    multiparts: dict[MultipartUploadId, "S3Multipart"]
1✔
113
    objects: Union["KeyStore", "VersionedKeyStore"]
1✔
114
    versioning_status: BucketVersioningStatus | None
1✔
115
    lifecycle_rules: LifecycleRules | None
1✔
116
    transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize | None
1✔
117
    policy: Policy | None
1✔
118
    website_configuration: WebsiteConfiguration | None
1✔
119
    acl: AccessControlPolicy
1✔
120
    cors_rules: CORSConfiguration | None
1✔
121
    logging: LoggingEnabled
1✔
122
    notification_configuration: NotificationConfiguration
1✔
123
    payer: Payer
1✔
124
    encryption_rule: ServerSideEncryptionRule | None
1✔
125
    public_access_block: PublicAccessBlockConfiguration | None
1✔
126
    accelerate_status: BucketAccelerateStatus | None
1✔
127
    object_lock_enabled: bool
1✔
128
    object_ownership: ObjectOwnership | None  # can be set to None manually in S3
1✔
129
    intelligent_tiering_configurations: dict[IntelligentTieringId, IntelligentTieringConfiguration]
1✔
130
    analytics_configurations: dict[AnalyticsId, AnalyticsConfiguration]
1✔
131
    inventory_configurations: dict[InventoryId, InventoryConfiguration]
1✔
132
    metric_configurations: dict[MetricsId, MetricsConfiguration]
1✔
133
    object_lock_default_retention: DefaultRetention | None
1✔
134
    replication: ReplicationConfiguration | None
1✔
135
    owner: Owner
1✔
136

137
    # set all buckets parameters here
138
    def __init__(
1✔
139
        self,
140
        name: BucketName,
141
        account_id: AccountId,
142
        bucket_region: BucketRegion,
143
        owner: Owner,
144
        acl: AccessControlPolicy = None,
145
        object_ownership: ObjectOwnership = None,
146
        object_lock_enabled_for_bucket: bool = None,
147
        location_constraint: BucketLocationConstraint | Literal[""] = "",
148
    ):
149
        self.name = name
1✔
150
        self.bucket_account_id = account_id
1✔
151
        self.bucket_region = bucket_region
1✔
152
        self.bucket_arn = arns.s3_bucket_arn(self.name, region=bucket_region)
1✔
153
        self.location_constraint = location_constraint
1✔
154
        # If ObjectLock is enabled, it forces the bucket to be versioned as well
155
        self.versioning_status = None if not object_lock_enabled_for_bucket else "Enabled"
1✔
156
        self.objects = KeyStore() if not object_lock_enabled_for_bucket else VersionedKeyStore()
1✔
157
        self.object_ownership = object_ownership or ObjectOwnership.BucketOwnerEnforced
1✔
158
        self.object_lock_enabled = object_lock_enabled_for_bucket
1✔
159
        self.encryption_rule = DEFAULT_BUCKET_ENCRYPTION
1✔
160
        self.creation_date = datetime.now(tz=_gmt_zone_info)
1✔
161
        self.payer = Payer.BucketOwner
1✔
162
        self.public_access_block = DEFAULT_PUBLIC_BLOCK_ACCESS
1✔
163
        self.multiparts = {}
1✔
164
        self.notification_configuration = {}
1✔
165
        self.logging = {}
1✔
166
        self.cors_rules = None
1✔
167
        self.lifecycle_rules = None
1✔
168
        self.transition_default_minimum_object_size = None
1✔
169
        self.website_configuration = None
1✔
170
        self.policy = None
1✔
171
        self.accelerate_status = None
1✔
172
        self.intelligent_tiering_configurations = {}
1✔
173
        self.analytics_configurations = {}
1✔
174
        self.inventory_configurations = {}
1✔
175
        self.metric_configurations = {}
1✔
176
        self.object_lock_default_retention = {}
1✔
177
        self.replication = None
1✔
178
        self.acl = acl
1✔
179
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
180
        self.owner = owner
1✔
181

182
    def get_object(
1✔
183
        self,
184
        key: ObjectKey,
185
        version_id: ObjectVersionId = None,
186
        http_method: Literal["GET", "PUT", "HEAD", "DELETE"] = "GET",
187
    ) -> "S3Object":
188
        """
189
        :param key: the Object Key
190
        :param version_id: optional, the versionId of the object
191
        :param http_method: the HTTP method of the original call. This is necessary for the exception if the bucket is
192
        versioned or suspended
193
        see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeleteMarker.html
194
        :return: the S3Object from the bucket
195
        :raises NoSuchKey if the object key does not exist at all, or if the object is a DeleteMarker
196
        :raises MethodNotAllowed if the object is a DeleteMarker and the operation is not allowed against it
197
        """
198

199
        if self.versioning_status is None:
1✔
200
            if version_id and version_id != "null":
1✔
201
                raise InvalidArgument(
1✔
202
                    "Invalid version id specified",
203
                    ArgumentName="versionId",
204
                    ArgumentValue=version_id,
205
                )
206

207
            s3_object = self.objects.get(key)
1✔
208

209
            if not s3_object:
1✔
210
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
211

212
        else:
213
            self.objects: VersionedKeyStore
1✔
214
            if version_id:
1✔
215
                s3_object_version = self.objects.get(key, version_id)
1✔
216
                if not s3_object_version:
1✔
217
                    raise NoSuchVersion(
1✔
218
                        "The specified version does not exist.",
219
                        Key=key,
220
                        VersionId=version_id,
221
                    )
222
                elif isinstance(s3_object_version, S3DeleteMarker):
1✔
223
                    if http_method == "HEAD":
1✔
224
                        raise CommonServiceException(
1✔
225
                            code="405",
226
                            message="Method Not Allowed",
227
                            status_code=405,
228
                        )
229

230
                    raise MethodNotAllowed(
1✔
231
                        "The specified method is not allowed against this resource.",
232
                        Method=http_method,
233
                        ResourceType="DeleteMarker",
234
                        DeleteMarker=True,
235
                        Allow="DELETE",
236
                        VersionId=s3_object_version.version_id,
237
                    )
238
                return s3_object_version
1✔
239

240
            s3_object = self.objects.get(key)
1✔
241

242
            if not s3_object:
1✔
243
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
244

245
            elif isinstance(s3_object, S3DeleteMarker):
1✔
246
                if http_method not in ("HEAD", "GET"):
1✔
247
                    raise MethodNotAllowed(
1✔
248
                        "The specified method is not allowed against this resource.",
249
                        Method=http_method,
250
                        ResourceType="DeleteMarker",
251
                        DeleteMarker=True,
252
                        Allow="DELETE",
253
                        VersionId=s3_object.version_id,
254
                    )
255

256
                raise NoSuchKey(
1✔
257
                    "The specified key does not exist.",
258
                    Key=key,
259
                    DeleteMarker=True,
260
                    VersionId=s3_object.version_id,
261
                )
262

263
        return s3_object
1✔
264

265

266
class S3Object:
1✔
267
    key: ObjectKey
1✔
268
    version_id: ObjectVersionId | None
1✔
269
    owner: Owner | None
1✔
270
    size: Size | None
1✔
271
    etag: ETag | None
1✔
272
    user_metadata: Metadata
1✔
273
    system_metadata: Metadata
1✔
274
    last_modified: datetime
1✔
275
    expires: datetime | None
1✔
276
    expiration: Expiration | None  # right now, this is stored in the provider cache
1✔
277
    storage_class: StorageClass | ObjectStorageClass
1✔
278
    encryption: ServerSideEncryption | None  # inherit bucket
1✔
279
    kms_key_id: SSEKMSKeyId | None  # inherit bucket
1✔
280
    bucket_key_enabled: bool | None  # inherit bucket
1✔
281
    sse_key_hash: SSECustomerKeyMD5 | None
1✔
282
    # ``checksum_algorithm`` can only be None when SSE-C is set and while creating a Multipart.
283
    # TODO: remove `| None` when SSE-C is removed from AWS S3
284
    checksum_algorithm: ChecksumAlgorithm | None
1✔
285
    checksum_value: str | None
1✔
286
    checksum_type: ChecksumType | None
1✔
287
    lock_mode: ObjectLockMode | ObjectLockRetentionMode | None
1✔
288
    lock_legal_status: ObjectLockLegalHoldStatus | None
1✔
289
    lock_until: datetime | None
1✔
290
    website_redirect_location: WebsiteRedirectLocation | None
1✔
291
    acl: AccessControlPolicy | None
1✔
292
    is_current: bool
1✔
293
    parts: dict[str, InternalObjectPart]
1✔
294
    restore: Restore | None
1✔
295
    internal_last_modified: int
1✔
296

297
    def __init__(
1✔
298
        self,
299
        key: ObjectKey,
300
        etag: ETag | None = None,
301
        size: int | None = None,
302
        version_id: ObjectVersionId | None = None,
303
        user_metadata: Metadata | None = None,
304
        system_metadata: Metadata | None = None,
305
        storage_class: StorageClass = StorageClass.STANDARD,
306
        expires: datetime | None = None,
307
        expiration: Expiration | None = None,
308
        checksum_algorithm: ChecksumAlgorithm | None = None,
309
        checksum_value: str | None = None,
310
        checksum_type: ChecksumType | None = ChecksumType.FULL_OBJECT,
311
        encryption: ServerSideEncryption | None = None,
312
        kms_key_id: SSEKMSKeyId | None = None,
313
        sse_key_hash: SSECustomerKeyMD5 | None = None,
314
        bucket_key_enabled: bool = False,
315
        lock_mode: ObjectLockMode | ObjectLockRetentionMode | None = None,
316
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
317
        lock_until: datetime | None = None,
318
        website_redirect_location: WebsiteRedirectLocation | None = None,
319
        acl: AccessControlPolicy | None = None,  # TODO
320
        owner: Owner | None = None,
321
    ):
322
        self.key = key
1✔
323
        self.user_metadata = user_metadata or {}
1✔
324
        self.system_metadata = system_metadata or {}
1✔
325
        self.version_id = version_id
1✔
326
        self.storage_class = storage_class or StorageClass.STANDARD
1✔
327
        self.etag = etag
1✔
328
        self.size = size
1✔
329
        self.expires = expires
1✔
330
        self.checksum_algorithm = checksum_algorithm or ChecksumAlgorithm.CRC64NVME
1✔
331
        self.checksum_value = checksum_value
1✔
332
        self.checksum_type = checksum_type
1✔
333
        self.encryption = encryption
1✔
334
        self.kms_key_id = kms_key_id
1✔
335
        self.bucket_key_enabled = bucket_key_enabled
1✔
336
        self.sse_key_hash = sse_key_hash
1✔
337
        self.lock_mode = lock_mode
1✔
338
        self.lock_legal_status = lock_legal_status
1✔
339
        self.lock_until = lock_until
1✔
340
        self.acl = acl
1✔
341
        self.expiration = expiration
1✔
342
        self.website_redirect_location = website_redirect_location
1✔
343
        self.is_current = True
1✔
344
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
345
        self.parts = {}
1✔
346
        self.restore = None
1✔
347
        self.owner = owner
1✔
348
        self.internal_last_modified = 0
1✔
349

350
    def get_system_metadata_fields(self) -> dict:
1✔
351
        # TODO: change when updating the schema -> make it a property
352
        headers = {
1✔
353
            "LastModified": self.last_modified_rfc1123,
354
            "ContentLength": str(self.size),
355
            "ETag": self.quoted_etag,
356
        }
357
        if self.expires:
1✔
358
            headers["Expires"] = self.expires_rfc1123
1✔
359

360
        for metadata_key, metadata_value in self.system_metadata.items():
1✔
361
            headers[metadata_key] = replace_non_iso_8859_1_characters(metadata_value)
1✔
362

363
        if self.storage_class != StorageClass.STANDARD:
1✔
364
            headers["StorageClass"] = self.storage_class
1✔
365

366
        return headers
1✔
367

368
    @property
1✔
369
    def last_modified_rfc1123(self) -> str:
1✔
370
        # TODO: verify if we need them with proper snapshot testing, for now it's copied from moto
371
        # Different datetime formats depending on how the key is obtained
372
        # https://github.com/boto/boto/issues/466
373
        return rfc_1123_datetime(self.last_modified)
1✔
374

375
    @property
1✔
376
    def expires_rfc1123(self) -> str:
1✔
377
        return rfc_1123_datetime(self.expires)
1✔
378

379
    @property
1✔
380
    def quoted_etag(self) -> str:
1✔
381
        return f'"{self.etag}"'
1✔
382

383
    def is_locked(self, bypass_governance: bool = False) -> bool:
1✔
384
        if self.lock_legal_status == "ON":
1✔
385
            return True
1✔
386

387
        if bypass_governance and self.lock_mode == ObjectLockMode.GOVERNANCE:
1✔
388
            return False
1✔
389

390
        if self.lock_until:
1✔
391
            return self.lock_until > datetime.now(tz=_gmt_zone_info)
1✔
392

393
        return False
1✔
394

395

396
class S3DeleteMarker:
1✔
397
    key: ObjectKey
1✔
398
    version_id: str
1✔
399
    last_modified: datetime
1✔
400
    is_current: bool
1✔
401

402
    def __init__(self, key: ObjectKey, version_id: ObjectVersionId):
1✔
403
        self.key = key
1✔
404
        self.version_id = version_id
1✔
405
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
406
        self.is_current = True
1✔
407

408
    @staticmethod
1✔
409
    def is_locked(*args, **kwargs) -> bool:
1✔
410
        # an S3DeleteMarker cannot be lock protected
411
        return False
1✔
412

413

414
class S3Part:
1✔
415
    part_number: PartNumber
1✔
416
    etag: ETag | None
1✔
417
    last_modified: datetime
1✔
418
    size: int | None
1✔
419
    checksum_algorithm: ChecksumAlgorithm | None
1✔
420
    checksum_value: str | None
1✔
421

422
    def __init__(
1✔
423
        self,
424
        part_number: PartNumber,
425
        size: int = None,
426
        etag: ETag = None,
427
        checksum_algorithm: ChecksumAlgorithm | None = None,
428
        checksum_value: str | None = None,
429
    ):
430
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
431
        self.part_number = part_number
1✔
432
        self.size = size
1✔
433
        self.etag = etag
1✔
434
        self.checksum_algorithm = checksum_algorithm
1✔
435
        self.checksum_value = checksum_value
1✔
436

437
    @property
1✔
438
    def quoted_etag(self) -> str:
1✔
439
        return f'"{self.etag}"'
1✔
440

441

442
class S3Multipart:
1✔
443
    id: MultipartUploadId
1✔
444
    parts: dict[str, S3Part]
1✔
445
    object: S3Object
1✔
446
    checksum_value: str | None
1✔
447
    checksum_type: ChecksumType | None
1✔
448
    checksum_algorithm: ChecksumAlgorithm | None
1✔
449
    initiated: datetime
1✔
450
    precondition: bool | None
1✔
451
    initiator: Owner | None
1✔
452
    tagging: dict[str, str] | None
1✔
453

454
    def __init__(
1✔
455
        self,
456
        key: ObjectKey,
457
        storage_class: StorageClass | ObjectStorageClass = StorageClass.STANDARD,
458
        expires: datetime | None = None,
459
        expiration: datetime | None = None,  # come from lifecycle
460
        checksum_algorithm: ChecksumAlgorithm | None = None,
461
        checksum_type: ChecksumType | None = None,
462
        encryption: ServerSideEncryption | None = None,  # inherit bucket
463
        kms_key_id: SSEKMSKeyId | None = None,  # inherit bucket
464
        bucket_key_enabled: bool = False,  # inherit bucket
465
        sse_key_hash: SSECustomerKeyMD5 | None = None,
466
        lock_mode: ObjectLockMode | None = None,
467
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
468
        lock_until: datetime | None = None,
469
        website_redirect_location: WebsiteRedirectLocation | None = None,
470
        acl: AccessControlPolicy | None = None,  # TODO
471
        user_metadata: Metadata | None = None,
472
        system_metadata: Metadata | None = None,
473
        initiator: Owner | None = None,
474
        tagging: dict[str, str] | None = None,
475
        owner: Owner | None = None,
476
        precondition: bool | None = None,
477
    ):
478
        self.id = token_urlsafe(96)  # MultipartUploadId is 128 characters long
1✔
479
        self.initiated = datetime.now(tz=_gmt_zone_info)
1✔
480
        self.parts = {}
1✔
481
        self.initiator = initiator
1✔
482
        self.tagging = tagging
1✔
483
        self.checksum_value = None
1✔
484
        self.checksum_type = checksum_type
1✔
485
        self.checksum_algorithm = checksum_algorithm
1✔
486
        self.precondition = precondition
1✔
487
        self.object = S3Object(
1✔
488
            key=key,
489
            user_metadata=user_metadata,
490
            system_metadata=system_metadata,
491
            storage_class=storage_class or StorageClass.STANDARD,
492
            expires=expires,
493
            expiration=expiration,
494
            checksum_algorithm=checksum_algorithm,
495
            checksum_type=checksum_type,
496
            encryption=encryption,
497
            kms_key_id=kms_key_id,
498
            bucket_key_enabled=bucket_key_enabled,
499
            sse_key_hash=sse_key_hash,
500
            lock_mode=lock_mode,
501
            lock_legal_status=lock_legal_status,
502
            lock_until=lock_until,
503
            website_redirect_location=website_redirect_location,
504
            acl=acl,
505
            owner=owner,
506
        )
507

508
    def complete_multipart(
1✔
509
        self, parts: CompletedPartList, mpu_size: int = None, validation_checksum: str = None
510
    ):
511
        last_part_index = len(parts) - 1
1✔
512
        object_etag = hashlib.md5(usedforsecurity=False)
1✔
513
        has_checksum = self.checksum_algorithm is not None
1✔
514
        checksum_hash = None
1✔
515
        checksum_key = None
1✔
516
        if has_checksum:
1✔
517
            checksum_key = f"Checksum{self.checksum_algorithm.upper()}"
1✔
518
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
519
                checksum_hash = get_s3_checksum(self.checksum_algorithm)
1✔
520
            else:
521
                checksum_hash = CombinedCrcHash(self.checksum_algorithm)
1✔
522

523
        pos = 0
1✔
524
        parts_map: dict[str, InternalObjectPart] = {}
1✔
525
        for index, part in enumerate(parts):
1✔
526
            part_number = str(part["PartNumber"])
1✔
527
            part_etag = part["ETag"].strip('"')
1✔
528

529
            s3_part = self.parts.get(part_number)
1✔
530
            if (
1✔
531
                not s3_part
532
                or s3_part.etag != part_etag
533
                or (not has_checksum and any(k.startswith("Checksum") for k in part))
534
            ):
535
                raise InvalidPart(
1✔
536
                    "One or more of the specified parts could not be found.  "
537
                    "The part may not have been uploaded, "
538
                    "or the specified entity tag may not match the part's entity tag.",
539
                    ETag=part_etag,
540
                    PartNumber=part_number,
541
                    UploadId=self.id,
542
                )
543

544
            if has_checksum:
1✔
545
                if not (part_checksum := part.get(checksum_key)):
1✔
546
                    if self.checksum_type == ChecksumType.COMPOSITE:
1✔
547
                        # weird case, they still try to validate a different checksum type than the multipart
548
                        for field in part:
1✔
549
                            if field.startswith("Checksum"):
1✔
550
                                algo = field.removeprefix("Checksum").lower()
1✔
551
                                raise BadDigest(
1✔
552
                                    f"The {algo} you specified for part {part_number} did not match what we received."
553
                                )
554

555
                        raise InvalidRequest(
1✔
556
                            f"The upload was created using a {self.checksum_algorithm.lower()} checksum. "
557
                            f"The complete request must include the checksum for each part. "
558
                            f"It was missing for part {part_number} in the request."
559
                        )
560
                elif part_checksum != s3_part.checksum_value:
1✔
561
                    raise InvalidPart(
1✔
562
                        "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
563
                        ETag=part_etag,
564
                        PartNumber=part_number,
565
                        UploadId=self.id,
566
                    )
567

568
                part_checksum_value = base64.b64decode(s3_part.checksum_value)
1✔
569
                if self.checksum_type == ChecksumType.COMPOSITE:
1✔
570
                    checksum_hash.update(part_checksum_value)
1✔
571
                else:
572
                    checksum_hash.combine(part_checksum_value, s3_part.size)
1✔
573

574
            elif any(k.startswith("Checksum") for k in part):
1✔
575
                raise InvalidPart(
×
576
                    "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
577
                    ETag=part_etag,
578
                    PartNumber=part_number,
579
                    UploadId=self.id,
580
                )
581

582
            if index != last_part_index and s3_part.size < S3_UPLOAD_PART_MIN_SIZE:
1✔
583
                raise EntityTooSmall(
1✔
584
                    "Your proposed upload is smaller than the minimum allowed size",
585
                    ETag=part_etag,
586
                    PartNumber=part_number,
587
                    MinSizeAllowed=S3_UPLOAD_PART_MIN_SIZE,
588
                    ProposedSize=s3_part.size,
589
                )
590

591
            object_etag.update(bytes.fromhex(s3_part.etag))
1✔
592
            # keep track of the parts size, as it can be queried afterward on the object as a Range
593
            internal_part = InternalObjectPart(
1✔
594
                _position=pos,
595
                Size=s3_part.size,
596
                ETag=s3_part.etag,
597
                PartNumber=s3_part.part_number,
598
            )
599
            if has_checksum and self.checksum_type == ChecksumType.COMPOSITE:
1✔
600
                internal_part[checksum_key] = s3_part.checksum_value
1✔
601

602
            parts_map[part_number] = internal_part
1✔
603
            pos += s3_part.size
1✔
604

605
        if mpu_size and mpu_size != pos:
1✔
606
            raise InvalidRequest(
1✔
607
                f"The provided 'x-amz-mp-object-size' header value {mpu_size} "
608
                f"does not match what was computed: {pos}"
609
            )
610

611
        if has_checksum:
1✔
612
            checksum_value = base64.b64encode(checksum_hash.digest()).decode()
1✔
613
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
614
                checksum_value = f"{checksum_value}-{len(parts)}"
1✔
615

616
            elif self.checksum_type == ChecksumType.FULL_OBJECT:
1✔
617
                if validation_checksum and validation_checksum != checksum_value:
1✔
618
                    raise BadDigest(
1✔
619
                        f"The {self.object.checksum_algorithm.lower()} you specified did not match the calculated checksum."
620
                    )
621

622
            self.checksum_value = checksum_value
1✔
623
            self.object.checksum_value = checksum_value
1✔
624

625
        multipart_etag = f"{object_etag.hexdigest()}-{len(parts)}"
1✔
626
        self.object.etag = multipart_etag
1✔
627
        self.object.parts = parts_map
1✔
628

629

630
class KeyStore:
1✔
631
    """
632
    Object representing an S3 Un-versioned Bucket's Key Store. An object is mapped by a key, and you can simply
633
    retrieve the object from that key.
634
    """
635

636
    _store: dict[ObjectKey, S3Object | S3DeleteMarker]
1✔
637

638
    def __init__(self):
1✔
639
        self._store = {}
1✔
640

641
    def get(self, object_key: ObjectKey) -> S3Object | None:
1✔
642
        return self._store.get(object_key)
1✔
643

644
    def set(self, object_key: ObjectKey, s3_object: S3Object):
1✔
645
        self._store[object_key] = s3_object
1✔
646

647
    def pop(self, object_key: ObjectKey, default=None) -> S3Object | None:
1✔
648
        return self._store.pop(object_key, default)
1✔
649

650
    def values(self, *_, **__) -> list[S3Object | S3DeleteMarker]:
1✔
651
        # we create a shallow copy with dict to avoid size changed during iteration
652
        return list(dict(self._store).values())
1✔
653

654
    def is_empty(self) -> bool:
1✔
655
        return not self._store
1✔
656

657
    def __contains__(self, item):
1✔
658
        return item in self._store
×
659

660

661
class VersionedKeyStore:
1✔
662
    """
663
    Object representing an S3 Versioned Bucket's Key Store. An object is mapped by a key, and adding an object to the
664
    same key will create a new version of it. When deleting the object, a S3DeleteMarker is created and put on top
665
    of the version stack, to signal the object has been "deleted".
666
    This object allows easy retrieval and saving of new object versions.
667
    See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/versioning-workflows.html
668
    """
669

670
    _store: dict[ObjectKey, dict[ObjectVersionId, S3Object | S3DeleteMarker]]
1✔
671

672
    def __init__(self):
1✔
673
        self._store = defaultdict(dict)
1✔
674

675
    @classmethod
1✔
676
    def from_key_store(cls, keystore: KeyStore) -> "VersionedKeyStore":
1✔
677
        new_versioned_keystore = cls()
1✔
678
        for s3_object in keystore.values():
1✔
679
            # TODO: maybe do the object mutation inside the provider instead? but would need to iterate twice
680
            #  or do this whole operation inside the provider instead, when actually working on versioning
681
            s3_object.version_id = "null"
1✔
682
            new_versioned_keystore.set(object_key=s3_object.key, s3_object=s3_object)
1✔
683

684
        return new_versioned_keystore
1✔
685

686
    def get(
1✔
687
        self, object_key: ObjectKey, version_id: ObjectVersionId = None
688
    ) -> S3Object | S3DeleteMarker | None:
689
        """
690
        :param object_key: the key of the Object we need to retrieve
691
        :param version_id: Optional, if not specified, return the current version (last one inserted)
692
        :return: an S3Object or S3DeleteMarker
693
        """
694
        if not version_id and (versions := self._store.get(object_key)):
1✔
695
            for version_id in reversed(versions):
1✔
696
                return versions.get(version_id)
1✔
697

698
        return self._store.get(object_key, {}).get(version_id)
1✔
699

700
    def set(self, object_key: ObjectKey, s3_object: S3Object | S3DeleteMarker):
1✔
701
        """
702
        Set an S3 object, using its already set VersionId.
703
        If the bucket versioning is `Enabled`, then we're just inserting a new Version.
704
        If the bucket versioning is `Suspended`, the current object version will be set to `null`, so if setting a new
705
        object at the same key, we will override it at the `null` versionId entry.
706
        :param object_key: the key of the Object we are setting
707
        :param s3_object: the S3 object or S3DeleteMarker to set
708
        :return: None
709
        """
710
        existing_s3_object = self.get(object_key)
1✔
711
        if existing_s3_object:
1✔
712
            existing_s3_object.is_current = False
1✔
713

714
        self._store[object_key][s3_object.version_id] = s3_object
1✔
715

716
    def pop(
1✔
717
        self, object_key: ObjectKey, version_id: ObjectVersionId = None, default=None
718
    ) -> S3Object | S3DeleteMarker | None:
719
        versions = self._store.get(object_key)
1✔
720
        if not versions:
1✔
721
            return None
×
722

723
        object_version = versions.pop(version_id, default)
1✔
724
        if not versions:
1✔
725
            self._store.pop(object_key)
1✔
726
        else:
727
            existing_s3_object = self.get(object_key)
1✔
728
            existing_s3_object.is_current = True
1✔
729

730
        return object_version
1✔
731

732
    def values(self, with_versions: bool = False) -> list[S3Object | S3DeleteMarker]:
1✔
733
        if with_versions:
1✔
734
            # we create a shallow copy with dict to avoid size changed during iteration
735
            return [
1✔
736
                object_version
737
                for values in dict(self._store).values()
738
                for object_version in dict(values).values()
739
            ]
740

741
        # if `with_versions` is False, then we need to return only the current version if it's not a DeleteMarker
742
        objects = []
1✔
743
        for object_key, versions in dict(self._store).items():
1✔
744
            # we're getting the last set object in the versions dictionary
745
            for version_id in reversed(versions):
1✔
746
                current_object = versions[version_id]
1✔
747
                if isinstance(current_object, S3DeleteMarker):
1✔
748
                    break
1✔
749

750
                objects.append(versions[version_id])
1✔
751
                break
1✔
752

753
        return objects
1✔
754

755
    def is_empty(self) -> bool:
1✔
756
        return not self._store
1✔
757

758
    def __contains__(self, item):
1✔
759
        return item in self._store
1✔
760

761

762
class S3Store(BaseStore):
1✔
763
    buckets: dict[BucketName, S3Bucket] = CrossRegionAttribute(default=dict)
1✔
764
    global_bucket_map: dict[BucketName, AccountId] = CrossAccountAttribute(default=dict)
1✔
765
    aws_managed_kms_key_id: SSEKMSKeyId = LocalAttribute(default=str)
1✔
766
    tags: Tags = LocalAttribute(default=Tags)
1✔
767

768

769
class BucketCorsIndex:
1✔
770
    def __init__(self):
1✔
771
        self._cors_index_cache = None
1✔
772
        self._bucket_index_cache = None
1✔
773

774
    @property
1✔
775
    def cors(self) -> dict[str, CORSConfiguration]:
1✔
776
        if self._cors_index_cache is None:
1✔
777
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
×
778
        return self._cors_index_cache
1✔
779

780
    @property
1✔
781
    def buckets(self) -> set[str]:
1✔
782
        if self._bucket_index_cache is None:
1✔
783
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
1✔
784
        return self._bucket_index_cache
1✔
785

786
    def invalidate(self):
1✔
787
        self._cors_index_cache = None
1✔
788
        self._bucket_index_cache = None
1✔
789

790
    @staticmethod
1✔
791
    def _build_index() -> tuple[set[BucketName], dict[BucketName, CORSConfiguration]]:
1✔
792
        buckets = set()
1✔
793
        cors_index = {}
1✔
794
        # we create a shallow copy with dict to avoid size changed during iteration, as the store could have new account
795
        # or region create from any other requests
796
        for account_id, regions in dict(s3_stores).items():
1✔
797
            for bucket_name, bucket in dict(regions[AWS_REGION_US_EAST_1].buckets).items():
1✔
798
                bucket: S3Bucket
799
                buckets.add(bucket_name)
1✔
800
                if bucket.cors_rules is not None:
1✔
801
                    cors_index[bucket_name] = bucket.cors_rules
1✔
802

803
        return buckets, cors_index
1✔
804

805

806
class EncryptionParameters(NamedTuple):
1✔
807
    encryption: ServerSideEncryption
1✔
808
    kms_key_id: SSEKMSKeyId
1✔
809
    bucket_key_enabled: BucketKeyEnabled
1✔
810

811

812
class ObjectLockParameters(NamedTuple):
1✔
813
    lock_until: ObjectLockRetainUntilDate
1✔
814
    lock_legal_status: ObjectLockLegalHoldStatus
1✔
815
    lock_mode: ObjectLockMode | ObjectLockRetentionMode
1✔
816

817

818
s3_stores = AccountRegionBundle[S3Store]("s3", S3Store)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc