• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 533d4262-4f08-49b7-b7ba-18c46e51ac1a

02 Jun 2025 06:43PM UTC coverage: 86.752% (+0.1%) from 86.654%
533d4262-4f08-49b7-b7ba-18c46e51ac1a

push

circleci

web-flow
APIGW: implement Canary Deployments CRUD logic (#12694)

142 of 147 new or added lines in 3 files covered. (96.6%)

187 existing lines in 16 files now uncovered.

64937 of 74854 relevant lines covered (86.75%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.73
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
from collections import defaultdict
1✔
8
from inspect import signature
1✔
9
from io import BytesIO
1✔
10
from operator import itemgetter
1✔
11
from typing import IO, Optional, Union
1✔
12
from urllib import parse as urlparse
1✔
13
from zoneinfo import ZoneInfo
1✔
14

15
from localstack import config
1✔
16
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
17
from localstack.aws.api.s3 import (
1✔
18
    MFA,
19
    AbortMultipartUploadOutput,
20
    AccelerateConfiguration,
21
    AccessControlPolicy,
22
    AccessDenied,
23
    AccountId,
24
    AnalyticsConfiguration,
25
    AnalyticsId,
26
    BadDigest,
27
    Body,
28
    Bucket,
29
    BucketAlreadyExists,
30
    BucketAlreadyOwnedByYou,
31
    BucketCannedACL,
32
    BucketLifecycleConfiguration,
33
    BucketLoggingStatus,
34
    BucketName,
35
    BucketNotEmpty,
36
    BucketRegion,
37
    BucketVersioningStatus,
38
    BypassGovernanceRetention,
39
    ChecksumAlgorithm,
40
    ChecksumCRC32,
41
    ChecksumCRC32C,
42
    ChecksumCRC64NVME,
43
    ChecksumSHA1,
44
    ChecksumSHA256,
45
    ChecksumType,
46
    CommonPrefix,
47
    CompletedMultipartUpload,
48
    CompleteMultipartUploadOutput,
49
    ConditionalRequestConflict,
50
    ConfirmRemoveSelfBucketAccess,
51
    ContentMD5,
52
    CopyObjectOutput,
53
    CopyObjectRequest,
54
    CopyObjectResult,
55
    CopyPartResult,
56
    CORSConfiguration,
57
    CreateBucketOutput,
58
    CreateBucketRequest,
59
    CreateMultipartUploadOutput,
60
    CreateMultipartUploadRequest,
61
    CrossLocationLoggingProhibitted,
62
    Delete,
63
    DeletedObject,
64
    DeleteMarkerEntry,
65
    DeleteObjectOutput,
66
    DeleteObjectsOutput,
67
    DeleteObjectTaggingOutput,
68
    Delimiter,
69
    EncodingType,
70
    Error,
71
    Expiration,
72
    FetchOwner,
73
    GetBucketAccelerateConfigurationOutput,
74
    GetBucketAclOutput,
75
    GetBucketAnalyticsConfigurationOutput,
76
    GetBucketCorsOutput,
77
    GetBucketEncryptionOutput,
78
    GetBucketIntelligentTieringConfigurationOutput,
79
    GetBucketInventoryConfigurationOutput,
80
    GetBucketLifecycleConfigurationOutput,
81
    GetBucketLocationOutput,
82
    GetBucketLoggingOutput,
83
    GetBucketOwnershipControlsOutput,
84
    GetBucketPolicyOutput,
85
    GetBucketPolicyStatusOutput,
86
    GetBucketReplicationOutput,
87
    GetBucketRequestPaymentOutput,
88
    GetBucketTaggingOutput,
89
    GetBucketVersioningOutput,
90
    GetBucketWebsiteOutput,
91
    GetObjectAclOutput,
92
    GetObjectAttributesOutput,
93
    GetObjectAttributesParts,
94
    GetObjectAttributesRequest,
95
    GetObjectLegalHoldOutput,
96
    GetObjectLockConfigurationOutput,
97
    GetObjectOutput,
98
    GetObjectRequest,
99
    GetObjectRetentionOutput,
100
    GetObjectTaggingOutput,
101
    GetObjectTorrentOutput,
102
    GetPublicAccessBlockOutput,
103
    HeadBucketOutput,
104
    HeadObjectOutput,
105
    HeadObjectRequest,
106
    IfMatch,
107
    IfMatchInitiatedTime,
108
    IfMatchLastModifiedTime,
109
    IfMatchSize,
110
    IfNoneMatch,
111
    IntelligentTieringConfiguration,
112
    IntelligentTieringId,
113
    InvalidArgument,
114
    InvalidBucketName,
115
    InvalidDigest,
116
    InvalidLocationConstraint,
117
    InvalidObjectState,
118
    InvalidPartNumber,
119
    InvalidPartOrder,
120
    InvalidStorageClass,
121
    InvalidTargetBucketForLogging,
122
    InventoryConfiguration,
123
    InventoryId,
124
    KeyMarker,
125
    LifecycleRules,
126
    ListBucketAnalyticsConfigurationsOutput,
127
    ListBucketIntelligentTieringConfigurationsOutput,
128
    ListBucketInventoryConfigurationsOutput,
129
    ListBucketsOutput,
130
    ListMultipartUploadsOutput,
131
    ListObjectsOutput,
132
    ListObjectsV2Output,
133
    ListObjectVersionsOutput,
134
    ListPartsOutput,
135
    Marker,
136
    MaxBuckets,
137
    MaxKeys,
138
    MaxParts,
139
    MaxUploads,
140
    MethodNotAllowed,
141
    MissingSecurityHeader,
142
    MpuObjectSize,
143
    MultipartUpload,
144
    MultipartUploadId,
145
    NoSuchBucket,
146
    NoSuchBucketPolicy,
147
    NoSuchCORSConfiguration,
148
    NoSuchKey,
149
    NoSuchLifecycleConfiguration,
150
    NoSuchPublicAccessBlockConfiguration,
151
    NoSuchTagSet,
152
    NoSuchUpload,
153
    NoSuchWebsiteConfiguration,
154
    NotificationConfiguration,
155
    Object,
156
    ObjectIdentifier,
157
    ObjectKey,
158
    ObjectLockConfiguration,
159
    ObjectLockConfigurationNotFoundError,
160
    ObjectLockEnabled,
161
    ObjectLockLegalHold,
162
    ObjectLockMode,
163
    ObjectLockRetention,
164
    ObjectLockToken,
165
    ObjectOwnership,
166
    ObjectVersion,
167
    ObjectVersionId,
168
    ObjectVersionStorageClass,
169
    OptionalObjectAttributesList,
170
    Owner,
171
    OwnershipControls,
172
    OwnershipControlsNotFoundError,
173
    Part,
174
    PartNumber,
175
    PartNumberMarker,
176
    Policy,
177
    PostResponse,
178
    PreconditionFailed,
179
    Prefix,
180
    PublicAccessBlockConfiguration,
181
    PutBucketAclRequest,
182
    PutBucketLifecycleConfigurationOutput,
183
    PutObjectAclOutput,
184
    PutObjectAclRequest,
185
    PutObjectLegalHoldOutput,
186
    PutObjectLockConfigurationOutput,
187
    PutObjectOutput,
188
    PutObjectRequest,
189
    PutObjectRetentionOutput,
190
    PutObjectTaggingOutput,
191
    ReplicationConfiguration,
192
    ReplicationConfigurationNotFoundError,
193
    RequestPayer,
194
    RequestPaymentConfiguration,
195
    RestoreObjectOutput,
196
    RestoreRequest,
197
    S3Api,
198
    ServerSideEncryption,
199
    ServerSideEncryptionConfiguration,
200
    SkipValidation,
201
    SSECustomerAlgorithm,
202
    SSECustomerKey,
203
    SSECustomerKeyMD5,
204
    StartAfter,
205
    StorageClass,
206
    Tagging,
207
    Token,
208
    TransitionDefaultMinimumObjectSize,
209
    UploadIdMarker,
210
    UploadPartCopyOutput,
211
    UploadPartCopyRequest,
212
    UploadPartOutput,
213
    UploadPartRequest,
214
    VersionIdMarker,
215
    VersioningConfiguration,
216
    WebsiteConfiguration,
217
)
218
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
219
from localstack.aws.handlers import (
1✔
220
    modify_service_response,
221
    preprocess_request,
222
    serve_custom_service_request_handlers,
223
)
224
from localstack.constants import AWS_REGION_US_EAST_1
1✔
225
from localstack.services.edge import ROUTER
1✔
226
from localstack.services.plugins import ServiceLifecycleHook
1✔
227
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
228
from localstack.services.s3.constants import (
1✔
229
    ALLOWED_HEADER_OVERRIDES,
230
    ARCHIVES_STORAGE_CLASSES,
231
    CHECKSUM_ALGORITHMS,
232
    DEFAULT_BUCKET_ENCRYPTION,
233
)
234
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
235
from localstack.services.s3.exceptions import (
1✔
236
    InvalidBucketOwnerAWSAccountID,
237
    InvalidBucketState,
238
    InvalidRequest,
239
    MalformedPolicy,
240
    MalformedXML,
241
    NoSuchConfiguration,
242
    NoSuchObjectLockConfiguration,
243
    UnexpectedContent,
244
)
245
from localstack.services.s3.models import (
1✔
246
    BucketCorsIndex,
247
    EncryptionParameters,
248
    ObjectLockParameters,
249
    S3Bucket,
250
    S3DeleteMarker,
251
    S3Multipart,
252
    S3Object,
253
    S3Part,
254
    S3Store,
255
    VersionedKeyStore,
256
    s3_stores,
257
)
258
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
259
from localstack.services.s3.presigned_url import validate_post_policy
1✔
260
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
261
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
262
from localstack.services.s3.utils import (
1✔
263
    ObjectRange,
264
    add_expiration_days_to_datetime,
265
    base_64_content_md5_to_etag,
266
    create_redirect_for_post_request,
267
    create_s3_kms_managed_key_for_region,
268
    etag_to_base_64_content_md5,
269
    extract_bucket_key_version_id_from_copy_source,
270
    generate_safe_version_id,
271
    get_canned_acl,
272
    get_class_attrs_from_spec_class,
273
    get_failed_precondition_copy_source,
274
    get_full_default_bucket_location,
275
    get_kms_key_arn,
276
    get_lifecycle_rule_from_object,
277
    get_owner_for_account_id,
278
    get_permission_from_header,
279
    get_retention_from_now,
280
    get_s3_checksum_algorithm_from_request,
281
    get_s3_checksum_algorithm_from_trailing_headers,
282
    get_system_metadata_from_request,
283
    get_unique_key_id,
284
    is_bucket_name_valid,
285
    is_version_older_than_other,
286
    parse_copy_source_range_header,
287
    parse_post_object_tagging_xml,
288
    parse_range_header,
289
    parse_tagging_header,
290
    s3_response_handler,
291
    serialize_expiration_header,
292
    str_to_rfc_1123_datetime,
293
    validate_dict_fields,
294
    validate_failed_precondition,
295
    validate_kms_key_id,
296
    validate_tag_set,
297
)
298
from localstack.services.s3.validation import (
1✔
299
    parse_grants_in_headers,
300
    validate_acl_acp,
301
    validate_bucket_analytics_configuration,
302
    validate_bucket_intelligent_tiering_configuration,
303
    validate_canned_acl,
304
    validate_checksum_value,
305
    validate_cors_configuration,
306
    validate_inventory_configuration,
307
    validate_lifecycle_configuration,
308
    validate_object_key,
309
    validate_sse_c,
310
    validate_website_configuration,
311
)
312
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
313
from localstack.state import AssetDirectory, StateVisitor
1✔
314
from localstack.utils.aws.arns import s3_bucket_name
1✔
315
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
316

317
LOG = logging.getLogger(__name__)
1✔
318

319
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
320
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
321
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
322

323
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
324

325

326
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
327
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
328
        super().__init__()
1✔
329
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
330
        self._notification_dispatcher = NotificationDispatcher()
1✔
331
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
332

333
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
334
        # in case the rules have changed
335
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
336

337
    def on_after_init(self):
1✔
338
        preprocess_request.append(self._cors_handler)
1✔
339
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
340
        modify_service_response.append(self.service, s3_response_handler)
1✔
341
        register_website_hosting_routes(router=ROUTER)
1✔
342

343
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
344
        visitor.visit(s3_stores)
×
345
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
346

347
    def on_before_state_save(self):
1✔
348
        self._storage_backend.flush()
×
349

350
    def on_after_state_reset(self):
1✔
351
        self._cors_handler.invalidate_cache()
×
352

353
    def on_after_state_load(self):
1✔
354
        self._cors_handler.invalidate_cache()
×
355

356
    def on_before_stop(self):
1✔
357
        self._notification_dispatcher.shutdown()
1✔
358
        self._storage_backend.close()
1✔
359

360
    def _notify(
1✔
361
        self,
362
        context: RequestContext,
363
        s3_bucket: S3Bucket,
364
        s3_object: S3Object | S3DeleteMarker = None,
365
        s3_notif_ctx: S3EventNotificationContext = None,
366
    ):
367
        """
368
        :param context: the RequestContext, to retrieve more information about the incoming notification
369
        :param s3_bucket: the S3Bucket object
370
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
371
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
372
        :return:
373
        """
374
        if s3_bucket.notification_configuration:
1✔
375
            if not s3_notif_ctx:
1✔
376
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
377
                    context,
378
                    s3_bucket=s3_bucket,
379
                    s3_object=s3_object,
380
                )
381

382
            self._notification_dispatcher.send_notifications(
1✔
383
                s3_notif_ctx, s3_bucket.notification_configuration
384
            )
385

386
    def _verify_notification_configuration(
1✔
387
        self,
388
        notification_configuration: NotificationConfiguration,
389
        skip_destination_validation: SkipValidation,
390
        context: RequestContext,
391
        bucket_name: str,
392
    ):
393
        self._notification_dispatcher.verify_configuration(
1✔
394
            notification_configuration, skip_destination_validation, context, bucket_name
395
        )
396

397
    def _get_expiration_header(
1✔
398
        self,
399
        lifecycle_rules: LifecycleRules,
400
        bucket: BucketName,
401
        s3_object: S3Object,
402
        object_tags: dict[str, str],
403
    ) -> Expiration:
404
        """
405
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
406
        the case. We're caching it because it can change depending on the set rules on the bucket.
407
        We can't use `lru_cache` as the parameters needs to be hashable
408
        :param lifecycle_rules: the bucket LifecycleRules
409
        :param s3_object: S3Object
410
        :param object_tags: the object tags
411
        :return: the Expiration header if there's a rule matching
412
        """
413
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
414
            return cached_exp
1✔
415

416
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
417
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
418
        ):
419
            expiration_header = serialize_expiration_header(
1✔
420
                lifecycle_rule["ID"],
421
                lifecycle_rule["Expiration"],
422
                s3_object.last_modified,
423
            )
424
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
425
            return expiration_header
1✔
426

427
    def _get_cross_account_bucket(
1✔
428
        self,
429
        context: RequestContext,
430
        bucket_name: BucketName,
431
        *,
432
        expected_bucket_owner: AccountId = None,
433
    ) -> tuple[S3Store, S3Bucket]:
434
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
435
            raise InvalidBucketOwnerAWSAccountID(
1✔
436
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
437
            )
438

439
        store = self.get_store(context.account_id, context.region)
1✔
440
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
441
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
442
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
443

444
            store = self.get_store(account_id, context.region)
1✔
445
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
446
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
447

448
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
449
            raise AccessDenied("Access Denied")
1✔
450

451
        return store, s3_bucket
1✔
452

453
    @staticmethod
1✔
454
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
455
        # Use default account id for external access? would need an anonymous one
456
        return s3_stores[account_id][region_name]
1✔
457

458
    @handler("CreateBucket", expand=False)
1✔
459
    def create_bucket(
1✔
460
        self,
461
        context: RequestContext,
462
        request: CreateBucketRequest,
463
    ) -> CreateBucketOutput:
464
        bucket_name = request["Bucket"]
1✔
465

466
        if not is_bucket_name_valid(bucket_name):
1✔
467
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
468

469
        # the XML parser returns an empty dict if the body contains the following:
470
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
471
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
472
        # if the body is empty or not
473
        if context.request.data and (
1✔
474
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
475
        ):
476
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
1✔
477
                raise MalformedXML()
1✔
478

479
            if context.region == AWS_REGION_US_EAST_1:
1✔
480
                if bucket_region == "us-east-1":
1✔
481
                    raise InvalidLocationConstraint(
1✔
482
                        "The specified location-constraint is not valid",
483
                        LocationConstraint=bucket_region,
484
                    )
485
            elif context.region != bucket_region:
1✔
486
                raise CommonServiceException(
1✔
487
                    code="IllegalLocationConstraintException",
488
                    message=f"The {bucket_region} location constraint is incompatible for the region specific endpoint this request was sent to.",
489
                )
490
        else:
491
            bucket_region = AWS_REGION_US_EAST_1
1✔
492
            if context.region != bucket_region:
1✔
493
                raise CommonServiceException(
1✔
494
                    code="IllegalLocationConstraintException",
495
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
496
                )
497

498
        store = self.get_store(context.account_id, bucket_region)
1✔
499

500
        if bucket_name in store.global_bucket_map:
1✔
501
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
502
            if existing_bucket_owner != context.account_id:
1✔
503
                raise BucketAlreadyExists()
1✔
504

505
            # if the existing bucket has the same owner, the behaviour will depend on the region
506
            if bucket_region != "us-east-1":
1✔
507
                raise BucketAlreadyOwnedByYou(
1✔
508
                    "Your previous request to create the named bucket succeeded and you already own it.",
509
                    BucketName=bucket_name,
510
                )
511
            else:
512
                # CreateBucket is idempotent in us-east-1
513
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
514

515
        if (
1✔
516
            object_ownership := request.get("ObjectOwnership")
517
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
518
            raise InvalidArgument(
1✔
519
                f"Invalid x-amz-object-ownership header: {object_ownership}",
520
                ArgumentName="x-amz-object-ownership",
521
            )
522
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
523
        owner = get_owner_for_account_id(context.account_id)
1✔
524
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
525
        s3_bucket = S3Bucket(
1✔
526
            name=bucket_name,
527
            account_id=context.account_id,
528
            bucket_region=bucket_region,
529
            owner=owner,
530
            acl=acl,
531
            object_ownership=request.get("ObjectOwnership"),
532
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
533
        )
534

535
        store.buckets[bucket_name] = s3_bucket
1✔
536
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
537
        self._cors_handler.invalidate_cache()
1✔
538
        self._storage_backend.create_bucket(bucket_name)
1✔
539

540
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
541
        location = (
1✔
542
            f"/{bucket_name}"
543
            if bucket_region == "us-east-1"
544
            else get_full_default_bucket_location(bucket_name)
545
        )
546
        response = CreateBucketOutput(Location=location)
1✔
547
        return response
1✔
548

549
    def delete_bucket(
1✔
550
        self,
551
        context: RequestContext,
552
        bucket: BucketName,
553
        expected_bucket_owner: AccountId = None,
554
        **kwargs,
555
    ) -> None:
556
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
557

558
        # the bucket still contains objects
559
        if not s3_bucket.objects.is_empty():
1✔
560
            message = "The bucket you tried to delete is not empty"
1✔
561
            if s3_bucket.versioning_status:
1✔
562
                message += ". You must delete all versions in the bucket."
1✔
563
            raise BucketNotEmpty(
1✔
564
                message,
565
                BucketName=bucket,
566
            )
567

568
        store.buckets.pop(bucket)
1✔
569
        store.global_bucket_map.pop(bucket)
1✔
570
        self._cors_handler.invalidate_cache()
1✔
571
        self._expiration_cache.pop(bucket, None)
1✔
572
        # clean up the storage backend
573
        self._storage_backend.delete_bucket(bucket)
1✔
574

575
    def list_buckets(
1✔
576
        self,
577
        context: RequestContext,
578
        max_buckets: MaxBuckets = None,
579
        continuation_token: Token = None,
580
        prefix: Prefix = None,
581
        bucket_region: BucketRegion = None,
582
        **kwargs,
583
    ) -> ListBucketsOutput:
584
        owner = get_owner_for_account_id(context.account_id)
1✔
585
        store = self.get_store(context.account_id, context.region)
1✔
586

587
        decoded_continuation_token = (
1✔
588
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
589
            if continuation_token
590
            else None
591
        )
592

593
        count = 0
1✔
594
        buckets: list[Bucket] = []
1✔
595
        next_continuation_token = None
1✔
596

597
        # Comparing strings with case sensitivity since AWS is case-sensitive
598
        for bucket in sorted(store.buckets.values(), key=lambda r: r.name):
1✔
599
            if continuation_token and bucket.name < decoded_continuation_token:
1✔
600
                continue
1✔
601

602
            if prefix and not bucket.name.startswith(prefix):
1✔
603
                continue
1✔
604

605
            if bucket_region and not bucket.bucket_region == bucket_region:
1✔
606
                continue
1✔
607

608
            if max_buckets and count >= max_buckets:
1✔
609
                next_continuation_token = to_str(base64.urlsafe_b64encode(bucket.name.encode()))
1✔
610
                break
1✔
611

612
            output_bucket = Bucket(
1✔
613
                Name=bucket.name,
614
                CreationDate=bucket.creation_date,
615
                BucketRegion=bucket.bucket_region,
616
            )
617
            buckets.append(output_bucket)
1✔
618
            count += 1
1✔
619

620
        return ListBucketsOutput(
1✔
621
            Owner=owner, Buckets=buckets, Prefix=prefix, ContinuationToken=next_continuation_token
622
        )
623

624
    def head_bucket(
1✔
625
        self,
626
        context: RequestContext,
627
        bucket: BucketName,
628
        expected_bucket_owner: AccountId = None,
629
        **kwargs,
630
    ) -> HeadBucketOutput:
631
        store = self.get_store(context.account_id, context.region)
1✔
632
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
633
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
634
                # just to return the 404 error message
635
                raise NoSuchBucket()
1✔
636

637
            store = self.get_store(account_id, context.region)
1✔
638
            if not (s3_bucket := store.buckets.get(bucket)):
1✔
639
                # just to return the 404 error message
640
                raise NoSuchBucket()
×
641

642
        # TODO: this call is also used to check if the user has access/authorization for the bucket
643
        #  it can return 403
644
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
645

646
    def get_bucket_location(
1✔
647
        self,
648
        context: RequestContext,
649
        bucket: BucketName,
650
        expected_bucket_owner: AccountId = None,
651
        **kwargs,
652
    ) -> GetBucketLocationOutput:
653
        """
654
        When implementing the ASF provider, this operation is implemented because:
655
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
656
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
657
        - We circumvent the root level element here by patching the spec such that this operation returns a
658
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
659
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
660
          the payload, which is why we need to manually do this here by manipulating the string.
661
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
662
        """
663
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
664

665
        location_constraint = (
1✔
666
            '<?xml version="1.0" encoding="UTF-8"?>\n'
667
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
668
        )
669

670
        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
1✔
671
        location_constraint = location_constraint.replace("{{location}}", location)
1✔
672

673
        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
1✔
674
        return response
1✔
675

676
    @handler("PutObject", expand=False)
1✔
677
    def put_object(
1✔
678
        self,
679
        context: RequestContext,
680
        request: PutObjectRequest,
681
    ) -> PutObjectOutput:
682
        # TODO: validate order of validation
683
        # TODO: still need to handle following parameters
684
        #  request_payer: RequestPayer = None,
685
        bucket_name = request["Bucket"]
1✔
686
        key = request["Key"]
1✔
687
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
688

689
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
690
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
691
        ):
692
            raise InvalidStorageClass(
1✔
693
                "The storage class you specified is not valid", StorageClassRequested=storage_class
694
            )
695

696
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
697
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
698

699
        validate_object_key(key)
1✔
700

701
        if_match = request.get("IfMatch")
1✔
702
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
703
            raise NotImplementedException(
704
                "A header you provided implies functionality that is not implemented",
705
                Header="If-Match,If-None-Match",
706
                additionalMessage="Multiple conditional request headers present in the request",
707
            )
708

709
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
710
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
711
            raise NotImplementedException(
712
                "A header you provided implies functionality that is not implemented",
713
                Header=header_name,
714
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
715
            )
716

717
        system_metadata = get_system_metadata_from_request(request)
1✔
718
        if not system_metadata.get("ContentType"):
1✔
719
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
720

721
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
722

723
        etag_content_md5 = ""
1✔
724
        if content_md5 := request.get("ContentMD5"):
1✔
725
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
726
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
727
            if not etag_content_md5:
1✔
728
                raise InvalidDigest(
1✔
729
                    "The Content-MD5 you specified was invalid.",
730
                    Content_MD5=content_md5,
731
                )
732

733
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
734
        checksum_value = (
1✔
735
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
736
        )
737

738
        # TODO: we're not encrypting the object with the provided key for now
739
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
740
        validate_sse_c(
1✔
741
            algorithm=request.get("SSECustomerAlgorithm"),
742
            encryption_key=request.get("SSECustomerKey"),
743
            encryption_key_md5=sse_c_key_md5,
744
            server_side_encryption=request.get("ServerSideEncryption"),
745
        )
746

747
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
748
            request,
749
            s3_bucket,
750
            store,
751
        )
752

753
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
754

755
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
756

757
        if tagging := request.get("Tagging"):
1✔
758
            tagging = parse_tagging_header(tagging)
1✔
759

760
        s3_object = S3Object(
1✔
761
            key=key,
762
            version_id=version_id,
763
            storage_class=storage_class,
764
            expires=request.get("Expires"),
765
            user_metadata=request.get("Metadata"),
766
            system_metadata=system_metadata,
767
            checksum_algorithm=checksum_algorithm,
768
            checksum_value=checksum_value,
769
            encryption=encryption_parameters.encryption,
770
            kms_key_id=encryption_parameters.kms_key_id,
771
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
772
            sse_key_hash=sse_c_key_md5,
773
            lock_mode=lock_parameters.lock_mode,
774
            lock_legal_status=lock_parameters.lock_legal_status,
775
            lock_until=lock_parameters.lock_until,
776
            website_redirect_location=request.get("WebsiteRedirectLocation"),
777
            acl=acl,
778
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
779
        )
780

781
        body = request.get("Body")
1✔
782
        # check if chunked request
783
        headers = context.request.headers
1✔
784
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
785
            "STREAMING-"
786
        ) or "aws-chunked" in headers.get("content-encoding", "")
787
        if is_aws_chunked:
1✔
788
            checksum_algorithm = (
1✔
789
                checksum_algorithm
790
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
791
            )
792
            if checksum_algorithm:
1✔
793
                s3_object.checksum_algorithm = checksum_algorithm
1✔
794

795
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
796
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
797

798
            # S3 removes the `aws-chunked` value from ContentEncoding
799
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
800
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
801
                if encodings:
1✔
802
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
803

804
        with self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object:
1✔
805
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
806
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
807
            # opening the lock and other requests can check this condition
808
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
809
                raise PreconditionFailed(
1✔
810
                    "At least one of the pre-conditions you specified did not hold",
811
                    Condition="If-None-Match",
812
                )
813

814
            elif if_match:
1✔
815
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
816

817
            s3_stored_object.write(body)
1✔
818

819
            if s3_object.checksum_algorithm:
1✔
820
                if not s3_object.checksum_value:
1✔
821
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
822
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
823
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
824
                    raise InvalidRequest(
1✔
825
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
826
                    )
827
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
828
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
829
                    raise BadDigest(
1✔
830
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
831
                    )
832

833
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
834
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
835
            if content_md5:
1✔
836
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
837
                if calculated_md5 != content_md5:
1✔
838
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
839
                    raise BadDigest(
1✔
840
                        "The Content-MD5 you specified did not match what we received.",
841
                        ExpectedDigest=etag_content_md5,
842
                        CalculatedDigest=calculated_md5,
843
                    )
844

845
            s3_bucket.objects.set(key, s3_object)
1✔
846

847
        # in case we are overriding an object, delete the tags entry
848
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
849
        store.TAGS.tags.pop(key_id, None)
1✔
850
        if tagging:
1✔
851
            store.TAGS.tags[key_id] = tagging
1✔
852

853
        # RequestCharged: Optional[RequestCharged]  # TODO
854
        response = PutObjectOutput(
1✔
855
            ETag=s3_object.quoted_etag,
856
        )
857
        if s3_bucket.versioning_status == "Enabled":
1✔
858
            response["VersionId"] = s3_object.version_id
1✔
859

860
        if s3_object.checksum_algorithm:
1✔
861
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
862
            response["ChecksumType"] = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
863

864
        if s3_bucket.lifecycle_rules:
1✔
865
            if expiration_header := self._get_expiration_header(
1✔
866
                s3_bucket.lifecycle_rules,
867
                bucket_name,
868
                s3_object,
869
                store.TAGS.tags.get(key_id, {}),
870
            ):
871
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
872
                #  apply them everytime we get/head an object
873
                response["Expiration"] = expiration_header
1✔
874

875
        add_encryption_to_response(response, s3_object=s3_object)
1✔
876
        if sse_c_key_md5:
1✔
877
            response["SSECustomerAlgorithm"] = "AES256"
1✔
878
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
879

880
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
881

882
        return response
1✔
883

884
    @handler("GetObject", expand=False)
1✔
885
    def get_object(
1✔
886
        self,
887
        context: RequestContext,
888
        request: GetObjectRequest,
889
    ) -> GetObjectOutput:
890
        # TODO: missing handling parameters:
891
        #  request_payer: RequestPayer = None,
892
        #  expected_bucket_owner: AccountId = None,
893

894
        bucket_name = request["Bucket"]
1✔
895
        object_key = request["Key"]
1✔
896
        version_id = request.get("VersionId")
1✔
897
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
898

899
        s3_object = s3_bucket.get_object(
1✔
900
            key=object_key,
901
            version_id=version_id,
902
            http_method="GET",
903
        )
904
        if s3_object.expires and s3_object.expires < datetime.datetime.now(
1✔
905
            tz=s3_object.expires.tzinfo
906
        ):
907
            # TODO: old behaviour was deleting key instantly if expired. AWS cleans up only once a day generally
908
            #  you can still HeadObject on it and you get the expiry time until scheduled deletion
909
            kwargs = {"Key": object_key}
1✔
910
            if version_id:
1✔
911
                kwargs["VersionId"] = version_id
×
912
            raise NoSuchKey("The specified key does not exist.", **kwargs)
1✔
913

914
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
915
            raise InvalidObjectState(
1✔
916
                "The operation is not valid for the object's storage class",
917
                StorageClass=s3_object.storage_class,
918
            )
919

920
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
921
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
922

923
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
924
        # we're using getattr access because when restoring, the field might not exist
925
        # TODO: cleanup at next major release
926
        if sse_key_hash := getattr(s3_object, "sse_key_hash", None):
1✔
927
            if sse_key_hash and not sse_c_key_md5:
1✔
928
                raise InvalidRequest(
1✔
929
                    "The object was stored using a form of Server Side Encryption. "
930
                    "The correct parameters must be provided to retrieve the object."
931
                )
932
            elif sse_key_hash != sse_c_key_md5:
1✔
933
                raise AccessDenied(
1✔
934
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
935
                )
936

937
        validate_sse_c(
1✔
938
            algorithm=request.get("SSECustomerAlgorithm"),
939
            encryption_key=request.get("SSECustomerKey"),
940
            encryption_key_md5=sse_c_key_md5,
941
        )
942

943
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
944

945
        range_header = request.get("Range")
1✔
946
        part_number = request.get("PartNumber")
1✔
947
        if range_header and part_number:
1✔
948
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
949
        range_data = None
1✔
950
        if range_header:
1✔
951
            range_data = parse_range_header(range_header, s3_object.size)
1✔
952
        elif part_number:
1✔
953
            range_data = get_part_range(s3_object, part_number)
1✔
954

955
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
956
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
957
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
958
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
959
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
960

961
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
962
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
963
        # the object again
964
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
965
            s3_object = s3_bucket.get_object(
1✔
966
                key=object_key,
967
                version_id=version_id,
968
                http_method="GET",
969
            )
970

971
        response = GetObjectOutput(
1✔
972
            AcceptRanges="bytes",
973
            **s3_object.get_system_metadata_fields(),
974
        )
975
        if s3_object.user_metadata:
1✔
976
            response["Metadata"] = s3_object.user_metadata
1✔
977

978
        if s3_object.parts and request.get("PartNumber"):
1✔
979
            response["PartsCount"] = len(s3_object.parts)
1✔
980

981
        if s3_object.version_id:
1✔
982
            response["VersionId"] = s3_object.version_id
1✔
983

984
        if s3_object.website_redirect_location:
1✔
985
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
986

987
        if s3_object.restore:
1✔
988
            response["Restore"] = s3_object.restore
×
989

990
        checksum_value = None
1✔
991
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
992
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
993
                checksum_value = s3_object.checksum_value
1✔
994

995
        if range_data:
1✔
996
            s3_stored_object.seek(range_data.begin)
1✔
997
            response["Body"] = LimitedIterableStream(
1✔
998
                s3_stored_object, max_length=range_data.content_length
999
            )
1000
            response["ContentRange"] = range_data.content_range
1✔
1001
            response["ContentLength"] = range_data.content_length
1✔
1002
            response["StatusCode"] = 206
1✔
1003
            if range_data.content_length == s3_object.size and checksum_value:
1✔
1004
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1005
                response["ChecksumType"] = getattr(
1✔
1006
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1007
                )
1008
        else:
1009
            response["Body"] = s3_stored_object
1✔
1010
            if checksum_value:
1✔
1011
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1012
                response["ChecksumType"] = getattr(
1✔
1013
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1014
                )
1015

1016
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1017

1018
        if object_tags := store.TAGS.tags.get(
1✔
1019
            get_unique_key_id(bucket_name, object_key, version_id)
1020
        ):
1021
            response["TagCount"] = len(object_tags)
1✔
1022

1023
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
1024
            if expiration_header := self._get_expiration_header(
1✔
1025
                s3_bucket.lifecycle_rules,
1026
                bucket_name,
1027
                s3_object,
1028
                object_tags,
1029
            ):
1030
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1031
                #  apply them everytime we get/head an object
1032
                response["Expiration"] = expiration_header
1✔
1033

1034
        # TODO: missing returned fields
1035
        #     RequestCharged: Optional[RequestCharged]
1036
        #     ReplicationStatus: Optional[ReplicationStatus]
1037

1038
        if s3_object.lock_mode:
1✔
1039
            response["ObjectLockMode"] = s3_object.lock_mode
×
1040
            if s3_object.lock_until:
×
1041
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1042
        if s3_object.lock_legal_status:
1✔
1043
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1044

1045
        if sse_c_key_md5:
1✔
1046
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1047
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1048

1049
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1050
            if request_param_value := request.get(request_param):
1✔
1051
                response[response_param] = request_param_value
1✔
1052

1053
        return response
1✔
1054

1055
    @handler("HeadObject", expand=False)
1✔
1056
    def head_object(
1✔
1057
        self,
1058
        context: RequestContext,
1059
        request: HeadObjectRequest,
1060
    ) -> HeadObjectOutput:
1061
        bucket_name = request["Bucket"]
1✔
1062
        object_key = request["Key"]
1✔
1063
        version_id = request.get("VersionId")
1✔
1064
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1065

1066
        s3_object = s3_bucket.get_object(
1✔
1067
            key=object_key,
1068
            version_id=version_id,
1069
            http_method="HEAD",
1070
        )
1071

1072
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1073

1074
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1075
        if s3_object.sse_key_hash:
1✔
1076
            if not sse_c_key_md5:
1✔
1077
                raise InvalidRequest(
×
1078
                    "The object was stored using a form of Server Side Encryption. "
1079
                    "The correct parameters must be provided to retrieve the object."
1080
                )
1081
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1082
                raise AccessDenied(
1✔
1083
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1084
                )
1085

1086
        validate_sse_c(
1✔
1087
            algorithm=request.get("SSECustomerAlgorithm"),
1088
            encryption_key=request.get("SSECustomerKey"),
1089
            encryption_key_md5=sse_c_key_md5,
1090
        )
1091

1092
        response = HeadObjectOutput(
1✔
1093
            AcceptRanges="bytes",
1094
            **s3_object.get_system_metadata_fields(),
1095
        )
1096
        if s3_object.user_metadata:
1✔
1097
            response["Metadata"] = s3_object.user_metadata
1✔
1098

1099
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1100
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1101
                response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
1102
                response["ChecksumType"] = getattr(
1✔
1103
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1104
                )
1105

1106
        if s3_object.parts and request.get("PartNumber"):
1✔
1107
            response["PartsCount"] = len(s3_object.parts)
1✔
1108

1109
        if s3_object.version_id:
1✔
1110
            response["VersionId"] = s3_object.version_id
1✔
1111

1112
        if s3_object.website_redirect_location:
1✔
1113
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1114

1115
        if s3_object.restore:
1✔
1116
            response["Restore"] = s3_object.restore
1✔
1117

1118
        range_header = request.get("Range")
1✔
1119
        part_number = request.get("PartNumber")
1✔
1120
        if range_header and part_number:
1✔
1121
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1122
        range_data = None
1✔
1123
        if range_header:
1✔
1124
            range_data = parse_range_header(range_header, s3_object.size)
×
1125
        elif part_number:
1✔
1126
            range_data = get_part_range(s3_object, part_number)
1✔
1127

1128
        if range_data:
1✔
1129
            response["ContentLength"] = range_data.content_length
1✔
1130
            response["ContentRange"] = range_data.content_range
1✔
1131
            response["StatusCode"] = 206
1✔
1132

1133
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1134

1135
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1136
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1137
            object_tags = store.TAGS.tags.get(
1✔
1138
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1139
            )
1140
            if expiration_header := self._get_expiration_header(
1✔
1141
                s3_bucket.lifecycle_rules,
1142
                bucket_name,
1143
                s3_object,
1144
                object_tags,
1145
            ):
1146
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1147
                #  apply them everytime we get/head an object
1148
                response["Expiration"] = expiration_header
1✔
1149

1150
        if s3_object.lock_mode:
1✔
1151
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1152
            if s3_object.lock_until:
1✔
1153
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1154
        if s3_object.lock_legal_status:
1✔
1155
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1156

1157
        if sse_c_key_md5:
1✔
1158
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1159
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1160

1161
        # TODO: missing return fields:
1162
        #  ArchiveStatus: Optional[ArchiveStatus]
1163
        #  RequestCharged: Optional[RequestCharged]
1164
        #  ReplicationStatus: Optional[ReplicationStatus]
1165

1166
        return response
1✔
1167

1168
    def delete_object(
1✔
1169
        self,
1170
        context: RequestContext,
1171
        bucket: BucketName,
1172
        key: ObjectKey,
1173
        mfa: MFA = None,
1174
        version_id: ObjectVersionId = None,
1175
        request_payer: RequestPayer = None,
1176
        bypass_governance_retention: BypassGovernanceRetention = None,
1177
        expected_bucket_owner: AccountId = None,
1178
        if_match: IfMatch = None,
1179
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1180
        if_match_size: IfMatchSize = None,
1181
        **kwargs,
1182
    ) -> DeleteObjectOutput:
1183
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1184

1185
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1186
            raise InvalidArgument(
1✔
1187
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1188
                ArgumentName="x-amz-bypass-governance-retention",
1189
            )
1190

1191
        if s3_bucket.versioning_status is None:
1✔
1192
            if version_id and version_id != "null":
1✔
1193
                raise InvalidArgument(
1✔
1194
                    "Invalid version id specified",
1195
                    ArgumentName="versionId",
1196
                    ArgumentValue=version_id,
1197
                )
1198

1199
            found_object = s3_bucket.objects.pop(key, None)
1✔
1200
            # TODO: RequestCharged
1201
            if found_object:
1✔
1202
                self._storage_backend.remove(bucket, found_object)
1✔
1203
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1204
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1205

1206
            return DeleteObjectOutput()
1✔
1207

1208
        if not version_id:
1✔
1209
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1210
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1211
            s3_bucket.objects.set(key, delete_marker)
1✔
1212
            s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1213
                context,
1214
                s3_bucket=s3_bucket,
1215
                s3_object=delete_marker,
1216
            )
1217
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1218
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1219

1220
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1221

1222
        if key not in s3_bucket.objects:
1✔
1223
            return DeleteObjectOutput()
×
1224

1225
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1226
            raise InvalidArgument(
1✔
1227
                "Invalid version id specified",
1228
                ArgumentName="versionId",
1229
                ArgumentValue=version_id,
1230
            )
1231

1232
        if s3_object.is_locked(bypass_governance_retention):
1✔
1233
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1234

1235
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1236
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1237

1238
        if isinstance(s3_object, S3DeleteMarker):
1✔
1239
            response["DeleteMarker"] = True
1✔
1240
        else:
1241
            self._storage_backend.remove(bucket, s3_object)
1✔
1242
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1243
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1244

1245
        return response
1✔
1246

1247
    def delete_objects(
1✔
1248
        self,
1249
        context: RequestContext,
1250
        bucket: BucketName,
1251
        delete: Delete,
1252
        mfa: MFA = None,
1253
        request_payer: RequestPayer = None,
1254
        bypass_governance_retention: BypassGovernanceRetention = None,
1255
        expected_bucket_owner: AccountId = None,
1256
        checksum_algorithm: ChecksumAlgorithm = None,
1257
        **kwargs,
1258
    ) -> DeleteObjectsOutput:
1259
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1260

1261
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1262
            raise InvalidArgument(
1✔
1263
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1264
                ArgumentName="x-amz-bypass-governance-retention",
1265
            )
1266

1267
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1268
        if not objects:
1✔
1269
            raise MalformedXML()
×
1270

1271
        # TODO: max 1000 delete at once? test against AWS?
1272

1273
        quiet = delete.get("Quiet", False)
1✔
1274
        deleted = []
1✔
1275
        errors = []
1✔
1276

1277
        to_remove = []
1✔
1278
        for to_delete_object in objects:
1✔
1279
            object_key = to_delete_object.get("Key")
1✔
1280
            version_id = to_delete_object.get("VersionId")
1✔
1281
            if s3_bucket.versioning_status is None:
1✔
1282
                if version_id and version_id != "null":
1✔
1283
                    errors.append(
1✔
1284
                        Error(
1285
                            Code="NoSuchVersion",
1286
                            Key=object_key,
1287
                            Message="The specified version does not exist.",
1288
                            VersionId=version_id,
1289
                        )
1290
                    )
1291
                    continue
1✔
1292

1293
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1294
                if found_object:
1✔
1295
                    to_remove.append(found_object)
1✔
1296
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1297
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1298
                # small hack to not create a fake object for nothing
1299
                elif s3_bucket.notification_configuration:
1✔
1300
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1301
                    # for a non-existing object being deleted
1302
                    self._notify(
1✔
1303
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1304
                    )
1305

1306
                if not quiet:
1✔
1307
                    deleted.append(DeletedObject(Key=object_key))
1✔
1308

1309
                continue
1✔
1310

1311
            if not version_id:
1✔
1312
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1313
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1314
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1315
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1316
                    context,
1317
                    s3_bucket=s3_bucket,
1318
                    s3_object=delete_marker,
1319
                )
1320
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1321
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1322

1323
                if not quiet:
1✔
1324
                    deleted.append(
1✔
1325
                        DeletedObject(
1326
                            DeleteMarker=True,
1327
                            DeleteMarkerVersionId=delete_marker_id,
1328
                            Key=object_key,
1329
                        )
1330
                    )
1331
                continue
1✔
1332

1333
            if not (
1✔
1334
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1335
            ):
1336
                errors.append(
1✔
1337
                    Error(
1338
                        Code="NoSuchVersion",
1339
                        Key=object_key,
1340
                        Message="The specified version does not exist.",
1341
                        VersionId=version_id,
1342
                    )
1343
                )
1344
                continue
1✔
1345

1346
            if found_object.is_locked(bypass_governance_retention):
1✔
1347
                errors.append(
1✔
1348
                    Error(
1349
                        Code="AccessDenied",
1350
                        Key=object_key,
1351
                        Message="Access Denied because object protected by object lock.",
1352
                        VersionId=version_id,
1353
                    )
1354
                )
1355
                continue
1✔
1356

1357
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1358
            if not quiet:
1✔
1359
                deleted_object = DeletedObject(
1✔
1360
                    Key=object_key,
1361
                    VersionId=version_id,
1362
                )
1363
                if isinstance(found_object, S3DeleteMarker):
1✔
1364
                    deleted_object["DeleteMarker"] = True
1✔
1365
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1366

1367
                deleted.append(deleted_object)
1✔
1368

1369
            if isinstance(found_object, S3Object):
1✔
1370
                to_remove.append(found_object)
1✔
1371

1372
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1373
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1374

1375
        # TODO: request charged
1376
        self._storage_backend.remove(bucket, to_remove)
1✔
1377
        response: DeleteObjectsOutput = {}
1✔
1378
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1379
        if errors:
1✔
1380
            response["Errors"] = errors
1✔
1381
        if not quiet:
1✔
1382
            response["Deleted"] = deleted
1✔
1383

1384
        return response
1✔
1385

1386
    @handler("CopyObject", expand=False)
1✔
1387
    def copy_object(
1✔
1388
        self,
1389
        context: RequestContext,
1390
        request: CopyObjectRequest,
1391
    ) -> CopyObjectOutput:
1392
        # request_payer: RequestPayer = None,  # TODO:
1393
        dest_bucket = request["Bucket"]
1✔
1394
        dest_key = request["Key"]
1✔
1395
        validate_object_key(dest_key)
1✔
1396
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1397

1398
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1399
            request.get("CopySource")
1400
        )
1401
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1402

1403
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1404
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1405

1406
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1407
        try:
1✔
1408
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
1409
        except MethodNotAllowed:
×
1410
            raise InvalidRequest(
×
1411
                "The source of a copy request may not specifically refer to a delete marker by version id."
1412
            )
1413

1414
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
1415
            raise InvalidObjectState(
×
1416
                "Operation is not valid for the source object's storage class",
1417
                StorageClass=src_s3_object.storage_class,
1418
            )
1419

1420
        if failed_condition := get_failed_precondition_copy_source(
1✔
1421
            request, src_s3_object.last_modified, src_s3_object.etag
1422
        ):
1423
            raise PreconditionFailed(
1✔
1424
                "At least one of the pre-conditions you specified did not hold",
1425
                Condition=failed_condition,
1426
            )
1427

1428
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1429
        if src_s3_object.sse_key_hash:
1✔
1430
            if not source_sse_c_key_md5:
1✔
1431
                raise InvalidRequest(
1✔
1432
                    "The object was stored using a form of Server Side Encryption. "
1433
                    "The correct parameters must be provided to retrieve the object."
1434
                )
1435
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
1436
                raise AccessDenied("Access Denied")
×
1437

1438
        validate_sse_c(
1✔
1439
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1440
            encryption_key=request.get("CopySourceSSECustomerKey"),
1441
            encryption_key_md5=source_sse_c_key_md5,
1442
        )
1443

1444
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1445
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1446
        # validate target SSE-C parameters
1447
        validate_sse_c(
1✔
1448
            algorithm=request.get("SSECustomerAlgorithm"),
1449
            encryption_key=request.get("SSECustomerKey"),
1450
            encryption_key_md5=target_sse_c_key_md5,
1451
            server_side_encryption=server_side_encryption,
1452
        )
1453

1454
        # TODO validate order of validation
1455
        storage_class = request.get("StorageClass")
1✔
1456
        metadata_directive = request.get("MetadataDirective")
1✔
1457
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1458
        # we need to check for identity of the object, to see if the default one has been changed
1459
        is_default_encryption = (
1✔
1460
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1461
            and src_s3_object.encryption == "AES256"
1462
        )
1463
        if (
1✔
1464
            src_bucket == dest_bucket
1465
            and src_key == dest_key
1466
            and not any(
1467
                (
1468
                    storage_class,
1469
                    server_side_encryption,
1470
                    target_sse_c_key_md5,
1471
                    metadata_directive == "REPLACE",
1472
                    website_redirect_location,
1473
                    dest_s3_bucket.encryption_rule
1474
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1475
                    src_s3_object.restore,
1476
                )
1477
            )
1478
        ):
1479
            raise InvalidRequest(
1✔
1480
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1481
                "object's metadata, storage class, website redirect location or encryption attributes."
1482
            )
1483

1484
        if tagging := request.get("Tagging"):
1✔
1485
            tagging = parse_tagging_header(tagging)
1✔
1486

1487
        if metadata_directive == "REPLACE":
1✔
1488
            user_metadata = request.get("Metadata")
1✔
1489
            system_metadata = get_system_metadata_from_request(request)
1✔
1490
            if not system_metadata.get("ContentType"):
1✔
1491
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1492
        else:
1493
            user_metadata = src_s3_object.user_metadata
1✔
1494
            system_metadata = src_s3_object.system_metadata
1✔
1495

1496
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1497

1498
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1499
            request,
1500
            dest_s3_bucket,
1501
            store,
1502
        )
1503
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1504
            request, dest_s3_bucket
1505
        )
1506

1507
        acl = get_access_control_policy_for_new_resource_request(
1✔
1508
            request, owner=dest_s3_bucket.owner
1509
        )
1510
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1511

1512
        s3_object = S3Object(
1✔
1513
            key=dest_key,
1514
            size=src_s3_object.size,
1515
            version_id=dest_version_id,
1516
            storage_class=storage_class,
1517
            expires=request.get("Expires"),
1518
            user_metadata=user_metadata,
1519
            system_metadata=system_metadata,
1520
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1521
            encryption=encryption_parameters.encryption,
1522
            kms_key_id=encryption_parameters.kms_key_id,
1523
            bucket_key_enabled=request.get(
1524
                "BucketKeyEnabled"
1525
            ),  # CopyObject does not inherit from the bucket here
1526
            sse_key_hash=target_sse_c_key_md5,
1527
            lock_mode=lock_parameters.lock_mode,
1528
            lock_legal_status=lock_parameters.lock_legal_status,
1529
            lock_until=lock_parameters.lock_until,
1530
            website_redirect_location=website_redirect_location,
1531
            expiration=None,  # TODO, from lifecycle
1532
            acl=acl,
1533
            owner=dest_s3_bucket.owner,
1534
        )
1535

1536
        with self._storage_backend.copy(
1✔
1537
            src_bucket=src_bucket,
1538
            src_object=src_s3_object,
1539
            dest_bucket=dest_bucket,
1540
            dest_object=s3_object,
1541
        ) as s3_stored_object:
1542
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1543
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1544

1545
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1546

1547
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1548

1549
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1550
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1551
        else:
1552
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1553
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1554
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1555

1556
        copy_object_result = CopyObjectResult(
1✔
1557
            ETag=s3_object.quoted_etag,
1558
            LastModified=s3_object.last_modified,
1559
        )
1560
        if s3_object.checksum_algorithm:
1✔
1561
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1562
                s3_object.checksum_value
1563
            )
1564

1565
        response = CopyObjectOutput(
1✔
1566
            CopyObjectResult=copy_object_result,
1567
        )
1568

1569
        if s3_object.version_id:
1✔
1570
            response["VersionId"] = s3_object.version_id
1✔
1571

1572
        if s3_object.expiration:
1✔
1573
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1574

1575
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1576
        if target_sse_c_key_md5:
1✔
1577
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1578
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1579

1580
        if (
1✔
1581
            src_s3_bucket.versioning_status
1582
            and src_s3_object.version_id
1583
            and src_s3_object.version_id != "null"
1584
        ):
1585
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1586

1587
        # RequestCharged: Optional[RequestCharged] # TODO
1588
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1589

1590
        return response
1✔
1591

1592
    def list_objects(
1✔
1593
        self,
1594
        context: RequestContext,
1595
        bucket: BucketName,
1596
        delimiter: Delimiter = None,
1597
        encoding_type: EncodingType = None,
1598
        marker: Marker = None,
1599
        max_keys: MaxKeys = None,
1600
        prefix: Prefix = None,
1601
        request_payer: RequestPayer = None,
1602
        expected_bucket_owner: AccountId = None,
1603
        optional_object_attributes: OptionalObjectAttributesList = None,
1604
        **kwargs,
1605
    ) -> ListObjectsOutput:
1606
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1607

1608
        common_prefixes = set()
1✔
1609
        count = 0
1✔
1610
        is_truncated = False
1✔
1611
        next_key_marker = None
1✔
1612
        max_keys = max_keys or 1000
1✔
1613
        prefix = prefix or ""
1✔
1614
        delimiter = delimiter or ""
1✔
1615
        if encoding_type:
1✔
1616
            prefix = urlparse.quote(prefix)
1✔
1617
            delimiter = urlparse.quote(delimiter)
1✔
1618

1619
        s3_objects: list[Object] = []
1✔
1620

1621
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1622
        last_key = all_keys[-1] if all_keys else None
1✔
1623

1624
        # sort by key
1625
        for s3_object in all_keys:
1✔
1626
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1627
            # skip all keys that alphabetically come before key_marker
1628
            if marker:
1✔
1629
                if key <= marker:
1✔
1630
                    continue
1✔
1631

1632
            # Filter for keys that start with prefix
1633
            if prefix and not key.startswith(prefix):
1✔
1634
                continue
×
1635

1636
            # see ListObjectsV2 for the logic comments (shared logic here)
1637
            prefix_including_delimiter = None
1✔
1638
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1639
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1640
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1641

1642
                if prefix_including_delimiter in common_prefixes or (
1✔
1643
                    marker and marker.startswith(prefix_including_delimiter)
1644
                ):
1645
                    continue
1✔
1646

1647
            if prefix_including_delimiter:
1✔
1648
                common_prefixes.add(prefix_including_delimiter)
1✔
1649
            else:
1650
                # TODO: add RestoreStatus if present
1651
                object_data = Object(
1✔
1652
                    Key=key,
1653
                    ETag=s3_object.quoted_etag,
1654
                    Owner=s3_bucket.owner,  # TODO: verify reality
1655
                    Size=s3_object.size,
1656
                    LastModified=s3_object.last_modified,
1657
                    StorageClass=s3_object.storage_class,
1658
                )
1659

1660
                if s3_object.checksum_algorithm:
1✔
1661
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1662
                    object_data["ChecksumType"] = getattr(
1✔
1663
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1664
                    )
1665

1666
                s3_objects.append(object_data)
1✔
1667

1668
            # we just added a CommonPrefix or an Object, increase the counter
1669
            count += 1
1✔
1670
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1671
                is_truncated = True
1✔
1672
                if prefix_including_delimiter:
1✔
1673
                    next_key_marker = prefix_including_delimiter
1✔
1674
                elif s3_objects:
1✔
1675
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1676
                break
1✔
1677

1678
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1679

1680
        response = ListObjectsOutput(
1✔
1681
            IsTruncated=is_truncated,
1682
            Name=bucket,
1683
            MaxKeys=max_keys,
1684
            Prefix=prefix or "",
1685
            Marker=marker or "",
1686
        )
1687
        if s3_objects:
1✔
1688
            response["Contents"] = s3_objects
1✔
1689
        if encoding_type:
1✔
1690
            response["EncodingType"] = EncodingType.url
1✔
1691
        if delimiter:
1✔
1692
            response["Delimiter"] = delimiter
1✔
1693
        if common_prefixes:
1✔
1694
            response["CommonPrefixes"] = common_prefixes
1✔
1695
        if delimiter and next_key_marker:
1✔
1696
            response["NextMarker"] = next_key_marker
1✔
1697
        if s3_bucket.bucket_region != "us-east-1":
1✔
1698
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1699

1700
        # RequestCharged: Optional[RequestCharged]  # TODO
1701
        return response
1✔
1702

1703
    def list_objects_v2(
1✔
1704
        self,
1705
        context: RequestContext,
1706
        bucket: BucketName,
1707
        delimiter: Delimiter = None,
1708
        encoding_type: EncodingType = None,
1709
        max_keys: MaxKeys = None,
1710
        prefix: Prefix = None,
1711
        continuation_token: Token = None,
1712
        fetch_owner: FetchOwner = None,
1713
        start_after: StartAfter = None,
1714
        request_payer: RequestPayer = None,
1715
        expected_bucket_owner: AccountId = None,
1716
        optional_object_attributes: OptionalObjectAttributesList = None,
1717
        **kwargs,
1718
    ) -> ListObjectsV2Output:
1719
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1720

1721
        if continuation_token == "":
1✔
1722
            raise InvalidArgument(
1✔
1723
                "The continuation token provided is incorrect",
1724
                ArgumentName="continuation-token",
1725
            )
1726

1727
        common_prefixes = set()
1✔
1728
        count = 0
1✔
1729
        is_truncated = False
1✔
1730
        next_continuation_token = None
1✔
1731
        max_keys = max_keys or 1000
1✔
1732
        prefix = prefix or ""
1✔
1733
        delimiter = delimiter or ""
1✔
1734
        if encoding_type:
1✔
1735
            prefix = urlparse.quote(prefix)
1✔
1736
            delimiter = urlparse.quote(delimiter)
1✔
1737
        decoded_continuation_token = (
1✔
1738
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1739
            if continuation_token
1740
            else None
1741
        )
1742

1743
        s3_objects: list[Object] = []
1✔
1744

1745
        # sort by key
1746
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1747
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1748

1749
            # skip all keys that alphabetically come before continuation_token
1750
            if continuation_token:
1✔
1751
                if key < decoded_continuation_token:
1✔
1752
                    continue
1✔
1753

1754
            elif start_after:
1✔
1755
                if key <= start_after:
1✔
1756
                    continue
1✔
1757

1758
            # Filter for keys that start with prefix
1759
            if prefix and not key.startswith(prefix):
1✔
1760
                continue
1✔
1761

1762
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1763
            prefix_including_delimiter = None
1✔
1764
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1765
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1766
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1767

1768
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1769
                # the entry without increasing the counter. We need to iterate over all of these entries before
1770
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1771
                if prefix_including_delimiter in common_prefixes:
1✔
1772
                    continue
1✔
1773

1774
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1775
            if count >= max_keys:
1✔
1776
                is_truncated = True
1✔
1777
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1778
                break
1✔
1779

1780
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1781
            # else, it means it's a new Object, add it to the Contents
1782
            if prefix_including_delimiter:
1✔
1783
                common_prefixes.add(prefix_including_delimiter)
1✔
1784
            else:
1785
                # TODO: add RestoreStatus if present
1786
                object_data = Object(
1✔
1787
                    Key=key,
1788
                    ETag=s3_object.quoted_etag,
1789
                    Size=s3_object.size,
1790
                    LastModified=s3_object.last_modified,
1791
                    StorageClass=s3_object.storage_class,
1792
                )
1793

1794
                if fetch_owner:
1✔
1795
                    object_data["Owner"] = s3_bucket.owner
×
1796

1797
                if s3_object.checksum_algorithm:
1✔
1798
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1799
                    object_data["ChecksumType"] = getattr(
1✔
1800
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1801
                    )
1802

1803
                s3_objects.append(object_data)
1✔
1804

1805
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1806
            count += 1
1✔
1807

1808
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1809

1810
        response = ListObjectsV2Output(
1✔
1811
            IsTruncated=is_truncated,
1812
            Name=bucket,
1813
            MaxKeys=max_keys,
1814
            Prefix=prefix or "",
1815
            KeyCount=count,
1816
        )
1817
        if s3_objects:
1✔
1818
            response["Contents"] = s3_objects
1✔
1819
        if encoding_type:
1✔
1820
            response["EncodingType"] = EncodingType.url
1✔
1821
        if delimiter:
1✔
1822
            response["Delimiter"] = delimiter
1✔
1823
        if common_prefixes:
1✔
1824
            response["CommonPrefixes"] = common_prefixes
1✔
1825
        if next_continuation_token:
1✔
1826
            response["NextContinuationToken"] = next_continuation_token
1✔
1827

1828
        if continuation_token:
1✔
1829
            response["ContinuationToken"] = continuation_token
1✔
1830
        elif start_after:
1✔
1831
            response["StartAfter"] = start_after
1✔
1832

1833
        if s3_bucket.bucket_region != "us-east-1":
1✔
1834
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1835

1836
        # RequestCharged: Optional[RequestCharged]  # TODO
1837
        return response
1✔
1838

1839
    def list_object_versions(
1✔
1840
        self,
1841
        context: RequestContext,
1842
        bucket: BucketName,
1843
        delimiter: Delimiter = None,
1844
        encoding_type: EncodingType = None,
1845
        key_marker: KeyMarker = None,
1846
        max_keys: MaxKeys = None,
1847
        prefix: Prefix = None,
1848
        version_id_marker: VersionIdMarker = None,
1849
        expected_bucket_owner: AccountId = None,
1850
        request_payer: RequestPayer = None,
1851
        optional_object_attributes: OptionalObjectAttributesList = None,
1852
        **kwargs,
1853
    ) -> ListObjectVersionsOutput:
1854
        if version_id_marker and not key_marker:
1✔
1855
            raise InvalidArgument(
1✔
1856
                "A version-id marker cannot be specified without a key marker.",
1857
                ArgumentName="version-id-marker",
1858
                ArgumentValue=version_id_marker,
1859
            )
1860

1861
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1862
        common_prefixes = set()
1✔
1863
        count = 0
1✔
1864
        is_truncated = False
1✔
1865
        next_key_marker = None
1✔
1866
        next_version_id_marker = None
1✔
1867
        max_keys = max_keys or 1000
1✔
1868
        prefix = prefix or ""
1✔
1869
        delimiter = delimiter or ""
1✔
1870
        if encoding_type:
1✔
1871
            prefix = urlparse.quote(prefix)
1✔
1872
            delimiter = urlparse.quote(delimiter)
1✔
1873
        version_key_marker_found = False
1✔
1874

1875
        object_versions: list[ObjectVersion] = []
1✔
1876
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1877

1878
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1879
        # sort by key, and last-modified-date, to get the last version first
1880
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1881
        last_version = all_versions[-1] if all_versions else None
1✔
1882

1883
        for version in all_versions:
1✔
1884
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1885
            # skip all keys that alphabetically come before key_marker
1886
            if key_marker:
1✔
1887
                if key < key_marker:
1✔
1888
                    continue
1✔
1889
                elif key == key_marker:
1✔
1890
                    if not version_id_marker:
1✔
1891
                        continue
1✔
1892
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
1893
                    if version.version_id == version_id_marker:
1✔
1894
                        version_key_marker_found = True
1✔
1895
                        continue
1✔
1896

1897
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
1898
                    # as soon as the next version id is older than the version id marker (meaning this version was
1899
                    # next after the now-deleted version)
1900
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
1901
                        version_key_marker_found = True
1✔
1902

1903
                    elif not version_key_marker_found:
1✔
1904
                        # as long as we have not passed the version_key_marker, skip the versions
1905
                        continue
1✔
1906

1907
            # Filter for keys that start with prefix
1908
            if prefix and not key.startswith(prefix):
1✔
1909
                continue
1✔
1910

1911
            # see ListObjectsV2 for the logic comments (shared logic here)
1912
            prefix_including_delimiter = None
1✔
1913
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1914
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1915
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1916

1917
                if prefix_including_delimiter in common_prefixes or (
1✔
1918
                    key_marker and key_marker.startswith(prefix_including_delimiter)
1919
                ):
1920
                    continue
1✔
1921

1922
            if prefix_including_delimiter:
1✔
1923
                common_prefixes.add(prefix_including_delimiter)
1✔
1924

1925
            elif isinstance(version, S3DeleteMarker):
1✔
1926
                delete_marker = DeleteMarkerEntry(
1✔
1927
                    Key=key,
1928
                    Owner=s3_bucket.owner,
1929
                    VersionId=version.version_id,
1930
                    IsLatest=version.is_current,
1931
                    LastModified=version.last_modified,
1932
                )
1933
                delete_markers.append(delete_marker)
1✔
1934
            else:
1935
                # TODO: add RestoreStatus if present
1936
                object_version = ObjectVersion(
1✔
1937
                    Key=key,
1938
                    ETag=version.quoted_etag,
1939
                    Owner=s3_bucket.owner,  # TODO: verify reality
1940
                    Size=version.size,
1941
                    VersionId=version.version_id or "null",
1942
                    LastModified=version.last_modified,
1943
                    IsLatest=version.is_current,
1944
                    # TODO: verify this, are other class possible?
1945
                    # StorageClass=version.storage_class,
1946
                    StorageClass=ObjectVersionStorageClass.STANDARD,
1947
                )
1948

1949
                if version.checksum_algorithm:
1✔
1950
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
1951
                    object_version["ChecksumType"] = getattr(
1✔
1952
                        version, "checksum_type", ChecksumType.FULL_OBJECT
1953
                    )
1954

1955
                object_versions.append(object_version)
1✔
1956

1957
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
1958
            count += 1
1✔
1959
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
1960
                is_truncated = True
1✔
1961
                if prefix_including_delimiter:
1✔
1962
                    next_key_marker = prefix_including_delimiter
1✔
1963
                else:
1964
                    next_key_marker = version.key
1✔
1965
                    next_version_id_marker = version.version_id
1✔
1966
                break
1✔
1967

1968
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1969

1970
        response = ListObjectVersionsOutput(
1✔
1971
            IsTruncated=is_truncated,
1972
            Name=bucket,
1973
            MaxKeys=max_keys,
1974
            Prefix=prefix,
1975
            KeyMarker=key_marker or "",
1976
            VersionIdMarker=version_id_marker or "",
1977
        )
1978
        if object_versions:
1✔
1979
            response["Versions"] = object_versions
1✔
1980
        if encoding_type:
1✔
1981
            response["EncodingType"] = EncodingType.url
1✔
1982
        if delete_markers:
1✔
1983
            response["DeleteMarkers"] = delete_markers
1✔
1984
        if delimiter:
1✔
1985
            response["Delimiter"] = delimiter
1✔
1986
        if common_prefixes:
1✔
1987
            response["CommonPrefixes"] = common_prefixes
1✔
1988
        if next_key_marker:
1✔
1989
            response["NextKeyMarker"] = next_key_marker
1✔
1990
        if next_version_id_marker:
1✔
1991
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
1992

1993
        # RequestCharged: Optional[RequestCharged]  # TODO
1994
        return response
1✔
1995

1996
    @handler("GetObjectAttributes", expand=False)
1✔
1997
    def get_object_attributes(
1✔
1998
        self,
1999
        context: RequestContext,
2000
        request: GetObjectAttributesRequest,
2001
    ) -> GetObjectAttributesOutput:
2002
        bucket_name = request["Bucket"]
1✔
2003
        object_key = request["Key"]
1✔
2004
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2005

2006
        s3_object = s3_bucket.get_object(
1✔
2007
            key=object_key,
2008
            version_id=request.get("VersionId"),
2009
            http_method="GET",
2010
        )
2011

2012
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2013
        if s3_object.sse_key_hash:
1✔
2014
            if not sse_c_key_md5:
1✔
2015
                raise InvalidRequest(
×
2016
                    "The object was stored using a form of Server Side Encryption. "
2017
                    "The correct parameters must be provided to retrieve the object."
2018
                )
2019
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
2020
                raise AccessDenied("Access Denied")
×
2021

2022
        validate_sse_c(
1✔
2023
            algorithm=request.get("SSECustomerAlgorithm"),
2024
            encryption_key=request.get("SSECustomerKey"),
2025
            encryption_key_md5=sse_c_key_md5,
2026
        )
2027

2028
        object_attrs = request.get("ObjectAttributes", [])
1✔
2029
        response = GetObjectAttributesOutput()
1✔
2030
        if "ETag" in object_attrs:
1✔
2031
            response["ETag"] = s3_object.etag
1✔
2032
        if "StorageClass" in object_attrs:
1✔
2033
            response["StorageClass"] = s3_object.storage_class
1✔
2034
        if "ObjectSize" in object_attrs:
1✔
2035
            response["ObjectSize"] = s3_object.size
1✔
2036
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2037
            if s3_object.parts:
1✔
2038
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2039
            else:
2040
                checksum_value = s3_object.checksum_value
1✔
2041
            response["Checksum"] = {
1✔
2042
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2043
                "ChecksumType": getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT),
2044
            }
2045

2046
        response["LastModified"] = s3_object.last_modified
1✔
2047

2048
        if s3_bucket.versioning_status:
1✔
2049
            response["VersionId"] = s3_object.version_id
1✔
2050

2051
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2052
            # TODO: implements ObjectParts, this is basically a simplified `ListParts` call on the object, we might
2053
            #  need to store more data about the Parts once we implement checksums for them
2054
            response["ObjectParts"] = GetObjectAttributesParts(TotalPartsCount=len(s3_object.parts))
1✔
2055

2056
        return response
1✔
2057

2058
    def restore_object(
1✔
2059
        self,
2060
        context: RequestContext,
2061
        bucket: BucketName,
2062
        key: ObjectKey,
2063
        version_id: ObjectVersionId = None,
2064
        restore_request: RestoreRequest = None,
2065
        request_payer: RequestPayer = None,
2066
        checksum_algorithm: ChecksumAlgorithm = None,
2067
        expected_bucket_owner: AccountId = None,
2068
        **kwargs,
2069
    ) -> RestoreObjectOutput:
2070
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2071

2072
        s3_object = s3_bucket.get_object(
1✔
2073
            key=key,
2074
            version_id=version_id,
2075
            http_method="GET",  # TODO: verify http method
2076
        )
2077
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
2078
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2079

2080
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2081
        # will only implement only the same functionality for now
2082

2083
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2084
        status_code = 200 if s3_object.restore else 202
1✔
2085
        restore_days = restore_request.get("Days")
1✔
2086
        if not restore_days:
1✔
2087
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
2088
            return RestoreObjectOutput()
×
2089

2090
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2091
            datetime.datetime.now(datetime.UTC), restore_days
2092
        )
2093
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2094
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2095

2096
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context_native(
1✔
2097
            context,
2098
            s3_bucket=s3_bucket,
2099
            s3_object=s3_object,
2100
        )
2101
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2102
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2103
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2104
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2105
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2106
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2107
            "Post", "Completed"
2108
        )
2109
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2110

2111
        # TODO: request charged
2112
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2113

2114
    @handler("CreateMultipartUpload", expand=False)
1✔
2115
    def create_multipart_upload(
1✔
2116
        self,
2117
        context: RequestContext,
2118
        request: CreateMultipartUploadRequest,
2119
    ) -> CreateMultipartUploadOutput:
2120
        # TODO: handle missing parameters:
2121
        #  request_payer: RequestPayer = None,
2122
        bucket_name = request["Bucket"]
1✔
2123
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2124

2125
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2126
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2127
        ):
2128
            raise InvalidStorageClass(
1✔
2129
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2130
            )
2131

2132
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2133
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2134

2135
        if tagging := request.get("Tagging"):
1✔
2136
            tagging = parse_tagging_header(tagging_header=tagging)
×
2137

2138
        key = request["Key"]
1✔
2139

2140
        system_metadata = get_system_metadata_from_request(request)
1✔
2141
        if not system_metadata.get("ContentType"):
1✔
2142
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2143

2144
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2145
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2146
            raise InvalidRequest(
1✔
2147
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2148
            )
2149

2150
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2151
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2152
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2153
            else:
2154
                checksum_type = ChecksumType.COMPOSITE
1✔
2155
        elif checksum_type and not checksum_algorithm:
1✔
2156
            raise InvalidRequest(
1✔
2157
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2158
            )
2159

2160
        if (
1✔
2161
            checksum_type == ChecksumType.COMPOSITE
2162
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2163
        ):
2164
            raise InvalidRequest(
1✔
2165
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2166
            )
2167
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2168
            "SHA"
2169
        ):
2170
            raise InvalidRequest(
1✔
2171
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2172
            )
2173

2174
        # TODO: we're not encrypting the object with the provided key for now
2175
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2176
        validate_sse_c(
1✔
2177
            algorithm=request.get("SSECustomerAlgorithm"),
2178
            encryption_key=request.get("SSECustomerKey"),
2179
            encryption_key_md5=sse_c_key_md5,
2180
            server_side_encryption=request.get("ServerSideEncryption"),
2181
        )
2182

2183
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2184
            request,
2185
            s3_bucket,
2186
            store,
2187
        )
2188
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2189

2190
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2191

2192
        # validate encryption values
2193
        s3_multipart = S3Multipart(
1✔
2194
            key=key,
2195
            storage_class=storage_class,
2196
            expires=request.get("Expires"),
2197
            user_metadata=request.get("Metadata"),
2198
            system_metadata=system_metadata,
2199
            checksum_algorithm=checksum_algorithm,
2200
            checksum_type=checksum_type,
2201
            encryption=encryption_parameters.encryption,
2202
            kms_key_id=encryption_parameters.kms_key_id,
2203
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2204
            sse_key_hash=sse_c_key_md5,
2205
            lock_mode=lock_parameters.lock_mode,
2206
            lock_legal_status=lock_parameters.lock_legal_status,
2207
            lock_until=lock_parameters.lock_until,
2208
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2209
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2210
            acl=acl,
2211
            initiator=get_owner_for_account_id(context.account_id),
2212
            tagging=tagging,
2213
            owner=s3_bucket.owner,
2214
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2215
        )
2216
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2217
        # PutObject)
2218
        if sse_c_key_md5:
1✔
2219
            s3_multipart.object.checksum_algorithm = None
1✔
2220

2221
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2222

2223
        response = CreateMultipartUploadOutput(
1✔
2224
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2225
        )
2226

2227
        if checksum_algorithm:
1✔
2228
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2229
            response["ChecksumType"] = checksum_type
1✔
2230

2231
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2232
        if sse_c_key_md5:
1✔
2233
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2234
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2235

2236
        # TODO: missing response fields we're not currently supporting
2237
        # - AbortDate: lifecycle related,not currently supported, todo
2238
        # - AbortRuleId: lifecycle related, not currently supported, todo
2239
        # - RequestCharged: todo
2240

2241
        return response
1✔
2242

2243
    @handler("UploadPart", expand=False)
1✔
2244
    def upload_part(
1✔
2245
        self,
2246
        context: RequestContext,
2247
        request: UploadPartRequest,
2248
    ) -> UploadPartOutput:
2249
        # TODO: missing following parameters:
2250
        #  content_length: ContentLength = None, ->validate?
2251
        #  content_md5: ContentMD5 = None, -> validate?
2252
        #  request_payer: RequestPayer = None,
2253
        bucket_name = request["Bucket"]
1✔
2254
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2255

2256
        upload_id = request.get("UploadId")
1✔
2257
        if not (
1✔
2258
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2259
        ) or s3_multipart.object.key != request.get("Key"):
2260
            raise NoSuchUpload(
1✔
2261
                "The specified upload does not exist. "
2262
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2263
                UploadId=upload_id,
2264
            )
2265
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2266
            raise InvalidArgument(
1✔
2267
                "Part number must be an integer between 1 and 10000, inclusive",
2268
                ArgumentName="partNumber",
2269
                ArgumentValue=part_number,
2270
            )
2271

2272
        if content_md5 := request.get("ContentMD5"):
1✔
2273
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2274
            if not base_64_content_md5_to_etag(content_md5):
1✔
2275
                raise InvalidDigest(
1✔
2276
                    "The Content-MD5 you specified was invalid.",
2277
                    Content_MD5=content_md5,
2278
                )
2279

2280
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2281
        checksum_value = (
1✔
2282
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2283
        )
2284

2285
        # TODO: we're not encrypting the object with the provided key for now
2286
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2287
        validate_sse_c(
1✔
2288
            algorithm=request.get("SSECustomerAlgorithm"),
2289
            encryption_key=request.get("SSECustomerKey"),
2290
            encryption_key_md5=sse_c_key_md5,
2291
        )
2292

2293
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2294
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2295
        ):
2296
            raise InvalidRequest(
1✔
2297
                "The multipart upload initiate requested encryption. "
2298
                "Subsequent part requests must include the appropriate encryption parameters."
2299
            )
2300
        elif (
1✔
2301
            s3_multipart.object.sse_key_hash
2302
            and sse_c_key_md5
2303
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2304
        ):
2305
            raise InvalidRequest(
1✔
2306
                "The provided encryption parameters did not match the ones used originally."
2307
            )
2308

2309
        s3_part = S3Part(
1✔
2310
            part_number=part_number,
2311
            checksum_algorithm=checksum_algorithm,
2312
            checksum_value=checksum_value,
2313
        )
2314
        body = request.get("Body")
1✔
2315
        headers = context.request.headers
1✔
2316
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2317
            "STREAMING-"
2318
        ) or "aws-chunked" in headers.get("content-encoding", "")
2319
        # check if chunked request
2320
        if is_aws_chunked:
1✔
2321
            checksum_algorithm = (
1✔
2322
                checksum_algorithm
2323
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2324
            )
2325
            if checksum_algorithm:
1✔
2326
                s3_part.checksum_algorithm = checksum_algorithm
×
2327

2328
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2329
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2330

2331
        if (
1✔
2332
            s3_multipart.checksum_algorithm
2333
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2334
        ):
2335
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2336
            error_mp_checksum = (
1✔
2337
                s3_multipart.object.checksum_algorithm.lower()
2338
                if s3_multipart.object.checksum_algorithm
2339
                else "null"
2340
            )
2341
            if not error_mp_checksum == "null":
1✔
2342
                raise InvalidRequest(
1✔
2343
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2344
                )
2345

2346
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2347
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2348
            try:
1✔
2349
                stored_s3_part.write(body)
1✔
2350
            except Exception:
1✔
2351
                stored_multipart.remove_part(s3_part)
1✔
2352
                raise
1✔
2353

2354
            if checksum_algorithm:
1✔
2355
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2356
                    stored_multipart.remove_part(s3_part)
1✔
2357
                    raise InvalidRequest(
1✔
2358
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2359
                    )
2360
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2361
                    stored_multipart.remove_part(s3_part)
1✔
2362
                    raise BadDigest(
1✔
2363
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2364
                    )
2365

2366
            if content_md5:
1✔
2367
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2368
                if calculated_md5 != content_md5:
1✔
2369
                    stored_multipart.remove_part(s3_part)
1✔
2370
                    raise BadDigest(
1✔
2371
                        "The Content-MD5 you specified did not match what we received.",
2372
                        ExpectedDigest=content_md5,
2373
                        CalculatedDigest=calculated_md5,
2374
                    )
2375

2376
            s3_multipart.parts[part_number] = s3_part
1✔
2377

2378
        response = UploadPartOutput(
1✔
2379
            ETag=s3_part.quoted_etag,
2380
        )
2381

2382
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2383
        if sse_c_key_md5:
1✔
2384
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2385
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2386

2387
        if s3_part.checksum_algorithm:
1✔
2388
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2389

2390
        # TODO: RequestCharged: Optional[RequestCharged]
2391
        return response
1✔
2392

2393
    @handler("UploadPartCopy", expand=False)
1✔
2394
    def upload_part_copy(
1✔
2395
        self,
2396
        context: RequestContext,
2397
        request: UploadPartCopyRequest,
2398
    ) -> UploadPartCopyOutput:
2399
        # TODO: handle following parameters:
2400
        #  copy_source_if_match: CopySourceIfMatch = None,
2401
        #  copy_source_if_modified_since: CopySourceIfModifiedSince = None,
2402
        #  copy_source_if_none_match: CopySourceIfNoneMatch = None,
2403
        #  copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,
2404
        #  request_payer: RequestPayer = None,
2405
        dest_bucket = request["Bucket"]
1✔
2406
        dest_key = request["Key"]
1✔
2407
        store = self.get_store(context.account_id, context.region)
1✔
2408
        # TODO: validate cross-account UploadPartCopy
2409
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
2410
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2411

2412
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2413
            request.get("CopySource")
2414
        )
2415

2416
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
2417
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2418

2419
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2420
        try:
1✔
2421
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
2422
        except MethodNotAllowed:
×
2423
            raise InvalidRequest(
×
2424
                "The source of a copy request may not specifically refer to a delete marker by version id."
2425
            )
2426

2427
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
2428
            raise InvalidObjectState(
×
2429
                "Operation is not valid for the source object's storage class",
2430
                StorageClass=src_s3_object.storage_class,
2431
            )
2432

2433
        upload_id = request.get("UploadId")
1✔
2434
        if (
1✔
2435
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2436
            or s3_multipart.object.key != dest_key
2437
        ):
2438
            raise NoSuchUpload(
×
2439
                "The specified upload does not exist. "
2440
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2441
                UploadId=upload_id,
2442
            )
2443

2444
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2445
            raise InvalidArgument(
×
2446
                "Part number must be an integer between 1 and 10000, inclusive",
2447
                ArgumentName="partNumber",
2448
                ArgumentValue=part_number,
2449
            )
2450

2451
        source_range = request.get("CopySourceRange")
1✔
2452
        # TODO implement copy source IF (done in ASF provider)
2453

2454
        range_data: Optional[ObjectRange] = None
1✔
2455
        if source_range:
1✔
2456
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2457

2458
        s3_part = S3Part(part_number=part_number)
1✔
2459

2460
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2461
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2462

2463
        s3_multipart.parts[part_number] = s3_part
1✔
2464

2465
        # TODO: return those fields (checksum not handled currently in moto for parts)
2466
        # ChecksumCRC32: Optional[ChecksumCRC32]
2467
        # ChecksumCRC32C: Optional[ChecksumCRC32C]
2468
        # ChecksumSHA1: Optional[ChecksumSHA1]
2469
        # ChecksumSHA256: Optional[ChecksumSHA256]
2470
        #     RequestCharged: Optional[RequestCharged]
2471

2472
        result = CopyPartResult(
1✔
2473
            ETag=s3_part.quoted_etag,
2474
            LastModified=s3_part.last_modified,
2475
        )
2476

2477
        response = UploadPartCopyOutput(
1✔
2478
            CopyPartResult=result,
2479
        )
2480

2481
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
2482
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2483

2484
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2485

2486
        return response
1✔
2487

2488
    def complete_multipart_upload(
1✔
2489
        self,
2490
        context: RequestContext,
2491
        bucket: BucketName,
2492
        key: ObjectKey,
2493
        upload_id: MultipartUploadId,
2494
        multipart_upload: CompletedMultipartUpload = None,
2495
        checksum_crc32: ChecksumCRC32 = None,
2496
        checksum_crc32_c: ChecksumCRC32C = None,
2497
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2498
        checksum_sha1: ChecksumSHA1 = None,
2499
        checksum_sha256: ChecksumSHA256 = None,
2500
        checksum_type: ChecksumType = None,
2501
        mpu_object_size: MpuObjectSize = None,
2502
        request_payer: RequestPayer = None,
2503
        expected_bucket_owner: AccountId = None,
2504
        if_match: IfMatch = None,
2505
        if_none_match: IfNoneMatch = None,
2506
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2507
        sse_customer_key: SSECustomerKey = None,
2508
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2509
        **kwargs,
2510
    ) -> CompleteMultipartUploadOutput:
2511
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2512

2513
        if (
1✔
2514
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2515
            or s3_multipart.object.key != key
2516
        ):
2517
            raise NoSuchUpload(
1✔
2518
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2519
                UploadId=upload_id,
2520
            )
2521

2522
        if if_none_match and if_match:
1✔
2523
            raise NotImplementedException(
2524
                "A header you provided implies functionality that is not implemented",
2525
                Header="If-Match,If-None-Match",
2526
                additionalMessage="Multiple conditional request headers present in the request",
2527
            )
2528

2529
        elif if_none_match:
1✔
2530
            if if_none_match != "*":
1✔
2531
                raise NotImplementedException(
2532
                    "A header you provided implies functionality that is not implemented",
2533
                    Header="If-None-Match",
2534
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2535
                )
2536
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2537
                raise PreconditionFailed(
1✔
2538
                    "At least one of the pre-conditions you specified did not hold",
2539
                    Condition="If-None-Match",
2540
                )
2541
            elif s3_multipart.precondition:
1✔
2542
                raise ConditionalRequestConflict(
1✔
2543
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2544
                    Condition="If-None-Match",
2545
                    Key=key,
2546
                )
2547

2548
        elif if_match:
1✔
2549
            if if_match == "*":
1✔
2550
                raise NotImplementedException(
2551
                    "A header you provided implies functionality that is not implemented",
2552
                    Header="If-None-Match",
2553
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2554
                )
2555
            verify_object_equality_precondition_write(
1✔
2556
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2557
            )
2558

2559
        parts = multipart_upload.get("Parts", [])
1✔
2560
        if not parts:
1✔
2561
            raise InvalidRequest("You must specify at least one part")
1✔
2562

2563
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2564
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2565
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2566
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2567
        if sorted(parts_numbers) != parts_numbers:
1✔
2568
            raise InvalidPartOrder(
1✔
2569
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2570
                UploadId=upload_id,
2571
            )
2572

2573
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2574
        mpu_checksum_type = getattr(s3_multipart, "checksum_type", None)
1✔
2575

2576
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2577
            raise InvalidRequest(
1✔
2578
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2579
                f"The complete request must use the same checksum mode."
2580
            )
2581

2582
        # generate the versionId before completing, in case the bucket versioning status has changed between
2583
        # creation and completion? AWS validate this
2584
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2585
        s3_multipart.object.version_id = version_id
1✔
2586

2587
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2588
        # persistence. if we do not have a new version, do not validate those parameters
2589
        # TODO: remove for next major version (minor?)
2590
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2591
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2592
            checksum_map = {
1✔
2593
                "crc32": checksum_crc32,
2594
                "crc32c": checksum_crc32_c,
2595
                "crc64nvme": checksum_crc64_nvme,
2596
                "sha1": checksum_sha1,
2597
                "sha256": checksum_sha256,
2598
            }
2599
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2600
            s3_multipart.complete_multipart(
1✔
2601
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2602
            )
2603
            if mpu_checksum_algorithm and (
1✔
2604
                (
2605
                    checksum_value
2606
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2607
                    and not checksum_type
2608
                )
2609
                or any(
2610
                    checksum_value
2611
                    for checksum_type, checksum_value in checksum_map.items()
2612
                    if checksum_type != checksum_algorithm
2613
                )
2614
            ):
2615
                # this is not ideal, but this validation comes last... after the validation of individual parts
2616
                s3_multipart.object.parts.clear()
1✔
2617
                raise BadDigest(
1✔
2618
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2619
                )
2620
        else:
2621
            s3_multipart.complete_multipart(parts)
×
2622

2623
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2624
        stored_multipart.complete_multipart(
1✔
2625
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
2626
        )
2627
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2628
            with self._storage_backend.open(
1✔
2629
                bucket, s3_multipart.object, mode="r"
2630
            ) as s3_stored_object:
2631
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2632
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2633

2634
        s3_object = s3_multipart.object
1✔
2635

2636
        s3_bucket.objects.set(key, s3_object)
1✔
2637

2638
        # remove the multipart now that it's complete
2639
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2640
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2641

2642
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2643
        store.TAGS.tags.pop(key_id, None)
1✔
2644
        if s3_multipart.tagging:
1✔
2645
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2646

2647
        # RequestCharged: Optional[RequestCharged] TODO
2648

2649
        response = CompleteMultipartUploadOutput(
1✔
2650
            Bucket=bucket,
2651
            Key=key,
2652
            ETag=s3_object.quoted_etag,
2653
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
2654
        )
2655

2656
        if s3_object.version_id:
1✔
2657
            response["VersionId"] = s3_object.version_id
×
2658

2659
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2660
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2661
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2662
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2663
            response["ChecksumType"] = s3_object.checksum_type
1✔
2664

2665
        if s3_object.expiration:
1✔
2666
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2667

2668
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2669

2670
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2671

2672
        return response
1✔
2673

2674
    def abort_multipart_upload(
1✔
2675
        self,
2676
        context: RequestContext,
2677
        bucket: BucketName,
2678
        key: ObjectKey,
2679
        upload_id: MultipartUploadId,
2680
        request_payer: RequestPayer = None,
2681
        expected_bucket_owner: AccountId = None,
2682
        if_match_initiated_time: IfMatchInitiatedTime = None,
2683
        **kwargs,
2684
    ) -> AbortMultipartUploadOutput:
2685
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2686

2687
        if (
1✔
2688
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2689
            or s3_multipart.object.key != key
2690
        ):
2691
            raise NoSuchUpload(
1✔
2692
                "The specified upload does not exist. "
2693
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2694
                UploadId=upload_id,
2695
            )
2696
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2697

2698
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2699
        response = AbortMultipartUploadOutput()
1✔
2700
        # TODO: requestCharged
2701
        return response
1✔
2702

2703
    def list_parts(
1✔
2704
        self,
2705
        context: RequestContext,
2706
        bucket: BucketName,
2707
        key: ObjectKey,
2708
        upload_id: MultipartUploadId,
2709
        max_parts: MaxParts = None,
2710
        part_number_marker: PartNumberMarker = None,
2711
        request_payer: RequestPayer = None,
2712
        expected_bucket_owner: AccountId = None,
2713
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2714
        sse_customer_key: SSECustomerKey = None,
2715
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2716
        **kwargs,
2717
    ) -> ListPartsOutput:
2718
        # TODO: implement MaxParts
2719
        # TODO: implements PartNumberMarker
2720
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2721

2722
        if (
1✔
2723
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2724
            or s3_multipart.object.key != key
2725
        ):
2726
            raise NoSuchUpload(
1✔
2727
                "The specified upload does not exist. "
2728
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2729
                UploadId=upload_id,
2730
            )
2731

2732
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2733
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2734
        #     RequestCharged: Optional[RequestCharged]
2735

2736
        count = 0
1✔
2737
        is_truncated = False
1✔
2738
        part_number_marker = part_number_marker or 0
1✔
2739
        max_parts = max_parts or 1000
1✔
2740

2741
        parts = []
1✔
2742
        all_parts = sorted(s3_multipart.parts.items())
1✔
2743
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2744
        for part_number, part in all_parts:
1✔
2745
            if part_number <= part_number_marker:
1✔
2746
                continue
1✔
2747
            part_item = Part(
1✔
2748
                ETag=part.quoted_etag,
2749
                LastModified=part.last_modified,
2750
                PartNumber=part_number,
2751
                Size=part.size,
2752
            )
2753
            if s3_multipart.checksum_algorithm:
1✔
2754
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2755

2756
            parts.append(part_item)
1✔
2757
            count += 1
1✔
2758

2759
            if count >= max_parts and part.part_number != last_part_number:
1✔
2760
                is_truncated = True
1✔
2761
                break
1✔
2762

2763
        response = ListPartsOutput(
1✔
2764
            Bucket=bucket,
2765
            Key=key,
2766
            UploadId=upload_id,
2767
            Initiator=s3_multipart.initiator,
2768
            Owner=s3_multipart.initiator,
2769
            StorageClass=s3_multipart.object.storage_class,
2770
            IsTruncated=is_truncated,
2771
            MaxParts=max_parts,
2772
            PartNumberMarker=0,
2773
            NextPartNumberMarker=0,
2774
        )
2775
        if parts:
1✔
2776
            response["Parts"] = parts
1✔
2777
            last_part = parts[-1]["PartNumber"]
1✔
2778
            response["NextPartNumberMarker"] = last_part
1✔
2779

2780
        if part_number_marker:
1✔
2781
            response["PartNumberMarker"] = part_number_marker
1✔
2782
        if s3_multipart.checksum_algorithm:
1✔
2783
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2784
            response["ChecksumType"] = getattr(s3_multipart, "checksum_type", None)
1✔
2785

2786
        return response
1✔
2787

2788
    def list_multipart_uploads(
1✔
2789
        self,
2790
        context: RequestContext,
2791
        bucket: BucketName,
2792
        delimiter: Delimiter = None,
2793
        encoding_type: EncodingType = None,
2794
        key_marker: KeyMarker = None,
2795
        max_uploads: MaxUploads = None,
2796
        prefix: Prefix = None,
2797
        upload_id_marker: UploadIdMarker = None,
2798
        expected_bucket_owner: AccountId = None,
2799
        request_payer: RequestPayer = None,
2800
        **kwargs,
2801
    ) -> ListMultipartUploadsOutput:
2802
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2803

2804
        common_prefixes = set()
1✔
2805
        count = 0
1✔
2806
        is_truncated = False
1✔
2807
        max_uploads = max_uploads or 1000
1✔
2808
        prefix = prefix or ""
1✔
2809
        delimiter = delimiter or ""
1✔
2810
        if encoding_type:
1✔
2811
            prefix = urlparse.quote(prefix)
1✔
2812
            delimiter = urlparse.quote(delimiter)
1✔
2813
        upload_id_marker_found = False
1✔
2814

2815
        if key_marker and upload_id_marker:
1✔
2816
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2817
            if multipart:
1✔
2818
                key = (
1✔
2819
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2820
                )
2821
            else:
2822
                # set key to None so it fails if the multipart is not Found
2823
                key = None
×
2824

2825
            if key_marker != key:
1✔
2826
                raise InvalidArgument(
1✔
2827
                    "Invalid uploadId marker",
2828
                    ArgumentName="upload-id-marker",
2829
                    ArgumentValue=upload_id_marker,
2830
                )
2831

2832
        uploads = []
1✔
2833
        # sort by key and initiated
2834
        all_multiparts = sorted(
1✔
2835
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
2836
        )
2837
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
2838

2839
        for multipart in all_multiparts:
1✔
2840
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
2841
            # skip all keys that are different than key_marker
2842
            if key_marker:
1✔
2843
                if key < key_marker:
1✔
2844
                    continue
1✔
2845
                elif key == key_marker:
1✔
2846
                    if not upload_id_marker:
1✔
2847
                        continue
1✔
2848
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2849
                    if multipart.id == upload_id_marker:
1✔
2850
                        upload_id_marker_found = True
1✔
2851
                        continue
1✔
2852
                    elif not upload_id_marker_found:
1✔
2853
                        # as long as we have not passed the version_key_marker, skip the versions
2854
                        continue
1✔
2855

2856
            # Filter for keys that start with prefix
2857
            if prefix and not key.startswith(prefix):
1✔
2858
                continue
1✔
2859

2860
            # see ListObjectsV2 for the logic comments (shared logic here)
2861
            prefix_including_delimiter = None
1✔
2862
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2863
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2864
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2865

2866
                if prefix_including_delimiter in common_prefixes or (
1✔
2867
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2868
                ):
2869
                    continue
1✔
2870

2871
            if prefix_including_delimiter:
1✔
2872
                common_prefixes.add(prefix_including_delimiter)
1✔
2873
            else:
2874
                multipart_upload = MultipartUpload(
1✔
2875
                    UploadId=multipart.id,
2876
                    Key=multipart.object.key,
2877
                    Initiated=multipart.initiated,
2878
                    StorageClass=multipart.object.storage_class,
2879
                    Owner=multipart.initiator,  # TODO: check the difference
2880
                    Initiator=multipart.initiator,
2881
                )
2882
                if multipart.checksum_algorithm:
1✔
2883
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
2884
                    multipart_upload["ChecksumType"] = getattr(multipart, "checksum_type", None)
1✔
2885

2886
                uploads.append(multipart_upload)
1✔
2887

2888
            count += 1
1✔
2889
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
2890
                is_truncated = True
1✔
2891
                break
1✔
2892

2893
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2894

2895
        response = ListMultipartUploadsOutput(
1✔
2896
            Bucket=bucket,
2897
            IsTruncated=is_truncated,
2898
            MaxUploads=max_uploads or 1000,
2899
            KeyMarker=key_marker or "",
2900
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
2901
            NextKeyMarker="",
2902
            NextUploadIdMarker="",
2903
        )
2904
        if uploads:
1✔
2905
            response["Uploads"] = uploads
1✔
2906
            last_upload = uploads[-1]
1✔
2907
            response["NextKeyMarker"] = last_upload["Key"]
1✔
2908
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
2909
        if delimiter:
1✔
2910
            response["Delimiter"] = delimiter
1✔
2911
        if prefix:
1✔
2912
            response["Prefix"] = prefix
1✔
2913
        if encoding_type:
1✔
2914
            response["EncodingType"] = EncodingType.url
1✔
2915
        if common_prefixes:
1✔
2916
            response["CommonPrefixes"] = common_prefixes
1✔
2917

2918
        return response
1✔
2919

2920
    def put_bucket_versioning(
1✔
2921
        self,
2922
        context: RequestContext,
2923
        bucket: BucketName,
2924
        versioning_configuration: VersioningConfiguration,
2925
        content_md5: ContentMD5 = None,
2926
        checksum_algorithm: ChecksumAlgorithm = None,
2927
        mfa: MFA = None,
2928
        expected_bucket_owner: AccountId = None,
2929
        **kwargs,
2930
    ) -> None:
2931
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2932
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
2933
            raise CommonServiceException(
1✔
2934
                code="IllegalVersioningConfigurationException",
2935
                message="The Versioning element must be specified",
2936
            )
2937

2938
        if versioning_status not in ("Enabled", "Suspended"):
1✔
2939
            raise MalformedXML()
1✔
2940

2941
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
2942
            raise InvalidBucketState(
1✔
2943
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
2944
            )
2945

2946
        if not s3_bucket.versioning_status:
1✔
2947
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
2948

2949
        s3_bucket.versioning_status = versioning_status
1✔
2950

2951
    def get_bucket_versioning(
1✔
2952
        self,
2953
        context: RequestContext,
2954
        bucket: BucketName,
2955
        expected_bucket_owner: AccountId = None,
2956
        **kwargs,
2957
    ) -> GetBucketVersioningOutput:
2958
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2959

2960
        if not s3_bucket.versioning_status:
1✔
2961
            return GetBucketVersioningOutput()
1✔
2962

2963
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
2964

2965
    def get_bucket_encryption(
1✔
2966
        self,
2967
        context: RequestContext,
2968
        bucket: BucketName,
2969
        expected_bucket_owner: AccountId = None,
2970
        **kwargs,
2971
    ) -> GetBucketEncryptionOutput:
2972
        # AWS now encrypts bucket by default with AES256, see:
2973
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
2974
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2975

2976
        if not s3_bucket.encryption_rule:
1✔
2977
            return GetBucketEncryptionOutput()
×
2978

2979
        return GetBucketEncryptionOutput(
1✔
2980
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
2981
        )
2982

2983
    def put_bucket_encryption(
1✔
2984
        self,
2985
        context: RequestContext,
2986
        bucket: BucketName,
2987
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
2988
        content_md5: ContentMD5 = None,
2989
        checksum_algorithm: ChecksumAlgorithm = None,
2990
        expected_bucket_owner: AccountId = None,
2991
        **kwargs,
2992
    ) -> None:
2993
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2994

2995
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
2996
            raise MalformedXML()
1✔
2997

2998
        if len(rules) != 1 or not (
1✔
2999
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
3000
        ):
3001
            raise MalformedXML()
1✔
3002

3003
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
3004
            raise MalformedXML()
×
3005

3006
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
3007
            raise MalformedXML()
×
3008

3009
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
3010
            raise InvalidArgument(
1✔
3011
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
3012
                ArgumentName="ApplyServerSideEncryptionByDefault",
3013
            )
3014
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
3015
        # TODO: validate KMS key? not currently done in moto
3016
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
3017
        # It's always saved as the ARN in the bucket configuration.
3018
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
3019
        # encryption["KMSMasterKeyID"] = master_kms_key
3020

3021
        s3_bucket.encryption_rule = rules[0]
1✔
3022

3023
    def delete_bucket_encryption(
1✔
3024
        self,
3025
        context: RequestContext,
3026
        bucket: BucketName,
3027
        expected_bucket_owner: AccountId = None,
3028
        **kwargs,
3029
    ) -> None:
3030
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3031

3032
        s3_bucket.encryption_rule = None
1✔
3033

3034
    def put_bucket_notification_configuration(
1✔
3035
        self,
3036
        context: RequestContext,
3037
        bucket: BucketName,
3038
        notification_configuration: NotificationConfiguration,
3039
        expected_bucket_owner: AccountId = None,
3040
        skip_destination_validation: SkipValidation = None,
3041
        **kwargs,
3042
    ) -> None:
3043
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3044

3045
        self._verify_notification_configuration(
1✔
3046
            notification_configuration, skip_destination_validation, context, bucket
3047
        )
3048
        s3_bucket.notification_configuration = notification_configuration
1✔
3049

3050
    def get_bucket_notification_configuration(
1✔
3051
        self,
3052
        context: RequestContext,
3053
        bucket: BucketName,
3054
        expected_bucket_owner: AccountId = None,
3055
        **kwargs,
3056
    ) -> NotificationConfiguration:
3057
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3058

3059
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3060

3061
    def put_bucket_tagging(
1✔
3062
        self,
3063
        context: RequestContext,
3064
        bucket: BucketName,
3065
        tagging: Tagging,
3066
        content_md5: ContentMD5 = None,
3067
        checksum_algorithm: ChecksumAlgorithm = None,
3068
        expected_bucket_owner: AccountId = None,
3069
        **kwargs,
3070
    ) -> None:
3071
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3072

3073
        if "TagSet" not in tagging:
1✔
3074
            raise MalformedXML()
×
3075

3076
        validate_tag_set(tagging["TagSet"], type_set="bucket")
1✔
3077

3078
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3079
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3080
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tagging["TagSet"])
1✔
3081

3082
    def get_bucket_tagging(
1✔
3083
        self,
3084
        context: RequestContext,
3085
        bucket: BucketName,
3086
        expected_bucket_owner: AccountId = None,
3087
        **kwargs,
3088
    ) -> GetBucketTaggingOutput:
3089
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3090
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
1✔
3091
        if not tag_set:
1✔
3092
            raise NoSuchTagSet(
1✔
3093
                "The TagSet does not exist",
3094
                BucketName=bucket,
3095
            )
3096

3097
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3098

3099
    def delete_bucket_tagging(
1✔
3100
        self,
3101
        context: RequestContext,
3102
        bucket: BucketName,
3103
        expected_bucket_owner: AccountId = None,
3104
        **kwargs,
3105
    ) -> None:
3106
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3107

3108
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3109

3110
    def put_object_tagging(
1✔
3111
        self,
3112
        context: RequestContext,
3113
        bucket: BucketName,
3114
        key: ObjectKey,
3115
        tagging: Tagging,
3116
        version_id: ObjectVersionId = None,
3117
        content_md5: ContentMD5 = None,
3118
        checksum_algorithm: ChecksumAlgorithm = None,
3119
        expected_bucket_owner: AccountId = None,
3120
        request_payer: RequestPayer = None,
3121
        **kwargs,
3122
    ) -> PutObjectTaggingOutput:
3123
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3124

3125
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3126

3127
        if "TagSet" not in tagging:
1✔
3128
            raise MalformedXML()
×
3129

3130
        validate_tag_set(tagging["TagSet"], type_set="object")
1✔
3131

3132
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3133
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3134
        store.TAGS.tags.pop(key_id, None)
1✔
3135
        store.TAGS.tag_resource(key_id, tags=tagging["TagSet"])
1✔
3136
        response = PutObjectTaggingOutput()
1✔
3137
        if s3_object.version_id:
1✔
3138
            response["VersionId"] = s3_object.version_id
1✔
3139

3140
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3141

3142
        return response
1✔
3143

3144
    def get_object_tagging(
1✔
3145
        self,
3146
        context: RequestContext,
3147
        bucket: BucketName,
3148
        key: ObjectKey,
3149
        version_id: ObjectVersionId = None,
3150
        expected_bucket_owner: AccountId = None,
3151
        request_payer: RequestPayer = None,
3152
        **kwargs,
3153
    ) -> GetObjectTaggingOutput:
3154
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3155

3156
        try:
1✔
3157
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3158
        except NoSuchKey as e:
1✔
3159
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3160
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3161
            # `S3Bucket.get_object` signature for one operation.
3162
            if s3_bucket.versioning_status and (
1✔
3163
                s3_object_version := s3_bucket.objects.get(key, version_id)
3164
            ):
3165
                raise MethodNotAllowed(
1✔
3166
                    "The specified method is not allowed against this resource.",
3167
                    Method="GET",
3168
                    ResourceType="DeleteMarker",
3169
                    DeleteMarker=True,
3170
                    Allow="DELETE",
3171
                    VersionId=s3_object_version.version_id,
3172
                )
3173

3174
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3175
            # follow AWS on this one
3176
            e.Key = f"{bucket}/{key}"
1✔
3177
            raise e
1✔
3178

3179
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3180
            get_unique_key_id(bucket, key, s3_object.version_id)
3181
        )["Tags"]
3182
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3183
        if s3_object.version_id:
1✔
3184
            response["VersionId"] = s3_object.version_id
1✔
3185

3186
        return response
1✔
3187

3188
    def delete_object_tagging(
1✔
3189
        self,
3190
        context: RequestContext,
3191
        bucket: BucketName,
3192
        key: ObjectKey,
3193
        version_id: ObjectVersionId = None,
3194
        expected_bucket_owner: AccountId = None,
3195
        **kwargs,
3196
    ) -> DeleteObjectTaggingOutput:
3197
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3198

3199
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3200

3201
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
3202
        response = DeleteObjectTaggingOutput()
1✔
3203
        if s3_object.version_id:
1✔
3204
            response["VersionId"] = s3_object.version_id
×
3205

3206
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3207

3208
        return response
1✔
3209

3210
    def put_bucket_cors(
1✔
3211
        self,
3212
        context: RequestContext,
3213
        bucket: BucketName,
3214
        cors_configuration: CORSConfiguration,
3215
        content_md5: ContentMD5 = None,
3216
        checksum_algorithm: ChecksumAlgorithm = None,
3217
        expected_bucket_owner: AccountId = None,
3218
        **kwargs,
3219
    ) -> None:
3220
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3221
        validate_cors_configuration(cors_configuration)
1✔
3222
        s3_bucket.cors_rules = cors_configuration
1✔
3223
        self._cors_handler.invalidate_cache()
1✔
3224

3225
    def get_bucket_cors(
1✔
3226
        self,
3227
        context: RequestContext,
3228
        bucket: BucketName,
3229
        expected_bucket_owner: AccountId = None,
3230
        **kwargs,
3231
    ) -> GetBucketCorsOutput:
3232
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3233

3234
        if not s3_bucket.cors_rules:
1✔
3235
            raise NoSuchCORSConfiguration(
1✔
3236
                "The CORS configuration does not exist",
3237
                BucketName=bucket,
3238
            )
3239
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3240

3241
    def delete_bucket_cors(
1✔
3242
        self,
3243
        context: RequestContext,
3244
        bucket: BucketName,
3245
        expected_bucket_owner: AccountId = None,
3246
        **kwargs,
3247
    ) -> None:
3248
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3249

3250
        if s3_bucket.cors_rules:
1✔
3251
            self._cors_handler.invalidate_cache()
1✔
3252
            s3_bucket.cors_rules = None
1✔
3253

3254
    def get_bucket_lifecycle_configuration(
1✔
3255
        self,
3256
        context: RequestContext,
3257
        bucket: BucketName,
3258
        expected_bucket_owner: AccountId = None,
3259
        **kwargs,
3260
    ) -> GetBucketLifecycleConfigurationOutput:
3261
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3262

3263
        if not s3_bucket.lifecycle_rules:
1✔
3264
            raise NoSuchLifecycleConfiguration(
1✔
3265
                "The lifecycle configuration does not exist",
3266
                BucketName=bucket,
3267
            )
3268

3269
        return GetBucketLifecycleConfigurationOutput(
1✔
3270
            Rules=s3_bucket.lifecycle_rules,
3271
            # TODO: remove for next major version, safe access to new attribute
3272
            TransitionDefaultMinimumObjectSize=getattr(
3273
                s3_bucket,
3274
                "transition_default_minimum_object_size",
3275
                TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3276
            ),
3277
        )
3278

3279
    def put_bucket_lifecycle_configuration(
1✔
3280
        self,
3281
        context: RequestContext,
3282
        bucket: BucketName,
3283
        checksum_algorithm: ChecksumAlgorithm = None,
3284
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3285
        expected_bucket_owner: AccountId = None,
3286
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3287
        **kwargs,
3288
    ) -> PutBucketLifecycleConfigurationOutput:
3289
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3290

3291
        transition_min_obj_size = (
1✔
3292
            transition_default_minimum_object_size
3293
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3294
        )
3295

3296
        if transition_min_obj_size not in (
1✔
3297
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3298
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3299
        ):
3300
            raise InvalidRequest(
1✔
3301
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3302
            )
3303

3304
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3305
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3306
        #  everytime we get/head an object
3307
        # for now, we keep a cache and get it everytime we fetch an object
3308
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3309
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3310
        self._expiration_cache[bucket].clear()
1✔
3311
        return PutBucketLifecycleConfigurationOutput(
1✔
3312
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3313
        )
3314

3315
    def delete_bucket_lifecycle(
1✔
3316
        self,
3317
        context: RequestContext,
3318
        bucket: BucketName,
3319
        expected_bucket_owner: AccountId = None,
3320
        **kwargs,
3321
    ) -> None:
3322
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3323

3324
        s3_bucket.lifecycle_rules = None
1✔
3325
        self._expiration_cache[bucket].clear()
1✔
3326

3327
    def put_bucket_analytics_configuration(
1✔
3328
        self,
3329
        context: RequestContext,
3330
        bucket: BucketName,
3331
        id: AnalyticsId,
3332
        analytics_configuration: AnalyticsConfiguration,
3333
        expected_bucket_owner: AccountId = None,
3334
        **kwargs,
3335
    ) -> None:
3336
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3337

3338
        validate_bucket_analytics_configuration(
1✔
3339
            id=id, analytics_configuration=analytics_configuration
3340
        )
3341

3342
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3343

3344
    def get_bucket_analytics_configuration(
1✔
3345
        self,
3346
        context: RequestContext,
3347
        bucket: BucketName,
3348
        id: AnalyticsId,
3349
        expected_bucket_owner: AccountId = None,
3350
        **kwargs,
3351
    ) -> GetBucketAnalyticsConfigurationOutput:
3352
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3353

3354
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3355
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3356

3357
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3358

3359
    def list_bucket_analytics_configurations(
1✔
3360
        self,
3361
        context: RequestContext,
3362
        bucket: BucketName,
3363
        continuation_token: Token = None,
3364
        expected_bucket_owner: AccountId = None,
3365
        **kwargs,
3366
    ) -> ListBucketAnalyticsConfigurationsOutput:
3367
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3368

3369
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3370
            IsTruncated=False,
3371
            AnalyticsConfigurationList=sorted(
3372
                s3_bucket.analytics_configurations.values(),
3373
                key=itemgetter("Id"),
3374
            ),
3375
        )
3376

3377
    def delete_bucket_analytics_configuration(
1✔
3378
        self,
3379
        context: RequestContext,
3380
        bucket: BucketName,
3381
        id: AnalyticsId,
3382
        expected_bucket_owner: AccountId = None,
3383
        **kwargs,
3384
    ) -> None:
3385
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3386

3387
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3388
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3389

3390
    def put_bucket_intelligent_tiering_configuration(
1✔
3391
        self,
3392
        context: RequestContext,
3393
        bucket: BucketName,
3394
        id: IntelligentTieringId,
3395
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3396
        **kwargs,
3397
    ) -> None:
3398
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3399

3400
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3401

3402
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3403

3404
    def get_bucket_intelligent_tiering_configuration(
1✔
3405
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3406
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3407
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3408

3409
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
3410
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3411

3412
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3413
            IntelligentTieringConfiguration=itier_config
3414
        )
3415

3416
    def delete_bucket_intelligent_tiering_configuration(
1✔
3417
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3418
    ) -> None:
3419
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3420

3421
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3422
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3423

3424
    def list_bucket_intelligent_tiering_configurations(
1✔
3425
        self,
3426
        context: RequestContext,
3427
        bucket: BucketName,
3428
        continuation_token: Token = None,
3429
        **kwargs,
3430
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3431
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3432

3433
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3434
            IsTruncated=False,
3435
            IntelligentTieringConfigurationList=sorted(
3436
                s3_bucket.intelligent_tiering_configurations.values(),
3437
                key=itemgetter("Id"),
3438
            ),
3439
        )
3440

3441
    def put_bucket_inventory_configuration(
1✔
3442
        self,
3443
        context: RequestContext,
3444
        bucket: BucketName,
3445
        id: InventoryId,
3446
        inventory_configuration: InventoryConfiguration,
3447
        expected_bucket_owner: AccountId = None,
3448
        **kwargs,
3449
    ) -> None:
3450
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3451

3452
        validate_inventory_configuration(
1✔
3453
            config_id=id, inventory_configuration=inventory_configuration
3454
        )
3455
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3456

3457
    def get_bucket_inventory_configuration(
1✔
3458
        self,
3459
        context: RequestContext,
3460
        bucket: BucketName,
3461
        id: InventoryId,
3462
        expected_bucket_owner: AccountId = None,
3463
        **kwargs,
3464
    ) -> GetBucketInventoryConfigurationOutput:
3465
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3466

3467
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3468
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3469
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3470

3471
    def list_bucket_inventory_configurations(
1✔
3472
        self,
3473
        context: RequestContext,
3474
        bucket: BucketName,
3475
        continuation_token: Token = None,
3476
        expected_bucket_owner: AccountId = None,
3477
        **kwargs,
3478
    ) -> ListBucketInventoryConfigurationsOutput:
3479
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3480

3481
        return ListBucketInventoryConfigurationsOutput(
1✔
3482
            IsTruncated=False,
3483
            InventoryConfigurationList=sorted(
3484
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3485
            ),
3486
        )
3487

3488
    def delete_bucket_inventory_configuration(
1✔
3489
        self,
3490
        context: RequestContext,
3491
        bucket: BucketName,
3492
        id: InventoryId,
3493
        expected_bucket_owner: AccountId = None,
3494
        **kwargs,
3495
    ) -> None:
3496
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3497

3498
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
3499
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3500

3501
    def get_bucket_website(
1✔
3502
        self,
3503
        context: RequestContext,
3504
        bucket: BucketName,
3505
        expected_bucket_owner: AccountId = None,
3506
        **kwargs,
3507
    ) -> GetBucketWebsiteOutput:
3508
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3509

3510
        if not s3_bucket.website_configuration:
1✔
3511
            raise NoSuchWebsiteConfiguration(
1✔
3512
                "The specified bucket does not have a website configuration",
3513
                BucketName=bucket,
3514
            )
3515
        return s3_bucket.website_configuration
1✔
3516

3517
    def put_bucket_website(
1✔
3518
        self,
3519
        context: RequestContext,
3520
        bucket: BucketName,
3521
        website_configuration: WebsiteConfiguration,
3522
        content_md5: ContentMD5 = None,
3523
        checksum_algorithm: ChecksumAlgorithm = None,
3524
        expected_bucket_owner: AccountId = None,
3525
        **kwargs,
3526
    ) -> None:
3527
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3528

3529
        validate_website_configuration(website_configuration)
1✔
3530
        s3_bucket.website_configuration = website_configuration
1✔
3531

3532
    def delete_bucket_website(
1✔
3533
        self,
3534
        context: RequestContext,
3535
        bucket: BucketName,
3536
        expected_bucket_owner: AccountId = None,
3537
        **kwargs,
3538
    ) -> None:
3539
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3540
        # does not raise error if the bucket did not have a config, will simply return
3541
        s3_bucket.website_configuration = None
1✔
3542

3543
    def get_object_lock_configuration(
1✔
3544
        self,
3545
        context: RequestContext,
3546
        bucket: BucketName,
3547
        expected_bucket_owner: AccountId = None,
3548
        **kwargs,
3549
    ) -> GetObjectLockConfigurationOutput:
3550
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3551
        if not s3_bucket.object_lock_enabled:
1✔
3552
            raise ObjectLockConfigurationNotFoundError(
1✔
3553
                "Object Lock configuration does not exist for this bucket",
3554
                BucketName=bucket,
3555
            )
3556

3557
        response = GetObjectLockConfigurationOutput(
1✔
3558
            ObjectLockConfiguration=ObjectLockConfiguration(
3559
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3560
            )
3561
        )
3562
        if s3_bucket.object_lock_default_retention:
1✔
3563
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3564
                "DefaultRetention": s3_bucket.object_lock_default_retention
3565
            }
3566

3567
        return response
1✔
3568

3569
    def put_object_lock_configuration(
1✔
3570
        self,
3571
        context: RequestContext,
3572
        bucket: BucketName,
3573
        object_lock_configuration: ObjectLockConfiguration = None,
3574
        request_payer: RequestPayer = None,
3575
        token: ObjectLockToken = None,
3576
        content_md5: ContentMD5 = None,
3577
        checksum_algorithm: ChecksumAlgorithm = None,
3578
        expected_bucket_owner: AccountId = None,
3579
        **kwargs,
3580
    ) -> PutObjectLockConfigurationOutput:
3581
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3582
        if s3_bucket.versioning_status != "Enabled":
1✔
3583
            raise InvalidBucketState(
1✔
3584
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3585
            )
3586

3587
        if (
1✔
3588
            not object_lock_configuration
3589
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3590
        ):
3591
            raise MalformedXML()
1✔
3592

3593
        if "Rule" not in object_lock_configuration:
1✔
3594
            s3_bucket.object_lock_default_retention = None
1✔
3595
            if not s3_bucket.object_lock_enabled:
1✔
3596
                s3_bucket.object_lock_enabled = True
1✔
3597

3598
            return PutObjectLockConfigurationOutput()
1✔
3599
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3600
            default_retention := rule.get("DefaultRetention")
3601
        ):
3602
            raise MalformedXML()
1✔
3603

3604
        if "Mode" not in default_retention or (
1✔
3605
            ("Days" in default_retention and "Years" in default_retention)
3606
            or ("Days" not in default_retention and "Years" not in default_retention)
3607
        ):
3608
            raise MalformedXML()
1✔
3609

3610
        s3_bucket.object_lock_default_retention = default_retention
1✔
3611
        if not s3_bucket.object_lock_enabled:
1✔
3612
            s3_bucket.object_lock_enabled = True
×
3613

3614
        return PutObjectLockConfigurationOutput()
1✔
3615

3616
    def get_object_legal_hold(
1✔
3617
        self,
3618
        context: RequestContext,
3619
        bucket: BucketName,
3620
        key: ObjectKey,
3621
        version_id: ObjectVersionId = None,
3622
        request_payer: RequestPayer = None,
3623
        expected_bucket_owner: AccountId = None,
3624
        **kwargs,
3625
    ) -> GetObjectLegalHoldOutput:
3626
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3627
        if not s3_bucket.object_lock_enabled:
1✔
3628
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3629

3630
        s3_object = s3_bucket.get_object(
1✔
3631
            key=key,
3632
            version_id=version_id,
3633
            http_method="GET",
3634
        )
3635
        if not s3_object.lock_legal_status:
1✔
3636
            raise NoSuchObjectLockConfiguration(
1✔
3637
                "The specified object does not have a ObjectLock configuration"
3638
            )
3639

3640
        return GetObjectLegalHoldOutput(
1✔
3641
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3642
        )
3643

3644
    def put_object_legal_hold(
1✔
3645
        self,
3646
        context: RequestContext,
3647
        bucket: BucketName,
3648
        key: ObjectKey,
3649
        legal_hold: ObjectLockLegalHold = None,
3650
        request_payer: RequestPayer = None,
3651
        version_id: ObjectVersionId = None,
3652
        content_md5: ContentMD5 = None,
3653
        checksum_algorithm: ChecksumAlgorithm = None,
3654
        expected_bucket_owner: AccountId = None,
3655
        **kwargs,
3656
    ) -> PutObjectLegalHoldOutput:
3657
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3658

3659
        if not legal_hold:
1✔
3660
            raise MalformedXML()
1✔
3661

3662
        if not s3_bucket.object_lock_enabled:
1✔
3663
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3664

3665
        s3_object = s3_bucket.get_object(
1✔
3666
            key=key,
3667
            version_id=version_id,
3668
            http_method="PUT",
3669
        )
3670
        # TODO: check casing
3671
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
3672
            raise MalformedXML()
×
3673

3674
        s3_object.lock_legal_status = status
1✔
3675

3676
        # TODO: return RequestCharged
3677
        return PutObjectRetentionOutput()
1✔
3678

3679
    def get_object_retention(
1✔
3680
        self,
3681
        context: RequestContext,
3682
        bucket: BucketName,
3683
        key: ObjectKey,
3684
        version_id: ObjectVersionId = None,
3685
        request_payer: RequestPayer = None,
3686
        expected_bucket_owner: AccountId = None,
3687
        **kwargs,
3688
    ) -> GetObjectRetentionOutput:
3689
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3690
        if not s3_bucket.object_lock_enabled:
1✔
3691
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3692

3693
        s3_object = s3_bucket.get_object(
1✔
3694
            key=key,
3695
            version_id=version_id,
3696
            http_method="GET",
3697
        )
3698
        if not s3_object.lock_mode:
1✔
3699
            raise NoSuchObjectLockConfiguration(
1✔
3700
                "The specified object does not have a ObjectLock configuration"
3701
            )
3702

3703
        return GetObjectRetentionOutput(
1✔
3704
            Retention=ObjectLockRetention(
3705
                Mode=s3_object.lock_mode,
3706
                RetainUntilDate=s3_object.lock_until,
3707
            )
3708
        )
3709

3710
    def put_object_retention(
1✔
3711
        self,
3712
        context: RequestContext,
3713
        bucket: BucketName,
3714
        key: ObjectKey,
3715
        retention: ObjectLockRetention = None,
3716
        request_payer: RequestPayer = None,
3717
        version_id: ObjectVersionId = None,
3718
        bypass_governance_retention: BypassGovernanceRetention = None,
3719
        content_md5: ContentMD5 = None,
3720
        checksum_algorithm: ChecksumAlgorithm = None,
3721
        expected_bucket_owner: AccountId = None,
3722
        **kwargs,
3723
    ) -> PutObjectRetentionOutput:
3724
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3725
        if not s3_bucket.object_lock_enabled:
1✔
3726
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3727

3728
        s3_object = s3_bucket.get_object(
1✔
3729
            key=key,
3730
            version_id=version_id,
3731
            http_method="PUT",
3732
        )
3733

3734
        if retention and not validate_dict_fields(
1✔
3735
            retention, required_fields={"Mode", "RetainUntilDate"}
3736
        ):
3737
            raise MalformedXML()
1✔
3738

3739
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3740
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3741
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3742
            pst_datetime = retention["RetainUntilDate"].astimezone(tz=ZoneInfo("US/Pacific"))
1✔
3743
            raise InvalidArgument(
1✔
3744
                "The retain until date must be in the future!",
3745
                ArgumentName="RetainUntilDate",
3746
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3747
            )
3748

3749
        if (
1✔
3750
            not retention
3751
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3752
        ) and not (
3753
            bypass_governance_retention and s3_object.lock_mode == ObjectLockMode.GOVERNANCE
3754
        ):
3755
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3756

3757
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3758
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3759

3760
        # TODO: return RequestCharged
3761
        return PutObjectRetentionOutput()
1✔
3762

3763
    def put_bucket_request_payment(
1✔
3764
        self,
3765
        context: RequestContext,
3766
        bucket: BucketName,
3767
        request_payment_configuration: RequestPaymentConfiguration,
3768
        content_md5: ContentMD5 = None,
3769
        checksum_algorithm: ChecksumAlgorithm = None,
3770
        expected_bucket_owner: AccountId = None,
3771
        **kwargs,
3772
    ) -> None:
3773
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3774
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3775

3776
        payer = request_payment_configuration.get("Payer")
1✔
3777
        if payer not in ["Requester", "BucketOwner"]:
1✔
3778
            raise MalformedXML()
1✔
3779

3780
        s3_bucket.payer = payer
1✔
3781

3782
    def get_bucket_request_payment(
1✔
3783
        self,
3784
        context: RequestContext,
3785
        bucket: BucketName,
3786
        expected_bucket_owner: AccountId = None,
3787
        **kwargs,
3788
    ) -> GetBucketRequestPaymentOutput:
3789
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3790
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3791

3792
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
3793

3794
    def get_bucket_ownership_controls(
1✔
3795
        self,
3796
        context: RequestContext,
3797
        bucket: BucketName,
3798
        expected_bucket_owner: AccountId = None,
3799
        **kwargs,
3800
    ) -> GetBucketOwnershipControlsOutput:
3801
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3802

3803
        if not s3_bucket.object_ownership:
1✔
3804
            raise OwnershipControlsNotFoundError(
1✔
3805
                "The bucket ownership controls were not found",
3806
                BucketName=bucket,
3807
            )
3808

3809
        return GetBucketOwnershipControlsOutput(
1✔
3810
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
3811
        )
3812

3813
    def put_bucket_ownership_controls(
1✔
3814
        self,
3815
        context: RequestContext,
3816
        bucket: BucketName,
3817
        ownership_controls: OwnershipControls,
3818
        content_md5: ContentMD5 | None = None,
3819
        expected_bucket_owner: AccountId | None = None,
3820
        checksum_algorithm: ChecksumAlgorithm | None = None,
3821
        **kwargs,
3822
    ) -> None:
3823
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3824
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
3825
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3826

3827
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
3828
            raise MalformedXML()
1✔
3829

3830
        rule = rules[0]
1✔
3831
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
3832
            raise MalformedXML()
1✔
3833

3834
        s3_bucket.object_ownership = object_ownership
1✔
3835

3836
    def delete_bucket_ownership_controls(
1✔
3837
        self,
3838
        context: RequestContext,
3839
        bucket: BucketName,
3840
        expected_bucket_owner: AccountId = None,
3841
        **kwargs,
3842
    ) -> None:
3843
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3844

3845
        s3_bucket.object_ownership = None
1✔
3846

3847
    def get_public_access_block(
1✔
3848
        self,
3849
        context: RequestContext,
3850
        bucket: BucketName,
3851
        expected_bucket_owner: AccountId = None,
3852
        **kwargs,
3853
    ) -> GetPublicAccessBlockOutput:
3854
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3855

3856
        if not s3_bucket.public_access_block:
1✔
3857
            raise NoSuchPublicAccessBlockConfiguration(
1✔
3858
                "The public access block configuration was not found", BucketName=bucket
3859
            )
3860

3861
        return GetPublicAccessBlockOutput(
1✔
3862
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
3863
        )
3864

3865
    def put_public_access_block(
1✔
3866
        self,
3867
        context: RequestContext,
3868
        bucket: BucketName,
3869
        public_access_block_configuration: PublicAccessBlockConfiguration,
3870
        content_md5: ContentMD5 = None,
3871
        checksum_algorithm: ChecksumAlgorithm = None,
3872
        expected_bucket_owner: AccountId = None,
3873
        **kwargs,
3874
    ) -> None:
3875
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3876
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
3877
        #  bucket configuration. See s3control
3878
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3879

3880
        public_access_block_fields = {
1✔
3881
            "BlockPublicAcls",
3882
            "BlockPublicPolicy",
3883
            "IgnorePublicAcls",
3884
            "RestrictPublicBuckets",
3885
        }
3886
        if not validate_dict_fields(
1✔
3887
            public_access_block_configuration,
3888
            required_fields=set(),
3889
            optional_fields=public_access_block_fields,
3890
        ):
UNCOV
3891
            raise MalformedXML()
×
3892

3893
        for field in public_access_block_fields:
1✔
3894
            if public_access_block_configuration.get(field) is None:
1✔
3895
                public_access_block_configuration[field] = False
1✔
3896

3897
        s3_bucket.public_access_block = public_access_block_configuration
1✔
3898

3899
    def delete_public_access_block(
1✔
3900
        self,
3901
        context: RequestContext,
3902
        bucket: BucketName,
3903
        expected_bucket_owner: AccountId = None,
3904
        **kwargs,
3905
    ) -> None:
3906
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3907

3908
        s3_bucket.public_access_block = None
1✔
3909

3910
    def get_bucket_policy(
1✔
3911
        self,
3912
        context: RequestContext,
3913
        bucket: BucketName,
3914
        expected_bucket_owner: AccountId = None,
3915
        **kwargs,
3916
    ) -> GetBucketPolicyOutput:
3917
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3918
            context, bucket, expected_bucket_owner=expected_bucket_owner
3919
        )
3920
        if not s3_bucket.policy:
1✔
3921
            raise NoSuchBucketPolicy(
1✔
3922
                "The bucket policy does not exist",
3923
                BucketName=bucket,
3924
            )
3925
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
3926

3927
    def put_bucket_policy(
1✔
3928
        self,
3929
        context: RequestContext,
3930
        bucket: BucketName,
3931
        policy: Policy,
3932
        content_md5: ContentMD5 = None,
3933
        checksum_algorithm: ChecksumAlgorithm = None,
3934
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
3935
        expected_bucket_owner: AccountId = None,
3936
        **kwargs,
3937
    ) -> None:
3938
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3939
            context, bucket, expected_bucket_owner=expected_bucket_owner
3940
        )
3941

3942
        if not policy or policy[0] != "{":
1✔
3943
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
3944
        try:
1✔
3945
            json_policy = json.loads(policy)
1✔
3946
            if not json_policy:
1✔
3947
                # TODO: add more validation around the policy?
3948
                raise MalformedPolicy("Missing required field Statement")
1✔
3949
        except ValueError:
1✔
UNCOV
3950
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
3951

3952
        s3_bucket.policy = policy
1✔
3953

3954
    def delete_bucket_policy(
1✔
3955
        self,
3956
        context: RequestContext,
3957
        bucket: BucketName,
3958
        expected_bucket_owner: AccountId = None,
3959
        **kwargs,
3960
    ) -> None:
3961
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3962
            context, bucket, expected_bucket_owner=expected_bucket_owner
3963
        )
3964

3965
        s3_bucket.policy = None
1✔
3966

3967
    def get_bucket_accelerate_configuration(
1✔
3968
        self,
3969
        context: RequestContext,
3970
        bucket: BucketName,
3971
        expected_bucket_owner: AccountId = None,
3972
        request_payer: RequestPayer = None,
3973
        **kwargs,
3974
    ) -> GetBucketAccelerateConfigurationOutput:
3975
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3976

3977
        response = GetBucketAccelerateConfigurationOutput()
1✔
3978
        if s3_bucket.accelerate_status:
1✔
3979
            response["Status"] = s3_bucket.accelerate_status
1✔
3980

3981
        return response
1✔
3982

3983
    def put_bucket_accelerate_configuration(
1✔
3984
        self,
3985
        context: RequestContext,
3986
        bucket: BucketName,
3987
        accelerate_configuration: AccelerateConfiguration,
3988
        expected_bucket_owner: AccountId = None,
3989
        checksum_algorithm: ChecksumAlgorithm = None,
3990
        **kwargs,
3991
    ) -> None:
3992
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3993

3994
        if "." in bucket:
1✔
3995
            raise InvalidRequest(
1✔
3996
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
3997
            )
3998

3999
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
4000
            "Enabled",
4001
            "Suspended",
4002
        ):
4003
            raise MalformedXML()
1✔
4004

4005
        s3_bucket.accelerate_status = status
1✔
4006

4007
    def put_bucket_logging(
1✔
4008
        self,
4009
        context: RequestContext,
4010
        bucket: BucketName,
4011
        bucket_logging_status: BucketLoggingStatus,
4012
        content_md5: ContentMD5 = None,
4013
        checksum_algorithm: ChecksumAlgorithm = None,
4014
        expected_bucket_owner: AccountId = None,
4015
        **kwargs,
4016
    ) -> None:
4017
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4018

4019
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
4020
            s3_bucket.logging = {}
1✔
4021
            return
1✔
4022

4023
        # the target bucket must be in the same account
4024
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
UNCOV
4025
            raise MalformedXML()
×
4026

4027
        if not logging_config.get("TargetPrefix"):
1✔
UNCOV
4028
            logging_config["TargetPrefix"] = ""
×
4029

4030
        # TODO: validate Grants
4031

4032
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4033
            raise InvalidTargetBucketForLogging(
1✔
4034
                "The target bucket for logging does not exist",
4035
                TargetBucket=target_bucket_name,
4036
            )
4037

4038
        source_bucket_region = s3_bucket.bucket_region
1✔
4039
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4040
            raise (
1✔
4041
                CrossLocationLoggingProhibitted(
4042
                    "Cross S3 location logging not allowed. ",
4043
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4044
                )
4045
                if source_bucket_region == AWS_REGION_US_EAST_1
4046
                else CrossLocationLoggingProhibitted(
4047
                    "Cross S3 location logging not allowed. ",
4048
                    SourceBucketLocation=source_bucket_region,
4049
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4050
                )
4051
            )
4052

4053
        s3_bucket.logging = logging_config
1✔
4054

4055
    def get_bucket_logging(
1✔
4056
        self,
4057
        context: RequestContext,
4058
        bucket: BucketName,
4059
        expected_bucket_owner: AccountId = None,
4060
        **kwargs,
4061
    ) -> GetBucketLoggingOutput:
4062
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4063

4064
        if not s3_bucket.logging:
1✔
4065
            return GetBucketLoggingOutput()
1✔
4066

4067
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4068

4069
    def put_bucket_replication(
1✔
4070
        self,
4071
        context: RequestContext,
4072
        bucket: BucketName,
4073
        replication_configuration: ReplicationConfiguration,
4074
        content_md5: ContentMD5 = None,
4075
        checksum_algorithm: ChecksumAlgorithm = None,
4076
        token: ObjectLockToken = None,
4077
        expected_bucket_owner: AccountId = None,
4078
        **kwargs,
4079
    ) -> None:
4080
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4081
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4082
            raise InvalidRequest(
1✔
4083
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4084
            )
4085

4086
        if not (rules := replication_configuration.get("Rules")):
1✔
4087
            raise MalformedXML()
1✔
4088

4089
        for rule in rules:
1✔
4090
            if "ID" not in rule:
1✔
4091
                rule["ID"] = short_uid()
1✔
4092

4093
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4094
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4095
            if (
1✔
4096
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4097
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4098
            ):
4099
                # according to AWS testing the same exception is raised if the bucket does not exist
4100
                # or if versioning was disabled
4101
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4102

4103
        # TODO more validation on input
4104
        s3_bucket.replication = replication_configuration
1✔
4105

4106
    def get_bucket_replication(
1✔
4107
        self,
4108
        context: RequestContext,
4109
        bucket: BucketName,
4110
        expected_bucket_owner: AccountId = None,
4111
        **kwargs,
4112
    ) -> GetBucketReplicationOutput:
4113
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4114

4115
        if not s3_bucket.replication:
1✔
4116
            raise ReplicationConfigurationNotFoundError(
1✔
4117
                "The replication configuration was not found",
4118
                BucketName=bucket,
4119
            )
4120

4121
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4122

4123
    def delete_bucket_replication(
1✔
4124
        self,
4125
        context: RequestContext,
4126
        bucket: BucketName,
4127
        expected_bucket_owner: AccountId = None,
4128
        **kwargs,
4129
    ) -> None:
4130
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4131

4132
        s3_bucket.replication = None
1✔
4133

4134
    @handler("PutBucketAcl", expand=False)
1✔
4135
    def put_bucket_acl(
1✔
4136
        self,
4137
        context: RequestContext,
4138
        request: PutBucketAclRequest,
4139
    ) -> None:
4140
        bucket = request["Bucket"]
1✔
4141
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4142
        acp = get_access_control_policy_from_acl_request(
1✔
4143
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4144
        )
4145
        s3_bucket.acl = acp
1✔
4146

4147
    def get_bucket_acl(
1✔
4148
        self,
4149
        context: RequestContext,
4150
        bucket: BucketName,
4151
        expected_bucket_owner: AccountId = None,
4152
        **kwargs,
4153
    ) -> GetBucketAclOutput:
4154
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4155

4156
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4157

4158
    @handler("PutObjectAcl", expand=False)
1✔
4159
    def put_object_acl(
1✔
4160
        self,
4161
        context: RequestContext,
4162
        request: PutObjectAclRequest,
4163
    ) -> PutObjectAclOutput:
4164
        bucket = request["Bucket"]
1✔
4165
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4166

4167
        s3_object = s3_bucket.get_object(
1✔
4168
            key=request["Key"],
4169
            version_id=request.get("VersionId"),
4170
            http_method="PUT",
4171
        )
4172
        acp = get_access_control_policy_from_acl_request(
1✔
4173
            request=request, owner=s3_object.owner, request_body=context.request.data
4174
        )
4175
        previous_acl = s3_object.acl
1✔
4176
        s3_object.acl = acp
1✔
4177

4178
        if previous_acl != acp:
1✔
4179
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4180

4181
        # TODO: RequestCharged
4182
        return PutObjectAclOutput()
1✔
4183

4184
    def get_object_acl(
1✔
4185
        self,
4186
        context: RequestContext,
4187
        bucket: BucketName,
4188
        key: ObjectKey,
4189
        version_id: ObjectVersionId = None,
4190
        request_payer: RequestPayer = None,
4191
        expected_bucket_owner: AccountId = None,
4192
        **kwargs,
4193
    ) -> GetObjectAclOutput:
4194
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4195

4196
        s3_object = s3_bucket.get_object(
1✔
4197
            key=key,
4198
            version_id=version_id,
4199
        )
4200
        # TODO: RequestCharged
4201
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4202

4203
    def get_bucket_policy_status(
1✔
4204
        self,
4205
        context: RequestContext,
4206
        bucket: BucketName,
4207
        expected_bucket_owner: AccountId = None,
4208
        **kwargs,
4209
    ) -> GetBucketPolicyStatusOutput:
4210
        raise NotImplementedError
4211

4212
    def get_object_torrent(
1✔
4213
        self,
4214
        context: RequestContext,
4215
        bucket: BucketName,
4216
        key: ObjectKey,
4217
        request_payer: RequestPayer = None,
4218
        expected_bucket_owner: AccountId = None,
4219
        **kwargs,
4220
    ) -> GetObjectTorrentOutput:
4221
        raise NotImplementedError
4222

4223
    def post_object(
1✔
4224
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4225
    ) -> PostResponse:
4226
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4227
            raise PreconditionFailed(
1✔
4228
                "At least one of the pre-conditions you specified did not hold",
4229
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4230
            )
4231
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4232
        # TODO: signature validation is not implemented for pre-signed POST
4233
        # policy validation is not implemented either, except expiration and mandatory fields
4234
        # This operation is the only one using form for storing the request data. We will have to do some manual
4235
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4236
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4237

4238
        form = context.request.form
1✔
4239
        object_key = context.request.form.get("key")
1✔
4240

4241
        if "file" in form:
1✔
4242
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4243
            file_data = to_bytes(form["file"])
1✔
4244
            object_content_length = len(file_data)
1✔
4245
            stream = BytesIO(file_data)
1✔
4246
        else:
4247
            # this is the default behaviour
4248
            fileobj = context.request.files["file"]
1✔
4249
            stream = fileobj.stream
1✔
4250
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4251
            # validation
4252
            original_pos = stream.tell()
1✔
4253
            object_content_length = stream.seek(0, 2)
1✔
4254
            # reset the stream and put it back at its original position
4255
            stream.seek(original_pos, 0)
1✔
4256

4257
            if "${filename}" in object_key:
1✔
4258
                # TODO: ${filename} is actually usable in all form fields
4259
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4260
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4261
                # is recognized by all form fields.
4262
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4263

4264
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4265
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4266
        additional_policy_metadata = {
1✔
4267
            "bucket": bucket,
4268
            "content_length": object_content_length,
4269
        }
4270
        validate_post_policy(form, additional_policy_metadata)
1✔
4271

4272
        if canned_acl := form.get("acl"):
1✔
4273
            validate_canned_acl(canned_acl)
×
UNCOV
4274
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4275
        else:
4276
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4277

4278
        post_system_settable_headers = [
1✔
4279
            "Cache-Control",
4280
            "Content-Type",
4281
            "Content-Disposition",
4282
            "Content-Encoding",
4283
        ]
4284
        system_metadata = {}
1✔
4285
        for system_metadata_field in post_system_settable_headers:
1✔
4286
            if field_value := form.get(system_metadata_field):
1✔
4287
                system_metadata[system_metadata_field.replace("-", "")] = field_value
1✔
4288

4289
        if not system_metadata.get("ContentType"):
1✔
4290
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4291

4292
        user_metadata = {
1✔
4293
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4294
            for field in form
4295
            if field.startswith("x-amz-meta-")
4296
        }
4297

4298
        if tagging := form.get("tagging"):
1✔
4299
            # this is weird, as it's direct XML in the form, we need to parse it directly
4300
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4301

4302
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4303
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4304
        ):
4305
            raise InvalidStorageClass(
1✔
4306
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4307
            )
4308

4309
        encryption_request = {
1✔
4310
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4311
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4312
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4313
        }
4314

4315
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4316
            encryption_request,
4317
            s3_bucket,
4318
            store,
4319
        )
4320

4321
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4322
        checksum_value = (
1✔
4323
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4324
        )
4325
        expires = (
1✔
4326
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4327
        )
4328

4329
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4330

4331
        s3_object = S3Object(
1✔
4332
            key=object_key,
4333
            version_id=version_id,
4334
            storage_class=storage_class,
4335
            expires=expires,
4336
            user_metadata=user_metadata,
4337
            system_metadata=system_metadata,
4338
            checksum_algorithm=checksum_algorithm,
4339
            checksum_value=checksum_value,
4340
            encryption=encryption_parameters.encryption,
4341
            kms_key_id=encryption_parameters.kms_key_id,
4342
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4343
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4344
            acl=acp,
4345
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4346
        )
4347

4348
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4349
            s3_stored_object.write(stream)
1✔
4350

4351
            if not s3_object.checksum_value:
1✔
4352
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4353

4354
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
4355
                self._storage_backend.remove(bucket, s3_object)
×
UNCOV
4356
                raise InvalidRequest(
×
4357
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4358
                )
4359

4360
            s3_bucket.objects.set(object_key, s3_object)
1✔
4361

4362
        # in case we are overriding an object, delete the tags entry
4363
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4364
        store.TAGS.tags.pop(key_id, None)
1✔
4365
        if tagging:
1✔
4366
            store.TAGS.tags[key_id] = tagging
1✔
4367

4368
        response = PostResponse()
1✔
4369
        # hacky way to set the etag in the headers as well: two locations for one value
4370
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4371

4372
        if redirect := form.get("success_action_redirect"):
1✔
4373
            # we need to create the redirect, as the parser could not return the moto-calculated one
4374
            try:
1✔
4375
                redirect = create_redirect_for_post_request(
1✔
4376
                    base_redirect=redirect,
4377
                    bucket=bucket,
4378
                    object_key=object_key,
4379
                    etag=s3_object.quoted_etag,
4380
                )
4381
                response["LocationHeader"] = redirect
1✔
4382
                response["StatusCode"] = 303
1✔
4383
            except ValueError:
1✔
4384
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4385
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4386

4387
        elif status_code := form.get("success_action_status"):
1✔
4388
            response["StatusCode"] = status_code
1✔
4389
        else:
4390
            response["StatusCode"] = 204
1✔
4391

4392
        response["LocationHeader"] = response.get(
1✔
4393
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
4394
        )
4395

4396
        if s3_bucket.versioning_status == "Enabled":
1✔
UNCOV
4397
            response["VersionId"] = s3_object.version_id
×
4398

4399
        if s3_object.checksum_algorithm:
1✔
4400
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4401
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4402

4403
        if s3_bucket.lifecycle_rules:
1✔
UNCOV
4404
            if expiration_header := self._get_expiration_header(
×
4405
                s3_bucket.lifecycle_rules,
4406
                bucket,
4407
                s3_object,
4408
                store.TAGS.tags.get(key_id, {}),
4409
            ):
4410
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4411
                #  apply them everytime we get/head an object
UNCOV
4412
                response["Expiration"] = expiration_header
×
4413

4414
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4415

4416
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4417

4418
        if response["StatusCode"] == "201":
1✔
4419
            # if the StatusCode is 201, S3 returns an XML body with additional information
4420
            response["ETag"] = s3_object.quoted_etag
1✔
4421
            response["Bucket"] = bucket
1✔
4422
            response["Key"] = object_key
1✔
4423
            response["Location"] = response["LocationHeader"]
1✔
4424

4425
        return response
1✔
4426

4427

4428
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4429
    if not bucket_versioning_status:
1✔
4430
        return None
1✔
4431
    elif bucket_versioning_status.lower() == "enabled":
1✔
4432
        return generate_safe_version_id()
1✔
4433
    else:
4434
        return "null"
1✔
4435

4436

4437
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4438
    if encryption := s3_object.encryption:
1✔
4439
        response["ServerSideEncryption"] = encryption
1✔
4440
        if encryption == ServerSideEncryption.aws_kms:
1✔
4441
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4442
            if s3_object.bucket_key_enabled:
1✔
4443
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4444

4445

4446
def get_encryption_parameters_from_request_and_bucket(
1✔
4447
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4448
    s3_bucket: S3Bucket,
4449
    store: S3Store,
4450
) -> EncryptionParameters:
4451
    if request.get("SSECustomerKey"):
1✔
4452
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4453
        return EncryptionParameters(None, None, False)
1✔
4454

4455
    encryption = request.get("ServerSideEncryption")
1✔
4456
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4457
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4458
    if s3_bucket.encryption_rule:
1✔
4459
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4460
        encryption = (
1✔
4461
            encryption
4462
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4463
        )
4464
        if encryption == ServerSideEncryption.aws_kms:
1✔
4465
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4466
                "ApplyServerSideEncryptionByDefault"
4467
            ].get("KMSMasterKeyID")
4468
            kms_key_id = get_kms_key_arn(
1✔
4469
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4470
            )
4471
            if not kms_key_id:
1✔
4472
                # if not key is provided, AWS will use an AWS managed KMS key
4473
                # create it if it doesn't already exist, and save it in the store per region
4474
                if not store.aws_managed_kms_key_id:
1✔
4475
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4476
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4477
                    )
4478
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4479

4480
                kms_key_id = store.aws_managed_kms_key_id
1✔
4481

4482
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4483

4484

4485
def get_object_lock_parameters_from_bucket_and_request(
1✔
4486
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4487
    s3_bucket: S3Bucket,
4488
):
4489
    # TODO: also validate here?
4490
    lock_mode = request.get("ObjectLockMode")
1✔
4491
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4492
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4493

4494
    if default_retention := s3_bucket.object_lock_default_retention:
1✔
4495
        lock_mode = lock_mode or default_retention.get("Mode")
1✔
4496
        if lock_mode and not lock_until:
1✔
4497
            lock_until = get_retention_from_now(
1✔
4498
                days=default_retention.get("Days"),
4499
                years=default_retention.get("Years"),
4500
            )
4501

4502
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4503

4504

4505
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4506
    """
4507
    Calculate the range value from a part Number for an S3 Object
4508
    :param s3_object: S3Object
4509
    :param part_number: the wanted part from the S3Object
4510
    :return: an ObjectRange used to return only a slice of an Object
4511
    """
4512
    if not s3_object.parts:
1✔
4513
        if part_number > 1:
1✔
4514
            raise InvalidPartNumber(
1✔
4515
                "The requested partnumber is not satisfiable",
4516
                PartNumberRequested=part_number,
4517
                ActualPartCount=1,
4518
            )
4519
        return ObjectRange(
1✔
4520
            begin=0,
4521
            end=s3_object.size - 1,
4522
            content_length=s3_object.size,
4523
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4524
        )
4525
    elif not (part_data := s3_object.parts.get(part_number)):
1✔
4526
        raise InvalidPartNumber(
1✔
4527
            "The requested partnumber is not satisfiable",
4528
            PartNumberRequested=part_number,
4529
            ActualPartCount=len(s3_object.parts),
4530
        )
4531

4532
    begin, part_length = part_data
1✔
4533
    end = begin + part_length - 1
1✔
4534
    return ObjectRange(
1✔
4535
        begin=begin,
4536
        end=end,
4537
        content_length=part_length,
4538
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4539
    )
4540

4541

4542
def get_acl_headers_from_request(
1✔
4543
    request: Union[
4544
        PutObjectRequest,
4545
        CreateMultipartUploadRequest,
4546
        CopyObjectRequest,
4547
        CreateBucketRequest,
4548
        PutBucketAclRequest,
4549
        PutObjectAclRequest,
4550
    ],
4551
) -> list[tuple[str, str]]:
4552
    permission_keys = [
1✔
4553
        "GrantFullControl",
4554
        "GrantRead",
4555
        "GrantReadACP",
4556
        "GrantWrite",
4557
        "GrantWriteACP",
4558
    ]
4559
    acl_headers = [
1✔
4560
        (permission, grant_header)
4561
        for permission in permission_keys
4562
        if (grant_header := request.get(permission))
4563
    ]
4564
    return acl_headers
1✔
4565

4566

4567
def get_access_control_policy_from_acl_request(
1✔
4568
    request: Union[PutBucketAclRequest, PutObjectAclRequest],
4569
    owner: Owner,
4570
    request_body: bytes,
4571
) -> AccessControlPolicy:
4572
    canned_acl = request.get("ACL")
1✔
4573
    acl_headers = get_acl_headers_from_request(request)
1✔
4574

4575
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4576
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4577
    # use case seems dangerous
4578
    is_acp_in_body = request_body
1✔
4579

4580
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4581
        raise MissingSecurityHeader(
1✔
4582
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4583
        )
4584

4585
    elif canned_acl and acl_headers:
1✔
4586
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4587

4588
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4589
        raise UnexpectedContent("This request does not support content")
1✔
4590

4591
    if canned_acl:
1✔
4592
        validate_canned_acl(canned_acl)
1✔
4593
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4594

4595
    elif acl_headers:
1✔
4596
        grants = []
1✔
4597
        for permission, grantees_values in acl_headers:
1✔
4598
            permission = get_permission_from_header(permission)
1✔
4599
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4600
            grants.extend(partial_grants)
1✔
4601

4602
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4603
    else:
4604
        acp = request.get("AccessControlPolicy")
1✔
4605
        validate_acl_acp(acp)
1✔
4606
        if (
1✔
4607
            owner.get("DisplayName")
4608
            and acp["Grants"]
4609
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4610
        ):
4611
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4612

4613
    return acp
1✔
4614

4615

4616
def get_access_control_policy_for_new_resource_request(
1✔
4617
    request: Union[
4618
        PutObjectRequest, CreateMultipartUploadRequest, CopyObjectRequest, CreateBucketRequest
4619
    ],
4620
    owner: Owner,
4621
) -> AccessControlPolicy:
4622
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4623
    canned_acl = request.get("ACL")
1✔
4624
    acl_headers = get_acl_headers_from_request(request)
1✔
4625

4626
    if not (canned_acl or acl_headers):
1✔
4627
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4628

4629
    elif canned_acl and acl_headers:
1✔
UNCOV
4630
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4631

4632
    if canned_acl:
1✔
4633
        validate_canned_acl(canned_acl)
1✔
4634
        return get_canned_acl(canned_acl, owner=owner)
1✔
4635

4636
    grants = []
×
4637
    for permission, grantees_values in acl_headers:
×
4638
        permission = get_permission_from_header(permission)
×
4639
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
UNCOV
4640
        grants.extend(partial_grants)
×
4641

UNCOV
4642
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
4643

4644

4645
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
4646
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
4647

4648

4649
def verify_object_equality_precondition_write(
1✔
4650
    s3_bucket: S3Bucket,
4651
    key: ObjectKey,
4652
    etag: str,
4653
    initiated: datetime.datetime | None = None,
4654
) -> None:
4655
    existing = s3_bucket.objects.get(key)
1✔
4656
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
4657
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
4658

4659
    if not existing.etag == etag.strip('"'):
1✔
4660
        raise PreconditionFailed(
1✔
4661
            "At least one of the pre-conditions you specified did not hold",
4662
            Condition="If-Match",
4663
        )
4664

4665
    if initiated and initiated < existing.last_modified:
1✔
4666
        raise ConditionalRequestConflict(
1✔
4667
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
4668
            Condition="If-Match",
4669
            Key=key,
4670
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc