• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.71
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import datetime
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
from collections import defaultdict
1✔
9
from inspect import signature
1✔
10
from io import BytesIO
1✔
11
from operator import itemgetter
1✔
12
from threading import RLock
1✔
13
from typing import IO
1✔
14
from urllib import parse as urlparse
1✔
15
from zoneinfo import ZoneInfo
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
19
from localstack.aws.api.s3 import (
1✔
20
    MFA,
21
    AbortMultipartUploadOutput,
22
    AccelerateConfiguration,
23
    AccessControlPolicy,
24
    AccessDenied,
25
    AccountId,
26
    AnalyticsConfiguration,
27
    AnalyticsId,
28
    AuthorizationHeaderMalformed,
29
    BadDigest,
30
    Body,
31
    Bucket,
32
    BucketAlreadyExists,
33
    BucketAlreadyOwnedByYou,
34
    BucketCannedACL,
35
    BucketLifecycleConfiguration,
36
    BucketLocationConstraint,
37
    BucketLoggingStatus,
38
    BucketName,
39
    BucketNotEmpty,
40
    BucketRegion,
41
    BucketVersioningStatus,
42
    BypassGovernanceRetention,
43
    ChecksumAlgorithm,
44
    ChecksumCRC32,
45
    ChecksumCRC32C,
46
    ChecksumCRC64NVME,
47
    ChecksumSHA1,
48
    ChecksumSHA256,
49
    ChecksumType,
50
    CommonPrefix,
51
    CompletedMultipartUpload,
52
    CompleteMultipartUploadOutput,
53
    ConditionalRequestConflict,
54
    ConfirmRemoveSelfBucketAccess,
55
    ContentMD5,
56
    CopyObjectOutput,
57
    CopyObjectRequest,
58
    CopyObjectResult,
59
    CopyPartResult,
60
    CORSConfiguration,
61
    CreateBucketOutput,
62
    CreateBucketRequest,
63
    CreateMultipartUploadOutput,
64
    CreateMultipartUploadRequest,
65
    CrossLocationLoggingProhibitted,
66
    Delete,
67
    DeletedObject,
68
    DeleteMarkerEntry,
69
    DeleteObjectOutput,
70
    DeleteObjectsOutput,
71
    DeleteObjectTaggingOutput,
72
    Delimiter,
73
    EncodingType,
74
    Error,
75
    Expiration,
76
    FetchOwner,
77
    GetBucketAccelerateConfigurationOutput,
78
    GetBucketAclOutput,
79
    GetBucketAnalyticsConfigurationOutput,
80
    GetBucketCorsOutput,
81
    GetBucketEncryptionOutput,
82
    GetBucketIntelligentTieringConfigurationOutput,
83
    GetBucketInventoryConfigurationOutput,
84
    GetBucketLifecycleConfigurationOutput,
85
    GetBucketLocationOutput,
86
    GetBucketLoggingOutput,
87
    GetBucketMetricsConfigurationOutput,
88
    GetBucketOwnershipControlsOutput,
89
    GetBucketPolicyOutput,
90
    GetBucketPolicyStatusOutput,
91
    GetBucketReplicationOutput,
92
    GetBucketRequestPaymentOutput,
93
    GetBucketTaggingOutput,
94
    GetBucketVersioningOutput,
95
    GetBucketWebsiteOutput,
96
    GetObjectAclOutput,
97
    GetObjectAttributesOutput,
98
    GetObjectAttributesParts,
99
    GetObjectAttributesRequest,
100
    GetObjectLegalHoldOutput,
101
    GetObjectLockConfigurationOutput,
102
    GetObjectOutput,
103
    GetObjectRequest,
104
    GetObjectRetentionOutput,
105
    GetObjectTaggingOutput,
106
    GetObjectTorrentOutput,
107
    GetPublicAccessBlockOutput,
108
    HeadBucketOutput,
109
    HeadObjectOutput,
110
    HeadObjectRequest,
111
    IfMatch,
112
    IfMatchInitiatedTime,
113
    IfMatchLastModifiedTime,
114
    IfMatchSize,
115
    IfNoneMatch,
116
    IntelligentTieringConfiguration,
117
    IntelligentTieringId,
118
    InvalidArgument,
119
    InvalidBucketName,
120
    InvalidDigest,
121
    InvalidObjectState,
122
    InvalidPartNumber,
123
    InvalidPartOrder,
124
    InvalidStorageClass,
125
    InvalidTargetBucketForLogging,
126
    InventoryConfiguration,
127
    InventoryId,
128
    KeyMarker,
129
    LifecycleRules,
130
    ListBucketAnalyticsConfigurationsOutput,
131
    ListBucketIntelligentTieringConfigurationsOutput,
132
    ListBucketInventoryConfigurationsOutput,
133
    ListBucketMetricsConfigurationsOutput,
134
    ListBucketsOutput,
135
    ListMultipartUploadsOutput,
136
    ListObjectsOutput,
137
    ListObjectsV2Output,
138
    ListObjectVersionsOutput,
139
    ListPartsOutput,
140
    Marker,
141
    MaxBuckets,
142
    MaxKeys,
143
    MaxParts,
144
    MaxUploads,
145
    MethodNotAllowed,
146
    MetricsConfiguration,
147
    MetricsId,
148
    MissingSecurityHeader,
149
    MpuObjectSize,
150
    MultipartUpload,
151
    MultipartUploadId,
152
    NoSuchBucket,
153
    NoSuchBucketPolicy,
154
    NoSuchCORSConfiguration,
155
    NoSuchKey,
156
    NoSuchLifecycleConfiguration,
157
    NoSuchPublicAccessBlockConfiguration,
158
    NoSuchTagSet,
159
    NoSuchUpload,
160
    NoSuchWebsiteConfiguration,
161
    NotificationConfiguration,
162
    Object,
163
    ObjectIdentifier,
164
    ObjectKey,
165
    ObjectLockConfiguration,
166
    ObjectLockConfigurationNotFoundError,
167
    ObjectLockEnabled,
168
    ObjectLockLegalHold,
169
    ObjectLockMode,
170
    ObjectLockRetention,
171
    ObjectLockToken,
172
    ObjectOwnership,
173
    ObjectPart,
174
    ObjectVersion,
175
    ObjectVersionId,
176
    ObjectVersionStorageClass,
177
    OptionalObjectAttributesList,
178
    Owner,
179
    OwnershipControls,
180
    OwnershipControlsNotFoundError,
181
    Part,
182
    PartNumber,
183
    PartNumberMarker,
184
    Policy,
185
    PostResponse,
186
    PreconditionFailed,
187
    Prefix,
188
    PublicAccessBlockConfiguration,
189
    PutBucketAclRequest,
190
    PutBucketLifecycleConfigurationOutput,
191
    PutObjectAclOutput,
192
    PutObjectAclRequest,
193
    PutObjectLegalHoldOutput,
194
    PutObjectLockConfigurationOutput,
195
    PutObjectOutput,
196
    PutObjectRequest,
197
    PutObjectRetentionOutput,
198
    PutObjectTaggingOutput,
199
    ReplicationConfiguration,
200
    ReplicationConfigurationNotFoundError,
201
    RequestPayer,
202
    RequestPaymentConfiguration,
203
    RestoreObjectOutput,
204
    RestoreRequest,
205
    S3Api,
206
    ServerSideEncryption,
207
    ServerSideEncryptionConfiguration,
208
    SkipValidation,
209
    SSECustomerAlgorithm,
210
    SSECustomerKey,
211
    SSECustomerKeyMD5,
212
    StartAfter,
213
    StorageClass,
214
    Tag,
215
    Tagging,
216
    TagSet,
217
    Token,
218
    TransitionDefaultMinimumObjectSize,
219
    UploadIdMarker,
220
    UploadPartCopyOutput,
221
    UploadPartCopyRequest,
222
    UploadPartOutput,
223
    UploadPartRequest,
224
    VersionIdMarker,
225
    VersioningConfiguration,
226
    WebsiteConfiguration,
227
)
228
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
229
from localstack.aws.handlers import (
1✔
230
    modify_service_response,
231
    preprocess_request,
232
    serve_custom_service_request_handlers,
233
)
234
from localstack.constants import AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1
1✔
235
from localstack.services.edge import ROUTER
1✔
236
from localstack.services.plugins import ServiceLifecycleHook
1✔
237
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
238
from localstack.services.s3.constants import (
1✔
239
    ALLOWED_HEADER_OVERRIDES,
240
    ARCHIVES_STORAGE_CLASSES,
241
    CHECKSUM_ALGORITHMS,
242
    DEFAULT_BUCKET_ENCRYPTION,
243
    S3_HOST_ID,
244
)
245
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
246
from localstack.services.s3.exceptions import (
1✔
247
    InvalidBucketOwnerAWSAccountID,
248
    InvalidBucketState,
249
    InvalidRequest,
250
    MalformedPolicy,
251
    MalformedXML,
252
    NoSuchConfiguration,
253
    NoSuchObjectLockConfiguration,
254
    TooManyConfigurations,
255
    UnexpectedContent,
256
)
257
from localstack.services.s3.models import (
1✔
258
    BucketCorsIndex,
259
    EncryptionParameters,
260
    ObjectLockParameters,
261
    S3Bucket,
262
    S3DeleteMarker,
263
    S3Multipart,
264
    S3Object,
265
    S3Part,
266
    S3Store,
267
    VersionedKeyStore,
268
    s3_stores,
269
)
270
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
271
from localstack.services.s3.presigned_url import validate_post_policy
1✔
272
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
273
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
274
from localstack.services.s3.utils import (
1✔
275
    ObjectRange,
276
    add_expiration_days_to_datetime,
277
    base_64_content_md5_to_etag,
278
    create_redirect_for_post_request,
279
    create_s3_kms_managed_key_for_region,
280
    decode_continuation_token,
281
    decode_user_metadata,
282
    encode_continuation_token,
283
    encode_user_metadata,
284
    etag_to_base_64_content_md5,
285
    extract_bucket_key_version_id_from_copy_source,
286
    generate_safe_version_id,
287
    get_bucket_location_xml,
288
    get_canned_acl,
289
    get_class_attrs_from_spec_class,
290
    get_failed_precondition_copy_source,
291
    get_failed_upload_part_copy_source_preconditions,
292
    get_full_default_bucket_location,
293
    get_kms_key_arn,
294
    get_lifecycle_rule_from_object,
295
    get_owner_for_account_id,
296
    get_permission_from_header,
297
    get_retention_from_now,
298
    get_s3_checksum_algorithm_from_request,
299
    get_s3_checksum_algorithm_from_trailing_headers,
300
    get_system_metadata_from_request,
301
    get_unique_key_id,
302
    get_url_encoded_object_location,
303
    header_name_from_capitalized_param,
304
    is_bucket_name_valid,
305
    is_version_older_than_other,
306
    parse_copy_source_range_header,
307
    parse_post_object_tagging_xml,
308
    parse_range_header,
309
    parse_tagging_header,
310
    s3_response_handler,
311
    serialize_expiration_header,
312
    str_to_rfc_1123_datetime,
313
    validate_dict_fields,
314
    validate_failed_precondition,
315
    validate_kms_key_id,
316
    validate_location_constraint,
317
    validate_tag_set,
318
)
319
from localstack.services.s3.validation import (
1✔
320
    parse_grants_in_headers,
321
    validate_acl_acp,
322
    validate_bucket_analytics_configuration,
323
    validate_bucket_intelligent_tiering_configuration,
324
    validate_canned_acl,
325
    validate_checksum_value,
326
    validate_cors_configuration,
327
    validate_encoding_type,
328
    validate_inventory_configuration,
329
    validate_lifecycle_configuration,
330
    validate_object_key,
331
    validate_sse_c,
332
    validate_website_configuration,
333
)
334
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
335
from localstack.state import AssetDirectory, StateVisitor
1✔
336
from localstack.utils.aws.arns import s3_bucket_name
1✔
337
from localstack.utils.aws.aws_stack import get_valid_regions_for_service
1✔
338
from localstack.utils.collections import select_from_typed_dict
1✔
339
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
340

341
LOG = logging.getLogger(__name__)
1✔
342

343
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
344
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
345
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
346
OBJECT_LOCK_MODES = get_class_attrs_from_spec_class(ObjectLockMode)
1✔
347

348
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
349

350

351
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
352
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
353
        super().__init__()
1✔
354
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
355
        self._notification_dispatcher = NotificationDispatcher()
1✔
356
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
357
        # TODO: add lock for keys for PutObject, only way to support precondition writes for versioned buckets
358
        self._preconditions_locks = defaultdict(lambda: defaultdict(RLock))
1✔
359

360
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
361
        # in case the rules have changed
362
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
363

364
    def on_after_init(self):
1✔
365
        preprocess_request.append(self._cors_handler)
1✔
366
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
367
        modify_service_response.append(self.service, s3_response_handler)
1✔
368
        register_website_hosting_routes(router=ROUTER)
1✔
369

370
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
371
        visitor.visit(s3_stores)
×
372
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
373

374
    def on_before_state_save(self):
1✔
375
        self._storage_backend.flush()
×
376

377
    def on_after_state_reset(self):
1✔
378
        self._cors_handler.invalidate_cache()
×
379

380
    def on_after_state_load(self):
1✔
381
        self._cors_handler.invalidate_cache()
×
382

383
    def on_before_stop(self):
1✔
384
        self._notification_dispatcher.shutdown()
1✔
385
        self._storage_backend.close()
1✔
386

387
    def _notify(
1✔
388
        self,
389
        context: RequestContext,
390
        s3_bucket: S3Bucket,
391
        s3_object: S3Object | S3DeleteMarker = None,
392
        s3_notif_ctx: S3EventNotificationContext = None,
393
    ):
394
        """
395
        :param context: the RequestContext, to retrieve more information about the incoming notification
396
        :param s3_bucket: the S3Bucket object
397
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
398
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
399
        :return:
400
        """
401
        if s3_bucket.notification_configuration:
1✔
402
            if not s3_notif_ctx:
1✔
403
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
404
                    context,
405
                    s3_bucket=s3_bucket,
406
                    s3_object=s3_object,
407
                )
408

409
            self._notification_dispatcher.send_notifications(
1✔
410
                s3_notif_ctx, s3_bucket.notification_configuration
411
            )
412

413
    def _verify_notification_configuration(
1✔
414
        self,
415
        notification_configuration: NotificationConfiguration,
416
        skip_destination_validation: SkipValidation,
417
        context: RequestContext,
418
        bucket_name: str,
419
    ):
420
        self._notification_dispatcher.verify_configuration(
1✔
421
            notification_configuration, skip_destination_validation, context, bucket_name
422
        )
423

424
    def _get_expiration_header(
1✔
425
        self,
426
        lifecycle_rules: LifecycleRules,
427
        bucket: BucketName,
428
        s3_object: S3Object,
429
        object_tags: dict[str, str],
430
    ) -> Expiration:
431
        """
432
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
433
        the case. We're caching it because it can change depending on the set rules on the bucket.
434
        We can't use `lru_cache` as the parameters needs to be hashable
435
        :param lifecycle_rules: the bucket LifecycleRules
436
        :param s3_object: S3Object
437
        :param object_tags: the object tags
438
        :return: the Expiration header if there's a rule matching
439
        """
440
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
441
            return cached_exp
1✔
442

443
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
444
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
445
        ):
446
            expiration_header = serialize_expiration_header(
1✔
447
                lifecycle_rule["ID"],
448
                lifecycle_rule["Expiration"],
449
                s3_object.last_modified,
450
            )
451
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
452
            return expiration_header
1✔
453

454
    def _get_cross_account_bucket(
1✔
455
        self,
456
        context: RequestContext,
457
        bucket_name: BucketName,
458
        *,
459
        expected_bucket_owner: AccountId = None,
460
    ) -> tuple[S3Store, S3Bucket]:
461
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
462
            raise InvalidBucketOwnerAWSAccountID(
1✔
463
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
464
            )
465

466
        store = self.get_store(context.account_id, context.region)
1✔
467
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
468
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
469
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
470

471
            store = self.get_store(account_id, context.region)
1✔
472
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
473
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
474

475
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
476
            raise AccessDenied("Access Denied")
1✔
477

478
        return store, s3_bucket
1✔
479

480
    def _create_bucket_tags(self, bucket: S3Bucket, tags: TagSet):
1✔
481
        store = self.get_store(bucket.bucket_account_id, bucket.bucket_region)
1✔
482
        store.tags.update_tags(bucket.bucket_arn, {tag["Key"]: tag["Value"] for tag in tags})
1✔
483

484
    def _remove_all_bucket_tags(self, bucket: S3Bucket):
1✔
485
        store = self.get_store(bucket.bucket_account_id, bucket.bucket_region)
1✔
486
        store.tags.delete_all_tags(bucket.bucket_arn)
1✔
487

488
    def _list_bucket_tags(self, bucket: S3Bucket) -> TagSet:
1✔
489
        store = self.get_store(bucket.bucket_account_id, bucket.bucket_region)
1✔
490
        tags = store.tags.get_tags(bucket.bucket_arn)
1✔
491
        return [{"Key": key, "Value": value} for key, value in tags.items()]
1✔
492

493
    @staticmethod
1✔
494
    def _create_object_tags(store: S3Store, key_id: str, tags: dict[str, str]):
1✔
495
        store.tags.update_tags(key_id, tags)
1✔
496

497
    @staticmethod
1✔
498
    def _remove_all_object_tags(store: S3Store, key_id: str):
1✔
499
        store.tags.delete_all_tags(key_id)
1✔
500

501
    @staticmethod
1✔
502
    def _list_object_tags(store: S3Store, key_id: str) -> dict[str, str]:
1✔
503
        return store.tags.get_tags(key_id)
1✔
504

505
    @staticmethod
1✔
506
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
507
        # Use default account id for external access? would need an anonymous one
508
        return s3_stores[account_id][region_name]
1✔
509

510
    @handler("CreateBucket", expand=False)
1✔
511
    def create_bucket(
1✔
512
        self,
513
        context: RequestContext,
514
        request: CreateBucketRequest,
515
    ) -> CreateBucketOutput:
516
        if context.region == "aws-global":
1✔
517
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
518
            #  right now so this will help users to properly set a region in their config
519
            # See the `TestS3.test_create_bucket_aws_global` test
520
            raise AuthorizationHeaderMalformed(
1✔
521
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
522
                HostId=S3_HOST_ID,
523
                Region=AWS_REGION_US_EAST_1,
524
            )
525

526
        bucket_name = request["Bucket"]
1✔
527

528
        if not is_bucket_name_valid(bucket_name):
1✔
529
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
530

531
        create_bucket_configuration = request.get("CreateBucketConfiguration") or {}
1✔
532

533
        bucket_tags = create_bucket_configuration.get("Tags", [])
1✔
534
        if bucket_tags:
1✔
535
            validate_tag_set(bucket_tags, type_set="create-bucket")
1✔
536

537
        location_constraint = create_bucket_configuration.get("LocationConstraint", "")
1✔
538
        validate_location_constraint(context.region, location_constraint)
1✔
539

540
        bucket_region = location_constraint
1✔
541
        if not location_constraint:
1✔
542
            bucket_region = AWS_REGION_US_EAST_1
1✔
543
        if location_constraint == BucketLocationConstraint.EU:
1✔
544
            bucket_region = AWS_REGION_EU_WEST_1
1✔
545

546
        store = self.get_store(context.account_id, bucket_region)
1✔
547

548
        if bucket_name in store.global_bucket_map:
1✔
549
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
550
            if existing_bucket_owner != context.account_id:
1✔
551
                raise BucketAlreadyExists()
1✔
552

553
            # if the existing bucket has the same owner, the behaviour will depend on the region and if the request has
554
            # tags
555
            if bucket_region != AWS_REGION_US_EAST_1 or bucket_tags:
1✔
556
                raise BucketAlreadyOwnedByYou(
1✔
557
                    "Your previous request to create the named bucket succeeded and you already own it.",
558
                    BucketName=bucket_name,
559
                )
560
            else:
561
                existing_bucket = store.buckets[bucket_name]
1✔
562
                # CreateBucket is idempotent in us-east-1
563
                return CreateBucketOutput(
1✔
564
                    Location=f"/{bucket_name}",
565
                    BucketArn=existing_bucket.bucket_arn,
566
                )
567

568
        if (
1✔
569
            object_ownership := request.get("ObjectOwnership")
570
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
571
            raise InvalidArgument(
1✔
572
                f"Invalid x-amz-object-ownership header: {object_ownership}",
573
                ArgumentName="x-amz-object-ownership",
574
            )
575
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
576
        owner = get_owner_for_account_id(context.account_id)
1✔
577
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
578

579
        s3_bucket = S3Bucket(
1✔
580
            name=bucket_name,
581
            account_id=context.account_id,
582
            bucket_region=bucket_region,
583
            owner=owner,
584
            acl=acl,
585
            object_ownership=request.get("ObjectOwnership"),
586
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket") or False,
587
            location_constraint=location_constraint,
588
        )
589

590
        store.buckets[bucket_name] = s3_bucket
1✔
591
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
592
        if bucket_tags:
1✔
593
            self._create_bucket_tags(s3_bucket, bucket_tags)
1✔
594
        self._cors_handler.invalidate_cache()
1✔
595
        self._storage_backend.create_bucket(bucket_name)
1✔
596

597
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
598
        location = (
1✔
599
            f"/{bucket_name}"
600
            if bucket_region == "us-east-1"
601
            else get_full_default_bucket_location(bucket_name)
602
        )
603
        response = CreateBucketOutput(Location=location, BucketArn=s3_bucket.bucket_arn)
1✔
604
        return response
1✔
605

606
    def delete_bucket(
1✔
607
        self,
608
        context: RequestContext,
609
        bucket: BucketName,
610
        expected_bucket_owner: AccountId = None,
611
        **kwargs,
612
    ) -> None:
613
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
614

615
        # the bucket still contains objects
616
        if not s3_bucket.objects.is_empty():
1✔
617
            message = "The bucket you tried to delete is not empty"
1✔
618
            if s3_bucket.versioning_status:
1✔
619
                message += ". You must delete all versions in the bucket."
1✔
620
            raise BucketNotEmpty(
1✔
621
                message,
622
                BucketName=bucket,
623
            )
624

625
        store.buckets.pop(bucket)
1✔
626
        store.global_bucket_map.pop(bucket)
1✔
627
        self._cors_handler.invalidate_cache()
1✔
628
        self._expiration_cache.pop(bucket, None)
1✔
629
        self._preconditions_locks.pop(bucket, None)
1✔
630
        # clean up the storage backend
631
        self._storage_backend.delete_bucket(bucket)
1✔
632
        self._remove_all_bucket_tags(s3_bucket)
1✔
633

634
    def list_buckets(
1✔
635
        self,
636
        context: RequestContext,
637
        max_buckets: MaxBuckets = None,
638
        continuation_token: Token = None,
639
        prefix: Prefix = None,
640
        bucket_region: BucketRegion = None,
641
        **kwargs,
642
    ) -> ListBucketsOutput:
643
        if bucket_region and not config.ALLOW_NONSTANDARD_REGIONS:
1✔
644
            if bucket_region not in get_valid_regions_for_service(self.service):
1✔
645
                raise InvalidArgument(
1✔
646
                    f"Argument value {bucket_region} is not a valid AWS Region",
647
                    ArgumentName="bucket-region",
648
                )
649

650
        owner = get_owner_for_account_id(context.account_id)
1✔
651
        store = self.get_store(context.account_id, context.region)
1✔
652

653
        decoded_continuation_token = (
1✔
654
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
655
            if continuation_token
656
            else None
657
        )
658

659
        count = 0
1✔
660
        buckets: list[Bucket] = []
1✔
661
        next_continuation_token = None
1✔
662

663
        # Comparing strings with case sensitivity since AWS is case-sensitive
664
        for bucket in sorted(store.buckets.values(), key=lambda r: r.name):
1✔
665
            if continuation_token and bucket.name < decoded_continuation_token:
1✔
666
                continue
1✔
667

668
            if prefix and not bucket.name.startswith(prefix):
1✔
669
                continue
1✔
670

671
            if bucket_region and not bucket.bucket_region == bucket_region:
1✔
672
                continue
1✔
673

674
            if max_buckets and count >= max_buckets:
1✔
675
                next_continuation_token = to_str(base64.urlsafe_b64encode(bucket.name.encode()))
1✔
676
                break
1✔
677

678
            output_bucket = Bucket(
1✔
679
                Name=bucket.name,
680
                CreationDate=bucket.creation_date,
681
                BucketRegion=bucket.bucket_region,
682
                BucketArn=bucket.bucket_arn,
683
            )
684
            buckets.append(output_bucket)
1✔
685
            count += 1
1✔
686

687
        return ListBucketsOutput(
1✔
688
            Owner=owner, Buckets=buckets, Prefix=prefix, ContinuationToken=next_continuation_token
689
        )
690

691
    def head_bucket(
1✔
692
        self,
693
        context: RequestContext,
694
        bucket: BucketName,
695
        expected_bucket_owner: AccountId = None,
696
        **kwargs,
697
    ) -> HeadBucketOutput:
698
        if context.region == "aws-global":
1✔
699
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
700
            #  right now so this will help users to properly set a region in their config
701
            # See the `TestS3.test_create_bucket_aws_global` test
702
            raise AuthorizationHeaderMalformed(
1✔
703
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
704
                HostId=S3_HOST_ID,
705
                Region=AWS_REGION_US_EAST_1,
706
            )
707

708
        store = self.get_store(context.account_id, context.region)
1✔
709
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
710
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
711
                # just to return the 404 error message
712
                raise NoSuchBucket()
1✔
713

714
            store = self.get_store(account_id, context.region)
×
715
            if not (s3_bucket := store.buckets.get(bucket)):
×
716
                # just to return the 404 error message
717
                raise NoSuchBucket()
×
718

719
        # TODO: this call is also used to check if the user has access/authorization for the bucket
720
        #  it can return 403
721
        return HeadBucketOutput(
1✔
722
            BucketRegion=s3_bucket.bucket_region, BucketArn=s3_bucket.bucket_arn
723
        )
724

725
    def get_bucket_location(
1✔
726
        self,
727
        context: RequestContext,
728
        bucket: BucketName,
729
        expected_bucket_owner: AccountId = None,
730
        **kwargs,
731
    ) -> GetBucketLocationOutput:
732
        """
733
        When implementing the ASF provider, this operation is implemented because:
734
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
735
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
736
        - We circumvent the root level element here by patching the spec such that this operation returns a
737
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
738
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
739
          the payload, which is why we need to manually do this here by manipulating the string.
740
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
741
        """
742
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
743

744
        return GetBucketLocationOutput(
1✔
745
            LocationConstraint=get_bucket_location_xml(s3_bucket.location_constraint)
746
        )
747

748
    @handler("PutObject", expand=False)
1✔
749
    def put_object(
1✔
750
        self,
751
        context: RequestContext,
752
        request: PutObjectRequest,
753
    ) -> PutObjectOutput:
754
        # TODO: validate order of validation
755
        # TODO: still need to handle following parameters
756
        #  request_payer: RequestPayer = None,
757
        bucket_name = request["Bucket"]
1✔
758
        key = request["Key"]
1✔
759
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
760

761
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
762
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
763
        ):
764
            raise InvalidStorageClass(
1✔
765
                "The storage class you specified is not valid", StorageClassRequested=storage_class
766
            )
767

768
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
769
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
770

771
        validate_object_key(key)
1✔
772

773
        if_match = request.get("IfMatch")
1✔
774
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
775
            raise NotImplementedException(
776
                "A header you provided implies functionality that is not implemented",
777
                Header="If-Match,If-None-Match",
778
                additionalMessage="Multiple conditional request headers present in the request",
779
            )
780

781
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
782
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
783
            raise NotImplementedException(
784
                "A header you provided implies functionality that is not implemented",
785
                Header=header_name,
786
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
787
            )
788

789
        system_metadata = get_system_metadata_from_request(request)
1✔
790
        if not system_metadata.get("ContentType"):
1✔
791
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
792

793
        user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
794

795
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
796
        if version_id != "null":
1✔
797
            # if we are in a versioned bucket, we need to lock around the full key (all the versions)
798
            # because object versions have locks per version
799
            precondition_lock = self._preconditions_locks[bucket_name][key]
1✔
800
        else:
801
            precondition_lock = contextlib.nullcontext()
1✔
802

803
        etag_content_md5 = ""
1✔
804
        if content_md5 := request.get("ContentMD5"):
1✔
805
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
806
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
807
            if not etag_content_md5:
1✔
808
                raise InvalidDigest(
1✔
809
                    "The Content-MD5 you specified was invalid.",
810
                    Content_MD5=content_md5,
811
                )
812

813
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
814
        checksum_value = (
1✔
815
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
816
        )
817

818
        # TODO: we're not encrypting the object with the provided key for now
819
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
820
        validate_sse_c(
1✔
821
            algorithm=request.get("SSECustomerAlgorithm"),
822
            encryption_key=request.get("SSECustomerKey"),
823
            encryption_key_md5=sse_c_key_md5,
824
            server_side_encryption=request.get("ServerSideEncryption"),
825
        )
826

827
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
828
            request,
829
            s3_bucket,
830
            store,
831
        )
832

833
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
834

835
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
836

837
        if tagging := request.get("Tagging"):
1✔
838
            tagging = parse_tagging_header(tagging)
1✔
839

840
        s3_object = S3Object(
1✔
841
            key=key,
842
            version_id=version_id,
843
            storage_class=storage_class,
844
            expires=request.get("Expires"),
845
            user_metadata=user_metadata,
846
            system_metadata=system_metadata,
847
            checksum_algorithm=checksum_algorithm,
848
            checksum_value=checksum_value,
849
            encryption=encryption_parameters.encryption,
850
            kms_key_id=encryption_parameters.kms_key_id,
851
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
852
            sse_key_hash=sse_c_key_md5,
853
            lock_mode=lock_parameters.lock_mode,
854
            lock_legal_status=lock_parameters.lock_legal_status,
855
            lock_until=lock_parameters.lock_until,
856
            website_redirect_location=request.get("WebsiteRedirectLocation"),
857
            acl=acl,
858
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
859
        )
860

861
        body = request.get("Body")
1✔
862
        # check if chunked request
863
        headers = context.request.headers
1✔
864
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
865
            "STREAMING-"
866
        ) or "aws-chunked" in headers.get("content-encoding", "")
867
        if is_aws_chunked:
1✔
868
            checksum_algorithm = (
1✔
869
                checksum_algorithm
870
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
871
            )
872
            if checksum_algorithm:
1✔
873
                s3_object.checksum_algorithm = checksum_algorithm
1✔
874

875
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
876
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
877

878
            # S3 removes the `aws-chunked` value from ContentEncoding
879
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
880
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
881
                if encodings:
1✔
882
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
883

884
        with (
1✔
885
            precondition_lock,
886
            self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object,
887
        ):
888
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
889
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
890
            # opening the lock and other requests can check this condition
891
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
892
                raise PreconditionFailed(
1✔
893
                    "At least one of the pre-conditions you specified did not hold",
894
                    Condition="If-None-Match",
895
                )
896

897
            elif if_match:
1✔
898
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
899

900
            s3_stored_object.write(body)
1✔
901

902
            if s3_object.checksum_algorithm:
1✔
903
                if not s3_object.checksum_value:
1✔
904
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
905
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
906
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
907
                    raise InvalidRequest(
1✔
908
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
909
                    )
910
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
911
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
912
                    raise BadDigest(
1✔
913
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
914
                    )
915

916
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
917
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
918
            if content_md5:
1✔
919
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
920
                if calculated_md5 != content_md5:
1✔
921
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
922
                    raise BadDigest(
1✔
923
                        "The Content-MD5 you specified did not match what we received.",
924
                        ExpectedDigest=etag_content_md5,
925
                        CalculatedDigest=calculated_md5,
926
                    )
927

928
            s3_bucket.objects.set(key, s3_object)
1✔
929

930
        # in case we are overriding an object, delete the tags entry
931
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
932
        store.tags.delete_all_tags(key_id)
1✔
933
        if tagging:
1✔
934
            self._create_object_tags(store, key_id, tagging)
1✔
935

936
        # RequestCharged: Optional[RequestCharged]  # TODO
937
        response = PutObjectOutput(
1✔
938
            ETag=s3_object.quoted_etag,
939
        )
940
        if s3_bucket.versioning_status == "Enabled":
1✔
941
            response["VersionId"] = s3_object.version_id
1✔
942

943
        if s3_object.checksum_algorithm:
1✔
944
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
945
            response["ChecksumType"] = s3_object.checksum_type
1✔
946

947
        if s3_bucket.lifecycle_rules:
1✔
948
            if expiration_header := self._get_expiration_header(
1✔
949
                s3_bucket.lifecycle_rules,
950
                bucket_name,
951
                s3_object,
952
                self._list_object_tags(store, key_id),
953
            ):
954
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
955
                #  apply them everytime we get/head an object
956
                response["Expiration"] = expiration_header
1✔
957

958
        add_encryption_to_response(response, s3_object=s3_object)
1✔
959
        if sse_c_key_md5:
1✔
960
            response["SSECustomerAlgorithm"] = "AES256"
1✔
961
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
962

963
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
964

965
        return response
1✔
966

967
    @handler("GetObject", expand=False)
1✔
968
    def get_object(
1✔
969
        self,
970
        context: RequestContext,
971
        request: GetObjectRequest,
972
    ) -> GetObjectOutput:
973
        # TODO: missing handling parameters:
974
        #  request_payer: RequestPayer = None,
975
        #  expected_bucket_owner: AccountId = None,
976

977
        bucket_name = request["Bucket"]
1✔
978
        object_key = request["Key"]
1✔
979
        version_id = request.get("VersionId")
1✔
980
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
981

982
        s3_object = s3_bucket.get_object(
1✔
983
            key=object_key,
984
            version_id=version_id,
985
            http_method="GET",
986
        )
987

988
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
989
            raise InvalidObjectState(
1✔
990
                "The operation is not valid for the object's storage class",
991
                StorageClass=s3_object.storage_class,
992
            )
993

994
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
995
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
996

997
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
998
        if s3_object.sse_key_hash:
1✔
999
            if s3_object.sse_key_hash and not sse_c_key_md5:
1✔
1000
                raise InvalidRequest(
1✔
1001
                    "The object was stored using a form of Server Side Encryption. "
1002
                    "The correct parameters must be provided to retrieve the object."
1003
                )
1004
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1005
                raise AccessDenied(
1✔
1006
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1007
                )
1008

1009
        validate_sse_c(
1✔
1010
            algorithm=request.get("SSECustomerAlgorithm"),
1011
            encryption_key=request.get("SSECustomerKey"),
1012
            encryption_key_md5=sse_c_key_md5,
1013
        )
1014

1015
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1016

1017
        range_header = request.get("Range")
1✔
1018
        part_number = request.get("PartNumber")
1✔
1019
        if range_header and part_number:
1✔
1020
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
1021
        range_data = None
1✔
1022
        if range_header:
1✔
1023
            range_data = parse_range_header(range_header, s3_object.size)
1✔
1024
        elif part_number:
1✔
1025
            range_data = get_part_range(s3_object, part_number)
1✔
1026

1027
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
1028
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
1029
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
1030
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
1031
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
1032

1033
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
1034
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
1035
        # the object again
1036
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
1037
            s3_object = s3_bucket.get_object(
1✔
1038
                key=object_key,
1039
                version_id=version_id,
1040
                http_method="GET",
1041
            )
1042

1043
        response = GetObjectOutput(
1✔
1044
            AcceptRanges="bytes",
1045
            **s3_object.get_system_metadata_fields(),
1046
        )
1047
        if s3_object.user_metadata:
1✔
1048
            response["Metadata"] = encode_user_metadata(s3_object.user_metadata)
1✔
1049

1050
        if s3_object.parts and request.get("PartNumber"):
1✔
1051
            response["PartsCount"] = len(s3_object.parts)
1✔
1052

1053
        if s3_object.version_id:
1✔
1054
            response["VersionId"] = s3_object.version_id
1✔
1055

1056
        if s3_object.website_redirect_location:
1✔
1057
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1058

1059
        if s3_object.restore:
1✔
1060
            response["Restore"] = s3_object.restore
×
1061

1062
        checksum_value = None
1✔
1063
        checksum_type = None
1✔
1064
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1065
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1066
                checksum_value = s3_object.checksum_value
1✔
1067
                checksum_type = s3_object.checksum_type
1✔
1068

1069
        if range_data:
1✔
1070
            s3_stored_object.seek(range_data.begin)
1✔
1071
            response["Body"] = LimitedIterableStream(
1✔
1072
                s3_stored_object, max_length=range_data.content_length
1073
            )
1074
            response["ContentRange"] = range_data.content_range
1✔
1075
            response["ContentLength"] = range_data.content_length
1✔
1076
            response["StatusCode"] = 206
1✔
1077
            if checksum_value:
1✔
1078
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1079
                    part_data = s3_object.parts[str(part_number)]
1✔
1080
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1081
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1082
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1083

1084
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1085
                # only had one part
1086
                elif range_data.content_length == s3_object.size:
1✔
1087
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1088
                    response["ChecksumType"] = checksum_type
1✔
1089
        else:
1090
            response["Body"] = s3_stored_object
1✔
1091
            if checksum_value:
1✔
1092
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1093
                response["ChecksumType"] = checksum_type
1✔
1094

1095
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1096

1097
        object_tags = self._list_object_tags(
1✔
1098
            store, get_unique_key_id(bucket_name, object_key, version_id)
1099
        )
1100

1101
        if tag_count := len(object_tags):
1✔
1102
            response["TagCount"] = tag_count
1✔
1103
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
1104
            if expiration_header := self._get_expiration_header(
1✔
1105
                s3_bucket.lifecycle_rules,
1106
                bucket_name,
1107
                s3_object,
1108
                object_tags,
1109
            ):
1110
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1111
                #  apply them everytime we get/head an object
1112
                response["Expiration"] = expiration_header
1✔
1113

1114
        # TODO: missing returned fields
1115
        #     RequestCharged: Optional[RequestCharged]
1116
        #     ReplicationStatus: Optional[ReplicationStatus]
1117

1118
        if s3_object.lock_mode:
1✔
1119
            response["ObjectLockMode"] = s3_object.lock_mode
×
1120
            if s3_object.lock_until:
×
1121
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1122
        if s3_object.lock_legal_status:
1✔
1123
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1124

1125
        if sse_c_key_md5:
1✔
1126
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1127
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1128

1129
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1130
            if request_param_value := request.get(request_param):
1✔
1131
                if isinstance(request_param_value, str):
1✔
1132
                    try:
1✔
1133
                        request_param_value.encode("latin-1")
1✔
1134
                    except UnicodeEncodeError:
1✔
1135
                        raise InvalidArgument(
1✔
1136
                            "Header value cannot be represented using ISO-8859-1.",
1137
                            ArgumentName=header_name_from_capitalized_param(request_param),
1138
                            ArgumentValue=request_param_value,
1139
                            HostId=S3_HOST_ID,
1140
                        )
1141

1142
                response[response_param] = request_param_value
1✔
1143

1144
        return response
1✔
1145

1146
    @handler("HeadObject", expand=False)
1✔
1147
    def head_object(
1✔
1148
        self,
1149
        context: RequestContext,
1150
        request: HeadObjectRequest,
1151
    ) -> HeadObjectOutput:
1152
        bucket_name = request["Bucket"]
1✔
1153
        object_key = request["Key"]
1✔
1154
        version_id = request.get("VersionId")
1✔
1155
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1156

1157
        s3_object = s3_bucket.get_object(
1✔
1158
            key=object_key,
1159
            version_id=version_id,
1160
            http_method="HEAD",
1161
        )
1162

1163
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1164

1165
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1166
        if s3_object.sse_key_hash:
1✔
1167
            if not sse_c_key_md5:
1✔
1168
                raise InvalidRequest(
×
1169
                    "The object was stored using a form of Server Side Encryption. "
1170
                    "The correct parameters must be provided to retrieve the object."
1171
                )
1172
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1173
                raise AccessDenied(
1✔
1174
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1175
                )
1176

1177
        validate_sse_c(
1✔
1178
            algorithm=request.get("SSECustomerAlgorithm"),
1179
            encryption_key=request.get("SSECustomerKey"),
1180
            encryption_key_md5=sse_c_key_md5,
1181
        )
1182

1183
        response = HeadObjectOutput(
1✔
1184
            AcceptRanges="bytes",
1185
            **s3_object.get_system_metadata_fields(),
1186
        )
1187
        if s3_object.user_metadata:
1✔
1188
            response["Metadata"] = encode_user_metadata(s3_object.user_metadata)
1✔
1189

1190
        checksum_value = None
1✔
1191
        checksum_type = None
1✔
1192
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1193
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1194
                checksum_value = s3_object.checksum_value
1✔
1195
                checksum_type = s3_object.checksum_type
1✔
1196

1197
        if s3_object.parts and request.get("PartNumber"):
1✔
1198
            response["PartsCount"] = len(s3_object.parts)
1✔
1199

1200
        if s3_object.version_id:
1✔
1201
            response["VersionId"] = s3_object.version_id
1✔
1202

1203
        if s3_object.website_redirect_location:
1✔
1204
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1205

1206
        if s3_object.restore:
1✔
1207
            response["Restore"] = s3_object.restore
1✔
1208

1209
        range_header = request.get("Range")
1✔
1210
        part_number = request.get("PartNumber")
1✔
1211
        if range_header and part_number:
1✔
1212
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1213
        range_data = None
1✔
1214
        if range_header:
1✔
1215
            range_data = parse_range_header(range_header, s3_object.size)
×
1216
        elif part_number:
1✔
1217
            range_data = get_part_range(s3_object, part_number)
1✔
1218

1219
        if range_data:
1✔
1220
            response["ContentLength"] = range_data.content_length
1✔
1221
            response["ContentRange"] = range_data.content_range
1✔
1222
            response["StatusCode"] = 206
1✔
1223
            if checksum_value:
1✔
1224
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1225
                    part_data = s3_object.parts[str(part_number)]
1✔
1226
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1227
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1228
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1229

1230
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1231
                # only had one part
1232
                elif range_data.content_length == s3_object.size:
1✔
1233
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1234
                    response["ChecksumType"] = checksum_type
1✔
1235
        elif checksum_value:
1✔
1236
            response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1237
            response["ChecksumType"] = checksum_type
1✔
1238

1239
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1240
        object_tags = self._list_object_tags(
1✔
1241
            store, get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1242
        )
1243
        if tag_count := len(object_tags):
1✔
1244
            response["TagCount"] = tag_count
1✔
1245

1246
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1247
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1248
            if expiration_header := self._get_expiration_header(
1✔
1249
                s3_bucket.lifecycle_rules,
1250
                bucket_name,
1251
                s3_object,
1252
                object_tags,
1253
            ):
1254
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1255
                #  apply them everytime we get/head an object
1256
                response["Expiration"] = expiration_header
1✔
1257

1258
        if s3_object.lock_mode:
1✔
1259
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1260
            if s3_object.lock_until:
1✔
1261
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1262
        if s3_object.lock_legal_status:
1✔
1263
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1264

1265
        if sse_c_key_md5:
1✔
1266
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1267
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1268

1269
        # TODO: missing return fields:
1270
        #  ArchiveStatus: Optional[ArchiveStatus]
1271
        #  RequestCharged: Optional[RequestCharged]
1272
        #  ReplicationStatus: Optional[ReplicationStatus]
1273

1274
        return response
1✔
1275

1276
    def delete_object(
1✔
1277
        self,
1278
        context: RequestContext,
1279
        bucket: BucketName,
1280
        key: ObjectKey,
1281
        mfa: MFA = None,
1282
        version_id: ObjectVersionId = None,
1283
        request_payer: RequestPayer = None,
1284
        bypass_governance_retention: BypassGovernanceRetention = None,
1285
        expected_bucket_owner: AccountId = None,
1286
        if_match: IfMatch = None,
1287
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1288
        if_match_size: IfMatchSize = None,
1289
        **kwargs,
1290
    ) -> DeleteObjectOutput:
1291
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1292

1293
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1294
            raise InvalidArgument(
1✔
1295
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1296
                ArgumentName="x-amz-bypass-governance-retention",
1297
            )
1298

1299
        # TODO: this is only supported for Directory Buckets
1300
        non_supported_precondition = None
1✔
1301
        if if_match:
1✔
1302
            non_supported_precondition = "If-Match"
×
1303
        if if_match_size:
1✔
1304
            non_supported_precondition = "x-amz-if-match-size"
×
1305
        if if_match_last_modified_time:
1✔
1306
            non_supported_precondition = "x-amz-if-match-last-modified-time"
×
1307
        if non_supported_precondition:
1✔
1308
            LOG.warning(
×
1309
                "DeleteObject Preconditions is only supported for Directory Buckets. "
1310
                "LocalStack does not support Directory Buckets yet."
1311
            )
1312
            raise NotImplementedException(
1313
                "A header you provided implies functionality that is not implemented",
1314
                Header=non_supported_precondition,
1315
            )
1316

1317
        if s3_bucket.versioning_status is None:
1✔
1318
            if version_id and version_id != "null":
1✔
1319
                raise InvalidArgument(
1✔
1320
                    "Invalid version id specified",
1321
                    ArgumentName="versionId",
1322
                    ArgumentValue=version_id,
1323
                )
1324

1325
            found_object = s3_bucket.objects.pop(key, None)
1✔
1326
            # TODO: RequestCharged
1327
            if found_object:
1✔
1328
                self._storage_backend.remove(bucket, found_object)
1✔
1329
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1330
                self._remove_all_object_tags(store, get_unique_key_id(bucket, key, version_id))
1✔
1331

1332
            return DeleteObjectOutput()
1✔
1333

1334
        if not version_id:
1✔
1335
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1336
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1337
            s3_bucket.objects.set(key, delete_marker)
1✔
1338
            s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1339
                context,
1340
                s3_bucket=s3_bucket,
1341
                s3_object=delete_marker,
1342
            )
1343
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1344
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1345

1346
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1347

1348
        if key not in s3_bucket.objects:
1✔
1349
            return DeleteObjectOutput()
×
1350

1351
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1352
            raise InvalidArgument(
1✔
1353
                "Invalid version id specified",
1354
                ArgumentName="versionId",
1355
                ArgumentValue=version_id,
1356
            )
1357

1358
        if s3_object.is_locked(bypass_governance_retention):
1✔
1359
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1360

1361
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1362
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1363

1364
        if isinstance(s3_object, S3DeleteMarker):
1✔
1365
            response["DeleteMarker"] = True
1✔
1366
        else:
1367
            self._storage_backend.remove(bucket, s3_object)
1✔
1368
            self._remove_all_object_tags(store, get_unique_key_id(bucket, key, version_id))
1✔
1369
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1370

1371
        if key not in s3_bucket.objects:
1✔
1372
            # we clean up keys that do not have any object versions in them anymore
1373
            self._preconditions_locks[bucket].pop(key, None)
1✔
1374

1375
        return response
1✔
1376

1377
    def delete_objects(
1✔
1378
        self,
1379
        context: RequestContext,
1380
        bucket: BucketName,
1381
        delete: Delete,
1382
        mfa: MFA = None,
1383
        request_payer: RequestPayer = None,
1384
        bypass_governance_retention: BypassGovernanceRetention = None,
1385
        expected_bucket_owner: AccountId = None,
1386
        checksum_algorithm: ChecksumAlgorithm = None,
1387
        **kwargs,
1388
    ) -> DeleteObjectsOutput:
1389
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1390

1391
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1392
            raise InvalidArgument(
1✔
1393
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1394
                ArgumentName="x-amz-bypass-governance-retention",
1395
            )
1396

1397
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1398
        if not objects:
1✔
1399
            raise MalformedXML()
×
1400

1401
        # TODO: max 1000 delete at once? test against AWS?
1402

1403
        quiet = delete.get("Quiet", False)
1✔
1404
        deleted = []
1✔
1405
        errors = []
1✔
1406

1407
        to_remove = []
1✔
1408
        versioned_keys = set()
1✔
1409
        for to_delete_object in objects:
1✔
1410
            object_key = to_delete_object.get("Key")
1✔
1411
            version_id = to_delete_object.get("VersionId")
1✔
1412
            if s3_bucket.versioning_status is None:
1✔
1413
                if version_id and version_id != "null":
1✔
1414
                    errors.append(
1✔
1415
                        Error(
1416
                            Code="NoSuchVersion",
1417
                            Key=object_key,
1418
                            Message="The specified version does not exist.",
1419
                            VersionId=version_id,
1420
                        )
1421
                    )
1422
                    continue
1✔
1423

1424
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1425
                if found_object:
1✔
1426
                    to_remove.append(found_object)
1✔
1427
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1428
                    self._remove_all_object_tags(
1✔
1429
                        store, get_unique_key_id(bucket, object_key, version_id)
1430
                    )
1431
                # small hack to not create a fake object for nothing
1432
                elif s3_bucket.notification_configuration:
1✔
1433
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1434
                    # for a non-existing object being deleted
1435
                    self._notify(
1✔
1436
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1437
                    )
1438

1439
                if not quiet:
1✔
1440
                    deleted.append(DeletedObject(Key=object_key))
1✔
1441

1442
                continue
1✔
1443

1444
            if not version_id:
1✔
1445
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1446
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1447
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1448
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1449
                    context,
1450
                    s3_bucket=s3_bucket,
1451
                    s3_object=delete_marker,
1452
                )
1453
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1454
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1455

1456
                if not quiet:
1✔
1457
                    deleted.append(
1✔
1458
                        DeletedObject(
1459
                            DeleteMarker=True,
1460
                            DeleteMarkerVersionId=delete_marker_id,
1461
                            Key=object_key,
1462
                        )
1463
                    )
1464
                continue
1✔
1465

1466
            if not (
1✔
1467
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1468
            ):
1469
                errors.append(
1✔
1470
                    Error(
1471
                        Code="NoSuchVersion",
1472
                        Key=object_key,
1473
                        Message="The specified version does not exist.",
1474
                        VersionId=version_id,
1475
                    )
1476
                )
1477
                continue
1✔
1478

1479
            if found_object.is_locked(bypass_governance_retention):
1✔
1480
                errors.append(
1✔
1481
                    Error(
1482
                        Code="AccessDenied",
1483
                        Key=object_key,
1484
                        Message="Access Denied because object protected by object lock.",
1485
                        VersionId=version_id,
1486
                    )
1487
                )
1488
                continue
1✔
1489

1490
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1491
            versioned_keys.add(object_key)
1✔
1492

1493
            if not quiet:
1✔
1494
                deleted_object = DeletedObject(
1✔
1495
                    Key=object_key,
1496
                    VersionId=version_id,
1497
                )
1498
                if isinstance(found_object, S3DeleteMarker):
1✔
1499
                    deleted_object["DeleteMarker"] = True
1✔
1500
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1501

1502
                deleted.append(deleted_object)
1✔
1503

1504
            if isinstance(found_object, S3Object):
1✔
1505
                to_remove.append(found_object)
1✔
1506

1507
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1508
            self._remove_all_object_tags(store, get_unique_key_id(bucket, object_key, version_id))
1✔
1509

1510
        for versioned_key in versioned_keys:
1✔
1511
            # we clean up keys that do not have any object versions in them anymore
1512
            if versioned_key not in s3_bucket.objects:
1✔
1513
                self._preconditions_locks[bucket].pop(versioned_key, None)
1✔
1514

1515
        # TODO: request charged
1516
        self._storage_backend.remove(bucket, to_remove)
1✔
1517
        response: DeleteObjectsOutput = {}
1✔
1518
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1519
        if errors:
1✔
1520
            response["Errors"] = errors
1✔
1521
        if not quiet:
1✔
1522
            response["Deleted"] = deleted
1✔
1523

1524
        return response
1✔
1525

1526
    @handler("CopyObject", expand=False)
1✔
1527
    def copy_object(
1✔
1528
        self,
1529
        context: RequestContext,
1530
        request: CopyObjectRequest,
1531
    ) -> CopyObjectOutput:
1532
        # request_payer: RequestPayer = None,  # TODO:
1533
        dest_bucket = request["Bucket"]
1✔
1534
        dest_key = request["Key"]
1✔
1535

1536
        if_match = request.get("IfMatch")
1✔
1537
        if_none_match = request.get("IfNoneMatch")
1✔
1538

1539
        if if_none_match and if_match:
1✔
1540
            raise NotImplementedException(
1541
                "A header you provided implies functionality that is not implemented",
1542
                Header="If-Match,If-None-Match",
1543
                additionalMessage="Multiple conditional request headers present in the request",
1544
            )
1545

1546
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
1547
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
1548
            raise NotImplementedException(
1549
                "A header you provided implies functionality that is not implemented",
1550
                Header=header_name,
1551
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
1552
            )
1553

1554
        validate_object_key(dest_key)
1✔
1555
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1556

1557
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1558
            request.get("CopySource")
1559
        )
1560
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1561

1562
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1563
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1564

1565
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1566
        try:
1✔
1567
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
1568
        except MethodNotAllowed:
×
1569
            raise InvalidRequest(
×
1570
                "The source of a copy request may not specifically refer to a delete marker by version id."
1571
            )
1572

1573
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
1574
            raise InvalidObjectState(
×
1575
                "Operation is not valid for the source object's storage class",
1576
                StorageClass=src_s3_object.storage_class,
1577
            )
1578

1579
        if failed_condition := get_failed_precondition_copy_source(
1✔
1580
            request, src_s3_object.last_modified, src_s3_object.etag
1581
        ):
1582
            raise PreconditionFailed(
1✔
1583
                "At least one of the pre-conditions you specified did not hold",
1584
                Condition=failed_condition,
1585
            )
1586

1587
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1588
        if src_s3_object.sse_key_hash:
1✔
1589
            if not source_sse_c_key_md5:
1✔
1590
                raise InvalidRequest(
1✔
1591
                    "The object was stored using a form of Server Side Encryption. "
1592
                    "The correct parameters must be provided to retrieve the object."
1593
                )
1594
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
1595
                raise AccessDenied("Access Denied")
×
1596

1597
        validate_sse_c(
1✔
1598
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1599
            encryption_key=request.get("CopySourceSSECustomerKey"),
1600
            encryption_key_md5=source_sse_c_key_md5,
1601
        )
1602

1603
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1604
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1605
        # validate target SSE-C parameters
1606
        validate_sse_c(
1✔
1607
            algorithm=request.get("SSECustomerAlgorithm"),
1608
            encryption_key=request.get("SSECustomerKey"),
1609
            encryption_key_md5=target_sse_c_key_md5,
1610
            server_side_encryption=server_side_encryption,
1611
        )
1612

1613
        # TODO validate order of validation
1614
        storage_class = request.get("StorageClass")
1✔
1615
        metadata_directive = request.get("MetadataDirective")
1✔
1616
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1617
        # we need to check for identity of the object, to see if the default one has been changed
1618
        is_default_encryption = (
1✔
1619
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1620
            and src_s3_object.encryption == "AES256"
1621
        )
1622
        if (
1✔
1623
            src_bucket == dest_bucket
1624
            and src_key == dest_key
1625
            and not any(
1626
                (
1627
                    storage_class,
1628
                    server_side_encryption,
1629
                    target_sse_c_key_md5,
1630
                    metadata_directive == "REPLACE",
1631
                    website_redirect_location,
1632
                    dest_s3_bucket.encryption_rule
1633
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1634
                    src_s3_object.restore,
1635
                )
1636
            )
1637
        ):
1638
            raise InvalidRequest(
1✔
1639
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1640
                "object's metadata, storage class, website redirect location or encryption attributes."
1641
            )
1642

1643
        if tagging := request.get("Tagging"):
1✔
1644
            tagging = parse_tagging_header(tagging)
1✔
1645

1646
        if metadata_directive == "REPLACE":
1✔
1647
            user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
1648
            system_metadata = get_system_metadata_from_request(request)
1✔
1649
            if not system_metadata.get("ContentType"):
1✔
1650
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1651
        else:
1652
            user_metadata = src_s3_object.user_metadata
1✔
1653
            system_metadata = src_s3_object.system_metadata
1✔
1654

1655
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1656
        if dest_version_id != "null":
1✔
1657
            # if we are in a versioned bucket, we need to lock around the full key (all the versions)
1658
            # because object versions have locks per version
1659
            precondition_lock = self._preconditions_locks[dest_bucket][dest_key]
1✔
1660
        else:
1661
            precondition_lock = contextlib.nullcontext()
1✔
1662

1663
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1664
            request,
1665
            dest_s3_bucket,
1666
            store,
1667
        )
1668
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1669
            request, dest_s3_bucket
1670
        )
1671

1672
        acl = get_access_control_policy_for_new_resource_request(
1✔
1673
            request, owner=dest_s3_bucket.owner
1674
        )
1675
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1676

1677
        s3_object = S3Object(
1✔
1678
            key=dest_key,
1679
            size=src_s3_object.size,
1680
            version_id=dest_version_id,
1681
            storage_class=storage_class,
1682
            expires=request.get("Expires"),
1683
            user_metadata=user_metadata,
1684
            system_metadata=system_metadata,
1685
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1686
            encryption=encryption_parameters.encryption,
1687
            kms_key_id=encryption_parameters.kms_key_id,
1688
            bucket_key_enabled=request.get(
1689
                "BucketKeyEnabled"
1690
            ),  # CopyObject does not inherit from the bucket here
1691
            sse_key_hash=target_sse_c_key_md5,
1692
            lock_mode=lock_parameters.lock_mode,
1693
            lock_legal_status=lock_parameters.lock_legal_status,
1694
            lock_until=lock_parameters.lock_until,
1695
            website_redirect_location=website_redirect_location,
1696
            expiration=None,  # TODO, from lifecycle
1697
            acl=acl,
1698
            owner=dest_s3_bucket.owner,
1699
        )
1700

1701
        with (
1✔
1702
            precondition_lock,
1703
            self._storage_backend.copy(
1704
                src_bucket=src_bucket,
1705
                src_object=src_s3_object,
1706
                dest_bucket=dest_bucket,
1707
                dest_object=s3_object,
1708
            ) as s3_stored_object,
1709
        ):
1710
            # Check destination write preconditions inside the lock to prevent race conditions.
1711
            if if_none_match and object_exists_for_precondition_write(dest_s3_bucket, dest_key):
1✔
1712
                raise PreconditionFailed(
1✔
1713
                    "At least one of the pre-conditions you specified did not hold",
1714
                    Condition="If-None-Match",
1715
                )
1716

1717
            elif if_match:
1✔
1718
                verify_object_equality_precondition_write(dest_s3_bucket, dest_key, if_match)
1✔
1719

1720
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1721
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1722

1723
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1724

1725
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1726

1727
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1728
            self._remove_all_object_tags(store, dest_key_id)
1✔
1729
            self._create_object_tags(store, dest_key_id, tagging or {})
1✔
1730
        else:
1731
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1732
            src_tags = self._list_object_tags(store, src_key_id)
1✔
1733
            self._remove_all_object_tags(store, dest_key_id)
1✔
1734
            self._create_object_tags(store, dest_key_id, src_tags)
1✔
1735

1736
        copy_object_result = CopyObjectResult(
1✔
1737
            ETag=s3_object.quoted_etag,
1738
            LastModified=s3_object.last_modified,
1739
        )
1740
        if s3_object.checksum_algorithm:
1✔
1741
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1742
                s3_object.checksum_value
1743
            )
1744
            copy_object_result["ChecksumType"] = s3_object.checksum_type
1✔
1745

1746
        response = CopyObjectOutput(
1✔
1747
            CopyObjectResult=copy_object_result,
1748
        )
1749

1750
        if s3_object.version_id:
1✔
1751
            response["VersionId"] = s3_object.version_id
1✔
1752

1753
        if s3_object.expiration:
1✔
1754
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1755

1756
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1757
        if target_sse_c_key_md5:
1✔
1758
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1759
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1760

1761
        if (
1✔
1762
            src_s3_bucket.versioning_status
1763
            and src_s3_object.version_id
1764
            and src_s3_object.version_id != "null"
1765
        ):
1766
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1767

1768
        # RequestCharged: Optional[RequestCharged] # TODO
1769
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1770

1771
        return response
1✔
1772

1773
    def list_objects(
1✔
1774
        self,
1775
        context: RequestContext,
1776
        bucket: BucketName,
1777
        delimiter: Delimiter = None,
1778
        encoding_type: EncodingType = None,
1779
        marker: Marker = None,
1780
        max_keys: MaxKeys = None,
1781
        prefix: Prefix = None,
1782
        request_payer: RequestPayer = None,
1783
        expected_bucket_owner: AccountId = None,
1784
        optional_object_attributes: OptionalObjectAttributesList = None,
1785
        **kwargs,
1786
    ) -> ListObjectsOutput:
1787
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1788
        validate_encoding_type(encoding_type)
1✔
1789

1790
        common_prefixes = set()
1✔
1791
        count = 0
1✔
1792
        is_truncated = False
1✔
1793
        next_key_marker = None
1✔
1794
        max_keys = max_keys or 1000
1✔
1795
        prefix = prefix or ""
1✔
1796
        delimiter = delimiter or ""
1✔
1797
        if encoding_type == EncodingType.url:
1✔
1798
            prefix = urlparse.quote(prefix)
1✔
1799
            delimiter = urlparse.quote(delimiter)
1✔
1800

1801
        s3_objects: list[Object] = []
1✔
1802

1803
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1804
        last_key = all_keys[-1] if all_keys else None
1✔
1805

1806
        # sort by key
1807
        for s3_object in all_keys:
1✔
1808
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1809
            # skip all keys that alphabetically come before key_marker
1810
            if marker:
1✔
1811
                if key <= marker:
1✔
1812
                    continue
1✔
1813

1814
            # Filter for keys that start with prefix
1815
            if prefix and not key.startswith(prefix):
1✔
1816
                continue
×
1817

1818
            # see ListObjectsV2 for the logic comments (shared logic here)
1819
            prefix_including_delimiter = None
1✔
1820
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1821
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1822
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1823

1824
                if prefix_including_delimiter in common_prefixes or (
1✔
1825
                    marker and marker.startswith(prefix_including_delimiter)
1826
                ):
1827
                    continue
1✔
1828

1829
            if prefix_including_delimiter:
1✔
1830
                common_prefixes.add(prefix_including_delimiter)
1✔
1831
            else:
1832
                # TODO: add RestoreStatus if present
1833
                object_data = Object(
1✔
1834
                    Key=key,
1835
                    ETag=s3_object.quoted_etag,
1836
                    Owner=s3_bucket.owner,  # TODO: verify reality
1837
                    Size=s3_object.size,
1838
                    LastModified=s3_object.last_modified,
1839
                    StorageClass=s3_object.storage_class,
1840
                )
1841

1842
                if s3_object.checksum_algorithm:
1✔
1843
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1844
                    object_data["ChecksumType"] = s3_object.checksum_type
1✔
1845

1846
                s3_objects.append(object_data)
1✔
1847

1848
            # we just added a CommonPrefix or an Object, increase the counter
1849
            count += 1
1✔
1850
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1851
                is_truncated = True
1✔
1852
                if prefix_including_delimiter:
1✔
1853
                    next_key_marker = prefix_including_delimiter
1✔
1854
                elif s3_objects:
1✔
1855
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1856
                break
1✔
1857

1858
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1859

1860
        response = ListObjectsOutput(
1✔
1861
            IsTruncated=is_truncated,
1862
            Name=bucket,
1863
            MaxKeys=max_keys,
1864
            Prefix=prefix or "",
1865
            Marker=marker or "",
1866
        )
1867
        if s3_objects:
1✔
1868
            response["Contents"] = s3_objects
1✔
1869
        if encoding_type:
1✔
1870
            response["EncodingType"] = EncodingType.url
1✔
1871
        if delimiter:
1✔
1872
            response["Delimiter"] = delimiter
1✔
1873
        if common_prefixes:
1✔
1874
            response["CommonPrefixes"] = common_prefixes
1✔
1875
        if delimiter and next_key_marker:
1✔
1876
            response["NextMarker"] = next_key_marker
1✔
1877
        if s3_bucket.bucket_region != "us-east-1":
1✔
1878
            response["BucketRegion"] = s3_bucket.bucket_region
×
1879

1880
        # RequestCharged: Optional[RequestCharged]  # TODO
1881
        return response
1✔
1882

1883
    def list_objects_v2(
1✔
1884
        self,
1885
        context: RequestContext,
1886
        bucket: BucketName,
1887
        delimiter: Delimiter = None,
1888
        encoding_type: EncodingType = None,
1889
        max_keys: MaxKeys = None,
1890
        prefix: Prefix = None,
1891
        continuation_token: Token = None,
1892
        fetch_owner: FetchOwner = None,
1893
        start_after: StartAfter = None,
1894
        request_payer: RequestPayer = None,
1895
        expected_bucket_owner: AccountId = None,
1896
        optional_object_attributes: OptionalObjectAttributesList = None,
1897
        **kwargs,
1898
    ) -> ListObjectsV2Output:
1899
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1900

1901
        if continuation_token == "":
1✔
1902
            raise InvalidArgument(
1✔
1903
                "The continuation token provided is incorrect",
1904
                ArgumentName="continuation-token",
1905
            )
1906
        validate_encoding_type(encoding_type)
1✔
1907

1908
        common_prefixes = set()
1✔
1909
        count = 0
1✔
1910
        is_truncated = False
1✔
1911
        next_continuation_token = None
1✔
1912
        max_keys = max_keys or 1000
1✔
1913
        prefix = prefix or ""
1✔
1914
        delimiter = delimiter or ""
1✔
1915
        start_after = start_after or ""
1✔
1916
        decoded_continuation_token = decode_continuation_token(continuation_token)
1✔
1917

1918
        if encoding_type == EncodingType.url:
1✔
1919
            prefix = urlparse.quote(prefix)
1✔
1920
            delimiter = urlparse.quote(delimiter)
1✔
1921
            start_after = urlparse.quote(start_after)
1✔
1922
            decoded_continuation_token = urlparse.quote(decoded_continuation_token)
1✔
1923

1924
        s3_objects: list[Object] = []
1✔
1925

1926
        # sort by key
1927
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1928
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1929

1930
            # skip all keys that alphabetically come before continuation_token
1931
            if continuation_token:
1✔
1932
                if key < decoded_continuation_token:
1✔
1933
                    continue
1✔
1934

1935
            elif start_after:
1✔
1936
                if key <= start_after:
1✔
1937
                    continue
1✔
1938

1939
            # Filter for keys that start with prefix
1940
            if prefix and not key.startswith(prefix):
1✔
1941
                continue
1✔
1942

1943
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1944
            prefix_including_delimiter = None
1✔
1945
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1946
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1947
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1948

1949
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1950
                # the entry without increasing the counter. We need to iterate over all of these entries before
1951
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1952
                if prefix_including_delimiter in common_prefixes:
1✔
1953
                    continue
1✔
1954

1955
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1956
            if count >= max_keys:
1✔
1957
                is_truncated = True
1✔
1958
                next_continuation_token = encode_continuation_token(s3_object.key)
1✔
1959
                break
1✔
1960

1961
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1962
            # else, it means it's a new Object, add it to the Contents
1963
            if prefix_including_delimiter:
1✔
1964
                common_prefixes.add(prefix_including_delimiter)
1✔
1965
            else:
1966
                # TODO: add RestoreStatus if present
1967
                object_data = Object(
1✔
1968
                    Key=key,
1969
                    ETag=s3_object.quoted_etag,
1970
                    Size=s3_object.size,
1971
                    LastModified=s3_object.last_modified,
1972
                    StorageClass=s3_object.storage_class,
1973
                )
1974

1975
                if fetch_owner:
1✔
1976
                    object_data["Owner"] = s3_bucket.owner
×
1977

1978
                if s3_object.checksum_algorithm:
1✔
1979
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1980
                    object_data["ChecksumType"] = s3_object.checksum_type
1✔
1981

1982
                s3_objects.append(object_data)
1✔
1983

1984
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1985
            count += 1
1✔
1986

1987
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1988

1989
        response = ListObjectsV2Output(
1✔
1990
            IsTruncated=is_truncated,
1991
            Name=bucket,
1992
            MaxKeys=max_keys,
1993
            Prefix=prefix or "",
1994
            KeyCount=count,
1995
        )
1996
        if s3_objects:
1✔
1997
            response["Contents"] = s3_objects
1✔
1998
        if encoding_type:
1✔
1999
            response["EncodingType"] = EncodingType.url
1✔
2000
        if delimiter:
1✔
2001
            response["Delimiter"] = delimiter
1✔
2002
        if common_prefixes:
1✔
2003
            response["CommonPrefixes"] = common_prefixes
1✔
2004
        if next_continuation_token:
1✔
2005
            response["NextContinuationToken"] = next_continuation_token
1✔
2006

2007
        if continuation_token:
1✔
2008
            response["ContinuationToken"] = continuation_token
1✔
2009
        elif start_after:
1✔
2010
            response["StartAfter"] = start_after
1✔
2011

2012
        if s3_bucket.bucket_region != "us-east-1":
1✔
2013
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
2014

2015
        # RequestCharged: Optional[RequestCharged]  # TODO
2016
        return response
1✔
2017

2018
    def list_object_versions(
1✔
2019
        self,
2020
        context: RequestContext,
2021
        bucket: BucketName,
2022
        delimiter: Delimiter = None,
2023
        encoding_type: EncodingType = None,
2024
        key_marker: KeyMarker = None,
2025
        max_keys: MaxKeys = None,
2026
        prefix: Prefix = None,
2027
        version_id_marker: VersionIdMarker = None,
2028
        expected_bucket_owner: AccountId = None,
2029
        request_payer: RequestPayer = None,
2030
        optional_object_attributes: OptionalObjectAttributesList = None,
2031
        **kwargs,
2032
    ) -> ListObjectVersionsOutput:
2033
        if version_id_marker and not key_marker:
1✔
2034
            raise InvalidArgument(
1✔
2035
                "A version-id marker cannot be specified without a key marker.",
2036
                ArgumentName="version-id-marker",
2037
                ArgumentValue=version_id_marker,
2038
            )
2039
        validate_encoding_type(encoding_type)
1✔
2040

2041
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2042
        common_prefixes = set()
1✔
2043
        count = 0
1✔
2044
        is_truncated = False
1✔
2045
        next_key_marker = None
1✔
2046
        next_version_id_marker = None
1✔
2047
        max_keys = max_keys or 1000
1✔
2048
        prefix = prefix or ""
1✔
2049
        delimiter = delimiter or ""
1✔
2050
        if encoding_type == EncodingType.url:
1✔
2051
            prefix = urlparse.quote(prefix)
1✔
2052
            delimiter = urlparse.quote(delimiter)
1✔
2053
        version_key_marker_found = False
1✔
2054

2055
        object_versions: list[ObjectVersion] = []
1✔
2056
        delete_markers: list[DeleteMarkerEntry] = []
1✔
2057

2058
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
2059
        # sort by key, and last-modified-date, to get the last version first
2060
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
2061
        last_version = all_versions[-1] if all_versions else None
1✔
2062

2063
        for version in all_versions:
1✔
2064
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
2065
            # skip all keys that alphabetically come before key_marker
2066
            if key_marker:
1✔
2067
                if key < key_marker:
1✔
2068
                    continue
1✔
2069
                elif key == key_marker:
1✔
2070
                    if not version_id_marker:
1✔
2071
                        continue
1✔
2072
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2073
                    if version.version_id == version_id_marker:
1✔
2074
                        version_key_marker_found = True
1✔
2075
                        continue
1✔
2076

2077
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
2078
                    # as soon as the next version id is older than the version id marker (meaning this version was
2079
                    # next after the now-deleted version)
2080
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
2081
                        version_key_marker_found = True
1✔
2082

2083
                    elif not version_key_marker_found:
1✔
2084
                        # as long as we have not passed the version_key_marker, skip the versions
2085
                        continue
1✔
2086

2087
            # Filter for keys that start with prefix
2088
            if prefix and not key.startswith(prefix):
1✔
2089
                continue
1✔
2090

2091
            # see ListObjectsV2 for the logic comments (shared logic here)
2092
            prefix_including_delimiter = None
1✔
2093
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2094
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2095
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2096

2097
                if prefix_including_delimiter in common_prefixes or (
1✔
2098
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2099
                ):
2100
                    continue
1✔
2101

2102
            if prefix_including_delimiter:
1✔
2103
                common_prefixes.add(prefix_including_delimiter)
1✔
2104

2105
            elif isinstance(version, S3DeleteMarker):
1✔
2106
                delete_marker = DeleteMarkerEntry(
1✔
2107
                    Key=key,
2108
                    Owner=s3_bucket.owner,
2109
                    VersionId=version.version_id,
2110
                    IsLatest=version.is_current,
2111
                    LastModified=version.last_modified,
2112
                )
2113
                delete_markers.append(delete_marker)
1✔
2114
            else:
2115
                # TODO: add RestoreStatus if present
2116
                object_version = ObjectVersion(
1✔
2117
                    Key=key,
2118
                    ETag=version.quoted_etag,
2119
                    Owner=s3_bucket.owner,  # TODO: verify reality
2120
                    Size=version.size,
2121
                    VersionId=version.version_id or "null",
2122
                    LastModified=version.last_modified,
2123
                    IsLatest=version.is_current,
2124
                    # TODO: verify this, are other class possible?
2125
                    # StorageClass=version.storage_class,
2126
                    StorageClass=ObjectVersionStorageClass.STANDARD,
2127
                )
2128

2129
                if version.checksum_algorithm:
1✔
2130
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
2131
                    object_version["ChecksumType"] = version.checksum_type
1✔
2132

2133
                object_versions.append(object_version)
1✔
2134

2135
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
2136
            count += 1
1✔
2137
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
2138
                is_truncated = True
1✔
2139
                if prefix_including_delimiter:
1✔
2140
                    next_key_marker = prefix_including_delimiter
1✔
2141
                else:
2142
                    next_key_marker = version.key
1✔
2143
                    next_version_id_marker = version.version_id
1✔
2144
                break
1✔
2145

2146
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2147

2148
        response = ListObjectVersionsOutput(
1✔
2149
            IsTruncated=is_truncated,
2150
            Name=bucket,
2151
            MaxKeys=max_keys,
2152
            Prefix=prefix,
2153
            KeyMarker=key_marker or "",
2154
            VersionIdMarker=version_id_marker or "",
2155
        )
2156
        if object_versions:
1✔
2157
            response["Versions"] = object_versions
1✔
2158
        if encoding_type:
1✔
2159
            response["EncodingType"] = EncodingType.url
1✔
2160
        if delete_markers:
1✔
2161
            response["DeleteMarkers"] = delete_markers
1✔
2162
        if delimiter:
1✔
2163
            response["Delimiter"] = delimiter
1✔
2164
        if common_prefixes:
1✔
2165
            response["CommonPrefixes"] = common_prefixes
1✔
2166
        if next_key_marker:
1✔
2167
            response["NextKeyMarker"] = next_key_marker
1✔
2168
        if next_version_id_marker:
1✔
2169
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
2170

2171
        # RequestCharged: Optional[RequestCharged]  # TODO
2172
        return response
1✔
2173

2174
    @handler("GetObjectAttributes", expand=False)
1✔
2175
    def get_object_attributes(
1✔
2176
        self,
2177
        context: RequestContext,
2178
        request: GetObjectAttributesRequest,
2179
    ) -> GetObjectAttributesOutput:
2180
        bucket_name = request["Bucket"]
1✔
2181
        object_key = request["Key"]
1✔
2182
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2183

2184
        s3_object = s3_bucket.get_object(
1✔
2185
            key=object_key,
2186
            version_id=request.get("VersionId"),
2187
            http_method="GET",
2188
        )
2189

2190
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2191
        if s3_object.sse_key_hash:
1✔
2192
            if not sse_c_key_md5:
1✔
2193
                raise InvalidRequest(
×
2194
                    "The object was stored using a form of Server Side Encryption. "
2195
                    "The correct parameters must be provided to retrieve the object."
2196
                )
2197
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
2198
                raise AccessDenied("Access Denied")
×
2199

2200
        validate_sse_c(
1✔
2201
            algorithm=request.get("SSECustomerAlgorithm"),
2202
            encryption_key=request.get("SSECustomerKey"),
2203
            encryption_key_md5=sse_c_key_md5,
2204
        )
2205

2206
        object_attrs = request.get("ObjectAttributes", [])
1✔
2207
        response = GetObjectAttributesOutput()
1✔
2208
        object_checksum_type = s3_object.checksum_type
1✔
2209
        if "ETag" in object_attrs:
1✔
2210
            response["ETag"] = s3_object.etag
1✔
2211
        if "StorageClass" in object_attrs:
1✔
2212
            response["StorageClass"] = s3_object.storage_class
1✔
2213
        if "ObjectSize" in object_attrs:
1✔
2214
            response["ObjectSize"] = s3_object.size
1✔
2215
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2216
            if s3_object.parts:
1✔
2217
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2218
            else:
2219
                checksum_value = s3_object.checksum_value
1✔
2220
            response["Checksum"] = {
1✔
2221
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2222
                "ChecksumType": object_checksum_type,
2223
            }
2224

2225
        response["LastModified"] = s3_object.last_modified
1✔
2226

2227
        if s3_bucket.versioning_status:
1✔
2228
            response["VersionId"] = s3_object.version_id
1✔
2229

2230
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2231
            if object_checksum_type == ChecksumType.FULL_OBJECT:
1✔
2232
                response["ObjectParts"] = GetObjectAttributesParts(
1✔
2233
                    TotalPartsCount=len(s3_object.parts)
2234
                )
2235
            else:
2236
                # this is basically a simplified `ListParts` call on the object, only returned when the checksum type is
2237
                # COMPOSITE
2238
                count = 0
1✔
2239
                is_truncated = False
1✔
2240
                part_number_marker = request.get("PartNumberMarker") or 0
1✔
2241
                max_parts = request.get("MaxParts") or 1000
1✔
2242

2243
                parts = []
1✔
2244
                all_parts = sorted(
1✔
2245
                    (int(part_number), part) for part_number, part in s3_object.parts.items()
2246
                )
2247
                last_part_number, last_part = all_parts[-1]
1✔
2248

2249
                for part_number, part in all_parts:
1✔
2250
                    if part_number <= part_number_marker:
1✔
2251
                        continue
1✔
2252
                    part_item = select_from_typed_dict(ObjectPart, part)
1✔
2253

2254
                    parts.append(part_item)
1✔
2255
                    count += 1
1✔
2256

2257
                    if count >= max_parts and part["PartNumber"] != last_part_number:
1✔
2258
                        is_truncated = True
1✔
2259
                        break
1✔
2260

2261
                object_parts = GetObjectAttributesParts(
1✔
2262
                    TotalPartsCount=len(s3_object.parts),
2263
                    IsTruncated=is_truncated,
2264
                    MaxParts=max_parts,
2265
                    PartNumberMarker=part_number_marker,
2266
                    NextPartNumberMarker=0,
2267
                )
2268
                if parts:
1✔
2269
                    object_parts["Parts"] = parts
1✔
2270
                    object_parts["NextPartNumberMarker"] = parts[-1]["PartNumber"]
1✔
2271

2272
                response["ObjectParts"] = object_parts
1✔
2273

2274
        return response
1✔
2275

2276
    def restore_object(
1✔
2277
        self,
2278
        context: RequestContext,
2279
        bucket: BucketName,
2280
        key: ObjectKey,
2281
        version_id: ObjectVersionId = None,
2282
        restore_request: RestoreRequest = None,
2283
        request_payer: RequestPayer = None,
2284
        checksum_algorithm: ChecksumAlgorithm = None,
2285
        expected_bucket_owner: AccountId = None,
2286
        **kwargs,
2287
    ) -> RestoreObjectOutput:
2288
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2289

2290
        s3_object = s3_bucket.get_object(
1✔
2291
            key=key,
2292
            version_id=version_id,
2293
            http_method="GET",  # TODO: verify http method
2294
        )
2295
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
2296
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2297

2298
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2299
        # will only implement only the same functionality for now
2300

2301
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2302
        status_code = 200 if s3_object.restore else 202
1✔
2303
        restore_days = restore_request.get("Days")
1✔
2304
        if not restore_days:
1✔
2305
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
2306
            return RestoreObjectOutput()
×
2307

2308
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2309
            datetime.datetime.now(datetime.UTC), restore_days
2310
        )
2311
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2312
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2313

2314
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context(
1✔
2315
            context,
2316
            s3_bucket=s3_bucket,
2317
            s3_object=s3_object,
2318
        )
2319
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2320
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2321
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2322
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2323
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2324
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2325
            "Post", "Completed"
2326
        )
2327
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2328

2329
        # TODO: request charged
2330
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2331

2332
    @handler("CreateMultipartUpload", expand=False)
1✔
2333
    def create_multipart_upload(
1✔
2334
        self,
2335
        context: RequestContext,
2336
        request: CreateMultipartUploadRequest,
2337
    ) -> CreateMultipartUploadOutput:
2338
        # TODO: handle missing parameters:
2339
        #  request_payer: RequestPayer = None,
2340
        bucket_name = request["Bucket"]
1✔
2341
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2342

2343
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2344
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2345
        ):
2346
            raise InvalidStorageClass(
1✔
2347
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2348
            )
2349

2350
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2351
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2352

2353
        if tagging := request.get("Tagging"):
1✔
2354
            tagging = parse_tagging_header(tagging_header=tagging)
×
2355

2356
        key = request["Key"]
1✔
2357

2358
        system_metadata = get_system_metadata_from_request(request)
1✔
2359
        if not system_metadata.get("ContentType"):
1✔
2360
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2361

2362
        user_metadata = decode_user_metadata(request.get("Metadata"))
1✔
2363

2364
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2365
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2366
            raise InvalidRequest(
1✔
2367
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, CRC64NVME, SHA1, SHA256]"
2368
            )
2369

2370
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2371
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2372
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2373
            else:
2374
                checksum_type = ChecksumType.COMPOSITE
1✔
2375
        elif checksum_type and not checksum_algorithm:
1✔
2376
            raise InvalidRequest(
1✔
2377
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2378
            )
2379

2380
        if (
1✔
2381
            checksum_type == ChecksumType.COMPOSITE
2382
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2383
        ):
2384
            raise InvalidRequest(
1✔
2385
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2386
            )
2387
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2388
            "SHA"
2389
        ):
2390
            raise InvalidRequest(
1✔
2391
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2392
            )
2393

2394
        # TODO: we're not encrypting the object with the provided key for now
2395
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2396
        validate_sse_c(
1✔
2397
            algorithm=request.get("SSECustomerAlgorithm"),
2398
            encryption_key=request.get("SSECustomerKey"),
2399
            encryption_key_md5=sse_c_key_md5,
2400
            server_side_encryption=request.get("ServerSideEncryption"),
2401
        )
2402

2403
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2404
            request,
2405
            s3_bucket,
2406
            store,
2407
        )
2408
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2409

2410
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2411

2412
        initiator = get_owner_for_account_id(context.account_id)
1✔
2413
        # This is weird, but for all other operations, AWS does not return a DisplayName anymore except for the
2414
        # `initiator` field in Multipart related operation. We will probably remove this soon once AWS changes that
2415
        initiator["DisplayName"] = "webfile"
1✔
2416

2417
        s3_multipart = S3Multipart(
1✔
2418
            key=key,
2419
            storage_class=storage_class,
2420
            expires=request.get("Expires"),
2421
            user_metadata=user_metadata,
2422
            system_metadata=system_metadata,
2423
            checksum_algorithm=checksum_algorithm,
2424
            checksum_type=checksum_type,
2425
            encryption=encryption_parameters.encryption,
2426
            kms_key_id=encryption_parameters.kms_key_id,
2427
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2428
            sse_key_hash=sse_c_key_md5,
2429
            lock_mode=lock_parameters.lock_mode,
2430
            lock_legal_status=lock_parameters.lock_legal_status,
2431
            lock_until=lock_parameters.lock_until,
2432
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2433
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2434
            acl=acl,
2435
            initiator=initiator,
2436
            tagging=tagging,
2437
            owner=s3_bucket.owner,
2438
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2439
        )
2440
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2441
        # PutObject)
2442
        if sse_c_key_md5:
1✔
2443
            s3_multipart.object.checksum_algorithm = None
1✔
2444

2445
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2446

2447
        response = CreateMultipartUploadOutput(
1✔
2448
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2449
        )
2450

2451
        if checksum_algorithm:
1✔
2452
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2453
            response["ChecksumType"] = checksum_type
1✔
2454

2455
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2456
        if sse_c_key_md5:
1✔
2457
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2458
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2459

2460
        # TODO: missing response fields we're not currently supporting
2461
        # - AbortDate: lifecycle related,not currently supported, todo
2462
        # - AbortRuleId: lifecycle related, not currently supported, todo
2463
        # - RequestCharged: todo
2464

2465
        return response
1✔
2466

2467
    @handler("UploadPart", expand=False)
1✔
2468
    def upload_part(
1✔
2469
        self,
2470
        context: RequestContext,
2471
        request: UploadPartRequest,
2472
    ) -> UploadPartOutput:
2473
        # TODO: missing following parameters:
2474
        #  content_length: ContentLength = None, ->validate?
2475
        #  content_md5: ContentMD5 = None, -> validate?
2476
        #  request_payer: RequestPayer = None,
2477
        bucket_name = request["Bucket"]
1✔
2478
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2479

2480
        upload_id = request.get("UploadId")
1✔
2481
        if not (
1✔
2482
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2483
        ) or s3_multipart.object.key != request.get("Key"):
2484
            raise NoSuchUpload(
1✔
2485
                "The specified upload does not exist. "
2486
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2487
                UploadId=upload_id,
2488
            )
2489
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2490
            raise InvalidArgument(
1✔
2491
                "Part number must be an integer between 1 and 10000, inclusive",
2492
                ArgumentName="partNumber",
2493
                ArgumentValue=part_number,
2494
            )
2495

2496
        if content_md5 := request.get("ContentMD5"):
1✔
2497
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2498
            if not base_64_content_md5_to_etag(content_md5):
1✔
2499
                raise InvalidDigest(
1✔
2500
                    "The Content-MD5 you specified was invalid.",
2501
                    Content_MD5=content_md5,
2502
                )
2503

2504
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2505
        checksum_value = (
1✔
2506
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2507
        )
2508

2509
        # TODO: we're not encrypting the object with the provided key for now
2510
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2511
        validate_sse_c(
1✔
2512
            algorithm=request.get("SSECustomerAlgorithm"),
2513
            encryption_key=request.get("SSECustomerKey"),
2514
            encryption_key_md5=sse_c_key_md5,
2515
        )
2516

2517
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2518
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2519
        ):
2520
            raise InvalidRequest(
1✔
2521
                "The multipart upload initiate requested encryption. "
2522
                "Subsequent part requests must include the appropriate encryption parameters."
2523
            )
2524
        elif (
1✔
2525
            s3_multipart.object.sse_key_hash
2526
            and sse_c_key_md5
2527
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2528
        ):
2529
            raise InvalidRequest(
1✔
2530
                "The provided encryption parameters did not match the ones used originally."
2531
            )
2532

2533
        s3_part = S3Part(
1✔
2534
            part_number=part_number,
2535
            checksum_algorithm=checksum_algorithm,
2536
            checksum_value=checksum_value,
2537
        )
2538
        body = request.get("Body")
1✔
2539
        headers = context.request.headers
1✔
2540
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2541
            "STREAMING-"
2542
        ) or "aws-chunked" in headers.get("content-encoding", "")
2543
        # check if chunked request
2544
        if is_aws_chunked:
1✔
2545
            checksum_algorithm = (
1✔
2546
                checksum_algorithm
2547
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2548
            )
2549
            if checksum_algorithm:
1✔
2550
                s3_part.checksum_algorithm = checksum_algorithm
×
2551

2552
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2553
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2554

2555
        if (
1✔
2556
            s3_multipart.checksum_algorithm
2557
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2558
        ):
2559
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2560
            error_mp_checksum = (
1✔
2561
                s3_multipart.object.checksum_algorithm.lower()
2562
                if s3_multipart.object.checksum_algorithm
2563
                else "null"
2564
            )
2565
            if not error_mp_checksum == "null":
1✔
2566
                raise InvalidRequest(
1✔
2567
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2568
                )
2569

2570
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2571
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2572
            try:
1✔
2573
                stored_s3_part.write(body)
1✔
2574
            except Exception:
1✔
2575
                stored_multipart.remove_part(s3_part)
1✔
2576
                raise
1✔
2577

2578
            if checksum_algorithm:
1✔
2579
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2580
                    stored_multipart.remove_part(s3_part)
1✔
2581
                    raise InvalidRequest(
1✔
2582
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2583
                    )
2584
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2585
                    stored_multipart.remove_part(s3_part)
1✔
2586
                    raise BadDigest(
1✔
2587
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2588
                    )
2589

2590
            if content_md5:
1✔
2591
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2592
                if calculated_md5 != content_md5:
1✔
2593
                    stored_multipart.remove_part(s3_part)
1✔
2594
                    raise BadDigest(
1✔
2595
                        "The Content-MD5 you specified did not match what we received.",
2596
                        ExpectedDigest=content_md5,
2597
                        CalculatedDigest=calculated_md5,
2598
                    )
2599

2600
            s3_multipart.parts[str(part_number)] = s3_part
1✔
2601

2602
        response = UploadPartOutput(
1✔
2603
            ETag=s3_part.quoted_etag,
2604
        )
2605

2606
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2607
        if sse_c_key_md5:
1✔
2608
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2609
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2610

2611
        if s3_part.checksum_algorithm:
1✔
2612
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2613

2614
        # TODO: RequestCharged: Optional[RequestCharged]
2615
        return response
1✔
2616

2617
    @handler("UploadPartCopy", expand=False)
1✔
2618
    def upload_part_copy(
1✔
2619
        self,
2620
        context: RequestContext,
2621
        request: UploadPartCopyRequest,
2622
    ) -> UploadPartCopyOutput:
2623
        # TODO: handle following parameters:
2624
        #  SSECustomerAlgorithm: Optional[SSECustomerAlgorithm]
2625
        #  SSECustomerKey: Optional[SSECustomerKey]
2626
        #  SSECustomerKeyMD5: Optional[SSECustomerKeyMD5]
2627
        #  CopySourceSSECustomerAlgorithm: Optional[CopySourceSSECustomerAlgorithm]
2628
        #  CopySourceSSECustomerKey: Optional[CopySourceSSECustomerKey]
2629
        #  CopySourceSSECustomerKeyMD5: Optional[CopySourceSSECustomerKeyMD5]
2630
        #  RequestPayer: Optional[RequestPayer]
2631
        #  ExpectedBucketOwner: Optional[AccountId]
2632
        #  ExpectedSourceBucketOwner: Optional[AccountId]
2633
        dest_bucket = request["Bucket"]
1✔
2634
        dest_key = request["Key"]
1✔
2635
        store = self.get_store(context.account_id, context.region)
1✔
2636
        # TODO: validate cross-account UploadPartCopy
2637
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
2638
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2639

2640
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2641
            request.get("CopySource")
2642
        )
2643

2644
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
2645
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2646

2647
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2648
        try:
1✔
2649
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
2650
        except MethodNotAllowed:
×
2651
            raise InvalidRequest(
×
2652
                "The source of a copy request may not specifically refer to a delete marker by version id."
2653
            )
2654

2655
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
2656
            raise InvalidObjectState(
×
2657
                "Operation is not valid for the source object's storage class",
2658
                StorageClass=src_s3_object.storage_class,
2659
            )
2660

2661
        upload_id = request.get("UploadId")
1✔
2662
        if (
1✔
2663
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2664
            or s3_multipart.object.key != dest_key
2665
        ):
2666
            raise NoSuchUpload(
×
2667
                "The specified upload does not exist. "
2668
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2669
                UploadId=upload_id,
2670
            )
2671

2672
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2673
            raise InvalidArgument(
×
2674
                "Part number must be an integer between 1 and 10000, inclusive",
2675
                ArgumentName="partNumber",
2676
                ArgumentValue=part_number,
2677
            )
2678

2679
        source_range = request.get("CopySourceRange")
1✔
2680
        # TODO implement copy source IF
2681

2682
        range_data: ObjectRange | None = None
1✔
2683
        if source_range:
1✔
2684
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2685

2686
        if precondition := get_failed_upload_part_copy_source_preconditions(
1✔
2687
            request, src_s3_object.last_modified, src_s3_object.etag
2688
        ):
2689
            raise PreconditionFailed(
1✔
2690
                "At least one of the pre-conditions you specified did not hold",
2691
                Condition=precondition,
2692
            )
2693

2694
        s3_part = S3Part(part_number=part_number)
1✔
2695
        if s3_multipart.checksum_algorithm:
1✔
2696
            s3_part.checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2697

2698
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2699
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2700

2701
        s3_multipart.parts[str(part_number)] = s3_part
1✔
2702

2703
        # TODO: return those fields
2704
        #     RequestCharged: Optional[RequestCharged]
2705

2706
        result = CopyPartResult(
1✔
2707
            ETag=s3_part.quoted_etag,
2708
            LastModified=s3_part.last_modified,
2709
        )
2710

2711
        response = UploadPartCopyOutput(
1✔
2712
            CopyPartResult=result,
2713
        )
2714

2715
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
2716
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2717

2718
        if s3_part.checksum_algorithm:
1✔
2719
            result[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2720

2721
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2722

2723
        return response
1✔
2724

2725
    def complete_multipart_upload(
1✔
2726
        self,
2727
        context: RequestContext,
2728
        bucket: BucketName,
2729
        key: ObjectKey,
2730
        upload_id: MultipartUploadId,
2731
        multipart_upload: CompletedMultipartUpload = None,
2732
        checksum_crc32: ChecksumCRC32 = None,
2733
        checksum_crc32_c: ChecksumCRC32C = None,
2734
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2735
        checksum_sha1: ChecksumSHA1 = None,
2736
        checksum_sha256: ChecksumSHA256 = None,
2737
        checksum_type: ChecksumType = None,
2738
        mpu_object_size: MpuObjectSize = None,
2739
        request_payer: RequestPayer = None,
2740
        expected_bucket_owner: AccountId = None,
2741
        if_match: IfMatch = None,
2742
        if_none_match: IfNoneMatch = None,
2743
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2744
        sse_customer_key: SSECustomerKey = None,
2745
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2746
        **kwargs,
2747
    ) -> CompleteMultipartUploadOutput:
2748
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2749

2750
        if (
1✔
2751
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2752
            or s3_multipart.object.key != key
2753
        ):
2754
            raise NoSuchUpload(
1✔
2755
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2756
                UploadId=upload_id,
2757
            )
2758

2759
        if if_none_match and if_match:
1✔
2760
            raise NotImplementedException(
2761
                "A header you provided implies functionality that is not implemented",
2762
                Header="If-Match,If-None-Match",
2763
                additionalMessage="Multiple conditional request headers present in the request",
2764
            )
2765

2766
        elif if_none_match:
1✔
2767
            # TODO: improve concurrency mechanism for `if_none_match` and `if_match`
2768
            if if_none_match != "*":
1✔
2769
                raise NotImplementedException(
2770
                    "A header you provided implies functionality that is not implemented",
2771
                    Header="If-None-Match",
2772
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2773
                )
2774
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2775
                raise PreconditionFailed(
1✔
2776
                    "At least one of the pre-conditions you specified did not hold",
2777
                    Condition="If-None-Match",
2778
                )
2779
            elif s3_multipart.precondition:
1✔
2780
                raise ConditionalRequestConflict(
1✔
2781
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2782
                    Condition="If-None-Match",
2783
                    Key=key,
2784
                )
2785

2786
        elif if_match:
1✔
2787
            if if_match == "*":
1✔
2788
                raise NotImplementedException(
2789
                    "A header you provided implies functionality that is not implemented",
2790
                    Header="If-None-Match",
2791
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2792
                )
2793
            verify_object_equality_precondition_write(
1✔
2794
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2795
            )
2796

2797
        parts = multipart_upload.get("Parts", [])
1✔
2798
        if not parts:
1✔
2799
            raise InvalidRequest("You must specify at least one part")
1✔
2800

2801
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2802
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2803
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2804
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2805
        if sorted(parts_numbers) != parts_numbers:
1✔
2806
            raise InvalidPartOrder(
1✔
2807
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2808
                UploadId=upload_id,
2809
            )
2810

2811
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2812
        mpu_checksum_type = s3_multipart.checksum_type
1✔
2813

2814
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2815
            raise InvalidRequest(
1✔
2816
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2817
                f"The complete request must use the same checksum mode."
2818
            )
2819

2820
        # generate the versionId before completing, in case the bucket versioning status has changed between
2821
        # creation and completion? AWS validate this
2822
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2823
        s3_multipart.object.version_id = version_id
1✔
2824

2825
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2826
        # persistence. if we do not have a new version, do not validate those parameters
2827
        # TODO: remove for next major version (minor?)
2828
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2829
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2830
            checksum_map = {
1✔
2831
                "crc32": checksum_crc32,
2832
                "crc32c": checksum_crc32_c,
2833
                "crc64nvme": checksum_crc64_nvme,
2834
                "sha1": checksum_sha1,
2835
                "sha256": checksum_sha256,
2836
            }
2837
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2838
            s3_multipart.complete_multipart(
1✔
2839
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2840
            )
2841
            if mpu_checksum_algorithm and (
1✔
2842
                (
2843
                    checksum_value
2844
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2845
                    and not checksum_type
2846
                )
2847
                or any(
2848
                    checksum_value
2849
                    for checksum_type, checksum_value in checksum_map.items()
2850
                    if checksum_type != checksum_algorithm
2851
                )
2852
            ):
2853
                # this is not ideal, but this validation comes last... after the validation of individual parts
2854
                s3_multipart.object.parts.clear()
1✔
2855
                raise BadDigest(
1✔
2856
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2857
                )
2858
        else:
2859
            s3_multipart.complete_multipart(parts)
×
2860

2861
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2862
        stored_multipart.complete_multipart(
1✔
2863
            [s3_multipart.parts.get(str(part_number)) for part_number in parts_numbers]
2864
        )
2865
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2866
            with self._storage_backend.open(
1✔
2867
                bucket, s3_multipart.object, mode="r"
2868
            ) as s3_stored_object:
2869
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2870
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2871

2872
        s3_object = s3_multipart.object
1✔
2873

2874
        s3_bucket.objects.set(key, s3_object)
1✔
2875

2876
        # remove the multipart now that it's complete
2877
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2878
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2879

2880
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2881
        self._remove_all_object_tags(store, key_id)
1✔
2882
        if s3_multipart.tagging:
1✔
2883
            self._create_object_tags(store, key_id, s3_multipart.tagging)
×
2884

2885
        # RequestCharged: Optional[RequestCharged] TODO
2886

2887
        response = CompleteMultipartUploadOutput(
1✔
2888
            Bucket=bucket,
2889
            Key=key,
2890
            ETag=s3_object.quoted_etag,
2891
            Location=get_url_encoded_object_location(bucket, key),
2892
        )
2893

2894
        if s3_object.version_id:
1✔
2895
            response["VersionId"] = s3_object.version_id
×
2896

2897
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2898
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2899
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2900
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2901
            response["ChecksumType"] = s3_object.checksum_type
1✔
2902

2903
        if s3_object.expiration:
1✔
2904
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2905

2906
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2907

2908
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2909

2910
        return response
1✔
2911

2912
    def abort_multipart_upload(
1✔
2913
        self,
2914
        context: RequestContext,
2915
        bucket: BucketName,
2916
        key: ObjectKey,
2917
        upload_id: MultipartUploadId,
2918
        request_payer: RequestPayer = None,
2919
        expected_bucket_owner: AccountId = None,
2920
        if_match_initiated_time: IfMatchInitiatedTime = None,
2921
        **kwargs,
2922
    ) -> AbortMultipartUploadOutput:
2923
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2924

2925
        if (
1✔
2926
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2927
            or s3_multipart.object.key != key
2928
        ):
2929
            raise NoSuchUpload(
1✔
2930
                "The specified upload does not exist. "
2931
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2932
                UploadId=upload_id,
2933
            )
2934
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2935

2936
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2937
        response = AbortMultipartUploadOutput()
1✔
2938
        # TODO: requestCharged
2939
        return response
1✔
2940

2941
    def list_parts(
1✔
2942
        self,
2943
        context: RequestContext,
2944
        bucket: BucketName,
2945
        key: ObjectKey,
2946
        upload_id: MultipartUploadId,
2947
        max_parts: MaxParts = None,
2948
        part_number_marker: PartNumberMarker = None,
2949
        request_payer: RequestPayer = None,
2950
        expected_bucket_owner: AccountId = None,
2951
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2952
        sse_customer_key: SSECustomerKey = None,
2953
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2954
        **kwargs,
2955
    ) -> ListPartsOutput:
2956
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2957

2958
        if (
1✔
2959
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2960
            or s3_multipart.object.key != key
2961
        ):
2962
            raise NoSuchUpload(
1✔
2963
                "The specified upload does not exist. "
2964
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2965
                UploadId=upload_id,
2966
            )
2967

2968
        count = 0
1✔
2969
        is_truncated = False
1✔
2970
        part_number_marker = part_number_marker or 0
1✔
2971
        max_parts = max_parts or 1000
1✔
2972

2973
        parts = []
1✔
2974
        all_parts = sorted(
1✔
2975
            (int(part_number), part) for part_number, part in s3_multipart.parts.items()
2976
        )
2977
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2978
        for part_number, part in all_parts:
1✔
2979
            if part_number <= part_number_marker:
1✔
2980
                continue
1✔
2981
            part_item = Part(
1✔
2982
                ETag=part.quoted_etag,
2983
                LastModified=part.last_modified,
2984
                PartNumber=part_number,
2985
                Size=part.size,
2986
            )
2987
            if s3_multipart.checksum_algorithm and part.checksum_algorithm:
1✔
2988
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2989

2990
            parts.append(part_item)
1✔
2991
            count += 1
1✔
2992

2993
            if count >= max_parts and part.part_number != last_part_number:
1✔
2994
                is_truncated = True
1✔
2995
                break
1✔
2996

2997
        response = ListPartsOutput(
1✔
2998
            Bucket=bucket,
2999
            Key=key,
3000
            UploadId=upload_id,
3001
            Initiator=s3_multipart.initiator,
3002
            Owner=s3_multipart.object.owner,
3003
            StorageClass=s3_multipart.object.storage_class,
3004
            IsTruncated=is_truncated,
3005
            MaxParts=max_parts,
3006
            PartNumberMarker=0,
3007
            NextPartNumberMarker=0,
3008
        )
3009
        if parts:
1✔
3010
            response["Parts"] = parts
1✔
3011
            last_part = parts[-1]["PartNumber"]
1✔
3012
            response["NextPartNumberMarker"] = last_part
1✔
3013

3014
        if part_number_marker:
1✔
3015
            response["PartNumberMarker"] = part_number_marker
1✔
3016
        if s3_multipart.checksum_algorithm:
1✔
3017
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
3018
            response["ChecksumType"] = s3_multipart.checksum_type
1✔
3019

3020
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
3021
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
3022
        #     RequestCharged: Optional[RequestCharged]
3023

3024
        return response
1✔
3025

3026
    def list_multipart_uploads(
1✔
3027
        self,
3028
        context: RequestContext,
3029
        bucket: BucketName,
3030
        delimiter: Delimiter = None,
3031
        encoding_type: EncodingType = None,
3032
        key_marker: KeyMarker = None,
3033
        max_uploads: MaxUploads = None,
3034
        prefix: Prefix = None,
3035
        upload_id_marker: UploadIdMarker = None,
3036
        expected_bucket_owner: AccountId = None,
3037
        request_payer: RequestPayer = None,
3038
        **kwargs,
3039
    ) -> ListMultipartUploadsOutput:
3040
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3041
        validate_encoding_type(encoding_type)
1✔
3042

3043
        common_prefixes = set()
1✔
3044
        count = 0
1✔
3045
        is_truncated = False
1✔
3046
        max_uploads = max_uploads or 1000
1✔
3047
        prefix = prefix or ""
1✔
3048
        delimiter = delimiter or ""
1✔
3049
        if encoding_type == EncodingType.url:
1✔
3050
            prefix = urlparse.quote(prefix)
1✔
3051
            delimiter = urlparse.quote(delimiter)
1✔
3052
        upload_id_marker_found = False
1✔
3053

3054
        if key_marker and upload_id_marker:
1✔
3055
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
3056
            if multipart:
1✔
3057
                key = (
1✔
3058
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
3059
                )
3060
            else:
3061
                # set key to None so it fails if the multipart is not Found
3062
                key = None
×
3063

3064
            if key_marker != key:
1✔
3065
                raise InvalidArgument(
1✔
3066
                    "Invalid uploadId marker",
3067
                    ArgumentName="upload-id-marker",
3068
                    ArgumentValue=upload_id_marker,
3069
                )
3070

3071
        uploads = []
1✔
3072
        # sort by key and initiated
3073
        all_multiparts = sorted(
1✔
3074
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
3075
        )
3076
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
3077

3078
        for multipart in all_multiparts:
1✔
3079
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
3080
            # skip all keys that are different than key_marker
3081
            if key_marker:
1✔
3082
                if key < key_marker:
1✔
3083
                    continue
1✔
3084
                elif key == key_marker:
1✔
3085
                    if not upload_id_marker:
1✔
3086
                        continue
1✔
3087
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
3088
                    if multipart.id == upload_id_marker:
1✔
3089
                        upload_id_marker_found = True
1✔
3090
                        continue
1✔
3091
                    elif not upload_id_marker_found:
1✔
3092
                        # as long as we have not passed the version_key_marker, skip the versions
3093
                        continue
1✔
3094

3095
            # Filter for keys that start with prefix
3096
            if prefix and not key.startswith(prefix):
1✔
3097
                continue
1✔
3098

3099
            # see ListObjectsV2 for the logic comments (shared logic here)
3100
            prefix_including_delimiter = None
1✔
3101
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
3102
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
3103
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
3104

3105
                if prefix_including_delimiter in common_prefixes or (
1✔
3106
                    key_marker and key_marker.startswith(prefix_including_delimiter)
3107
                ):
3108
                    continue
1✔
3109

3110
            if prefix_including_delimiter:
1✔
3111
                common_prefixes.add(prefix_including_delimiter)
1✔
3112
            else:
3113
                multipart_upload = MultipartUpload(
1✔
3114
                    UploadId=multipart.id,
3115
                    Key=multipart.object.key,
3116
                    Initiated=multipart.initiated,
3117
                    StorageClass=multipart.object.storage_class,
3118
                    Owner=multipart.object.owner,
3119
                    Initiator=multipart.initiator,
3120
                )
3121
                if multipart.checksum_algorithm:
1✔
3122
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
3123
                    multipart_upload["ChecksumType"] = multipart.checksum_type
1✔
3124

3125
                uploads.append(multipart_upload)
1✔
3126

3127
            count += 1
1✔
3128
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
3129
                is_truncated = True
1✔
3130
                break
1✔
3131

3132
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
3133

3134
        response = ListMultipartUploadsOutput(
1✔
3135
            Bucket=bucket,
3136
            IsTruncated=is_truncated,
3137
            MaxUploads=max_uploads or 1000,
3138
            KeyMarker=key_marker or "",
3139
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
3140
            NextKeyMarker="",
3141
            NextUploadIdMarker="",
3142
        )
3143
        if uploads:
1✔
3144
            response["Uploads"] = uploads
1✔
3145
            last_upload = uploads[-1]
1✔
3146
            response["NextKeyMarker"] = last_upload["Key"]
1✔
3147
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
3148
        if delimiter:
1✔
3149
            response["Delimiter"] = delimiter
1✔
3150
        if prefix:
1✔
3151
            response["Prefix"] = prefix
1✔
3152
        if encoding_type:
1✔
3153
            response["EncodingType"] = EncodingType.url
1✔
3154
        if common_prefixes:
1✔
3155
            response["CommonPrefixes"] = common_prefixes
1✔
3156

3157
        return response
1✔
3158

3159
    def put_bucket_versioning(
1✔
3160
        self,
3161
        context: RequestContext,
3162
        bucket: BucketName,
3163
        versioning_configuration: VersioningConfiguration,
3164
        content_md5: ContentMD5 = None,
3165
        checksum_algorithm: ChecksumAlgorithm = None,
3166
        mfa: MFA = None,
3167
        expected_bucket_owner: AccountId = None,
3168
        **kwargs,
3169
    ) -> None:
3170
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3171
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
3172
            raise CommonServiceException(
1✔
3173
                code="IllegalVersioningConfigurationException",
3174
                message="The Versioning element must be specified",
3175
            )
3176

3177
        if versioning_status not in ("Enabled", "Suspended"):
1✔
3178
            raise MalformedXML()
1✔
3179

3180
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
3181
            raise InvalidBucketState(
1✔
3182
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
3183
            )
3184

3185
        if not s3_bucket.versioning_status:
1✔
3186
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
3187

3188
        s3_bucket.versioning_status = versioning_status
1✔
3189

3190
    def get_bucket_versioning(
1✔
3191
        self,
3192
        context: RequestContext,
3193
        bucket: BucketName,
3194
        expected_bucket_owner: AccountId = None,
3195
        **kwargs,
3196
    ) -> GetBucketVersioningOutput:
3197
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3198

3199
        if not s3_bucket.versioning_status:
1✔
3200
            return GetBucketVersioningOutput()
1✔
3201

3202
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
3203

3204
    def get_bucket_encryption(
1✔
3205
        self,
3206
        context: RequestContext,
3207
        bucket: BucketName,
3208
        expected_bucket_owner: AccountId = None,
3209
        **kwargs,
3210
    ) -> GetBucketEncryptionOutput:
3211
        # AWS now encrypts bucket by default with AES256, see:
3212
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
3213
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3214

3215
        if not s3_bucket.encryption_rule:
1✔
3216
            return GetBucketEncryptionOutput()
×
3217

3218
        return GetBucketEncryptionOutput(
1✔
3219
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
3220
        )
3221

3222
    def put_bucket_encryption(
1✔
3223
        self,
3224
        context: RequestContext,
3225
        bucket: BucketName,
3226
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
3227
        content_md5: ContentMD5 = None,
3228
        checksum_algorithm: ChecksumAlgorithm = None,
3229
        expected_bucket_owner: AccountId = None,
3230
        **kwargs,
3231
    ) -> None:
3232
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3233

3234
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
3235
            raise MalformedXML()
1✔
3236

3237
        if len(rules) != 1 or not (
1✔
3238
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
3239
        ):
3240
            raise MalformedXML()
1✔
3241

3242
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
3243
            raise MalformedXML()
×
3244

3245
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
3246
            raise MalformedXML()
×
3247

3248
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
3249
            raise InvalidArgument(
1✔
3250
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
3251
                ArgumentName="ApplyServerSideEncryptionByDefault",
3252
            )
3253
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
3254
        # TODO: validate KMS key? not currently done in moto
3255
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
3256
        # It's always saved as the ARN in the bucket configuration.
3257
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
3258
        # encryption["KMSMasterKeyID"] = master_kms_key
3259

3260
        s3_bucket.encryption_rule = rules[0]
1✔
3261

3262
    def delete_bucket_encryption(
1✔
3263
        self,
3264
        context: RequestContext,
3265
        bucket: BucketName,
3266
        expected_bucket_owner: AccountId = None,
3267
        **kwargs,
3268
    ) -> None:
3269
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3270

3271
        s3_bucket.encryption_rule = None
1✔
3272

3273
    def put_bucket_notification_configuration(
1✔
3274
        self,
3275
        context: RequestContext,
3276
        bucket: BucketName,
3277
        notification_configuration: NotificationConfiguration,
3278
        expected_bucket_owner: AccountId = None,
3279
        skip_destination_validation: SkipValidation = None,
3280
        **kwargs,
3281
    ) -> None:
3282
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3283

3284
        self._verify_notification_configuration(
1✔
3285
            notification_configuration, skip_destination_validation, context, bucket
3286
        )
3287
        s3_bucket.notification_configuration = notification_configuration
1✔
3288

3289
    def get_bucket_notification_configuration(
1✔
3290
        self,
3291
        context: RequestContext,
3292
        bucket: BucketName,
3293
        expected_bucket_owner: AccountId = None,
3294
        **kwargs,
3295
    ) -> NotificationConfiguration:
3296
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3297

3298
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3299

3300
    def put_bucket_tagging(
1✔
3301
        self,
3302
        context: RequestContext,
3303
        bucket: BucketName,
3304
        tagging: Tagging,
3305
        content_md5: ContentMD5 = None,
3306
        checksum_algorithm: ChecksumAlgorithm = None,
3307
        expected_bucket_owner: AccountId = None,
3308
        **kwargs,
3309
    ) -> None:
3310
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3311

3312
        if "TagSet" not in tagging:
1✔
3313
            raise MalformedXML()
×
3314

3315
        tag_set = tagging["TagSet"] or []
1✔
3316
        validate_tag_set(tag_set, type_set="bucket")
1✔
3317

3318
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3319
        self._remove_all_bucket_tags(s3_bucket)
1✔
3320
        self._create_bucket_tags(s3_bucket, tag_set)
1✔
3321

3322
    def get_bucket_tagging(
1✔
3323
        self,
3324
        context: RequestContext,
3325
        bucket: BucketName,
3326
        expected_bucket_owner: AccountId = None,
3327
        **kwargs,
3328
    ) -> GetBucketTaggingOutput:
3329
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3330
        tag_set = self._list_bucket_tags(s3_bucket)
1✔
3331
        if not tag_set:
1✔
3332
            raise NoSuchTagSet(
1✔
3333
                "The TagSet does not exist",
3334
                BucketName=bucket,
3335
            )
3336

3337
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3338

3339
    def delete_bucket_tagging(
1✔
3340
        self,
3341
        context: RequestContext,
3342
        bucket: BucketName,
3343
        expected_bucket_owner: AccountId = None,
3344
        **kwargs,
3345
    ) -> None:
3346
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3347

3348
        # This operation doesn't remove the tags from the store like deleting a resource does, it just sets them as empty.
3349
        self._remove_all_bucket_tags(s3_bucket)
1✔
3350
        self._create_bucket_tags(s3_bucket, [])
1✔
3351

3352
    def put_object_tagging(
1✔
3353
        self,
3354
        context: RequestContext,
3355
        bucket: BucketName,
3356
        key: ObjectKey,
3357
        tagging: Tagging,
3358
        version_id: ObjectVersionId = None,
3359
        content_md5: ContentMD5 = None,
3360
        checksum_algorithm: ChecksumAlgorithm = None,
3361
        expected_bucket_owner: AccountId = None,
3362
        request_payer: RequestPayer = None,
3363
        **kwargs,
3364
    ) -> PutObjectTaggingOutput:
3365
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3366

3367
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3368

3369
        if "TagSet" not in tagging:
1✔
3370
            raise MalformedXML()
×
3371

3372
        tag_set = tagging["TagSet"] or []
1✔
3373
        validate_tag_set(tag_set, type_set="object")
1✔
3374

3375
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3376
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3377
        self._remove_all_object_tags(store, key_id)
1✔
3378
        self._create_object_tags(store, key_id, {tag["Key"]: tag["Value"] for tag in tag_set})
1✔
3379
        response = PutObjectTaggingOutput()
1✔
3380
        if s3_object.version_id:
1✔
3381
            response["VersionId"] = s3_object.version_id
1✔
3382

3383
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3384

3385
        return response
1✔
3386

3387
    def get_object_tagging(
1✔
3388
        self,
3389
        context: RequestContext,
3390
        bucket: BucketName,
3391
        key: ObjectKey,
3392
        version_id: ObjectVersionId = None,
3393
        expected_bucket_owner: AccountId = None,
3394
        request_payer: RequestPayer = None,
3395
        **kwargs,
3396
    ) -> GetObjectTaggingOutput:
3397
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3398

3399
        try:
1✔
3400
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3401
        except NoSuchKey as e:
1✔
3402
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3403
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3404
            # `S3Bucket.get_object` signature for one operation.
3405
            if s3_bucket.versioning_status and (
1✔
3406
                s3_object_version := s3_bucket.objects.get(key, version_id)
3407
            ):
3408
                raise MethodNotAllowed(
1✔
3409
                    "The specified method is not allowed against this resource.",
3410
                    Method="GET",
3411
                    ResourceType="DeleteMarker",
3412
                    DeleteMarker=True,
3413
                    Allow="DELETE",
3414
                    VersionId=s3_object_version.version_id,
3415
                )
3416

3417
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3418
            # follow AWS on this one
3419
            e.Key = f"{bucket}/{key}"
1✔
3420
            raise e
1✔
3421

3422
        object_tags = self._list_object_tags(
1✔
3423
            store, get_unique_key_id(bucket, key, s3_object.version_id)
3424
        )
3425
        response = GetObjectTaggingOutput(
1✔
3426
            TagSet=[Tag(Key=key, Value=value) for key, value in object_tags.items()]
3427
        )
3428
        if s3_object.version_id:
1✔
3429
            response["VersionId"] = s3_object.version_id
1✔
3430

3431
        return response
1✔
3432

3433
    def delete_object_tagging(
1✔
3434
        self,
3435
        context: RequestContext,
3436
        bucket: BucketName,
3437
        key: ObjectKey,
3438
        version_id: ObjectVersionId = None,
3439
        expected_bucket_owner: AccountId = None,
3440
        **kwargs,
3441
    ) -> DeleteObjectTaggingOutput:
3442
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3443

3444
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3445

3446
        self._remove_all_object_tags(store, get_unique_key_id(bucket, key, s3_object.version_id))
1✔
3447
        response = DeleteObjectTaggingOutput()
1✔
3448
        if s3_object.version_id:
1✔
3449
            response["VersionId"] = s3_object.version_id
1✔
3450

3451
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3452

3453
        return response
1✔
3454

3455
    def put_bucket_cors(
1✔
3456
        self,
3457
        context: RequestContext,
3458
        bucket: BucketName,
3459
        cors_configuration: CORSConfiguration,
3460
        content_md5: ContentMD5 = None,
3461
        checksum_algorithm: ChecksumAlgorithm = None,
3462
        expected_bucket_owner: AccountId = None,
3463
        **kwargs,
3464
    ) -> None:
3465
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3466
        validate_cors_configuration(cors_configuration)
1✔
3467
        s3_bucket.cors_rules = cors_configuration
1✔
3468
        self._cors_handler.invalidate_cache()
1✔
3469

3470
    def get_bucket_cors(
1✔
3471
        self,
3472
        context: RequestContext,
3473
        bucket: BucketName,
3474
        expected_bucket_owner: AccountId = None,
3475
        **kwargs,
3476
    ) -> GetBucketCorsOutput:
3477
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3478

3479
        if not s3_bucket.cors_rules:
1✔
3480
            raise NoSuchCORSConfiguration(
1✔
3481
                "The CORS configuration does not exist",
3482
                BucketName=bucket,
3483
            )
3484
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3485

3486
    def delete_bucket_cors(
1✔
3487
        self,
3488
        context: RequestContext,
3489
        bucket: BucketName,
3490
        expected_bucket_owner: AccountId = None,
3491
        **kwargs,
3492
    ) -> None:
3493
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3494

3495
        if s3_bucket.cors_rules:
1✔
3496
            self._cors_handler.invalidate_cache()
1✔
3497
            s3_bucket.cors_rules = None
1✔
3498

3499
    def get_bucket_lifecycle_configuration(
1✔
3500
        self,
3501
        context: RequestContext,
3502
        bucket: BucketName,
3503
        expected_bucket_owner: AccountId = None,
3504
        **kwargs,
3505
    ) -> GetBucketLifecycleConfigurationOutput:
3506
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3507

3508
        if not s3_bucket.lifecycle_rules:
1✔
3509
            raise NoSuchLifecycleConfiguration(
1✔
3510
                "The lifecycle configuration does not exist",
3511
                BucketName=bucket,
3512
            )
3513

3514
        return GetBucketLifecycleConfigurationOutput(
1✔
3515
            Rules=s3_bucket.lifecycle_rules,
3516
            TransitionDefaultMinimumObjectSize=s3_bucket.transition_default_minimum_object_size,
3517
        )
3518

3519
    def put_bucket_lifecycle_configuration(
1✔
3520
        self,
3521
        context: RequestContext,
3522
        bucket: BucketName,
3523
        checksum_algorithm: ChecksumAlgorithm = None,
3524
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3525
        expected_bucket_owner: AccountId = None,
3526
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3527
        **kwargs,
3528
    ) -> PutBucketLifecycleConfigurationOutput:
3529
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3530

3531
        transition_min_obj_size = (
1✔
3532
            transition_default_minimum_object_size
3533
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3534
        )
3535

3536
        if transition_min_obj_size not in (
1✔
3537
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3538
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3539
        ):
3540
            raise InvalidRequest(
1✔
3541
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3542
            )
3543

3544
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3545
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3546
        #  everytime we get/head an object
3547
        # for now, we keep a cache and get it everytime we fetch an object
3548
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3549
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3550
        self._expiration_cache[bucket].clear()
1✔
3551
        return PutBucketLifecycleConfigurationOutput(
1✔
3552
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3553
        )
3554

3555
    def delete_bucket_lifecycle(
1✔
3556
        self,
3557
        context: RequestContext,
3558
        bucket: BucketName,
3559
        expected_bucket_owner: AccountId = None,
3560
        **kwargs,
3561
    ) -> None:
3562
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3563

3564
        s3_bucket.lifecycle_rules = None
1✔
3565
        self._expiration_cache[bucket].clear()
1✔
3566

3567
    def put_bucket_analytics_configuration(
1✔
3568
        self,
3569
        context: RequestContext,
3570
        bucket: BucketName,
3571
        id: AnalyticsId,
3572
        analytics_configuration: AnalyticsConfiguration,
3573
        expected_bucket_owner: AccountId = None,
3574
        **kwargs,
3575
    ) -> None:
3576
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3577

3578
        validate_bucket_analytics_configuration(
1✔
3579
            id=id, analytics_configuration=analytics_configuration
3580
        )
3581

3582
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3583

3584
    def get_bucket_analytics_configuration(
1✔
3585
        self,
3586
        context: RequestContext,
3587
        bucket: BucketName,
3588
        id: AnalyticsId,
3589
        expected_bucket_owner: AccountId = None,
3590
        **kwargs,
3591
    ) -> GetBucketAnalyticsConfigurationOutput:
3592
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3593

3594
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3595
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3596

3597
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3598

3599
    def list_bucket_analytics_configurations(
1✔
3600
        self,
3601
        context: RequestContext,
3602
        bucket: BucketName,
3603
        continuation_token: Token = None,
3604
        expected_bucket_owner: AccountId = None,
3605
        **kwargs,
3606
    ) -> ListBucketAnalyticsConfigurationsOutput:
3607
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3608

3609
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3610
            IsTruncated=False,
3611
            AnalyticsConfigurationList=sorted(
3612
                s3_bucket.analytics_configurations.values(),
3613
                key=itemgetter("Id"),
3614
            ),
3615
        )
3616

3617
    def delete_bucket_analytics_configuration(
1✔
3618
        self,
3619
        context: RequestContext,
3620
        bucket: BucketName,
3621
        id: AnalyticsId,
3622
        expected_bucket_owner: AccountId = None,
3623
        **kwargs,
3624
    ) -> None:
3625
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3626

3627
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3628
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3629

3630
    def put_bucket_intelligent_tiering_configuration(
1✔
3631
        self,
3632
        context: RequestContext,
3633
        bucket: BucketName,
3634
        id: IntelligentTieringId,
3635
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3636
        expected_bucket_owner: AccountId | None = None,
3637
        **kwargs,
3638
    ) -> None:
3639
        # TODO add support for expected_bucket_owner
3640
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3641

3642
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3643

3644
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3645

3646
    def get_bucket_intelligent_tiering_configuration(
1✔
3647
        self,
3648
        context: RequestContext,
3649
        bucket: BucketName,
3650
        id: IntelligentTieringId,
3651
        expected_bucket_owner: AccountId | None = None,
3652
        **kwargs,
3653
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3654
        # TODO add support for expected_bucket_owner
3655
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3656

3657
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
3658
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3659

3660
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3661
            IntelligentTieringConfiguration=itier_config
3662
        )
3663

3664
    def delete_bucket_intelligent_tiering_configuration(
1✔
3665
        self,
3666
        context: RequestContext,
3667
        bucket: BucketName,
3668
        id: IntelligentTieringId,
3669
        expected_bucket_owner: AccountId | None = None,
3670
        **kwargs,
3671
    ) -> None:
3672
        # TODO add support for expected_bucket_owner
3673
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3674

3675
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3676
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3677

3678
    def list_bucket_intelligent_tiering_configurations(
1✔
3679
        self,
3680
        context: RequestContext,
3681
        bucket: BucketName,
3682
        continuation_token: Token | None = None,
3683
        expected_bucket_owner: AccountId | None = None,
3684
        **kwargs,
3685
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3686
        # TODO add support for expected_bucket_owner
3687
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3688

3689
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3690
            IsTruncated=False,
3691
            IntelligentTieringConfigurationList=sorted(
3692
                s3_bucket.intelligent_tiering_configurations.values(),
3693
                key=itemgetter("Id"),
3694
            ),
3695
        )
3696

3697
    def put_bucket_inventory_configuration(
1✔
3698
        self,
3699
        context: RequestContext,
3700
        bucket: BucketName,
3701
        id: InventoryId,
3702
        inventory_configuration: InventoryConfiguration,
3703
        expected_bucket_owner: AccountId = None,
3704
        **kwargs,
3705
    ) -> None:
3706
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3707

3708
        validate_inventory_configuration(
1✔
3709
            config_id=id, inventory_configuration=inventory_configuration
3710
        )
3711
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3712

3713
    def get_bucket_inventory_configuration(
1✔
3714
        self,
3715
        context: RequestContext,
3716
        bucket: BucketName,
3717
        id: InventoryId,
3718
        expected_bucket_owner: AccountId = None,
3719
        **kwargs,
3720
    ) -> GetBucketInventoryConfigurationOutput:
3721
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3722

3723
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3724
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3725
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3726

3727
    def list_bucket_inventory_configurations(
1✔
3728
        self,
3729
        context: RequestContext,
3730
        bucket: BucketName,
3731
        continuation_token: Token = None,
3732
        expected_bucket_owner: AccountId = None,
3733
        **kwargs,
3734
    ) -> ListBucketInventoryConfigurationsOutput:
3735
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3736

3737
        return ListBucketInventoryConfigurationsOutput(
1✔
3738
            IsTruncated=False,
3739
            InventoryConfigurationList=sorted(
3740
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3741
            ),
3742
        )
3743

3744
    def delete_bucket_inventory_configuration(
1✔
3745
        self,
3746
        context: RequestContext,
3747
        bucket: BucketName,
3748
        id: InventoryId,
3749
        expected_bucket_owner: AccountId = None,
3750
        **kwargs,
3751
    ) -> None:
3752
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3753

3754
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
3755
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3756

3757
    def get_bucket_website(
1✔
3758
        self,
3759
        context: RequestContext,
3760
        bucket: BucketName,
3761
        expected_bucket_owner: AccountId = None,
3762
        **kwargs,
3763
    ) -> GetBucketWebsiteOutput:
3764
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3765

3766
        if not s3_bucket.website_configuration:
1✔
3767
            raise NoSuchWebsiteConfiguration(
1✔
3768
                "The specified bucket does not have a website configuration",
3769
                BucketName=bucket,
3770
            )
3771
        return s3_bucket.website_configuration
1✔
3772

3773
    def put_bucket_website(
1✔
3774
        self,
3775
        context: RequestContext,
3776
        bucket: BucketName,
3777
        website_configuration: WebsiteConfiguration,
3778
        content_md5: ContentMD5 = None,
3779
        checksum_algorithm: ChecksumAlgorithm = None,
3780
        expected_bucket_owner: AccountId = None,
3781
        **kwargs,
3782
    ) -> None:
3783
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3784

3785
        validate_website_configuration(website_configuration)
1✔
3786
        s3_bucket.website_configuration = website_configuration
1✔
3787

3788
    def delete_bucket_website(
1✔
3789
        self,
3790
        context: RequestContext,
3791
        bucket: BucketName,
3792
        expected_bucket_owner: AccountId = None,
3793
        **kwargs,
3794
    ) -> None:
3795
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3796
        # does not raise error if the bucket did not have a config, will simply return
3797
        s3_bucket.website_configuration = None
1✔
3798

3799
    def get_object_lock_configuration(
1✔
3800
        self,
3801
        context: RequestContext,
3802
        bucket: BucketName,
3803
        expected_bucket_owner: AccountId = None,
3804
        **kwargs,
3805
    ) -> GetObjectLockConfigurationOutput:
3806
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3807
        if not s3_bucket.object_lock_enabled:
1✔
3808
            raise ObjectLockConfigurationNotFoundError(
1✔
3809
                "Object Lock configuration does not exist for this bucket",
3810
                BucketName=bucket,
3811
            )
3812

3813
        response = GetObjectLockConfigurationOutput(
1✔
3814
            ObjectLockConfiguration=ObjectLockConfiguration(
3815
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3816
            )
3817
        )
3818
        if s3_bucket.object_lock_default_retention:
1✔
3819
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3820
                "DefaultRetention": s3_bucket.object_lock_default_retention
3821
            }
3822

3823
        return response
1✔
3824

3825
    def put_object_lock_configuration(
1✔
3826
        self,
3827
        context: RequestContext,
3828
        bucket: BucketName,
3829
        object_lock_configuration: ObjectLockConfiguration = None,
3830
        request_payer: RequestPayer = None,
3831
        token: ObjectLockToken = None,
3832
        content_md5: ContentMD5 = None,
3833
        checksum_algorithm: ChecksumAlgorithm = None,
3834
        expected_bucket_owner: AccountId = None,
3835
        **kwargs,
3836
    ) -> PutObjectLockConfigurationOutput:
3837
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3838
        if s3_bucket.versioning_status != "Enabled":
1✔
3839
            raise InvalidBucketState(
1✔
3840
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3841
            )
3842

3843
        if (
1✔
3844
            not object_lock_configuration
3845
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3846
        ):
3847
            raise MalformedXML()
1✔
3848

3849
        if "Rule" not in object_lock_configuration:
1✔
3850
            s3_bucket.object_lock_default_retention = None
1✔
3851
            if not s3_bucket.object_lock_enabled:
1✔
3852
                s3_bucket.object_lock_enabled = True
1✔
3853

3854
            return PutObjectLockConfigurationOutput()
1✔
3855
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3856
            default_retention := rule.get("DefaultRetention")
3857
        ):
3858
            raise MalformedXML()
1✔
3859

3860
        if "Mode" not in default_retention or (
1✔
3861
            ("Days" in default_retention and "Years" in default_retention)
3862
            or ("Days" not in default_retention and "Years" not in default_retention)
3863
        ):
3864
            raise MalformedXML()
1✔
3865

3866
        if default_retention["Mode"] not in OBJECT_LOCK_MODES:
1✔
3867
            raise MalformedXML()
1✔
3868

3869
        s3_bucket.object_lock_default_retention = default_retention
1✔
3870
        if not s3_bucket.object_lock_enabled:
1✔
3871
            s3_bucket.object_lock_enabled = True
×
3872

3873
        return PutObjectLockConfigurationOutput()
1✔
3874

3875
    def get_object_legal_hold(
1✔
3876
        self,
3877
        context: RequestContext,
3878
        bucket: BucketName,
3879
        key: ObjectKey,
3880
        version_id: ObjectVersionId = None,
3881
        request_payer: RequestPayer = None,
3882
        expected_bucket_owner: AccountId = None,
3883
        **kwargs,
3884
    ) -> GetObjectLegalHoldOutput:
3885
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3886
        if not s3_bucket.object_lock_enabled:
1✔
3887
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3888

3889
        s3_object = s3_bucket.get_object(
1✔
3890
            key=key,
3891
            version_id=version_id,
3892
            http_method="GET",
3893
        )
3894
        if not s3_object.lock_legal_status:
1✔
3895
            raise NoSuchObjectLockConfiguration(
1✔
3896
                "The specified object does not have a ObjectLock configuration"
3897
            )
3898

3899
        return GetObjectLegalHoldOutput(
1✔
3900
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3901
        )
3902

3903
    def put_object_legal_hold(
1✔
3904
        self,
3905
        context: RequestContext,
3906
        bucket: BucketName,
3907
        key: ObjectKey,
3908
        legal_hold: ObjectLockLegalHold = None,
3909
        request_payer: RequestPayer = None,
3910
        version_id: ObjectVersionId = None,
3911
        content_md5: ContentMD5 = None,
3912
        checksum_algorithm: ChecksumAlgorithm = None,
3913
        expected_bucket_owner: AccountId = None,
3914
        **kwargs,
3915
    ) -> PutObjectLegalHoldOutput:
3916
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3917

3918
        if not legal_hold:
1✔
3919
            raise MalformedXML()
1✔
3920

3921
        if not s3_bucket.object_lock_enabled:
1✔
3922
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3923

3924
        s3_object = s3_bucket.get_object(
1✔
3925
            key=key,
3926
            version_id=version_id,
3927
            http_method="PUT",
3928
        )
3929
        # TODO: check casing
3930
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
3931
            raise MalformedXML()
×
3932

3933
        s3_object.lock_legal_status = status
1✔
3934

3935
        # TODO: return RequestCharged
3936
        return PutObjectRetentionOutput()
1✔
3937

3938
    def get_object_retention(
1✔
3939
        self,
3940
        context: RequestContext,
3941
        bucket: BucketName,
3942
        key: ObjectKey,
3943
        version_id: ObjectVersionId = None,
3944
        request_payer: RequestPayer = None,
3945
        expected_bucket_owner: AccountId = None,
3946
        **kwargs,
3947
    ) -> GetObjectRetentionOutput:
3948
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3949
        if not s3_bucket.object_lock_enabled:
1✔
3950
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3951

3952
        s3_object = s3_bucket.get_object(
1✔
3953
            key=key,
3954
            version_id=version_id,
3955
            http_method="GET",
3956
        )
3957
        if not s3_object.lock_mode:
1✔
3958
            raise NoSuchObjectLockConfiguration(
1✔
3959
                "The specified object does not have a ObjectLock configuration"
3960
            )
3961

3962
        return GetObjectRetentionOutput(
1✔
3963
            Retention=ObjectLockRetention(
3964
                Mode=s3_object.lock_mode,
3965
                RetainUntilDate=s3_object.lock_until,
3966
            )
3967
        )
3968

3969
    def put_object_retention(
1✔
3970
        self,
3971
        context: RequestContext,
3972
        bucket: BucketName,
3973
        key: ObjectKey,
3974
        retention: ObjectLockRetention = None,
3975
        request_payer: RequestPayer = None,
3976
        version_id: ObjectVersionId = None,
3977
        bypass_governance_retention: BypassGovernanceRetention = None,
3978
        content_md5: ContentMD5 = None,
3979
        checksum_algorithm: ChecksumAlgorithm = None,
3980
        expected_bucket_owner: AccountId = None,
3981
        **kwargs,
3982
    ) -> PutObjectRetentionOutput:
3983
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3984
        if not s3_bucket.object_lock_enabled:
1✔
3985
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3986

3987
        s3_object = s3_bucket.get_object(
1✔
3988
            key=key,
3989
            version_id=version_id,
3990
            http_method="PUT",
3991
        )
3992

3993
        if retention and (
1✔
3994
            not validate_dict_fields(retention, required_fields={"Mode", "RetainUntilDate"})
3995
            or retention["Mode"] not in OBJECT_LOCK_MODES
3996
        ):
3997
            raise MalformedXML()
1✔
3998

3999
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
4000
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
4001
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
4002
            pst_datetime = retention["RetainUntilDate"].astimezone(
1✔
4003
                tz=ZoneInfo("America/Los_Angeles")
4004
            )
4005
            raise InvalidArgument(
1✔
4006
                "The retain until date must be in the future!",
4007
                ArgumentName="RetainUntilDate",
4008
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
4009
            )
4010

4011
        is_request_reducing_locking = (
1✔
4012
            not retention
4013
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
4014
            or (
4015
                retention["Mode"] == ObjectLockMode.GOVERNANCE
4016
                and s3_object.lock_mode == ObjectLockMode.COMPLIANCE
4017
            )
4018
        )
4019
        if is_request_reducing_locking and (
1✔
4020
            s3_object.lock_mode == ObjectLockMode.COMPLIANCE
4021
            or (
4022
                s3_object.lock_mode == ObjectLockMode.GOVERNANCE and not bypass_governance_retention
4023
            )
4024
        ):
4025
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
4026

4027
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
4028
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
4029

4030
        # TODO: return RequestCharged
4031
        return PutObjectRetentionOutput()
1✔
4032

4033
    def put_bucket_request_payment(
1✔
4034
        self,
4035
        context: RequestContext,
4036
        bucket: BucketName,
4037
        request_payment_configuration: RequestPaymentConfiguration,
4038
        content_md5: ContentMD5 = None,
4039
        checksum_algorithm: ChecksumAlgorithm = None,
4040
        expected_bucket_owner: AccountId = None,
4041
        **kwargs,
4042
    ) -> None:
4043
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4044
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4045

4046
        payer = request_payment_configuration.get("Payer")
1✔
4047
        if payer not in ["Requester", "BucketOwner"]:
1✔
4048
            raise MalformedXML()
1✔
4049

4050
        s3_bucket.payer = payer
1✔
4051

4052
    def get_bucket_request_payment(
1✔
4053
        self,
4054
        context: RequestContext,
4055
        bucket: BucketName,
4056
        expected_bucket_owner: AccountId = None,
4057
        **kwargs,
4058
    ) -> GetBucketRequestPaymentOutput:
4059
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4060
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4061

4062
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
4063

4064
    def get_bucket_ownership_controls(
1✔
4065
        self,
4066
        context: RequestContext,
4067
        bucket: BucketName,
4068
        expected_bucket_owner: AccountId = None,
4069
        **kwargs,
4070
    ) -> GetBucketOwnershipControlsOutput:
4071
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4072

4073
        if not s3_bucket.object_ownership:
1✔
4074
            raise OwnershipControlsNotFoundError(
1✔
4075
                "The bucket ownership controls were not found",
4076
                BucketName=bucket,
4077
            )
4078

4079
        return GetBucketOwnershipControlsOutput(
1✔
4080
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
4081
        )
4082

4083
    def put_bucket_ownership_controls(
1✔
4084
        self,
4085
        context: RequestContext,
4086
        bucket: BucketName,
4087
        ownership_controls: OwnershipControls,
4088
        content_md5: ContentMD5 | None = None,
4089
        expected_bucket_owner: AccountId | None = None,
4090
        checksum_algorithm: ChecksumAlgorithm | None = None,
4091
        **kwargs,
4092
    ) -> None:
4093
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4094
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
4095
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4096

4097
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
4098
            raise MalformedXML()
1✔
4099

4100
        rule = rules[0]
1✔
4101
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
4102
            raise MalformedXML()
1✔
4103

4104
        s3_bucket.object_ownership = object_ownership
1✔
4105

4106
    def delete_bucket_ownership_controls(
1✔
4107
        self,
4108
        context: RequestContext,
4109
        bucket: BucketName,
4110
        expected_bucket_owner: AccountId = None,
4111
        **kwargs,
4112
    ) -> None:
4113
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4114

4115
        s3_bucket.object_ownership = None
1✔
4116

4117
    def get_public_access_block(
1✔
4118
        self,
4119
        context: RequestContext,
4120
        bucket: BucketName,
4121
        expected_bucket_owner: AccountId = None,
4122
        **kwargs,
4123
    ) -> GetPublicAccessBlockOutput:
4124
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4125

4126
        if not s3_bucket.public_access_block:
1✔
4127
            raise NoSuchPublicAccessBlockConfiguration(
1✔
4128
                "The public access block configuration was not found", BucketName=bucket
4129
            )
4130

4131
        return GetPublicAccessBlockOutput(
1✔
4132
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
4133
        )
4134

4135
    def put_public_access_block(
1✔
4136
        self,
4137
        context: RequestContext,
4138
        bucket: BucketName,
4139
        public_access_block_configuration: PublicAccessBlockConfiguration,
4140
        content_md5: ContentMD5 = None,
4141
        checksum_algorithm: ChecksumAlgorithm = None,
4142
        expected_bucket_owner: AccountId = None,
4143
        **kwargs,
4144
    ) -> None:
4145
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4146
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
4147
        #  bucket configuration. See s3control
4148
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4149

4150
        public_access_block_fields = {
1✔
4151
            "BlockPublicAcls",
4152
            "BlockPublicPolicy",
4153
            "IgnorePublicAcls",
4154
            "RestrictPublicBuckets",
4155
        }
4156
        if not validate_dict_fields(
1✔
4157
            public_access_block_configuration,
4158
            required_fields=set(),
4159
            optional_fields=public_access_block_fields,
4160
        ):
4161
            raise MalformedXML()
×
4162

4163
        for field in public_access_block_fields:
1✔
4164
            if public_access_block_configuration.get(field) is None:
1✔
4165
                public_access_block_configuration[field] = False
1✔
4166

4167
        s3_bucket.public_access_block = public_access_block_configuration
1✔
4168

4169
    def delete_public_access_block(
1✔
4170
        self,
4171
        context: RequestContext,
4172
        bucket: BucketName,
4173
        expected_bucket_owner: AccountId = None,
4174
        **kwargs,
4175
    ) -> None:
4176
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4177

4178
        s3_bucket.public_access_block = None
1✔
4179

4180
    def get_bucket_policy(
1✔
4181
        self,
4182
        context: RequestContext,
4183
        bucket: BucketName,
4184
        expected_bucket_owner: AccountId = None,
4185
        **kwargs,
4186
    ) -> GetBucketPolicyOutput:
4187
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4188
            context, bucket, expected_bucket_owner=expected_bucket_owner
4189
        )
4190
        if not s3_bucket.policy:
1✔
4191
            raise NoSuchBucketPolicy(
1✔
4192
                "The bucket policy does not exist",
4193
                BucketName=bucket,
4194
            )
4195
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
4196

4197
    def put_bucket_policy(
1✔
4198
        self,
4199
        context: RequestContext,
4200
        bucket: BucketName,
4201
        policy: Policy,
4202
        content_md5: ContentMD5 = None,
4203
        checksum_algorithm: ChecksumAlgorithm = None,
4204
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
4205
        expected_bucket_owner: AccountId = None,
4206
        **kwargs,
4207
    ) -> None:
4208
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4209
            context, bucket, expected_bucket_owner=expected_bucket_owner
4210
        )
4211

4212
        if not policy or policy[0] != "{":
1✔
4213
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
4214
        try:
1✔
4215
            json_policy = json.loads(policy)
1✔
4216
            if not json_policy:
1✔
4217
                # TODO: add more validation around the policy?
4218
                raise MalformedPolicy("Missing required field Statement")
1✔
4219
        except ValueError:
1✔
4220
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
4221

4222
        s3_bucket.policy = policy
1✔
4223

4224
    def delete_bucket_policy(
1✔
4225
        self,
4226
        context: RequestContext,
4227
        bucket: BucketName,
4228
        expected_bucket_owner: AccountId = None,
4229
        **kwargs,
4230
    ) -> None:
4231
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4232
            context, bucket, expected_bucket_owner=expected_bucket_owner
4233
        )
4234

4235
        s3_bucket.policy = None
1✔
4236

4237
    def get_bucket_accelerate_configuration(
1✔
4238
        self,
4239
        context: RequestContext,
4240
        bucket: BucketName,
4241
        expected_bucket_owner: AccountId = None,
4242
        request_payer: RequestPayer = None,
4243
        **kwargs,
4244
    ) -> GetBucketAccelerateConfigurationOutput:
4245
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4246

4247
        response = GetBucketAccelerateConfigurationOutput()
1✔
4248
        if s3_bucket.accelerate_status:
1✔
4249
            response["Status"] = s3_bucket.accelerate_status
1✔
4250

4251
        return response
1✔
4252

4253
    def put_bucket_accelerate_configuration(
1✔
4254
        self,
4255
        context: RequestContext,
4256
        bucket: BucketName,
4257
        accelerate_configuration: AccelerateConfiguration,
4258
        expected_bucket_owner: AccountId = None,
4259
        checksum_algorithm: ChecksumAlgorithm = None,
4260
        **kwargs,
4261
    ) -> None:
4262
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4263

4264
        if "." in bucket:
1✔
4265
            raise InvalidRequest(
1✔
4266
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
4267
            )
4268

4269
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
4270
            "Enabled",
4271
            "Suspended",
4272
        ):
4273
            raise MalformedXML()
1✔
4274

4275
        s3_bucket.accelerate_status = status
1✔
4276

4277
    def put_bucket_logging(
1✔
4278
        self,
4279
        context: RequestContext,
4280
        bucket: BucketName,
4281
        bucket_logging_status: BucketLoggingStatus,
4282
        content_md5: ContentMD5 = None,
4283
        checksum_algorithm: ChecksumAlgorithm = None,
4284
        expected_bucket_owner: AccountId = None,
4285
        **kwargs,
4286
    ) -> None:
4287
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4288

4289
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
4290
            s3_bucket.logging = {}
1✔
4291
            return
1✔
4292

4293
        # the target bucket must be in the same account
4294
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
4295
            raise MalformedXML()
×
4296

4297
        if not logging_config.get("TargetPrefix"):
1✔
4298
            logging_config["TargetPrefix"] = ""
×
4299

4300
        # TODO: validate Grants
4301

4302
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4303
            raise InvalidTargetBucketForLogging(
1✔
4304
                "The target bucket for logging does not exist",
4305
                TargetBucket=target_bucket_name,
4306
            )
4307

4308
        source_bucket_region = s3_bucket.bucket_region
1✔
4309
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4310
            raise (
1✔
4311
                CrossLocationLoggingProhibitted(
4312
                    "Cross S3 location logging not allowed. ",
4313
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4314
                )
4315
                if source_bucket_region == AWS_REGION_US_EAST_1
4316
                else CrossLocationLoggingProhibitted(
4317
                    "Cross S3 location logging not allowed. ",
4318
                    SourceBucketLocation=source_bucket_region,
4319
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4320
                )
4321
            )
4322

4323
        s3_bucket.logging = logging_config
1✔
4324

4325
    def get_bucket_logging(
1✔
4326
        self,
4327
        context: RequestContext,
4328
        bucket: BucketName,
4329
        expected_bucket_owner: AccountId = None,
4330
        **kwargs,
4331
    ) -> GetBucketLoggingOutput:
4332
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4333

4334
        if not s3_bucket.logging:
1✔
4335
            return GetBucketLoggingOutput()
1✔
4336

4337
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4338

4339
    def put_bucket_replication(
1✔
4340
        self,
4341
        context: RequestContext,
4342
        bucket: BucketName,
4343
        replication_configuration: ReplicationConfiguration,
4344
        content_md5: ContentMD5 = None,
4345
        checksum_algorithm: ChecksumAlgorithm = None,
4346
        token: ObjectLockToken = None,
4347
        expected_bucket_owner: AccountId = None,
4348
        **kwargs,
4349
    ) -> None:
4350
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4351
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4352
            raise InvalidRequest(
1✔
4353
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4354
            )
4355

4356
        if not (rules := replication_configuration.get("Rules")):
1✔
4357
            raise MalformedXML()
1✔
4358

4359
        for rule in rules:
1✔
4360
            if "ID" not in rule:
1✔
4361
                rule["ID"] = short_uid()
1✔
4362

4363
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4364
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4365
            if (
1✔
4366
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4367
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4368
            ):
4369
                # according to AWS testing the same exception is raised if the bucket does not exist
4370
                # or if versioning was disabled
4371
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4372

4373
        # TODO more validation on input
4374
        s3_bucket.replication = replication_configuration
1✔
4375

4376
    def get_bucket_replication(
1✔
4377
        self,
4378
        context: RequestContext,
4379
        bucket: BucketName,
4380
        expected_bucket_owner: AccountId = None,
4381
        **kwargs,
4382
    ) -> GetBucketReplicationOutput:
4383
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4384

4385
        if not s3_bucket.replication:
1✔
4386
            raise ReplicationConfigurationNotFoundError(
1✔
4387
                "The replication configuration was not found",
4388
                BucketName=bucket,
4389
            )
4390

4391
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4392

4393
    def delete_bucket_replication(
1✔
4394
        self,
4395
        context: RequestContext,
4396
        bucket: BucketName,
4397
        expected_bucket_owner: AccountId = None,
4398
        **kwargs,
4399
    ) -> None:
4400
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4401

4402
        s3_bucket.replication = None
1✔
4403

4404
    @handler("PutBucketAcl", expand=False)
1✔
4405
    def put_bucket_acl(
1✔
4406
        self,
4407
        context: RequestContext,
4408
        request: PutBucketAclRequest,
4409
    ) -> None:
4410
        bucket = request["Bucket"]
1✔
4411
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4412
        acp = get_access_control_policy_from_acl_request(
1✔
4413
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4414
        )
4415
        s3_bucket.acl = acp
1✔
4416

4417
    def get_bucket_acl(
1✔
4418
        self,
4419
        context: RequestContext,
4420
        bucket: BucketName,
4421
        expected_bucket_owner: AccountId = None,
4422
        **kwargs,
4423
    ) -> GetBucketAclOutput:
4424
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4425

4426
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4427

4428
    @handler("PutObjectAcl", expand=False)
1✔
4429
    def put_object_acl(
1✔
4430
        self,
4431
        context: RequestContext,
4432
        request: PutObjectAclRequest,
4433
    ) -> PutObjectAclOutput:
4434
        bucket = request["Bucket"]
1✔
4435
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4436

4437
        s3_object = s3_bucket.get_object(
1✔
4438
            key=request["Key"],
4439
            version_id=request.get("VersionId"),
4440
            http_method="PUT",
4441
        )
4442
        acp = get_access_control_policy_from_acl_request(
1✔
4443
            request=request, owner=s3_object.owner, request_body=context.request.data
4444
        )
4445
        previous_acl = s3_object.acl
1✔
4446
        s3_object.acl = acp
1✔
4447

4448
        if previous_acl != acp:
1✔
4449
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4450

4451
        # TODO: RequestCharged
4452
        return PutObjectAclOutput()
1✔
4453

4454
    def get_object_acl(
1✔
4455
        self,
4456
        context: RequestContext,
4457
        bucket: BucketName,
4458
        key: ObjectKey,
4459
        version_id: ObjectVersionId = None,
4460
        request_payer: RequestPayer = None,
4461
        expected_bucket_owner: AccountId = None,
4462
        **kwargs,
4463
    ) -> GetObjectAclOutput:
4464
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4465

4466
        s3_object = s3_bucket.get_object(
1✔
4467
            key=key,
4468
            version_id=version_id,
4469
        )
4470
        # TODO: RequestCharged
4471
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4472

4473
    def get_bucket_policy_status(
1✔
4474
        self,
4475
        context: RequestContext,
4476
        bucket: BucketName,
4477
        expected_bucket_owner: AccountId = None,
4478
        **kwargs,
4479
    ) -> GetBucketPolicyStatusOutput:
4480
        raise NotImplementedError
4481

4482
    def get_object_torrent(
1✔
4483
        self,
4484
        context: RequestContext,
4485
        bucket: BucketName,
4486
        key: ObjectKey,
4487
        request_payer: RequestPayer = None,
4488
        expected_bucket_owner: AccountId = None,
4489
        **kwargs,
4490
    ) -> GetObjectTorrentOutput:
4491
        raise NotImplementedError
4492

4493
    def post_object(
1✔
4494
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4495
    ) -> PostResponse:
4496
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4497
            raise PreconditionFailed(
1✔
4498
                "At least one of the pre-conditions you specified did not hold",
4499
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4500
            )
4501
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4502
        # TODO: signature validation is not implemented for pre-signed POST
4503
        # policy validation is not implemented either, except expiration and mandatory fields
4504
        # This operation is the only one using form for storing the request data. We will have to do some manual
4505
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4506
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4507

4508
        form = context.request.form
1✔
4509
        object_key = context.request.form.get("key")
1✔
4510

4511
        if "file" in form:
1✔
4512
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4513
            file_data = to_bytes(form["file"])
1✔
4514
            object_content_length = len(file_data)
1✔
4515
            stream = BytesIO(file_data)
1✔
4516
        else:
4517
            # this is the default behaviour
4518
            fileobj = context.request.files["file"]
1✔
4519
            stream = fileobj.stream
1✔
4520
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4521
            # validation
4522
            original_pos = stream.tell()
1✔
4523
            object_content_length = stream.seek(0, 2)
1✔
4524
            # reset the stream and put it back at its original position
4525
            stream.seek(original_pos, 0)
1✔
4526

4527
            if "${filename}" in object_key:
1✔
4528
                # TODO: ${filename} is actually usable in all form fields
4529
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4530
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4531
                # is recognized by all form fields.
4532
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4533

4534
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4535
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4536
        additional_policy_metadata = {
1✔
4537
            "bucket": bucket,
4538
            "content_length": object_content_length,
4539
        }
4540
        validate_post_policy(form, additional_policy_metadata)
1✔
4541

4542
        if canned_acl := form.get("acl"):
1✔
4543
            validate_canned_acl(canned_acl)
×
4544
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4545
        else:
4546
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4547

4548
        post_system_settable_headers = [
1✔
4549
            "Cache-Control",
4550
            "Content-Type",
4551
            "Content-Disposition",
4552
            "Content-Encoding",
4553
        ]
4554
        system_metadata = {}
1✔
4555
        for system_metadata_field in post_system_settable_headers:
1✔
4556
            if field_value := form.get(system_metadata_field):
1✔
4557
                system_key = system_metadata_field.replace("-", "")
1✔
4558
                system_metadata[system_key] = field_value
1✔
4559

4560
        if not system_metadata.get("ContentType"):
1✔
4561
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4562

4563
        user_metadata = {
1✔
4564
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4565
            for field in form
4566
            if field.startswith("x-amz-meta-")
4567
        }
4568

4569
        if tagging := form.get("tagging"):
1✔
4570
            # this is weird, as it's direct XML in the form, we need to parse it directly
4571
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4572

4573
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4574
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4575
        ):
4576
            raise InvalidStorageClass(
1✔
4577
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4578
            )
4579

4580
        encryption_request = {
1✔
4581
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4582
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4583
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4584
        }
4585

4586
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4587
            encryption_request,
4588
            s3_bucket,
4589
            store,
4590
        )
4591

4592
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4593
        checksum_value = (
1✔
4594
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4595
        )
4596
        expires = (
1✔
4597
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4598
        )
4599

4600
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4601

4602
        s3_object = S3Object(
1✔
4603
            key=object_key,
4604
            version_id=version_id,
4605
            storage_class=storage_class,
4606
            expires=expires,
4607
            user_metadata=user_metadata,
4608
            system_metadata=system_metadata,
4609
            checksum_algorithm=checksum_algorithm,
4610
            checksum_value=checksum_value,
4611
            encryption=encryption_parameters.encryption,
4612
            kms_key_id=encryption_parameters.kms_key_id,
4613
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4614
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4615
            acl=acp,
4616
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4617
        )
4618

4619
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4620
            s3_stored_object.write(stream)
1✔
4621

4622
            if not s3_object.checksum_value:
1✔
4623
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4624

4625
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
4626
                self._storage_backend.remove(bucket, s3_object)
×
4627
                raise InvalidRequest(
×
4628
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4629
                )
4630

4631
            s3_bucket.objects.set(object_key, s3_object)
1✔
4632

4633
        # in case we are overriding an object, delete the tags entry
4634
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4635
        self._remove_all_object_tags(store, key_id)
1✔
4636
        if tagging:
1✔
4637
            self._create_object_tags(store, key_id, tagging)
1✔
4638

4639
        response = PostResponse()
1✔
4640
        # hacky way to set the etag in the headers as well: two locations for one value
4641
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4642

4643
        if redirect := form.get("success_action_redirect"):
1✔
4644
            # we need to create the redirect, as the parser could not return the moto-calculated one
4645
            try:
1✔
4646
                redirect = create_redirect_for_post_request(
1✔
4647
                    base_redirect=redirect,
4648
                    bucket=bucket,
4649
                    object_key=object_key,
4650
                    etag=s3_object.quoted_etag,
4651
                )
4652
                response["LocationHeader"] = redirect
1✔
4653
                response["StatusCode"] = 303
1✔
4654
            except ValueError:
1✔
4655
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4656
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4657

4658
        elif status_code := form.get("success_action_status"):
1✔
4659
            response["StatusCode"] = status_code
1✔
4660
        else:
4661
            response["StatusCode"] = 204
1✔
4662

4663
        response["LocationHeader"] = response.get(
1✔
4664
            "LocationHeader",
4665
            get_url_encoded_object_location(bucket, object_key),
4666
        )
4667

4668
        if s3_bucket.versioning_status == "Enabled":
1✔
4669
            response["VersionId"] = s3_object.version_id
×
4670

4671
        if s3_object.checksum_algorithm:
1✔
4672
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4673
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4674

4675
        if s3_bucket.lifecycle_rules:
1✔
4676
            if expiration_header := self._get_expiration_header(
×
4677
                s3_bucket.lifecycle_rules,
4678
                bucket,
4679
                s3_object,
4680
                self._list_object_tags(store, key_id),
4681
            ):
4682
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4683
                #  apply them everytime we get/head an object
4684
                response["Expiration"] = expiration_header
×
4685

4686
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4687

4688
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4689

4690
        if response["StatusCode"] == "201":
1✔
4691
            # if the StatusCode is 201, S3 returns an XML body with additional information
4692
            response["ETag"] = s3_object.quoted_etag
1✔
4693
            response["Bucket"] = bucket
1✔
4694
            response["Key"] = object_key
1✔
4695
            response["Location"] = response["LocationHeader"]
1✔
4696

4697
        return response
1✔
4698

4699
    def put_bucket_metrics_configuration(
1✔
4700
        self,
4701
        context: RequestContext,
4702
        bucket: BucketName,
4703
        id: MetricsId,
4704
        metrics_configuration: MetricsConfiguration,
4705
        expected_bucket_owner: AccountId = None,
4706
        **kwargs,
4707
    ) -> None:
4708
        """
4709
        Update or add a new metrics configuration. If the provided `id` already exists, its associated configuration
4710
        will be overwritten. The total number of metric configurations is limited to 1000. If this limit is exceeded,
4711
        an error is raised unless the `is` already exists.
4712

4713
        :param context: The request context.
4714
        :param bucket: The name of the bucket associated with the metrics configuration.
4715
        :param id: Identifies the metrics configuration being added or updated.
4716
        :param metrics_configuration: A new or updated configuration associated with the given metrics identifier.
4717
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4718
        :return: None
4719
        :raises TooManyConfigurations: If the total number of metrics configurations exceeds 1000 AND the provided
4720
            `metrics_id` does not already exist.
4721
        """
4722
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4723
            context, bucket, expected_bucket_owner=expected_bucket_owner
4724
        )
4725

4726
        if (
1✔
4727
            len(s3_bucket.metric_configurations) >= 1000
4728
            and id not in s3_bucket.metric_configurations
4729
        ):
4730
            raise TooManyConfigurations("Too many metrics configurations")
×
4731
        s3_bucket.metric_configurations[id] = metrics_configuration
1✔
4732

4733
    def get_bucket_metrics_configuration(
1✔
4734
        self,
4735
        context: RequestContext,
4736
        bucket: BucketName,
4737
        id: MetricsId,
4738
        expected_bucket_owner: AccountId = None,
4739
        **kwargs,
4740
    ) -> GetBucketMetricsConfigurationOutput:
4741
        """
4742
        Retrieve the metrics configuration associated with a given metrics identifier.
4743

4744
        :param context: The request context.
4745
        :param bucket: The name of the bucket associated with the metrics configuration.
4746
        :param id: The unique identifier of the metrics configuration to retrieve.
4747
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4748
        :return: The metrics configuration associated with the given metrics identifier.
4749
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4750
        """
4751
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4752
            context, bucket, expected_bucket_owner=expected_bucket_owner
4753
        )
4754

4755
        metric_config = s3_bucket.metric_configurations.get(id)
1✔
4756
        if not metric_config:
1✔
4757
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4758
        return GetBucketMetricsConfigurationOutput(MetricsConfiguration=metric_config)
1✔
4759

4760
    def list_bucket_metrics_configurations(
1✔
4761
        self,
4762
        context: RequestContext,
4763
        bucket: BucketName,
4764
        continuation_token: Token = None,
4765
        expected_bucket_owner: AccountId = None,
4766
        **kwargs,
4767
    ) -> ListBucketMetricsConfigurationsOutput:
4768
        """
4769
        Lists the metric configurations available, allowing for pagination using a continuation token to retrieve more
4770
        results.
4771

4772
        :param context: The request context.
4773
        :param bucket: The name of the bucket associated with the metrics configuration.
4774
        :param continuation_token: An optional continuation token to retrieve the next set of results in case there are
4775
            more results than the default limit. Provided as a base64-encoded string value.
4776
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4777
        :return: A list of metric configurations and an optional continuation token for fetching subsequent data, if
4778
            applicable.
4779
        """
4780
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4781
            context, bucket, expected_bucket_owner=expected_bucket_owner
4782
        )
4783

4784
        metrics_configurations: list[MetricsConfiguration] = []
1✔
4785
        next_continuation_token = None
1✔
4786

4787
        decoded_continuation_token = (
1✔
4788
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
4789
            if continuation_token
4790
            else None
4791
        )
4792

4793
        for metric in sorted(s3_bucket.metric_configurations.values(), key=lambda r: r["Id"]):
1✔
4794
            if continuation_token and metric["Id"] < decoded_continuation_token:
1✔
4795
                continue
1✔
4796

4797
            if len(metrics_configurations) >= 100:
1✔
4798
                next_continuation_token = to_str(base64.urlsafe_b64encode(metric["Id"].encode()))
1✔
4799
                break
1✔
4800

4801
            metrics_configurations.append(metric)
1✔
4802

4803
        return ListBucketMetricsConfigurationsOutput(
1✔
4804
            IsTruncated=next_continuation_token is not None,
4805
            ContinuationToken=continuation_token,
4806
            NextContinuationToken=next_continuation_token,
4807
            MetricsConfigurationList=metrics_configurations,
4808
        )
4809

4810
    def delete_bucket_metrics_configuration(
1✔
4811
        self,
4812
        context: RequestContext,
4813
        bucket: BucketName,
4814
        id: MetricsId,
4815
        expected_bucket_owner: AccountId = None,
4816
        **kwargs,
4817
    ) -> None:
4818
        """
4819
        Removes a specific metrics configuration identified by its metrics ID.
4820

4821
        :param context: The request context.
4822
        :param bucket: The name of the bucket associated with the metrics configuration.
4823
        :param id: The unique identifier of the metrics configuration to delete.
4824
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4825
        :return: None
4826
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4827
        """
4828
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4829
            context, bucket, expected_bucket_owner=expected_bucket_owner
4830
        )
4831

4832
        deleted_config = s3_bucket.metric_configurations.pop(id, None)
1✔
4833
        if not deleted_config:
1✔
4834
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4835

4836

4837
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4838
    if not bucket_versioning_status:
1✔
4839
        return None
1✔
4840
    elif bucket_versioning_status.lower() == "enabled":
1✔
4841
        return generate_safe_version_id()
1✔
4842
    else:
4843
        return "null"
1✔
4844

4845

4846
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4847
    if encryption := s3_object.encryption:
1✔
4848
        response["ServerSideEncryption"] = encryption
1✔
4849
        if encryption == ServerSideEncryption.aws_kms:
1✔
4850
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4851
            if s3_object.bucket_key_enabled:
1✔
4852
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4853

4854

4855
def get_encryption_parameters_from_request_and_bucket(
1✔
4856
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4857
    s3_bucket: S3Bucket,
4858
    store: S3Store,
4859
) -> EncryptionParameters:
4860
    if request.get("SSECustomerKey"):
1✔
4861
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4862
        return EncryptionParameters(None, None, False)
1✔
4863

4864
    encryption = request.get("ServerSideEncryption")
1✔
4865
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4866
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4867
    if s3_bucket.encryption_rule:
1✔
4868
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4869
        encryption = (
1✔
4870
            encryption
4871
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4872
        )
4873
        if encryption == ServerSideEncryption.aws_kms:
1✔
4874
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4875
                "ApplyServerSideEncryptionByDefault"
4876
            ].get("KMSMasterKeyID")
4877
            kms_key_id = get_kms_key_arn(
1✔
4878
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4879
            )
4880
            if not kms_key_id:
1✔
4881
                # if not key is provided, AWS will use an AWS managed KMS key
4882
                # create it if it doesn't already exist, and save it in the store per region
4883
                if not store.aws_managed_kms_key_id:
1✔
4884
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4885
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4886
                    )
4887
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4888

4889
                kms_key_id = store.aws_managed_kms_key_id
1✔
4890

4891
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4892

4893

4894
def get_object_lock_parameters_from_bucket_and_request(
1✔
4895
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4896
    s3_bucket: S3Bucket,
4897
):
4898
    lock_mode = request.get("ObjectLockMode")
1✔
4899
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4900
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4901

4902
    if lock_mode and not lock_until:
1✔
4903
        raise InvalidArgument(
1✔
4904
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4905
            ArgumentName="x-amz-object-lock-retain-until-date",
4906
        )
4907
    elif not lock_mode and lock_until:
1✔
4908
        raise InvalidArgument(
1✔
4909
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4910
            ArgumentName="x-amz-object-lock-mode",
4911
        )
4912

4913
    if lock_mode and lock_mode not in OBJECT_LOCK_MODES:
1✔
4914
        raise InvalidArgument(
1✔
4915
            "Unknown wormMode directive.",
4916
            ArgumentName="x-amz-object-lock-mode",
4917
            ArgumentValue=lock_mode,
4918
        )
4919

4920
    if (default_retention := s3_bucket.object_lock_default_retention) and not lock_mode:
1✔
4921
        lock_mode = default_retention["Mode"]
1✔
4922
        lock_until = get_retention_from_now(
1✔
4923
            days=default_retention.get("Days"),
4924
            years=default_retention.get("Years"),
4925
        )
4926

4927
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4928

4929

4930
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4931
    """
4932
    Calculate the range value from a part Number for an S3 Object
4933
    :param s3_object: S3Object
4934
    :param part_number: the wanted part from the S3Object
4935
    :return: an ObjectRange used to return only a slice of an Object
4936
    """
4937
    if not s3_object.parts:
1✔
4938
        if part_number > 1:
1✔
4939
            raise InvalidPartNumber(
1✔
4940
                "The requested partnumber is not satisfiable",
4941
                PartNumberRequested=part_number,
4942
                ActualPartCount=1,
4943
            )
4944
        return ObjectRange(
1✔
4945
            begin=0,
4946
            end=s3_object.size - 1,
4947
            content_length=s3_object.size,
4948
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4949
        )
4950
    elif not (part_data := s3_object.parts.get(str(part_number))):
1✔
4951
        raise InvalidPartNumber(
1✔
4952
            "The requested partnumber is not satisfiable",
4953
            PartNumberRequested=part_number,
4954
            ActualPartCount=len(s3_object.parts),
4955
        )
4956

4957
    # TODO: remove for next major version 5.0, compatibility for <= 4.5
4958
    if isinstance(part_data, tuple):
1✔
4959
        begin, part_length = part_data
×
4960
    else:
4961
        begin = part_data["_position"]
1✔
4962
        part_length = part_data["Size"]
1✔
4963

4964
    end = begin + part_length - 1
1✔
4965
    return ObjectRange(
1✔
4966
        begin=begin,
4967
        end=end,
4968
        content_length=part_length,
4969
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4970
    )
4971

4972

4973
def get_acl_headers_from_request(
1✔
4974
    request: PutObjectRequest
4975
    | CreateMultipartUploadRequest
4976
    | CopyObjectRequest
4977
    | CreateBucketRequest
4978
    | PutBucketAclRequest
4979
    | PutObjectAclRequest,
4980
) -> list[tuple[str, str]]:
4981
    permission_keys = [
1✔
4982
        "GrantFullControl",
4983
        "GrantRead",
4984
        "GrantReadACP",
4985
        "GrantWrite",
4986
        "GrantWriteACP",
4987
    ]
4988
    acl_headers = [
1✔
4989
        (permission, grant_header)
4990
        for permission in permission_keys
4991
        if (grant_header := request.get(permission))
4992
    ]
4993
    return acl_headers
1✔
4994

4995

4996
def get_access_control_policy_from_acl_request(
1✔
4997
    request: PutBucketAclRequest | PutObjectAclRequest,
4998
    owner: Owner,
4999
    request_body: bytes,
5000
) -> AccessControlPolicy:
5001
    canned_acl = request.get("ACL")
1✔
5002
    acl_headers = get_acl_headers_from_request(request)
1✔
5003

5004
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
5005
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
5006
    # use case seems dangerous
5007
    is_acp_in_body = request_body
1✔
5008

5009
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
5010
        raise MissingSecurityHeader(
1✔
5011
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
5012
        )
5013

5014
    elif canned_acl and acl_headers:
1✔
5015
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
5016

5017
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
5018
        raise UnexpectedContent("This request does not support content")
1✔
5019

5020
    if canned_acl:
1✔
5021
        validate_canned_acl(canned_acl)
1✔
5022
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
5023

5024
    elif acl_headers:
1✔
5025
        grants = []
1✔
5026
        for permission, grantees_values in acl_headers:
1✔
5027
            permission = get_permission_from_header(permission)
1✔
5028
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
5029
            grants.extend(partial_grants)
1✔
5030

5031
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
5032
    else:
5033
        acp = request.get("AccessControlPolicy")
1✔
5034
        validate_acl_acp(acp)
1✔
5035
        if (
1✔
5036
            owner.get("DisplayName")
5037
            and acp["Grants"]
5038
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
5039
        ):
5040
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
×
5041

5042
    return acp
1✔
5043

5044

5045
def get_access_control_policy_for_new_resource_request(
1✔
5046
    request: PutObjectRequest
5047
    | CreateMultipartUploadRequest
5048
    | CopyObjectRequest
5049
    | CreateBucketRequest,
5050
    owner: Owner,
5051
) -> AccessControlPolicy:
5052
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
5053
    canned_acl = request.get("ACL")
1✔
5054
    acl_headers = get_acl_headers_from_request(request)
1✔
5055

5056
    if not (canned_acl or acl_headers):
1✔
5057
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
5058

5059
    elif canned_acl and acl_headers:
1✔
5060
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
5061

5062
    if canned_acl:
1✔
5063
        validate_canned_acl(canned_acl)
1✔
5064
        return get_canned_acl(canned_acl, owner=owner)
1✔
5065

5066
    grants = []
×
5067
    for permission, grantees_values in acl_headers:
×
5068
        permission = get_permission_from_header(permission)
×
5069
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
5070
        grants.extend(partial_grants)
×
5071

5072
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
5073

5074

5075
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
5076
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
5077

5078

5079
def verify_object_equality_precondition_write(
1✔
5080
    s3_bucket: S3Bucket,
5081
    key: ObjectKey,
5082
    etag: str,
5083
    initiated: datetime.datetime | None = None,
5084
) -> None:
5085
    existing = s3_bucket.objects.get(key)
1✔
5086
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
5087
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
5088

5089
    if not existing.etag == etag.strip('"'):
1✔
5090
        raise PreconditionFailed(
1✔
5091
            "At least one of the pre-conditions you specified did not hold",
5092
            Condition="If-Match",
5093
        )
5094

5095
    if initiated and initiated < existing.last_modified:
1✔
5096
        raise ConditionalRequestConflict(
1✔
5097
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
5098
            Condition="If-Match",
5099
            Key=key,
5100
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc