• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 21697093787

04 Feb 2026 09:56PM UTC coverage: 86.962% (-0.004%) from 86.966%
21697093787

push

github

web-flow
improve system information sent in session and container_info (#13680)

10 of 17 new or added lines in 2 files covered. (58.82%)

222 existing lines in 17 files now uncovered.

70560 of 81139 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.01
/localstack-core/localstack/services/s3/models.py
1
import base64
1✔
2
import hashlib
1✔
3
import logging
1✔
4
from collections import defaultdict
1✔
5
from datetime import datetime
1✔
6
from secrets import token_urlsafe
1✔
7
from typing import Literal, NamedTuple, Union
1✔
8
from zoneinfo import ZoneInfo
1✔
9

10
from localstack.aws.api import CommonServiceException
1✔
11
from localstack.aws.api.s3 import (
1✔
12
    AccessControlPolicy,
13
    AccountId,
14
    AnalyticsConfiguration,
15
    AnalyticsId,
16
    BadDigest,
17
    BucketAccelerateStatus,
18
    BucketKeyEnabled,
19
    BucketLocationConstraint,
20
    BucketName,
21
    BucketRegion,
22
    BucketVersioningStatus,
23
    ChecksumAlgorithm,
24
    ChecksumType,
25
    CompletedPartList,
26
    CORSConfiguration,
27
    DefaultRetention,
28
    EntityTooSmall,
29
    ETag,
30
    Expiration,
31
    IntelligentTieringConfiguration,
32
    IntelligentTieringId,
33
    InvalidArgument,
34
    InvalidPart,
35
    InventoryConfiguration,
36
    InventoryId,
37
    LifecycleRules,
38
    LoggingEnabled,
39
    Metadata,
40
    MethodNotAllowed,
41
    MetricsConfiguration,
42
    MetricsId,
43
    MultipartUploadId,
44
    NoSuchKey,
45
    NoSuchVersion,
46
    NotificationConfiguration,
47
    ObjectKey,
48
    ObjectLockLegalHoldStatus,
49
    ObjectLockMode,
50
    ObjectLockRetainUntilDate,
51
    ObjectLockRetentionMode,
52
    ObjectOwnership,
53
    ObjectStorageClass,
54
    ObjectVersionId,
55
    Owner,
56
    Part,
57
    PartNumber,
58
    Payer,
59
    Policy,
60
    PublicAccessBlockConfiguration,
61
    ReplicationConfiguration,
62
    Restore,
63
    ServerSideEncryption,
64
    ServerSideEncryptionRule,
65
    Size,
66
    SSECustomerKeyMD5,
67
    SSEKMSKeyId,
68
    StorageClass,
69
    TransitionDefaultMinimumObjectSize,
70
    WebsiteConfiguration,
71
    WebsiteRedirectLocation,
72
)
73
from localstack.constants import AWS_REGION_US_EAST_1
1✔
74
from localstack.services.s3.constants import (
1✔
75
    DEFAULT_BUCKET_ENCRYPTION,
76
    DEFAULT_PUBLIC_BLOCK_ACCESS,
77
    S3_UPLOAD_PART_MIN_SIZE,
78
)
79
from localstack.services.s3.exceptions import InvalidRequest
1✔
80
from localstack.services.s3.headers import replace_non_iso_8859_1_characters
1✔
81
from localstack.services.s3.utils import (
1✔
82
    CombinedCrcHash,
83
    get_s3_checksum,
84
    rfc_1123_datetime,
85
)
86
from localstack.services.stores import (
1✔
87
    AccountRegionBundle,
88
    BaseStore,
89
    CrossAccountAttribute,
90
    CrossRegionAttribute,
91
    LocalAttribute,
92
)
93
from localstack.utils.aws import arns
1✔
94
from localstack.utils.tagging import TaggingService
1✔
95

96
LOG = logging.getLogger(__name__)
1✔
97

98
_gmt_zone_info = ZoneInfo("GMT")
1✔
99

100

101
class InternalObjectPart(Part):
1✔
102
    _position: int
1✔
103

104

105
class S3Bucket:
1✔
106
    name: BucketName
1✔
107
    bucket_account_id: AccountId
1✔
108
    bucket_region: BucketRegion
1✔
109
    bucket_arn: str
1✔
110
    location_constraint: BucketLocationConstraint | Literal[""]
1✔
111
    creation_date: datetime
1✔
112
    multiparts: dict[MultipartUploadId, "S3Multipart"]
1✔
113
    objects: Union["KeyStore", "VersionedKeyStore"]
1✔
114
    versioning_status: BucketVersioningStatus | None
1✔
115
    lifecycle_rules: LifecycleRules | None
1✔
116
    transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize | None
1✔
117
    policy: Policy | None
1✔
118
    website_configuration: WebsiteConfiguration | None
1✔
119
    acl: AccessControlPolicy
1✔
120
    cors_rules: CORSConfiguration | None
1✔
121
    logging: LoggingEnabled
1✔
122
    notification_configuration: NotificationConfiguration
1✔
123
    payer: Payer
1✔
124
    encryption_rule: ServerSideEncryptionRule | None
1✔
125
    public_access_block: PublicAccessBlockConfiguration | None
1✔
126
    accelerate_status: BucketAccelerateStatus | None
1✔
127
    object_lock_enabled: bool
1✔
128
    object_ownership: ObjectOwnership
1✔
129
    intelligent_tiering_configurations: dict[IntelligentTieringId, IntelligentTieringConfiguration]
1✔
130
    analytics_configurations: dict[AnalyticsId, AnalyticsConfiguration]
1✔
131
    inventory_configurations: dict[InventoryId, InventoryConfiguration]
1✔
132
    metric_configurations: dict[MetricsId, MetricsConfiguration]
1✔
133
    object_lock_default_retention: DefaultRetention | None
1✔
134
    replication: ReplicationConfiguration | None
1✔
135
    owner: Owner
1✔
136

137
    # set all buckets parameters here
138
    def __init__(
1✔
139
        self,
140
        name: BucketName,
141
        account_id: AccountId,
142
        bucket_region: BucketRegion,
143
        owner: Owner,
144
        acl: AccessControlPolicy = None,
145
        object_ownership: ObjectOwnership = None,
146
        object_lock_enabled_for_bucket: bool = None,
147
        location_constraint: BucketLocationConstraint | Literal[""] = "",
148
    ):
149
        self.name = name
1✔
150
        self.bucket_account_id = account_id
1✔
151
        self.bucket_region = bucket_region
1✔
152
        self.bucket_arn = arns.s3_bucket_arn(self.name, region=bucket_region)
1✔
153
        self.location_constraint = location_constraint
1✔
154
        # If ObjectLock is enabled, it forces the bucket to be versioned as well
155
        self.versioning_status = None if not object_lock_enabled_for_bucket else "Enabled"
1✔
156
        self.objects = KeyStore() if not object_lock_enabled_for_bucket else VersionedKeyStore()
1✔
157
        self.object_ownership = object_ownership or ObjectOwnership.BucketOwnerEnforced
1✔
158
        self.object_lock_enabled = object_lock_enabled_for_bucket
1✔
159
        self.encryption_rule = DEFAULT_BUCKET_ENCRYPTION
1✔
160
        self.creation_date = datetime.now(tz=_gmt_zone_info)
1✔
161
        self.payer = Payer.BucketOwner
1✔
162
        self.public_access_block = DEFAULT_PUBLIC_BLOCK_ACCESS
1✔
163
        self.multiparts = {}
1✔
164
        self.notification_configuration = {}
1✔
165
        self.logging = {}
1✔
166
        self.cors_rules = None
1✔
167
        self.lifecycle_rules = None
1✔
168
        self.transition_default_minimum_object_size = None
1✔
169
        self.website_configuration = None
1✔
170
        self.policy = None
1✔
171
        self.accelerate_status = None
1✔
172
        self.intelligent_tiering_configurations = {}
1✔
173
        self.analytics_configurations = {}
1✔
174
        self.inventory_configurations = {}
1✔
175
        self.metric_configurations = {}
1✔
176
        self.object_lock_default_retention = {}
1✔
177
        self.replication = None
1✔
178
        self.acl = acl
1✔
179
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
180
        self.owner = owner
1✔
181

182
    def get_object(
1✔
183
        self,
184
        key: ObjectKey,
185
        version_id: ObjectVersionId = None,
186
        http_method: Literal["GET", "PUT", "HEAD", "DELETE"] = "GET",
187
    ) -> "S3Object":
188
        """
189
        :param key: the Object Key
190
        :param version_id: optional, the versionId of the object
191
        :param http_method: the HTTP method of the original call. This is necessary for the exception if the bucket is
192
        versioned or suspended
193
        see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/DeleteMarker.html
194
        :return: the S3Object from the bucket
195
        :raises NoSuchKey if the object key does not exist at all, or if the object is a DeleteMarker
196
        :raises MethodNotAllowed if the object is a DeleteMarker and the operation is not allowed against it
197
        """
198

199
        if self.versioning_status is None:
1✔
200
            if version_id and version_id != "null":
1✔
201
                raise InvalidArgument(
1✔
202
                    "Invalid version id specified",
203
                    ArgumentName="versionId",
204
                    ArgumentValue=version_id,
205
                )
206

207
            s3_object = self.objects.get(key)
1✔
208

209
            if not s3_object:
1✔
210
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
211

212
        else:
213
            self.objects: VersionedKeyStore
1✔
214
            if version_id:
1✔
215
                s3_object_version = self.objects.get(key, version_id)
1✔
216
                if not s3_object_version:
1✔
217
                    raise NoSuchVersion(
1✔
218
                        "The specified version does not exist.",
219
                        Key=key,
220
                        VersionId=version_id,
221
                    )
222
                elif isinstance(s3_object_version, S3DeleteMarker):
1✔
223
                    if http_method == "HEAD":
1✔
224
                        raise CommonServiceException(
1✔
225
                            code="405",
226
                            message="Method Not Allowed",
227
                            status_code=405,
228
                        )
229

230
                    raise MethodNotAllowed(
1✔
231
                        "The specified method is not allowed against this resource.",
232
                        Method=http_method,
233
                        ResourceType="DeleteMarker",
234
                        DeleteMarker=True,
235
                        Allow="DELETE",
236
                        VersionId=s3_object_version.version_id,
237
                    )
238
                return s3_object_version
1✔
239

240
            s3_object = self.objects.get(key)
1✔
241

242
            if not s3_object:
1✔
243
                raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
244

245
            elif isinstance(s3_object, S3DeleteMarker):
1✔
246
                if http_method not in ("HEAD", "GET"):
1✔
247
                    raise MethodNotAllowed(
1✔
248
                        "The specified method is not allowed against this resource.",
249
                        Method=http_method,
250
                        ResourceType="DeleteMarker",
251
                        DeleteMarker=True,
252
                        Allow="DELETE",
253
                        VersionId=s3_object.version_id,
254
                    )
255

256
                raise NoSuchKey(
1✔
257
                    "The specified key does not exist.",
258
                    Key=key,
259
                    DeleteMarker=True,
260
                    VersionId=s3_object.version_id,
261
                )
262

263
        return s3_object
1✔
264

265

266
class S3Object:
1✔
267
    key: ObjectKey
1✔
268
    version_id: ObjectVersionId | None
1✔
269
    owner: Owner | None
1✔
270
    size: Size | None
1✔
271
    etag: ETag | None
1✔
272
    user_metadata: Metadata
1✔
273
    system_metadata: Metadata
1✔
274
    last_modified: datetime
1✔
275
    expires: datetime | None
1✔
276
    expiration: Expiration | None  # right now, this is stored in the provider cache
1✔
277
    storage_class: StorageClass | ObjectStorageClass
1✔
278
    encryption: ServerSideEncryption | None  # inherit bucket
1✔
279
    kms_key_id: SSEKMSKeyId | None  # inherit bucket
1✔
280
    bucket_key_enabled: bool | None  # inherit bucket
1✔
281
    sse_key_hash: SSECustomerKeyMD5 | None
1✔
282
    checksum_algorithm: ChecksumAlgorithm
1✔
283
    checksum_value: str | None
1✔
284
    checksum_type: ChecksumType | None
1✔
285
    lock_mode: ObjectLockMode | ObjectLockRetentionMode | None
1✔
286
    lock_legal_status: ObjectLockLegalHoldStatus | None
1✔
287
    lock_until: datetime | None
1✔
288
    website_redirect_location: WebsiteRedirectLocation | None
1✔
289
    acl: AccessControlPolicy | None
1✔
290
    is_current: bool
1✔
291
    parts: dict[str, InternalObjectPart]
1✔
292
    restore: Restore | None
1✔
293
    internal_last_modified: int
1✔
294

295
    def __init__(
1✔
296
        self,
297
        key: ObjectKey,
298
        etag: ETag | None = None,
299
        size: int | None = None,
300
        version_id: ObjectVersionId | None = None,
301
        user_metadata: Metadata | None = None,
302
        system_metadata: Metadata | None = None,
303
        storage_class: StorageClass = StorageClass.STANDARD,
304
        expires: datetime | None = None,
305
        expiration: Expiration | None = None,
306
        checksum_algorithm: ChecksumAlgorithm | None = None,
307
        checksum_value: str | None = None,
308
        checksum_type: ChecksumType | None = ChecksumType.FULL_OBJECT,
309
        encryption: ServerSideEncryption | None = None,
310
        kms_key_id: SSEKMSKeyId | None = None,
311
        sse_key_hash: SSECustomerKeyMD5 | None = None,
312
        bucket_key_enabled: bool = False,
313
        lock_mode: ObjectLockMode | ObjectLockRetentionMode | None = None,
314
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
315
        lock_until: datetime | None = None,
316
        website_redirect_location: WebsiteRedirectLocation | None = None,
317
        acl: AccessControlPolicy | None = None,  # TODO
318
        owner: Owner | None = None,
319
    ):
320
        self.key = key
1✔
321
        self.user_metadata = user_metadata or {}
1✔
322
        self.system_metadata = system_metadata or {}
1✔
323
        self.version_id = version_id
1✔
324
        self.storage_class = storage_class or StorageClass.STANDARD
1✔
325
        self.etag = etag
1✔
326
        self.size = size
1✔
327
        self.expires = expires
1✔
328
        self.checksum_algorithm = checksum_algorithm or ChecksumAlgorithm.CRC64NVME
1✔
329
        self.checksum_value = checksum_value
1✔
330
        self.checksum_type = checksum_type
1✔
331
        self.encryption = encryption
1✔
332
        self.kms_key_id = kms_key_id
1✔
333
        self.bucket_key_enabled = bucket_key_enabled
1✔
334
        self.sse_key_hash = sse_key_hash
1✔
335
        self.lock_mode = lock_mode
1✔
336
        self.lock_legal_status = lock_legal_status
1✔
337
        self.lock_until = lock_until
1✔
338
        self.acl = acl
1✔
339
        self.expiration = expiration
1✔
340
        self.website_redirect_location = website_redirect_location
1✔
341
        self.is_current = True
1✔
342
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
343
        self.parts = {}
1✔
344
        self.restore = None
1✔
345
        self.owner = owner
1✔
346
        self.internal_last_modified = 0
1✔
347

348
    def get_system_metadata_fields(self) -> dict:
1✔
349
        # TODO: change when updating the schema -> make it a property
350
        headers = {
1✔
351
            "LastModified": self.last_modified_rfc1123,
352
            "ContentLength": str(self.size),
353
            "ETag": self.quoted_etag,
354
        }
355
        if self.expires:
1✔
356
            headers["Expires"] = self.expires_rfc1123
1✔
357

358
        for metadata_key, metadata_value in self.system_metadata.items():
1✔
359
            headers[metadata_key] = replace_non_iso_8859_1_characters(metadata_value)
1✔
360

361
        if self.storage_class != StorageClass.STANDARD:
1✔
362
            headers["StorageClass"] = self.storage_class
1✔
363

364
        return headers
1✔
365

366
    @property
1✔
367
    def last_modified_rfc1123(self) -> str:
1✔
368
        # TODO: verify if we need them with proper snapshot testing, for now it's copied from moto
369
        # Different datetime formats depending on how the key is obtained
370
        # https://github.com/boto/boto/issues/466
371
        return rfc_1123_datetime(self.last_modified)
1✔
372

373
    @property
1✔
374
    def expires_rfc1123(self) -> str:
1✔
375
        return rfc_1123_datetime(self.expires)
1✔
376

377
    @property
1✔
378
    def quoted_etag(self) -> str:
1✔
379
        return f'"{self.etag}"'
1✔
380

381
    def is_locked(self, bypass_governance: bool = False) -> bool:
1✔
382
        if self.lock_legal_status == "ON":
1✔
383
            return True
1✔
384

385
        if bypass_governance and self.lock_mode == ObjectLockMode.GOVERNANCE:
1✔
386
            return False
1✔
387

388
        if self.lock_until:
1✔
389
            return self.lock_until > datetime.now(tz=_gmt_zone_info)
1✔
390

391
        return False
1✔
392

393

394
class S3DeleteMarker:
1✔
395
    key: ObjectKey
1✔
396
    version_id: str
1✔
397
    last_modified: datetime
1✔
398
    is_current: bool
1✔
399

400
    def __init__(self, key: ObjectKey, version_id: ObjectVersionId):
1✔
401
        self.key = key
1✔
402
        self.version_id = version_id
1✔
403
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
404
        self.is_current = True
1✔
405

406
    @staticmethod
1✔
407
    def is_locked(*args, **kwargs) -> bool:
1✔
408
        # an S3DeleteMarker cannot be lock protected
409
        return False
1✔
410

411

412
class S3Part:
1✔
413
    part_number: PartNumber
1✔
414
    etag: ETag | None
1✔
415
    last_modified: datetime
1✔
416
    size: int | None
1✔
417
    checksum_algorithm: ChecksumAlgorithm | None
1✔
418
    checksum_value: str | None
1✔
419

420
    def __init__(
1✔
421
        self,
422
        part_number: PartNumber,
423
        size: int = None,
424
        etag: ETag = None,
425
        checksum_algorithm: ChecksumAlgorithm | None = None,
426
        checksum_value: str | None = None,
427
    ):
428
        self.last_modified = datetime.now(tz=_gmt_zone_info)
1✔
429
        self.part_number = part_number
1✔
430
        self.size = size
1✔
431
        self.etag = etag
1✔
432
        self.checksum_algorithm = checksum_algorithm
1✔
433
        self.checksum_value = checksum_value
1✔
434

435
    @property
1✔
436
    def quoted_etag(self) -> str:
1✔
437
        return f'"{self.etag}"'
1✔
438

439

440
class S3Multipart:
1✔
441
    id: MultipartUploadId
1✔
442
    parts: dict[str, S3Part]
1✔
443
    object: S3Object
1✔
444
    checksum_value: str | None
1✔
445
    checksum_type: ChecksumType | None
1✔
446
    checksum_algorithm: ChecksumAlgorithm | None
1✔
447
    initiated: datetime
1✔
448
    precondition: bool | None
1✔
449
    initiator: Owner | None
1✔
450
    tagging: dict[str, str] | None
1✔
451

452
    def __init__(
1✔
453
        self,
454
        key: ObjectKey,
455
        storage_class: StorageClass | ObjectStorageClass = StorageClass.STANDARD,
456
        expires: datetime | None = None,
457
        expiration: datetime | None = None,  # come from lifecycle
458
        checksum_algorithm: ChecksumAlgorithm | None = None,
459
        checksum_type: ChecksumType | None = None,
460
        encryption: ServerSideEncryption | None = None,  # inherit bucket
461
        kms_key_id: SSEKMSKeyId | None = None,  # inherit bucket
462
        bucket_key_enabled: bool = False,  # inherit bucket
463
        sse_key_hash: SSECustomerKeyMD5 | None = None,
464
        lock_mode: ObjectLockMode | None = None,
465
        lock_legal_status: ObjectLockLegalHoldStatus | None = None,
466
        lock_until: datetime | None = None,
467
        website_redirect_location: WebsiteRedirectLocation | None = None,
468
        acl: AccessControlPolicy | None = None,  # TODO
469
        user_metadata: Metadata | None = None,
470
        system_metadata: Metadata | None = None,
471
        initiator: Owner | None = None,
472
        tagging: dict[str, str] | None = None,
473
        owner: Owner | None = None,
474
        precondition: bool | None = None,
475
    ):
476
        self.id = token_urlsafe(96)  # MultipartUploadId is 128 characters long
1✔
477
        self.initiated = datetime.now(tz=_gmt_zone_info)
1✔
478
        self.parts = {}
1✔
479
        self.initiator = initiator
1✔
480
        self.tagging = tagging
1✔
481
        self.checksum_value = None
1✔
482
        self.checksum_type = checksum_type
1✔
483
        self.checksum_algorithm = checksum_algorithm
1✔
484
        self.precondition = precondition
1✔
485
        self.object = S3Object(
1✔
486
            key=key,
487
            user_metadata=user_metadata,
488
            system_metadata=system_metadata,
489
            storage_class=storage_class or StorageClass.STANDARD,
490
            expires=expires,
491
            expiration=expiration,
492
            checksum_algorithm=checksum_algorithm,
493
            checksum_type=checksum_type,
494
            encryption=encryption,
495
            kms_key_id=kms_key_id,
496
            bucket_key_enabled=bucket_key_enabled,
497
            sse_key_hash=sse_key_hash,
498
            lock_mode=lock_mode,
499
            lock_legal_status=lock_legal_status,
500
            lock_until=lock_until,
501
            website_redirect_location=website_redirect_location,
502
            acl=acl,
503
            owner=owner,
504
        )
505

506
    def complete_multipart(
1✔
507
        self, parts: CompletedPartList, mpu_size: int = None, validation_checksum: str = None
508
    ):
509
        last_part_index = len(parts) - 1
1✔
510
        object_etag = hashlib.md5(usedforsecurity=False)
1✔
511
        has_checksum = self.checksum_algorithm is not None
1✔
512
        checksum_hash = None
1✔
513
        checksum_key = None
1✔
514
        if has_checksum:
1✔
515
            checksum_key = f"Checksum{self.checksum_algorithm.upper()}"
1✔
516
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
517
                checksum_hash = get_s3_checksum(self.checksum_algorithm)
1✔
518
            else:
519
                checksum_hash = CombinedCrcHash(self.checksum_algorithm)
1✔
520

521
        pos = 0
1✔
522
        parts_map: dict[str, InternalObjectPart] = {}
1✔
523
        for index, part in enumerate(parts):
1✔
524
            part_number = str(part["PartNumber"])
1✔
525
            part_etag = part["ETag"].strip('"')
1✔
526

527
            s3_part = self.parts.get(part_number)
1✔
528
            if (
1✔
529
                not s3_part
530
                or s3_part.etag != part_etag
531
                or (not has_checksum and any(k.startswith("Checksum") for k in part))
532
            ):
533
                raise InvalidPart(
1✔
534
                    "One or more of the specified parts could not be found.  "
535
                    "The part may not have been uploaded, "
536
                    "or the specified entity tag may not match the part's entity tag.",
537
                    ETag=part_etag,
538
                    PartNumber=part_number,
539
                    UploadId=self.id,
540
                )
541

542
            if has_checksum:
1✔
543
                if not (part_checksum := part.get(checksum_key)):
1✔
544
                    if self.checksum_type == ChecksumType.COMPOSITE:
1✔
545
                        # weird case, they still try to validate a different checksum type than the multipart
546
                        for field in part:
1✔
547
                            if field.startswith("Checksum"):
1✔
548
                                algo = field.removeprefix("Checksum").lower()
1✔
549
                                raise BadDigest(
1✔
550
                                    f"The {algo} you specified for part {part_number} did not match what we received."
551
                                )
552

553
                        raise InvalidRequest(
1✔
554
                            f"The upload was created using a {self.checksum_algorithm.lower()} checksum. "
555
                            f"The complete request must include the checksum for each part. "
556
                            f"It was missing for part {part_number} in the request."
557
                        )
558
                elif part_checksum != s3_part.checksum_value:
1✔
559
                    raise InvalidPart(
1✔
560
                        "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
561
                        ETag=part_etag,
562
                        PartNumber=part_number,
563
                        UploadId=self.id,
564
                    )
565

566
                part_checksum_value = base64.b64decode(s3_part.checksum_value)
1✔
567
                if self.checksum_type == ChecksumType.COMPOSITE:
1✔
568
                    checksum_hash.update(part_checksum_value)
1✔
569
                else:
570
                    checksum_hash.combine(part_checksum_value, s3_part.size)
1✔
571

572
            elif any(k.startswith("Checksum") for k in part):
1✔
UNCOV
573
                raise InvalidPart(
×
574
                    "One or more of the specified parts could not be found.  The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
575
                    ETag=part_etag,
576
                    PartNumber=part_number,
577
                    UploadId=self.id,
578
                )
579

580
            if index != last_part_index and s3_part.size < S3_UPLOAD_PART_MIN_SIZE:
1✔
581
                raise EntityTooSmall(
1✔
582
                    "Your proposed upload is smaller than the minimum allowed size",
583
                    ETag=part_etag,
584
                    PartNumber=part_number,
585
                    MinSizeAllowed=S3_UPLOAD_PART_MIN_SIZE,
586
                    ProposedSize=s3_part.size,
587
                )
588

589
            object_etag.update(bytes.fromhex(s3_part.etag))
1✔
590
            # keep track of the parts size, as it can be queried afterward on the object as a Range
591
            internal_part = InternalObjectPart(
1✔
592
                _position=pos,
593
                Size=s3_part.size,
594
                ETag=s3_part.etag,
595
                PartNumber=s3_part.part_number,
596
            )
597
            if has_checksum and self.checksum_type == ChecksumType.COMPOSITE:
1✔
598
                internal_part[checksum_key] = s3_part.checksum_value
1✔
599

600
            parts_map[part_number] = internal_part
1✔
601
            pos += s3_part.size
1✔
602

603
        if mpu_size and mpu_size != pos:
1✔
604
            raise InvalidRequest(
1✔
605
                f"The provided 'x-amz-mp-object-size' header value {mpu_size} "
606
                f"does not match what was computed: {pos}"
607
            )
608

609
        if has_checksum:
1✔
610
            checksum_value = base64.b64encode(checksum_hash.digest()).decode()
1✔
611
            if self.checksum_type == ChecksumType.COMPOSITE:
1✔
612
                checksum_value = f"{checksum_value}-{len(parts)}"
1✔
613

614
            elif self.checksum_type == ChecksumType.FULL_OBJECT:
1✔
615
                if validation_checksum and validation_checksum != checksum_value:
1✔
616
                    raise BadDigest(
1✔
617
                        f"The {self.object.checksum_algorithm.lower()} you specified did not match the calculated checksum."
618
                    )
619

620
            self.checksum_value = checksum_value
1✔
621
            self.object.checksum_value = checksum_value
1✔
622

623
        multipart_etag = f"{object_etag.hexdigest()}-{len(parts)}"
1✔
624
        self.object.etag = multipart_etag
1✔
625
        self.object.parts = parts_map
1✔
626

627

628
class KeyStore:
1✔
629
    """
630
    Object representing an S3 Un-versioned Bucket's Key Store. An object is mapped by a key, and you can simply
631
    retrieve the object from that key.
632
    """
633

634
    _store: dict[ObjectKey, S3Object | S3DeleteMarker]
1✔
635

636
    def __init__(self):
1✔
637
        self._store = {}
1✔
638

639
    def get(self, object_key: ObjectKey) -> S3Object | None:
1✔
640
        return self._store.get(object_key)
1✔
641

642
    def set(self, object_key: ObjectKey, s3_object: S3Object):
1✔
643
        self._store[object_key] = s3_object
1✔
644

645
    def pop(self, object_key: ObjectKey, default=None) -> S3Object | None:
1✔
646
        return self._store.pop(object_key, default)
1✔
647

648
    def values(self, *_, **__) -> list[S3Object | S3DeleteMarker]:
1✔
649
        # we create a shallow copy with dict to avoid size changed during iteration
650
        return list(dict(self._store).values())
1✔
651

652
    def is_empty(self) -> bool:
1✔
653
        return not self._store
1✔
654

655
    def __contains__(self, item):
1✔
UNCOV
656
        return item in self._store
×
657

658

659
class VersionedKeyStore:
1✔
660
    """
661
    Object representing an S3 Versioned Bucket's Key Store. An object is mapped by a key, and adding an object to the
662
    same key will create a new version of it. When deleting the object, a S3DeleteMarker is created and put on top
663
    of the version stack, to signal the object has been "deleted".
664
    This object allows easy retrieval and saving of new object versions.
665
    See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/versioning-workflows.html
666
    """
667

668
    _store: dict[ObjectKey, dict[ObjectVersionId, S3Object | S3DeleteMarker]]
1✔
669

670
    def __init__(self):
1✔
671
        self._store = defaultdict(dict)
1✔
672

673
    @classmethod
1✔
674
    def from_key_store(cls, keystore: KeyStore) -> "VersionedKeyStore":
1✔
675
        new_versioned_keystore = cls()
1✔
676
        for s3_object in keystore.values():
1✔
677
            # TODO: maybe do the object mutation inside the provider instead? but would need to iterate twice
678
            #  or do this whole operation inside the provider instead, when actually working on versioning
679
            s3_object.version_id = "null"
1✔
680
            new_versioned_keystore.set(object_key=s3_object.key, s3_object=s3_object)
1✔
681

682
        return new_versioned_keystore
1✔
683

684
    def get(
1✔
685
        self, object_key: ObjectKey, version_id: ObjectVersionId = None
686
    ) -> S3Object | S3DeleteMarker | None:
687
        """
688
        :param object_key: the key of the Object we need to retrieve
689
        :param version_id: Optional, if not specified, return the current version (last one inserted)
690
        :return: an S3Object or S3DeleteMarker
691
        """
692
        if not version_id and (versions := self._store.get(object_key)):
1✔
693
            for version_id in reversed(versions):
1✔
694
                return versions.get(version_id)
1✔
695

696
        return self._store.get(object_key, {}).get(version_id)
1✔
697

698
    def set(self, object_key: ObjectKey, s3_object: S3Object | S3DeleteMarker):
1✔
699
        """
700
        Set an S3 object, using its already set VersionId.
701
        If the bucket versioning is `Enabled`, then we're just inserting a new Version.
702
        If the bucket versioning is `Suspended`, the current object version will be set to `null`, so if setting a new
703
        object at the same key, we will override it at the `null` versionId entry.
704
        :param object_key: the key of the Object we are setting
705
        :param s3_object: the S3 object or S3DeleteMarker to set
706
        :return: None
707
        """
708
        existing_s3_object = self.get(object_key)
1✔
709
        if existing_s3_object:
1✔
710
            existing_s3_object.is_current = False
1✔
711

712
        self._store[object_key][s3_object.version_id] = s3_object
1✔
713

714
    def pop(
1✔
715
        self, object_key: ObjectKey, version_id: ObjectVersionId = None, default=None
716
    ) -> S3Object | S3DeleteMarker | None:
717
        versions = self._store.get(object_key)
1✔
718
        if not versions:
1✔
UNCOV
719
            return None
×
720

721
        object_version = versions.pop(version_id, default)
1✔
722
        if not versions:
1✔
723
            self._store.pop(object_key)
1✔
724
        else:
725
            existing_s3_object = self.get(object_key)
1✔
726
            existing_s3_object.is_current = True
1✔
727

728
        return object_version
1✔
729

730
    def values(self, with_versions: bool = False) -> list[S3Object | S3DeleteMarker]:
1✔
731
        if with_versions:
1✔
732
            # we create a shallow copy with dict to avoid size changed during iteration
733
            return [
1✔
734
                object_version
735
                for values in dict(self._store).values()
736
                for object_version in dict(values).values()
737
            ]
738

739
        # if `with_versions` is False, then we need to return only the current version if it's not a DeleteMarker
740
        objects = []
1✔
741
        for object_key, versions in dict(self._store).items():
1✔
742
            # we're getting the last set object in the versions dictionary
743
            for version_id in reversed(versions):
1✔
744
                current_object = versions[version_id]
1✔
745
                if isinstance(current_object, S3DeleteMarker):
1✔
746
                    break
1✔
747

748
                objects.append(versions[version_id])
1✔
749
                break
1✔
750

751
        return objects
1✔
752

753
    def is_empty(self) -> bool:
1✔
754
        return not self._store
1✔
755

756
    def __contains__(self, item):
1✔
757
        return item in self._store
1✔
758

759

760
class S3Store(BaseStore):
1✔
761
    buckets: dict[BucketName, S3Bucket] = CrossRegionAttribute(default=dict)
1✔
762
    global_bucket_map: dict[BucketName, AccountId] = CrossAccountAttribute(default=dict)
1✔
763
    aws_managed_kms_key_id: SSEKMSKeyId = LocalAttribute(default=str)
1✔
764

765
    # static tagging service instance
766
    TAGS: TaggingService = CrossAccountAttribute(default=TaggingService)
1✔
767

768

769
class BucketCorsIndex:
1✔
770
    def __init__(self):
1✔
771
        self._cors_index_cache = None
1✔
772
        self._bucket_index_cache = None
1✔
773

774
    @property
1✔
775
    def cors(self) -> dict[str, CORSConfiguration]:
1✔
776
        if self._cors_index_cache is None:
1✔
UNCOV
777
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
×
778
        return self._cors_index_cache
1✔
779

780
    @property
1✔
781
    def buckets(self) -> set[str]:
1✔
782
        if self._bucket_index_cache is None:
1✔
783
            self._bucket_index_cache, self._cors_index_cache = self._build_index()
1✔
784
        return self._bucket_index_cache
1✔
785

786
    def invalidate(self):
1✔
787
        self._cors_index_cache = None
1✔
788
        self._bucket_index_cache = None
1✔
789

790
    @staticmethod
1✔
791
    def _build_index() -> tuple[set[BucketName], dict[BucketName, CORSConfiguration]]:
1✔
792
        buckets = set()
1✔
793
        cors_index = {}
1✔
794
        # we create a shallow copy with dict to avoid size changed during iteration, as the store could have new account
795
        # or region create from any other requests
796
        for account_id, regions in dict(s3_stores).items():
1✔
797
            for bucket_name, bucket in dict(regions[AWS_REGION_US_EAST_1].buckets).items():
1✔
798
                bucket: S3Bucket
799
                buckets.add(bucket_name)
1✔
800
                if bucket.cors_rules is not None:
1✔
801
                    cors_index[bucket_name] = bucket.cors_rules
1✔
802

803
        return buckets, cors_index
1✔
804

805

806
class EncryptionParameters(NamedTuple):
1✔
807
    encryption: ServerSideEncryption
1✔
808
    kms_key_id: SSEKMSKeyId
1✔
809
    bucket_key_enabled: BucketKeyEnabled
1✔
810

811

812
class ObjectLockParameters(NamedTuple):
1✔
813
    lock_until: ObjectLockRetainUntilDate
1✔
814
    lock_legal_status: ObjectLockLegalHoldStatus
1✔
815
    lock_mode: ObjectLockMode | ObjectLockRetentionMode
1✔
816

817

818
s3_stores = AccountRegionBundle[S3Store]("s3", S3Store)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc