• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / beb06ec3-fc0e-4100-b8f7-fbc8389f86d7

08 May 2025 05:15PM UTC coverage: 86.63% (-0.03%) from 86.66%
beb06ec3-fc0e-4100-b8f7-fbc8389f86d7

push

circleci

web-flow
CFn v2: Skip media type assertion (#12597)

64324 of 74251 relevant lines covered (86.63%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.52
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
from collections import defaultdict
1✔
8
from inspect import signature
1✔
9
from io import BytesIO
1✔
10
from operator import itemgetter
1✔
11
from typing import IO, Optional, Union
1✔
12
from urllib import parse as urlparse
1✔
13
from zoneinfo import ZoneInfo
1✔
14

15
from localstack import config
1✔
16
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
17
from localstack.aws.api.s3 import (
1✔
18
    MFA,
19
    AbortMultipartUploadOutput,
20
    AccelerateConfiguration,
21
    AccessControlPolicy,
22
    AccessDenied,
23
    AccountId,
24
    AnalyticsConfiguration,
25
    AnalyticsId,
26
    BadDigest,
27
    Body,
28
    Bucket,
29
    BucketAlreadyExists,
30
    BucketAlreadyOwnedByYou,
31
    BucketCannedACL,
32
    BucketLifecycleConfiguration,
33
    BucketLoggingStatus,
34
    BucketName,
35
    BucketNotEmpty,
36
    BucketRegion,
37
    BucketVersioningStatus,
38
    BypassGovernanceRetention,
39
    ChecksumAlgorithm,
40
    ChecksumCRC32,
41
    ChecksumCRC32C,
42
    ChecksumCRC64NVME,
43
    ChecksumSHA1,
44
    ChecksumSHA256,
45
    ChecksumType,
46
    CommonPrefix,
47
    CompletedMultipartUpload,
48
    CompleteMultipartUploadOutput,
49
    ConditionalRequestConflict,
50
    ConfirmRemoveSelfBucketAccess,
51
    ContentMD5,
52
    CopyObjectOutput,
53
    CopyObjectRequest,
54
    CopyObjectResult,
55
    CopyPartResult,
56
    CORSConfiguration,
57
    CreateBucketOutput,
58
    CreateBucketRequest,
59
    CreateMultipartUploadOutput,
60
    CreateMultipartUploadRequest,
61
    CrossLocationLoggingProhibitted,
62
    Delete,
63
    DeletedObject,
64
    DeleteMarkerEntry,
65
    DeleteObjectOutput,
66
    DeleteObjectsOutput,
67
    DeleteObjectTaggingOutput,
68
    Delimiter,
69
    EncodingType,
70
    Error,
71
    Expiration,
72
    FetchOwner,
73
    GetBucketAccelerateConfigurationOutput,
74
    GetBucketAclOutput,
75
    GetBucketAnalyticsConfigurationOutput,
76
    GetBucketCorsOutput,
77
    GetBucketEncryptionOutput,
78
    GetBucketIntelligentTieringConfigurationOutput,
79
    GetBucketInventoryConfigurationOutput,
80
    GetBucketLifecycleConfigurationOutput,
81
    GetBucketLocationOutput,
82
    GetBucketLoggingOutput,
83
    GetBucketOwnershipControlsOutput,
84
    GetBucketPolicyOutput,
85
    GetBucketPolicyStatusOutput,
86
    GetBucketReplicationOutput,
87
    GetBucketRequestPaymentOutput,
88
    GetBucketTaggingOutput,
89
    GetBucketVersioningOutput,
90
    GetBucketWebsiteOutput,
91
    GetObjectAclOutput,
92
    GetObjectAttributesOutput,
93
    GetObjectAttributesParts,
94
    GetObjectAttributesRequest,
95
    GetObjectLegalHoldOutput,
96
    GetObjectLockConfigurationOutput,
97
    GetObjectOutput,
98
    GetObjectRequest,
99
    GetObjectRetentionOutput,
100
    GetObjectTaggingOutput,
101
    GetObjectTorrentOutput,
102
    GetPublicAccessBlockOutput,
103
    HeadBucketOutput,
104
    HeadObjectOutput,
105
    HeadObjectRequest,
106
    IfMatch,
107
    IfMatchInitiatedTime,
108
    IfMatchLastModifiedTime,
109
    IfMatchSize,
110
    IfNoneMatch,
111
    IntelligentTieringConfiguration,
112
    IntelligentTieringId,
113
    InvalidArgument,
114
    InvalidBucketName,
115
    InvalidDigest,
116
    InvalidLocationConstraint,
117
    InvalidObjectState,
118
    InvalidPartNumber,
119
    InvalidPartOrder,
120
    InvalidStorageClass,
121
    InvalidTargetBucketForLogging,
122
    InventoryConfiguration,
123
    InventoryId,
124
    KeyMarker,
125
    LifecycleRules,
126
    ListBucketAnalyticsConfigurationsOutput,
127
    ListBucketIntelligentTieringConfigurationsOutput,
128
    ListBucketInventoryConfigurationsOutput,
129
    ListBucketsOutput,
130
    ListMultipartUploadsOutput,
131
    ListObjectsOutput,
132
    ListObjectsV2Output,
133
    ListObjectVersionsOutput,
134
    ListPartsOutput,
135
    Marker,
136
    MaxBuckets,
137
    MaxKeys,
138
    MaxParts,
139
    MaxUploads,
140
    MethodNotAllowed,
141
    MissingSecurityHeader,
142
    MpuObjectSize,
143
    MultipartUpload,
144
    MultipartUploadId,
145
    NoSuchBucket,
146
    NoSuchBucketPolicy,
147
    NoSuchCORSConfiguration,
148
    NoSuchKey,
149
    NoSuchLifecycleConfiguration,
150
    NoSuchPublicAccessBlockConfiguration,
151
    NoSuchTagSet,
152
    NoSuchUpload,
153
    NoSuchWebsiteConfiguration,
154
    NotificationConfiguration,
155
    Object,
156
    ObjectIdentifier,
157
    ObjectKey,
158
    ObjectLockConfiguration,
159
    ObjectLockConfigurationNotFoundError,
160
    ObjectLockEnabled,
161
    ObjectLockLegalHold,
162
    ObjectLockMode,
163
    ObjectLockRetention,
164
    ObjectLockToken,
165
    ObjectOwnership,
166
    ObjectVersion,
167
    ObjectVersionId,
168
    ObjectVersionStorageClass,
169
    OptionalObjectAttributesList,
170
    Owner,
171
    OwnershipControls,
172
    OwnershipControlsNotFoundError,
173
    Part,
174
    PartNumber,
175
    PartNumberMarker,
176
    Policy,
177
    PostResponse,
178
    PreconditionFailed,
179
    Prefix,
180
    PublicAccessBlockConfiguration,
181
    PutBucketAclRequest,
182
    PutBucketLifecycleConfigurationOutput,
183
    PutObjectAclOutput,
184
    PutObjectAclRequest,
185
    PutObjectLegalHoldOutput,
186
    PutObjectLockConfigurationOutput,
187
    PutObjectOutput,
188
    PutObjectRequest,
189
    PutObjectRetentionOutput,
190
    PutObjectTaggingOutput,
191
    ReplicationConfiguration,
192
    ReplicationConfigurationNotFoundError,
193
    RequestPayer,
194
    RequestPaymentConfiguration,
195
    RestoreObjectOutput,
196
    RestoreRequest,
197
    S3Api,
198
    ServerSideEncryption,
199
    ServerSideEncryptionConfiguration,
200
    SkipValidation,
201
    SSECustomerAlgorithm,
202
    SSECustomerKey,
203
    SSECustomerKeyMD5,
204
    StartAfter,
205
    StorageClass,
206
    Tagging,
207
    Token,
208
    TransitionDefaultMinimumObjectSize,
209
    UploadIdMarker,
210
    UploadPartCopyOutput,
211
    UploadPartCopyRequest,
212
    UploadPartOutput,
213
    UploadPartRequest,
214
    VersionIdMarker,
215
    VersioningConfiguration,
216
    WebsiteConfiguration,
217
)
218
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
219
from localstack.aws.handlers import (
1✔
220
    modify_service_response,
221
    preprocess_request,
222
    serve_custom_service_request_handlers,
223
)
224
from localstack.constants import AWS_REGION_US_EAST_1
1✔
225
from localstack.services.edge import ROUTER
1✔
226
from localstack.services.plugins import ServiceLifecycleHook
1✔
227
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
228
from localstack.services.s3.constants import (
1✔
229
    ALLOWED_HEADER_OVERRIDES,
230
    ARCHIVES_STORAGE_CLASSES,
231
    CHECKSUM_ALGORITHMS,
232
    DEFAULT_BUCKET_ENCRYPTION,
233
)
234
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
235
from localstack.services.s3.exceptions import (
1✔
236
    InvalidBucketOwnerAWSAccountID,
237
    InvalidBucketState,
238
    InvalidRequest,
239
    MalformedPolicy,
240
    MalformedXML,
241
    NoSuchConfiguration,
242
    NoSuchObjectLockConfiguration,
243
    UnexpectedContent,
244
)
245
from localstack.services.s3.models import (
1✔
246
    BucketCorsIndex,
247
    EncryptionParameters,
248
    ObjectLockParameters,
249
    S3Bucket,
250
    S3DeleteMarker,
251
    S3Multipart,
252
    S3Object,
253
    S3Part,
254
    S3Store,
255
    VersionedKeyStore,
256
    s3_stores,
257
)
258
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
259
from localstack.services.s3.presigned_url import validate_post_policy
1✔
260
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
261
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
262
from localstack.services.s3.utils import (
1✔
263
    ObjectRange,
264
    add_expiration_days_to_datetime,
265
    base_64_content_md5_to_etag,
266
    create_redirect_for_post_request,
267
    create_s3_kms_managed_key_for_region,
268
    etag_to_base_64_content_md5,
269
    extract_bucket_key_version_id_from_copy_source,
270
    generate_safe_version_id,
271
    get_canned_acl,
272
    get_class_attrs_from_spec_class,
273
    get_failed_precondition_copy_source,
274
    get_full_default_bucket_location,
275
    get_kms_key_arn,
276
    get_lifecycle_rule_from_object,
277
    get_owner_for_account_id,
278
    get_permission_from_header,
279
    get_retention_from_now,
280
    get_s3_checksum_algorithm_from_request,
281
    get_s3_checksum_algorithm_from_trailing_headers,
282
    get_system_metadata_from_request,
283
    get_unique_key_id,
284
    is_bucket_name_valid,
285
    is_version_older_than_other,
286
    parse_copy_source_range_header,
287
    parse_post_object_tagging_xml,
288
    parse_range_header,
289
    parse_tagging_header,
290
    s3_response_handler,
291
    serialize_expiration_header,
292
    str_to_rfc_1123_datetime,
293
    validate_dict_fields,
294
    validate_failed_precondition,
295
    validate_kms_key_id,
296
    validate_tag_set,
297
)
298
from localstack.services.s3.validation import (
1✔
299
    parse_grants_in_headers,
300
    validate_acl_acp,
301
    validate_bucket_analytics_configuration,
302
    validate_bucket_intelligent_tiering_configuration,
303
    validate_canned_acl,
304
    validate_checksum_value,
305
    validate_cors_configuration,
306
    validate_inventory_configuration,
307
    validate_lifecycle_configuration,
308
    validate_object_key,
309
    validate_sse_c,
310
    validate_website_configuration,
311
)
312
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
313
from localstack.state import AssetDirectory, StateVisitor
1✔
314
from localstack.utils.aws.arns import s3_bucket_name
1✔
315
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
316

317
LOG = logging.getLogger(__name__)
1✔
318

319
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
320
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
321
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
322

323
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
324

325

326
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
327
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
328
        super().__init__()
1✔
329
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
330
        self._notification_dispatcher = NotificationDispatcher()
1✔
331
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
332

333
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
334
        # in case the rules have changed
335
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
336

337
    def on_after_init(self):
1✔
338
        preprocess_request.append(self._cors_handler)
1✔
339
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
340
        modify_service_response.append(self.service, s3_response_handler)
1✔
341
        register_website_hosting_routes(router=ROUTER)
1✔
342

343
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
344
        visitor.visit(s3_stores)
×
345
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
346

347
    def on_before_state_save(self):
1✔
348
        self._storage_backend.flush()
×
349

350
    def on_after_state_reset(self):
1✔
351
        self._cors_handler.invalidate_cache()
×
352

353
    def on_after_state_load(self):
1✔
354
        self._cors_handler.invalidate_cache()
×
355

356
    def on_before_stop(self):
1✔
357
        self._notification_dispatcher.shutdown()
1✔
358
        self._storage_backend.close()
1✔
359

360
    def _notify(
1✔
361
        self,
362
        context: RequestContext,
363
        s3_bucket: S3Bucket,
364
        s3_object: S3Object | S3DeleteMarker = None,
365
        s3_notif_ctx: S3EventNotificationContext = None,
366
    ):
367
        """
368
        :param context: the RequestContext, to retrieve more information about the incoming notification
369
        :param s3_bucket: the S3Bucket object
370
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
371
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
372
        :return:
373
        """
374
        if s3_bucket.notification_configuration:
1✔
375
            if not s3_notif_ctx:
1✔
376
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
377
                    context,
378
                    s3_bucket=s3_bucket,
379
                    s3_object=s3_object,
380
                )
381

382
            self._notification_dispatcher.send_notifications(
1✔
383
                s3_notif_ctx, s3_bucket.notification_configuration
384
            )
385

386
    def _verify_notification_configuration(
1✔
387
        self,
388
        notification_configuration: NotificationConfiguration,
389
        skip_destination_validation: SkipValidation,
390
        context: RequestContext,
391
        bucket_name: str,
392
    ):
393
        self._notification_dispatcher.verify_configuration(
1✔
394
            notification_configuration, skip_destination_validation, context, bucket_name
395
        )
396

397
    def _get_expiration_header(
1✔
398
        self,
399
        lifecycle_rules: LifecycleRules,
400
        bucket: BucketName,
401
        s3_object: S3Object,
402
        object_tags: dict[str, str],
403
    ) -> Expiration:
404
        """
405
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
406
        the case. We're caching it because it can change depending on the set rules on the bucket.
407
        We can't use `lru_cache` as the parameters needs to be hashable
408
        :param lifecycle_rules: the bucket LifecycleRules
409
        :param s3_object: S3Object
410
        :param object_tags: the object tags
411
        :return: the Expiration header if there's a rule matching
412
        """
413
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
414
            return cached_exp
1✔
415

416
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
417
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
418
        ):
419
            expiration_header = serialize_expiration_header(
1✔
420
                lifecycle_rule["ID"],
421
                lifecycle_rule["Expiration"],
422
                s3_object.last_modified,
423
            )
424
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
425
            return expiration_header
1✔
426

427
    def _get_cross_account_bucket(
1✔
428
        self,
429
        context: RequestContext,
430
        bucket_name: BucketName,
431
        *,
432
        expected_bucket_owner: AccountId = None,
433
    ) -> tuple[S3Store, S3Bucket]:
434
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
435
            raise InvalidBucketOwnerAWSAccountID(
1✔
436
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
437
            )
438

439
        store = self.get_store(context.account_id, context.region)
1✔
440
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
441
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
442
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
443

444
            store = self.get_store(account_id, context.region)
1✔
445
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
446
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
447

448
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
449
            raise AccessDenied("Access Denied")
1✔
450

451
        return store, s3_bucket
1✔
452

453
    @staticmethod
1✔
454
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
455
        # Use default account id for external access? would need an anonymous one
456
        return s3_stores[account_id][region_name]
1✔
457

458
    @handler("CreateBucket", expand=False)
1✔
459
    def create_bucket(
1✔
460
        self,
461
        context: RequestContext,
462
        request: CreateBucketRequest,
463
    ) -> CreateBucketOutput:
464
        bucket_name = request["Bucket"]
1✔
465

466
        if not is_bucket_name_valid(bucket_name):
1✔
467
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
468

469
        # the XML parser returns an empty dict if the body contains the following:
470
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
471
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
472
        # if the body is empty or not
473
        if context.request.data and (
1✔
474
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
475
        ):
476
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
1✔
477
                raise MalformedXML()
1✔
478

479
            if context.region == AWS_REGION_US_EAST_1:
1✔
480
                if bucket_region == "us-east-1":
1✔
481
                    raise InvalidLocationConstraint(
1✔
482
                        "The specified location-constraint is not valid",
483
                        LocationConstraint=bucket_region,
484
                    )
485
            elif context.region != bucket_region:
1✔
486
                raise CommonServiceException(
1✔
487
                    code="IllegalLocationConstraintException",
488
                    message=f"The {bucket_region} location constraint is incompatible for the region specific endpoint this request was sent to.",
489
                )
490
        else:
491
            bucket_region = AWS_REGION_US_EAST_1
1✔
492
            if context.region != bucket_region:
1✔
493
                raise CommonServiceException(
1✔
494
                    code="IllegalLocationConstraintException",
495
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
496
                )
497

498
        store = self.get_store(context.account_id, bucket_region)
1✔
499

500
        if bucket_name in store.global_bucket_map:
1✔
501
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
502
            if existing_bucket_owner != context.account_id:
1✔
503
                raise BucketAlreadyExists()
1✔
504

505
            # if the existing bucket has the same owner, the behaviour will depend on the region
506
            if bucket_region != "us-east-1":
1✔
507
                raise BucketAlreadyOwnedByYou(
1✔
508
                    "Your previous request to create the named bucket succeeded and you already own it.",
509
                    BucketName=bucket_name,
510
                )
511
            else:
512
                # CreateBucket is idempotent in us-east-1
513
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
514

515
        if (
1✔
516
            object_ownership := request.get("ObjectOwnership")
517
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
518
            raise InvalidArgument(
1✔
519
                f"Invalid x-amz-object-ownership header: {object_ownership}",
520
                ArgumentName="x-amz-object-ownership",
521
            )
522
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
523
        owner = get_owner_for_account_id(context.account_id)
1✔
524
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
525
        s3_bucket = S3Bucket(
1✔
526
            name=bucket_name,
527
            account_id=context.account_id,
528
            bucket_region=bucket_region,
529
            owner=owner,
530
            acl=acl,
531
            object_ownership=request.get("ObjectOwnership"),
532
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
533
        )
534

535
        store.buckets[bucket_name] = s3_bucket
1✔
536
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
537
        self._cors_handler.invalidate_cache()
1✔
538
        self._storage_backend.create_bucket(bucket_name)
1✔
539

540
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
541
        location = (
1✔
542
            f"/{bucket_name}"
543
            if bucket_region == "us-east-1"
544
            else get_full_default_bucket_location(bucket_name)
545
        )
546
        response = CreateBucketOutput(Location=location)
1✔
547
        return response
1✔
548

549
    def delete_bucket(
1✔
550
        self,
551
        context: RequestContext,
552
        bucket: BucketName,
553
        expected_bucket_owner: AccountId = None,
554
        **kwargs,
555
    ) -> None:
556
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
557

558
        # the bucket still contains objects
559
        if not s3_bucket.objects.is_empty():
1✔
560
            message = "The bucket you tried to delete is not empty"
1✔
561
            if s3_bucket.versioning_status:
1✔
562
                message += ". You must delete all versions in the bucket."
1✔
563
            raise BucketNotEmpty(
1✔
564
                message,
565
                BucketName=bucket,
566
            )
567

568
        store.buckets.pop(bucket)
1✔
569
        store.global_bucket_map.pop(bucket)
1✔
570
        self._cors_handler.invalidate_cache()
1✔
571
        self._expiration_cache.pop(bucket, None)
1✔
572
        # clean up the storage backend
573
        self._storage_backend.delete_bucket(bucket)
1✔
574

575
    def list_buckets(
1✔
576
        self,
577
        context: RequestContext,
578
        max_buckets: MaxBuckets = None,
579
        continuation_token: Token = None,
580
        prefix: Prefix = None,
581
        bucket_region: BucketRegion = None,
582
        **kwargs,
583
    ) -> ListBucketsOutput:
584
        # TODO add support for max_buckets, continuation_token, prefix, and bucket_region
585
        owner = get_owner_for_account_id(context.account_id)
1✔
586
        store = self.get_store(context.account_id, context.region)
1✔
587
        buckets = [
1✔
588
            Bucket(Name=bucket.name, CreationDate=bucket.creation_date)
589
            for bucket in store.buckets.values()
590
        ]
591
        return ListBucketsOutput(Owner=owner, Buckets=buckets)
1✔
592

593
    def head_bucket(
1✔
594
        self,
595
        context: RequestContext,
596
        bucket: BucketName,
597
        expected_bucket_owner: AccountId = None,
598
        **kwargs,
599
    ) -> HeadBucketOutput:
600
        store = self.get_store(context.account_id, context.region)
1✔
601
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
602
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
603
                # just to return the 404 error message
604
                raise NoSuchBucket()
1✔
605

606
            store = self.get_store(account_id, context.region)
×
607
            if not (s3_bucket := store.buckets.get(bucket)):
×
608
                # just to return the 404 error message
609
                raise NoSuchBucket()
×
610

611
        # TODO: this call is also used to check if the user has access/authorization for the bucket
612
        #  it can return 403
613
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
614

615
    def get_bucket_location(
1✔
616
        self,
617
        context: RequestContext,
618
        bucket: BucketName,
619
        expected_bucket_owner: AccountId = None,
620
        **kwargs,
621
    ) -> GetBucketLocationOutput:
622
        """
623
        When implementing the ASF provider, this operation is implemented because:
624
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
625
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
626
        - We circumvent the root level element here by patching the spec such that this operation returns a
627
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
628
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
629
          the payload, which is why we need to manually do this here by manipulating the string.
630
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
631
        """
632
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
633

634
        location_constraint = (
1✔
635
            '<?xml version="1.0" encoding="UTF-8"?>\n'
636
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
637
        )
638

639
        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
1✔
640
        location_constraint = location_constraint.replace("{{location}}", location)
1✔
641

642
        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
1✔
643
        return response
1✔
644

645
    @handler("PutObject", expand=False)
1✔
646
    def put_object(
1✔
647
        self,
648
        context: RequestContext,
649
        request: PutObjectRequest,
650
    ) -> PutObjectOutput:
651
        # TODO: validate order of validation
652
        # TODO: still need to handle following parameters
653
        #  request_payer: RequestPayer = None,
654
        bucket_name = request["Bucket"]
1✔
655
        key = request["Key"]
1✔
656
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
657

658
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
659
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
660
        ):
661
            raise InvalidStorageClass(
1✔
662
                "The storage class you specified is not valid", StorageClassRequested=storage_class
663
            )
664

665
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
666
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
667

668
        validate_object_key(key)
1✔
669

670
        if_match = request.get("IfMatch")
1✔
671
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
672
            raise NotImplementedException(
673
                "A header you provided implies functionality that is not implemented",
674
                Header="If-Match,If-None-Match",
675
                additionalMessage="Multiple conditional request headers present in the request",
676
            )
677

678
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
679
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
680
            raise NotImplementedException(
681
                "A header you provided implies functionality that is not implemented",
682
                Header=header_name,
683
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
684
            )
685

686
        system_metadata = get_system_metadata_from_request(request)
1✔
687
        if not system_metadata.get("ContentType"):
1✔
688
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
689

690
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
691

692
        etag_content_md5 = ""
1✔
693
        if content_md5 := request.get("ContentMD5"):
1✔
694
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
695
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
696
            if not etag_content_md5:
1✔
697
                raise InvalidDigest(
1✔
698
                    "The Content-MD5 you specified was invalid.",
699
                    Content_MD5=content_md5,
700
                )
701

702
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
703
        checksum_value = (
1✔
704
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
705
        )
706

707
        # TODO: we're not encrypting the object with the provided key for now
708
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
709
        validate_sse_c(
1✔
710
            algorithm=request.get("SSECustomerAlgorithm"),
711
            encryption_key=request.get("SSECustomerKey"),
712
            encryption_key_md5=sse_c_key_md5,
713
            server_side_encryption=request.get("ServerSideEncryption"),
714
        )
715

716
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
717
            request,
718
            s3_bucket,
719
            store,
720
        )
721

722
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
723

724
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
725

726
        if tagging := request.get("Tagging"):
1✔
727
            tagging = parse_tagging_header(tagging)
1✔
728

729
        s3_object = S3Object(
1✔
730
            key=key,
731
            version_id=version_id,
732
            storage_class=storage_class,
733
            expires=request.get("Expires"),
734
            user_metadata=request.get("Metadata"),
735
            system_metadata=system_metadata,
736
            checksum_algorithm=checksum_algorithm,
737
            checksum_value=checksum_value,
738
            encryption=encryption_parameters.encryption,
739
            kms_key_id=encryption_parameters.kms_key_id,
740
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
741
            sse_key_hash=sse_c_key_md5,
742
            lock_mode=lock_parameters.lock_mode,
743
            lock_legal_status=lock_parameters.lock_legal_status,
744
            lock_until=lock_parameters.lock_until,
745
            website_redirect_location=request.get("WebsiteRedirectLocation"),
746
            acl=acl,
747
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
748
        )
749

750
        body = request.get("Body")
1✔
751
        # check if chunked request
752
        headers = context.request.headers
1✔
753
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
754
            "STREAMING-"
755
        ) or "aws-chunked" in headers.get("content-encoding", "")
756
        if is_aws_chunked:
1✔
757
            checksum_algorithm = (
1✔
758
                checksum_algorithm
759
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
760
            )
761
            if checksum_algorithm:
1✔
762
                s3_object.checksum_algorithm = checksum_algorithm
1✔
763

764
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
765
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
766

767
            # S3 removes the `aws-chunked` value from ContentEncoding
768
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
769
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
770
                if encodings:
1✔
771
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
772

773
        with self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object:
1✔
774
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
775
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
776
            # opening the lock and other requests can check this condition
777
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
778
                raise PreconditionFailed(
1✔
779
                    "At least one of the pre-conditions you specified did not hold",
780
                    Condition="If-None-Match",
781
                )
782

783
            elif if_match:
1✔
784
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
785

786
            s3_stored_object.write(body)
1✔
787

788
            if s3_object.checksum_algorithm:
1✔
789
                if not s3_object.checksum_value:
1✔
790
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
791
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
792
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
793
                    raise InvalidRequest(
1✔
794
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
795
                    )
796
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
797
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
798
                    raise BadDigest(
1✔
799
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
800
                    )
801

802
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
803
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
804
            if content_md5:
1✔
805
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
806
                if calculated_md5 != content_md5:
1✔
807
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
808
                    raise BadDigest(
1✔
809
                        "The Content-MD5 you specified did not match what we received.",
810
                        ExpectedDigest=etag_content_md5,
811
                        CalculatedDigest=calculated_md5,
812
                    )
813

814
            s3_bucket.objects.set(key, s3_object)
1✔
815

816
        # in case we are overriding an object, delete the tags entry
817
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
818
        store.TAGS.tags.pop(key_id, None)
1✔
819
        if tagging:
1✔
820
            store.TAGS.tags[key_id] = tagging
1✔
821

822
        # RequestCharged: Optional[RequestCharged]  # TODO
823
        response = PutObjectOutput(
1✔
824
            ETag=s3_object.quoted_etag,
825
        )
826
        if s3_bucket.versioning_status == "Enabled":
1✔
827
            response["VersionId"] = s3_object.version_id
1✔
828

829
        if s3_object.checksum_algorithm:
1✔
830
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
831
            response["ChecksumType"] = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
832

833
        if s3_bucket.lifecycle_rules:
1✔
834
            if expiration_header := self._get_expiration_header(
1✔
835
                s3_bucket.lifecycle_rules,
836
                bucket_name,
837
                s3_object,
838
                store.TAGS.tags.get(key_id, {}),
839
            ):
840
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
841
                #  apply them everytime we get/head an object
842
                response["Expiration"] = expiration_header
1✔
843

844
        add_encryption_to_response(response, s3_object=s3_object)
1✔
845
        if sse_c_key_md5:
1✔
846
            response["SSECustomerAlgorithm"] = "AES256"
1✔
847
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
848

849
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
850

851
        return response
1✔
852

853
    @handler("GetObject", expand=False)
1✔
854
    def get_object(
1✔
855
        self,
856
        context: RequestContext,
857
        request: GetObjectRequest,
858
    ) -> GetObjectOutput:
859
        # TODO: missing handling parameters:
860
        #  request_payer: RequestPayer = None,
861
        #  expected_bucket_owner: AccountId = None,
862

863
        bucket_name = request["Bucket"]
1✔
864
        object_key = request["Key"]
1✔
865
        version_id = request.get("VersionId")
1✔
866
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
867

868
        s3_object = s3_bucket.get_object(
1✔
869
            key=object_key,
870
            version_id=version_id,
871
            http_method="GET",
872
        )
873
        if s3_object.expires and s3_object.expires < datetime.datetime.now(
1✔
874
            tz=s3_object.expires.tzinfo
875
        ):
876
            # TODO: old behaviour was deleting key instantly if expired. AWS cleans up only once a day generally
877
            #  you can still HeadObject on it and you get the expiry time until scheduled deletion
878
            kwargs = {"Key": object_key}
1✔
879
            if version_id:
1✔
880
                kwargs["VersionId"] = version_id
×
881
            raise NoSuchKey("The specified key does not exist.", **kwargs)
1✔
882

883
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
884
            raise InvalidObjectState(
1✔
885
                "The operation is not valid for the object's storage class",
886
                StorageClass=s3_object.storage_class,
887
            )
888

889
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
890
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
891

892
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
893
        # we're using getattr access because when restoring, the field might not exist
894
        # TODO: cleanup at next major release
895
        if sse_key_hash := getattr(s3_object, "sse_key_hash", None):
1✔
896
            if sse_key_hash and not sse_c_key_md5:
1✔
897
                raise InvalidRequest(
1✔
898
                    "The object was stored using a form of Server Side Encryption. "
899
                    "The correct parameters must be provided to retrieve the object."
900
                )
901
            elif sse_key_hash != sse_c_key_md5:
1✔
902
                raise AccessDenied(
1✔
903
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
904
                )
905

906
        validate_sse_c(
1✔
907
            algorithm=request.get("SSECustomerAlgorithm"),
908
            encryption_key=request.get("SSECustomerKey"),
909
            encryption_key_md5=sse_c_key_md5,
910
        )
911

912
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
913

914
        range_header = request.get("Range")
1✔
915
        part_number = request.get("PartNumber")
1✔
916
        if range_header and part_number:
1✔
917
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
918
        range_data = None
1✔
919
        if range_header:
1✔
920
            range_data = parse_range_header(range_header, s3_object.size)
1✔
921
        elif part_number:
1✔
922
            range_data = get_part_range(s3_object, part_number)
1✔
923

924
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
925
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
926
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
927
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
928
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
929

930
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
931
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
932
        # the object again
933
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
934
            s3_object = s3_bucket.get_object(
1✔
935
                key=object_key,
936
                version_id=version_id,
937
                http_method="GET",
938
            )
939

940
        response = GetObjectOutput(
1✔
941
            AcceptRanges="bytes",
942
            **s3_object.get_system_metadata_fields(),
943
        )
944
        if s3_object.user_metadata:
1✔
945
            response["Metadata"] = s3_object.user_metadata
1✔
946

947
        if s3_object.parts and request.get("PartNumber"):
1✔
948
            response["PartsCount"] = len(s3_object.parts)
1✔
949

950
        if s3_object.version_id:
1✔
951
            response["VersionId"] = s3_object.version_id
1✔
952

953
        if s3_object.website_redirect_location:
1✔
954
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
955

956
        if s3_object.restore:
1✔
957
            response["Restore"] = s3_object.restore
×
958

959
        checksum_value = None
1✔
960
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
961
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
962
                checksum_value = s3_object.checksum_value
1✔
963

964
        if range_data:
1✔
965
            s3_stored_object.seek(range_data.begin)
1✔
966
            response["Body"] = LimitedIterableStream(
1✔
967
                s3_stored_object, max_length=range_data.content_length
968
            )
969
            response["ContentRange"] = range_data.content_range
1✔
970
            response["ContentLength"] = range_data.content_length
1✔
971
            response["StatusCode"] = 206
1✔
972
            if range_data.content_length == s3_object.size and checksum_value:
1✔
973
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
974
                response["ChecksumType"] = getattr(
1✔
975
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
976
                )
977
        else:
978
            response["Body"] = s3_stored_object
1✔
979
            if checksum_value:
1✔
980
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
981
                response["ChecksumType"] = getattr(
1✔
982
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
983
                )
984

985
        add_encryption_to_response(response, s3_object=s3_object)
1✔
986

987
        if object_tags := store.TAGS.tags.get(
1✔
988
            get_unique_key_id(bucket_name, object_key, version_id)
989
        ):
990
            response["TagCount"] = len(object_tags)
1✔
991

992
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
993
            if expiration_header := self._get_expiration_header(
1✔
994
                s3_bucket.lifecycle_rules,
995
                bucket_name,
996
                s3_object,
997
                object_tags,
998
            ):
999
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1000
                #  apply them everytime we get/head an object
1001
                response["Expiration"] = expiration_header
1✔
1002

1003
        # TODO: missing returned fields
1004
        #     RequestCharged: Optional[RequestCharged]
1005
        #     ReplicationStatus: Optional[ReplicationStatus]
1006

1007
        if s3_object.lock_mode:
1✔
1008
            response["ObjectLockMode"] = s3_object.lock_mode
×
1009
            if s3_object.lock_until:
×
1010
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1011
        if s3_object.lock_legal_status:
1✔
1012
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1013

1014
        if sse_c_key_md5:
1✔
1015
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1016
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1017

1018
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1019
            if request_param_value := request.get(request_param):
1✔
1020
                response[response_param] = request_param_value
1✔
1021

1022
        return response
1✔
1023

1024
    @handler("HeadObject", expand=False)
1✔
1025
    def head_object(
1✔
1026
        self,
1027
        context: RequestContext,
1028
        request: HeadObjectRequest,
1029
    ) -> HeadObjectOutput:
1030
        bucket_name = request["Bucket"]
1✔
1031
        object_key = request["Key"]
1✔
1032
        version_id = request.get("VersionId")
1✔
1033
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1034

1035
        s3_object = s3_bucket.get_object(
1✔
1036
            key=object_key,
1037
            version_id=version_id,
1038
            http_method="HEAD",
1039
        )
1040

1041
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1042

1043
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1044
        if s3_object.sse_key_hash:
1✔
1045
            if not sse_c_key_md5:
1✔
1046
                raise InvalidRequest(
×
1047
                    "The object was stored using a form of Server Side Encryption. "
1048
                    "The correct parameters must be provided to retrieve the object."
1049
                )
1050
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1051
                raise AccessDenied(
1✔
1052
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1053
                )
1054

1055
        validate_sse_c(
1✔
1056
            algorithm=request.get("SSECustomerAlgorithm"),
1057
            encryption_key=request.get("SSECustomerKey"),
1058
            encryption_key_md5=sse_c_key_md5,
1059
        )
1060

1061
        response = HeadObjectOutput(
1✔
1062
            AcceptRanges="bytes",
1063
            **s3_object.get_system_metadata_fields(),
1064
        )
1065
        if s3_object.user_metadata:
1✔
1066
            response["Metadata"] = s3_object.user_metadata
1✔
1067

1068
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1069
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1070
                response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
1071
                response["ChecksumType"] = getattr(
1✔
1072
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1073
                )
1074

1075
        if s3_object.parts and request.get("PartNumber"):
1✔
1076
            response["PartsCount"] = len(s3_object.parts)
1✔
1077

1078
        if s3_object.version_id:
1✔
1079
            response["VersionId"] = s3_object.version_id
1✔
1080

1081
        if s3_object.website_redirect_location:
1✔
1082
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1083

1084
        if s3_object.restore:
1✔
1085
            response["Restore"] = s3_object.restore
1✔
1086

1087
        range_header = request.get("Range")
1✔
1088
        part_number = request.get("PartNumber")
1✔
1089
        if range_header and part_number:
1✔
1090
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1091
        range_data = None
1✔
1092
        if range_header:
1✔
1093
            range_data = parse_range_header(range_header, s3_object.size)
×
1094
        elif part_number:
1✔
1095
            range_data = get_part_range(s3_object, part_number)
1✔
1096

1097
        if range_data:
1✔
1098
            response["ContentLength"] = range_data.content_length
1✔
1099
            response["ContentRange"] = range_data.content_range
1✔
1100
            response["StatusCode"] = 206
1✔
1101

1102
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1103

1104
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1105
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1106
            object_tags = store.TAGS.tags.get(
1✔
1107
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1108
            )
1109
            if expiration_header := self._get_expiration_header(
1✔
1110
                s3_bucket.lifecycle_rules,
1111
                bucket_name,
1112
                s3_object,
1113
                object_tags,
1114
            ):
1115
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1116
                #  apply them everytime we get/head an object
1117
                response["Expiration"] = expiration_header
1✔
1118

1119
        if s3_object.lock_mode:
1✔
1120
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1121
            if s3_object.lock_until:
1✔
1122
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1123
        if s3_object.lock_legal_status:
1✔
1124
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1125

1126
        if sse_c_key_md5:
1✔
1127
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1128
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1129

1130
        # TODO: missing return fields:
1131
        #  ArchiveStatus: Optional[ArchiveStatus]
1132
        #  RequestCharged: Optional[RequestCharged]
1133
        #  ReplicationStatus: Optional[ReplicationStatus]
1134

1135
        return response
1✔
1136

1137
    def delete_object(
1✔
1138
        self,
1139
        context: RequestContext,
1140
        bucket: BucketName,
1141
        key: ObjectKey,
1142
        mfa: MFA = None,
1143
        version_id: ObjectVersionId = None,
1144
        request_payer: RequestPayer = None,
1145
        bypass_governance_retention: BypassGovernanceRetention = None,
1146
        expected_bucket_owner: AccountId = None,
1147
        if_match: IfMatch = None,
1148
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1149
        if_match_size: IfMatchSize = None,
1150
        **kwargs,
1151
    ) -> DeleteObjectOutput:
1152
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1153

1154
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1155
            raise InvalidArgument(
1✔
1156
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1157
                ArgumentName="x-amz-bypass-governance-retention",
1158
            )
1159

1160
        if s3_bucket.versioning_status is None:
1✔
1161
            if version_id and version_id != "null":
1✔
1162
                raise InvalidArgument(
1✔
1163
                    "Invalid version id specified",
1164
                    ArgumentName="versionId",
1165
                    ArgumentValue=version_id,
1166
                )
1167

1168
            found_object = s3_bucket.objects.pop(key, None)
1✔
1169
            # TODO: RequestCharged
1170
            if found_object:
1✔
1171
                self._storage_backend.remove(bucket, found_object)
1✔
1172
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1173
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1174

1175
            return DeleteObjectOutput()
1✔
1176

1177
        if not version_id:
1✔
1178
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1179
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1180
            s3_bucket.objects.set(key, delete_marker)
1✔
1181
            s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1182
                context,
1183
                s3_bucket=s3_bucket,
1184
                s3_object=delete_marker,
1185
            )
1186
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1187
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1188

1189
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1190

1191
        if key not in s3_bucket.objects:
1✔
1192
            return DeleteObjectOutput()
×
1193

1194
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1195
            raise InvalidArgument(
1✔
1196
                "Invalid version id specified",
1197
                ArgumentName="versionId",
1198
                ArgumentValue=version_id,
1199
            )
1200

1201
        if s3_object.is_locked(bypass_governance_retention):
1✔
1202
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1203

1204
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1205
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1206

1207
        if isinstance(s3_object, S3DeleteMarker):
1✔
1208
            response["DeleteMarker"] = True
1✔
1209
        else:
1210
            self._storage_backend.remove(bucket, s3_object)
1✔
1211
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1212
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1213

1214
        return response
1✔
1215

1216
    def delete_objects(
1✔
1217
        self,
1218
        context: RequestContext,
1219
        bucket: BucketName,
1220
        delete: Delete,
1221
        mfa: MFA = None,
1222
        request_payer: RequestPayer = None,
1223
        bypass_governance_retention: BypassGovernanceRetention = None,
1224
        expected_bucket_owner: AccountId = None,
1225
        checksum_algorithm: ChecksumAlgorithm = None,
1226
        **kwargs,
1227
    ) -> DeleteObjectsOutput:
1228
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1229

1230
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1231
            raise InvalidArgument(
1✔
1232
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1233
                ArgumentName="x-amz-bypass-governance-retention",
1234
            )
1235

1236
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1237
        if not objects:
1✔
1238
            raise MalformedXML()
×
1239

1240
        # TODO: max 1000 delete at once? test against AWS?
1241

1242
        quiet = delete.get("Quiet", False)
1✔
1243
        deleted = []
1✔
1244
        errors = []
1✔
1245

1246
        to_remove = []
1✔
1247
        for to_delete_object in objects:
1✔
1248
            object_key = to_delete_object.get("Key")
1✔
1249
            version_id = to_delete_object.get("VersionId")
1✔
1250
            if s3_bucket.versioning_status is None:
1✔
1251
                if version_id and version_id != "null":
1✔
1252
                    errors.append(
1✔
1253
                        Error(
1254
                            Code="NoSuchVersion",
1255
                            Key=object_key,
1256
                            Message="The specified version does not exist.",
1257
                            VersionId=version_id,
1258
                        )
1259
                    )
1260
                    continue
1✔
1261

1262
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1263
                if found_object:
1✔
1264
                    to_remove.append(found_object)
1✔
1265
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1266
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1267
                # small hack to not create a fake object for nothing
1268
                elif s3_bucket.notification_configuration:
1✔
1269
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1270
                    # for a non-existing object being deleted
1271
                    self._notify(
1✔
1272
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1273
                    )
1274

1275
                if not quiet:
1✔
1276
                    deleted.append(DeletedObject(Key=object_key))
1✔
1277

1278
                continue
1✔
1279

1280
            if not version_id:
1✔
1281
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1282
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1283
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1284
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1285
                    context,
1286
                    s3_bucket=s3_bucket,
1287
                    s3_object=delete_marker,
1288
                )
1289
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1290
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1291

1292
                if not quiet:
1✔
1293
                    deleted.append(
1✔
1294
                        DeletedObject(
1295
                            DeleteMarker=True,
1296
                            DeleteMarkerVersionId=delete_marker_id,
1297
                            Key=object_key,
1298
                        )
1299
                    )
1300
                continue
1✔
1301

1302
            if not (
1✔
1303
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1304
            ):
1305
                errors.append(
1✔
1306
                    Error(
1307
                        Code="NoSuchVersion",
1308
                        Key=object_key,
1309
                        Message="The specified version does not exist.",
1310
                        VersionId=version_id,
1311
                    )
1312
                )
1313
                continue
1✔
1314

1315
            if found_object.is_locked(bypass_governance_retention):
1✔
1316
                errors.append(
1✔
1317
                    Error(
1318
                        Code="AccessDenied",
1319
                        Key=object_key,
1320
                        Message="Access Denied because object protected by object lock.",
1321
                        VersionId=version_id,
1322
                    )
1323
                )
1324
                continue
1✔
1325

1326
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1327
            if not quiet:
1✔
1328
                deleted_object = DeletedObject(
1✔
1329
                    Key=object_key,
1330
                    VersionId=version_id,
1331
                )
1332
                if isinstance(found_object, S3DeleteMarker):
1✔
1333
                    deleted_object["DeleteMarker"] = True
1✔
1334
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1335

1336
                deleted.append(deleted_object)
1✔
1337

1338
            if isinstance(found_object, S3Object):
1✔
1339
                to_remove.append(found_object)
1✔
1340

1341
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1342
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1343

1344
        # TODO: request charged
1345
        self._storage_backend.remove(bucket, to_remove)
1✔
1346
        response: DeleteObjectsOutput = {}
1✔
1347
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1348
        if errors:
1✔
1349
            response["Errors"] = errors
1✔
1350
        if not quiet:
1✔
1351
            response["Deleted"] = deleted
1✔
1352

1353
        return response
1✔
1354

1355
    @handler("CopyObject", expand=False)
1✔
1356
    def copy_object(
1✔
1357
        self,
1358
        context: RequestContext,
1359
        request: CopyObjectRequest,
1360
    ) -> CopyObjectOutput:
1361
        # request_payer: RequestPayer = None,  # TODO:
1362
        dest_bucket = request["Bucket"]
1✔
1363
        dest_key = request["Key"]
1✔
1364
        validate_object_key(dest_key)
1✔
1365
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1366

1367
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1368
            request.get("CopySource")
1369
        )
1370
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1371

1372
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1373
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1374

1375
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1376
        try:
1✔
1377
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
1378
        except MethodNotAllowed:
×
1379
            raise InvalidRequest(
×
1380
                "The source of a copy request may not specifically refer to a delete marker by version id."
1381
            )
1382

1383
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
1384
            raise InvalidObjectState(
×
1385
                "Operation is not valid for the source object's storage class",
1386
                StorageClass=src_s3_object.storage_class,
1387
            )
1388

1389
        if failed_condition := get_failed_precondition_copy_source(
1✔
1390
            request, src_s3_object.last_modified, src_s3_object.etag
1391
        ):
1392
            raise PreconditionFailed(
1✔
1393
                "At least one of the pre-conditions you specified did not hold",
1394
                Condition=failed_condition,
1395
            )
1396

1397
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1398
        if src_s3_object.sse_key_hash:
1✔
1399
            if not source_sse_c_key_md5:
1✔
1400
                raise InvalidRequest(
1✔
1401
                    "The object was stored using a form of Server Side Encryption. "
1402
                    "The correct parameters must be provided to retrieve the object."
1403
                )
1404
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
1405
                raise AccessDenied("Access Denied")
×
1406

1407
        validate_sse_c(
1✔
1408
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1409
            encryption_key=request.get("CopySourceSSECustomerKey"),
1410
            encryption_key_md5=source_sse_c_key_md5,
1411
        )
1412

1413
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1414
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1415
        # validate target SSE-C parameters
1416
        validate_sse_c(
1✔
1417
            algorithm=request.get("SSECustomerAlgorithm"),
1418
            encryption_key=request.get("SSECustomerKey"),
1419
            encryption_key_md5=target_sse_c_key_md5,
1420
            server_side_encryption=server_side_encryption,
1421
        )
1422

1423
        # TODO validate order of validation
1424
        storage_class = request.get("StorageClass")
1✔
1425
        metadata_directive = request.get("MetadataDirective")
1✔
1426
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1427
        # we need to check for identity of the object, to see if the default one has been changed
1428
        is_default_encryption = (
1✔
1429
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1430
            and src_s3_object.encryption == "AES256"
1431
        )
1432
        if (
1✔
1433
            src_bucket == dest_bucket
1434
            and src_key == dest_key
1435
            and not any(
1436
                (
1437
                    storage_class,
1438
                    server_side_encryption,
1439
                    target_sse_c_key_md5,
1440
                    metadata_directive == "REPLACE",
1441
                    website_redirect_location,
1442
                    dest_s3_bucket.encryption_rule
1443
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1444
                    src_s3_object.restore,
1445
                )
1446
            )
1447
        ):
1448
            raise InvalidRequest(
1✔
1449
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1450
                "object's metadata, storage class, website redirect location or encryption attributes."
1451
            )
1452

1453
        if tagging := request.get("Tagging"):
1✔
1454
            tagging = parse_tagging_header(tagging)
1✔
1455

1456
        if metadata_directive == "REPLACE":
1✔
1457
            user_metadata = request.get("Metadata")
1✔
1458
            system_metadata = get_system_metadata_from_request(request)
1✔
1459
            if not system_metadata.get("ContentType"):
1✔
1460
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1461
        else:
1462
            user_metadata = src_s3_object.user_metadata
1✔
1463
            system_metadata = src_s3_object.system_metadata
1✔
1464

1465
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1466

1467
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1468
            request,
1469
            dest_s3_bucket,
1470
            store,
1471
        )
1472
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1473
            request, dest_s3_bucket
1474
        )
1475

1476
        acl = get_access_control_policy_for_new_resource_request(
1✔
1477
            request, owner=dest_s3_bucket.owner
1478
        )
1479
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1480

1481
        s3_object = S3Object(
1✔
1482
            key=dest_key,
1483
            size=src_s3_object.size,
1484
            version_id=dest_version_id,
1485
            storage_class=storage_class,
1486
            expires=request.get("Expires"),
1487
            user_metadata=user_metadata,
1488
            system_metadata=system_metadata,
1489
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1490
            encryption=encryption_parameters.encryption,
1491
            kms_key_id=encryption_parameters.kms_key_id,
1492
            bucket_key_enabled=request.get(
1493
                "BucketKeyEnabled"
1494
            ),  # CopyObject does not inherit from the bucket here
1495
            sse_key_hash=target_sse_c_key_md5,
1496
            lock_mode=lock_parameters.lock_mode,
1497
            lock_legal_status=lock_parameters.lock_legal_status,
1498
            lock_until=lock_parameters.lock_until,
1499
            website_redirect_location=website_redirect_location,
1500
            expiration=None,  # TODO, from lifecycle
1501
            acl=acl,
1502
            owner=dest_s3_bucket.owner,
1503
        )
1504

1505
        with self._storage_backend.copy(
1✔
1506
            src_bucket=src_bucket,
1507
            src_object=src_s3_object,
1508
            dest_bucket=dest_bucket,
1509
            dest_object=s3_object,
1510
        ) as s3_stored_object:
1511
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1512
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1513

1514
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1515

1516
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1517

1518
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1519
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1520
        else:
1521
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1522
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1523
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1524

1525
        copy_object_result = CopyObjectResult(
1✔
1526
            ETag=s3_object.quoted_etag,
1527
            LastModified=s3_object.last_modified,
1528
        )
1529
        if s3_object.checksum_algorithm:
1✔
1530
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1531
                s3_object.checksum_value
1532
            )
1533

1534
        response = CopyObjectOutput(
1✔
1535
            CopyObjectResult=copy_object_result,
1536
        )
1537

1538
        if s3_object.version_id:
1✔
1539
            response["VersionId"] = s3_object.version_id
1✔
1540

1541
        if s3_object.expiration:
1✔
1542
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1543

1544
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1545
        if target_sse_c_key_md5:
1✔
1546
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1547
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1548

1549
        if (
1✔
1550
            src_s3_bucket.versioning_status
1551
            and src_s3_object.version_id
1552
            and src_s3_object.version_id != "null"
1553
        ):
1554
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1555

1556
        # RequestCharged: Optional[RequestCharged] # TODO
1557
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1558

1559
        return response
1✔
1560

1561
    def list_objects(
1✔
1562
        self,
1563
        context: RequestContext,
1564
        bucket: BucketName,
1565
        delimiter: Delimiter = None,
1566
        encoding_type: EncodingType = None,
1567
        marker: Marker = None,
1568
        max_keys: MaxKeys = None,
1569
        prefix: Prefix = None,
1570
        request_payer: RequestPayer = None,
1571
        expected_bucket_owner: AccountId = None,
1572
        optional_object_attributes: OptionalObjectAttributesList = None,
1573
        **kwargs,
1574
    ) -> ListObjectsOutput:
1575
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1576

1577
        common_prefixes = set()
1✔
1578
        count = 0
1✔
1579
        is_truncated = False
1✔
1580
        next_key_marker = None
1✔
1581
        max_keys = max_keys or 1000
1✔
1582
        prefix = prefix or ""
1✔
1583
        delimiter = delimiter or ""
1✔
1584
        if encoding_type:
1✔
1585
            prefix = urlparse.quote(prefix)
1✔
1586
            delimiter = urlparse.quote(delimiter)
1✔
1587

1588
        s3_objects: list[Object] = []
1✔
1589

1590
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1591
        last_key = all_keys[-1] if all_keys else None
1✔
1592

1593
        # sort by key
1594
        for s3_object in all_keys:
1✔
1595
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1596
            # skip all keys that alphabetically come before key_marker
1597
            if marker:
1✔
1598
                if key <= marker:
1✔
1599
                    continue
1✔
1600

1601
            # Filter for keys that start with prefix
1602
            if prefix and not key.startswith(prefix):
1✔
1603
                continue
×
1604

1605
            # see ListObjectsV2 for the logic comments (shared logic here)
1606
            prefix_including_delimiter = None
1✔
1607
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1608
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1609
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1610

1611
                if prefix_including_delimiter in common_prefixes or (
1✔
1612
                    marker and marker.startswith(prefix_including_delimiter)
1613
                ):
1614
                    continue
1✔
1615

1616
            if prefix_including_delimiter:
1✔
1617
                common_prefixes.add(prefix_including_delimiter)
1✔
1618
            else:
1619
                # TODO: add RestoreStatus if present
1620
                object_data = Object(
1✔
1621
                    Key=key,
1622
                    ETag=s3_object.quoted_etag,
1623
                    Owner=s3_bucket.owner,  # TODO: verify reality
1624
                    Size=s3_object.size,
1625
                    LastModified=s3_object.last_modified,
1626
                    StorageClass=s3_object.storage_class,
1627
                )
1628

1629
                if s3_object.checksum_algorithm:
1✔
1630
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1631
                    object_data["ChecksumType"] = getattr(
1✔
1632
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1633
                    )
1634

1635
                s3_objects.append(object_data)
1✔
1636

1637
            # we just added a CommonPrefix or an Object, increase the counter
1638
            count += 1
1✔
1639
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1640
                is_truncated = True
1✔
1641
                if prefix_including_delimiter:
1✔
1642
                    next_key_marker = prefix_including_delimiter
1✔
1643
                elif s3_objects:
1✔
1644
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1645
                break
1✔
1646

1647
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1648

1649
        response = ListObjectsOutput(
1✔
1650
            IsTruncated=is_truncated,
1651
            Name=bucket,
1652
            MaxKeys=max_keys,
1653
            Prefix=prefix or "",
1654
            Marker=marker or "",
1655
        )
1656
        if s3_objects:
1✔
1657
            response["Contents"] = s3_objects
1✔
1658
        if encoding_type:
1✔
1659
            response["EncodingType"] = EncodingType.url
1✔
1660
        if delimiter:
1✔
1661
            response["Delimiter"] = delimiter
1✔
1662
        if common_prefixes:
1✔
1663
            response["CommonPrefixes"] = common_prefixes
1✔
1664
        if delimiter and next_key_marker:
1✔
1665
            response["NextMarker"] = next_key_marker
1✔
1666
        if s3_bucket.bucket_region != "us-east-1":
1✔
1667
            response["BucketRegion"] = s3_bucket.bucket_region
×
1668

1669
        # RequestCharged: Optional[RequestCharged]  # TODO
1670
        return response
1✔
1671

1672
    def list_objects_v2(
1✔
1673
        self,
1674
        context: RequestContext,
1675
        bucket: BucketName,
1676
        delimiter: Delimiter = None,
1677
        encoding_type: EncodingType = None,
1678
        max_keys: MaxKeys = None,
1679
        prefix: Prefix = None,
1680
        continuation_token: Token = None,
1681
        fetch_owner: FetchOwner = None,
1682
        start_after: StartAfter = None,
1683
        request_payer: RequestPayer = None,
1684
        expected_bucket_owner: AccountId = None,
1685
        optional_object_attributes: OptionalObjectAttributesList = None,
1686
        **kwargs,
1687
    ) -> ListObjectsV2Output:
1688
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1689

1690
        if continuation_token == "":
1✔
1691
            raise InvalidArgument(
1✔
1692
                "The continuation token provided is incorrect",
1693
                ArgumentName="continuation-token",
1694
            )
1695

1696
        common_prefixes = set()
1✔
1697
        count = 0
1✔
1698
        is_truncated = False
1✔
1699
        next_continuation_token = None
1✔
1700
        max_keys = max_keys or 1000
1✔
1701
        prefix = prefix or ""
1✔
1702
        delimiter = delimiter or ""
1✔
1703
        if encoding_type:
1✔
1704
            prefix = urlparse.quote(prefix)
1✔
1705
            delimiter = urlparse.quote(delimiter)
1✔
1706
        decoded_continuation_token = (
1✔
1707
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1708
            if continuation_token
1709
            else None
1710
        )
1711

1712
        s3_objects: list[Object] = []
1✔
1713

1714
        # sort by key
1715
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1716
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1717

1718
            # skip all keys that alphabetically come before continuation_token
1719
            if continuation_token:
1✔
1720
                if key < decoded_continuation_token:
1✔
1721
                    continue
1✔
1722

1723
            elif start_after:
1✔
1724
                if key <= start_after:
1✔
1725
                    continue
1✔
1726

1727
            # Filter for keys that start with prefix
1728
            if prefix and not key.startswith(prefix):
1✔
1729
                continue
1✔
1730

1731
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1732
            prefix_including_delimiter = None
1✔
1733
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1734
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1735
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1736

1737
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1738
                # the entry without increasing the counter. We need to iterate over all of these entries before
1739
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1740
                if prefix_including_delimiter in common_prefixes:
1✔
1741
                    continue
1✔
1742

1743
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1744
            if count >= max_keys:
1✔
1745
                is_truncated = True
1✔
1746
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1747
                break
1✔
1748

1749
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1750
            # else, it means it's a new Object, add it to the Contents
1751
            if prefix_including_delimiter:
1✔
1752
                common_prefixes.add(prefix_including_delimiter)
1✔
1753
            else:
1754
                # TODO: add RestoreStatus if present
1755
                object_data = Object(
1✔
1756
                    Key=key,
1757
                    ETag=s3_object.quoted_etag,
1758
                    Size=s3_object.size,
1759
                    LastModified=s3_object.last_modified,
1760
                    StorageClass=s3_object.storage_class,
1761
                )
1762

1763
                if fetch_owner:
1✔
1764
                    object_data["Owner"] = s3_bucket.owner
×
1765

1766
                if s3_object.checksum_algorithm:
1✔
1767
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1768
                    object_data["ChecksumType"] = getattr(
1✔
1769
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1770
                    )
1771

1772
                s3_objects.append(object_data)
1✔
1773

1774
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1775
            count += 1
1✔
1776

1777
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1778

1779
        response = ListObjectsV2Output(
1✔
1780
            IsTruncated=is_truncated,
1781
            Name=bucket,
1782
            MaxKeys=max_keys,
1783
            Prefix=prefix or "",
1784
            KeyCount=count,
1785
        )
1786
        if s3_objects:
1✔
1787
            response["Contents"] = s3_objects
1✔
1788
        if encoding_type:
1✔
1789
            response["EncodingType"] = EncodingType.url
1✔
1790
        if delimiter:
1✔
1791
            response["Delimiter"] = delimiter
1✔
1792
        if common_prefixes:
1✔
1793
            response["CommonPrefixes"] = common_prefixes
1✔
1794
        if next_continuation_token:
1✔
1795
            response["NextContinuationToken"] = next_continuation_token
1✔
1796

1797
        if continuation_token:
1✔
1798
            response["ContinuationToken"] = continuation_token
1✔
1799
        elif start_after:
1✔
1800
            response["StartAfter"] = start_after
1✔
1801

1802
        if s3_bucket.bucket_region != "us-east-1":
1✔
1803
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1804

1805
        # RequestCharged: Optional[RequestCharged]  # TODO
1806
        return response
1✔
1807

1808
    def list_object_versions(
1✔
1809
        self,
1810
        context: RequestContext,
1811
        bucket: BucketName,
1812
        delimiter: Delimiter = None,
1813
        encoding_type: EncodingType = None,
1814
        key_marker: KeyMarker = None,
1815
        max_keys: MaxKeys = None,
1816
        prefix: Prefix = None,
1817
        version_id_marker: VersionIdMarker = None,
1818
        expected_bucket_owner: AccountId = None,
1819
        request_payer: RequestPayer = None,
1820
        optional_object_attributes: OptionalObjectAttributesList = None,
1821
        **kwargs,
1822
    ) -> ListObjectVersionsOutput:
1823
        if version_id_marker and not key_marker:
1✔
1824
            raise InvalidArgument(
1✔
1825
                "A version-id marker cannot be specified without a key marker.",
1826
                ArgumentName="version-id-marker",
1827
                ArgumentValue=version_id_marker,
1828
            )
1829

1830
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1831
        common_prefixes = set()
1✔
1832
        count = 0
1✔
1833
        is_truncated = False
1✔
1834
        next_key_marker = None
1✔
1835
        next_version_id_marker = None
1✔
1836
        max_keys = max_keys or 1000
1✔
1837
        prefix = prefix or ""
1✔
1838
        delimiter = delimiter or ""
1✔
1839
        if encoding_type:
1✔
1840
            prefix = urlparse.quote(prefix)
1✔
1841
            delimiter = urlparse.quote(delimiter)
1✔
1842
        version_key_marker_found = False
1✔
1843

1844
        object_versions: list[ObjectVersion] = []
1✔
1845
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1846

1847
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1848
        # sort by key, and last-modified-date, to get the last version first
1849
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1850
        last_version = all_versions[-1] if all_versions else None
1✔
1851

1852
        for version in all_versions:
1✔
1853
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1854
            # skip all keys that alphabetically come before key_marker
1855
            if key_marker:
1✔
1856
                if key < key_marker:
1✔
1857
                    continue
1✔
1858
                elif key == key_marker:
1✔
1859
                    if not version_id_marker:
1✔
1860
                        continue
1✔
1861
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
1862
                    if version.version_id == version_id_marker:
1✔
1863
                        version_key_marker_found = True
1✔
1864
                        continue
1✔
1865

1866
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
1867
                    # as soon as the next version id is older than the version id marker (meaning this version was
1868
                    # next after the now-deleted version)
1869
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
1870
                        version_key_marker_found = True
1✔
1871

1872
                    elif not version_key_marker_found:
1✔
1873
                        # as long as we have not passed the version_key_marker, skip the versions
1874
                        continue
1✔
1875

1876
            # Filter for keys that start with prefix
1877
            if prefix and not key.startswith(prefix):
1✔
1878
                continue
1✔
1879

1880
            # see ListObjectsV2 for the logic comments (shared logic here)
1881
            prefix_including_delimiter = None
1✔
1882
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1883
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1884
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1885

1886
                if prefix_including_delimiter in common_prefixes or (
1✔
1887
                    key_marker and key_marker.startswith(prefix_including_delimiter)
1888
                ):
1889
                    continue
1✔
1890

1891
            if prefix_including_delimiter:
1✔
1892
                common_prefixes.add(prefix_including_delimiter)
1✔
1893

1894
            elif isinstance(version, S3DeleteMarker):
1✔
1895
                delete_marker = DeleteMarkerEntry(
1✔
1896
                    Key=key,
1897
                    Owner=s3_bucket.owner,
1898
                    VersionId=version.version_id,
1899
                    IsLatest=version.is_current,
1900
                    LastModified=version.last_modified,
1901
                )
1902
                delete_markers.append(delete_marker)
1✔
1903
            else:
1904
                # TODO: add RestoreStatus if present
1905
                object_version = ObjectVersion(
1✔
1906
                    Key=key,
1907
                    ETag=version.quoted_etag,
1908
                    Owner=s3_bucket.owner,  # TODO: verify reality
1909
                    Size=version.size,
1910
                    VersionId=version.version_id or "null",
1911
                    LastModified=version.last_modified,
1912
                    IsLatest=version.is_current,
1913
                    # TODO: verify this, are other class possible?
1914
                    # StorageClass=version.storage_class,
1915
                    StorageClass=ObjectVersionStorageClass.STANDARD,
1916
                )
1917

1918
                if version.checksum_algorithm:
1✔
1919
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
1920
                    object_version["ChecksumType"] = getattr(
1✔
1921
                        version, "checksum_type", ChecksumType.FULL_OBJECT
1922
                    )
1923

1924
                object_versions.append(object_version)
1✔
1925

1926
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
1927
            count += 1
1✔
1928
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
1929
                is_truncated = True
1✔
1930
                if prefix_including_delimiter:
1✔
1931
                    next_key_marker = prefix_including_delimiter
1✔
1932
                else:
1933
                    next_key_marker = version.key
1✔
1934
                    next_version_id_marker = version.version_id
1✔
1935
                break
1✔
1936

1937
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1938

1939
        response = ListObjectVersionsOutput(
1✔
1940
            IsTruncated=is_truncated,
1941
            Name=bucket,
1942
            MaxKeys=max_keys,
1943
            Prefix=prefix,
1944
            KeyMarker=key_marker or "",
1945
            VersionIdMarker=version_id_marker or "",
1946
        )
1947
        if object_versions:
1✔
1948
            response["Versions"] = object_versions
1✔
1949
        if encoding_type:
1✔
1950
            response["EncodingType"] = EncodingType.url
1✔
1951
        if delete_markers:
1✔
1952
            response["DeleteMarkers"] = delete_markers
1✔
1953
        if delimiter:
1✔
1954
            response["Delimiter"] = delimiter
1✔
1955
        if common_prefixes:
1✔
1956
            response["CommonPrefixes"] = common_prefixes
1✔
1957
        if next_key_marker:
1✔
1958
            response["NextKeyMarker"] = next_key_marker
1✔
1959
        if next_version_id_marker:
1✔
1960
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
1961

1962
        # RequestCharged: Optional[RequestCharged]  # TODO
1963
        return response
1✔
1964

1965
    @handler("GetObjectAttributes", expand=False)
1✔
1966
    def get_object_attributes(
1✔
1967
        self,
1968
        context: RequestContext,
1969
        request: GetObjectAttributesRequest,
1970
    ) -> GetObjectAttributesOutput:
1971
        bucket_name = request["Bucket"]
1✔
1972
        object_key = request["Key"]
1✔
1973
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1974

1975
        s3_object = s3_bucket.get_object(
1✔
1976
            key=object_key,
1977
            version_id=request.get("VersionId"),
1978
            http_method="GET",
1979
        )
1980

1981
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1982
        if s3_object.sse_key_hash:
1✔
1983
            if not sse_c_key_md5:
1✔
1984
                raise InvalidRequest(
×
1985
                    "The object was stored using a form of Server Side Encryption. "
1986
                    "The correct parameters must be provided to retrieve the object."
1987
                )
1988
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1989
                raise AccessDenied("Access Denied")
×
1990

1991
        validate_sse_c(
1✔
1992
            algorithm=request.get("SSECustomerAlgorithm"),
1993
            encryption_key=request.get("SSECustomerKey"),
1994
            encryption_key_md5=sse_c_key_md5,
1995
        )
1996

1997
        object_attrs = request.get("ObjectAttributes", [])
1✔
1998
        response = GetObjectAttributesOutput()
1✔
1999
        if "ETag" in object_attrs:
1✔
2000
            response["ETag"] = s3_object.etag
1✔
2001
        if "StorageClass" in object_attrs:
1✔
2002
            response["StorageClass"] = s3_object.storage_class
1✔
2003
        if "ObjectSize" in object_attrs:
1✔
2004
            response["ObjectSize"] = s3_object.size
1✔
2005
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2006
            if s3_object.parts:
1✔
2007
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2008
            else:
2009
                checksum_value = s3_object.checksum_value
1✔
2010
            response["Checksum"] = {
1✔
2011
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2012
                "ChecksumType": getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT),
2013
            }
2014

2015
        response["LastModified"] = s3_object.last_modified
1✔
2016

2017
        if s3_bucket.versioning_status:
1✔
2018
            response["VersionId"] = s3_object.version_id
1✔
2019

2020
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2021
            # TODO: implements ObjectParts, this is basically a simplified `ListParts` call on the object, we might
2022
            #  need to store more data about the Parts once we implement checksums for them
2023
            response["ObjectParts"] = GetObjectAttributesParts(TotalPartsCount=len(s3_object.parts))
1✔
2024

2025
        return response
1✔
2026

2027
    def restore_object(
1✔
2028
        self,
2029
        context: RequestContext,
2030
        bucket: BucketName,
2031
        key: ObjectKey,
2032
        version_id: ObjectVersionId = None,
2033
        restore_request: RestoreRequest = None,
2034
        request_payer: RequestPayer = None,
2035
        checksum_algorithm: ChecksumAlgorithm = None,
2036
        expected_bucket_owner: AccountId = None,
2037
        **kwargs,
2038
    ) -> RestoreObjectOutput:
2039
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2040

2041
        s3_object = s3_bucket.get_object(
1✔
2042
            key=key,
2043
            version_id=version_id,
2044
            http_method="GET",  # TODO: verify http method
2045
        )
2046
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
2047
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2048

2049
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2050
        # will only implement only the same functionality for now
2051

2052
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2053
        status_code = 200 if s3_object.restore else 202
1✔
2054
        restore_days = restore_request.get("Days")
1✔
2055
        if not restore_days:
1✔
2056
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
2057
            return RestoreObjectOutput()
×
2058

2059
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2060
            datetime.datetime.now(datetime.UTC), restore_days
2061
        )
2062
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2063
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2064

2065
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context_native(
1✔
2066
            context,
2067
            s3_bucket=s3_bucket,
2068
            s3_object=s3_object,
2069
        )
2070
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2071
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2072
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2073
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2074
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2075
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2076
            "Post", "Completed"
2077
        )
2078
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2079

2080
        # TODO: request charged
2081
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2082

2083
    @handler("CreateMultipartUpload", expand=False)
1✔
2084
    def create_multipart_upload(
1✔
2085
        self,
2086
        context: RequestContext,
2087
        request: CreateMultipartUploadRequest,
2088
    ) -> CreateMultipartUploadOutput:
2089
        # TODO: handle missing parameters:
2090
        #  request_payer: RequestPayer = None,
2091
        bucket_name = request["Bucket"]
1✔
2092
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2093

2094
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2095
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2096
        ):
2097
            raise InvalidStorageClass(
1✔
2098
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2099
            )
2100

2101
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2102
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2103

2104
        if tagging := request.get("Tagging"):
1✔
2105
            tagging = parse_tagging_header(tagging_header=tagging)
×
2106

2107
        key = request["Key"]
1✔
2108

2109
        system_metadata = get_system_metadata_from_request(request)
1✔
2110
        if not system_metadata.get("ContentType"):
1✔
2111
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2112

2113
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2114
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2115
            raise InvalidRequest(
1✔
2116
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2117
            )
2118

2119
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2120
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2121
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2122
            else:
2123
                checksum_type = ChecksumType.COMPOSITE
1✔
2124
        elif checksum_type and not checksum_algorithm:
1✔
2125
            raise InvalidRequest(
1✔
2126
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2127
            )
2128

2129
        if (
1✔
2130
            checksum_type == ChecksumType.COMPOSITE
2131
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2132
        ):
2133
            raise InvalidRequest(
1✔
2134
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2135
            )
2136
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2137
            "SHA"
2138
        ):
2139
            raise InvalidRequest(
1✔
2140
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2141
            )
2142

2143
        # TODO: we're not encrypting the object with the provided key for now
2144
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2145
        validate_sse_c(
1✔
2146
            algorithm=request.get("SSECustomerAlgorithm"),
2147
            encryption_key=request.get("SSECustomerKey"),
2148
            encryption_key_md5=sse_c_key_md5,
2149
            server_side_encryption=request.get("ServerSideEncryption"),
2150
        )
2151

2152
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2153
            request,
2154
            s3_bucket,
2155
            store,
2156
        )
2157
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2158

2159
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2160

2161
        # validate encryption values
2162
        s3_multipart = S3Multipart(
1✔
2163
            key=key,
2164
            storage_class=storage_class,
2165
            expires=request.get("Expires"),
2166
            user_metadata=request.get("Metadata"),
2167
            system_metadata=system_metadata,
2168
            checksum_algorithm=checksum_algorithm,
2169
            checksum_type=checksum_type,
2170
            encryption=encryption_parameters.encryption,
2171
            kms_key_id=encryption_parameters.kms_key_id,
2172
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2173
            sse_key_hash=sse_c_key_md5,
2174
            lock_mode=lock_parameters.lock_mode,
2175
            lock_legal_status=lock_parameters.lock_legal_status,
2176
            lock_until=lock_parameters.lock_until,
2177
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2178
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2179
            acl=acl,
2180
            initiator=get_owner_for_account_id(context.account_id),
2181
            tagging=tagging,
2182
            owner=s3_bucket.owner,
2183
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2184
        )
2185
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2186
        # PutObject)
2187
        if sse_c_key_md5:
1✔
2188
            s3_multipart.object.checksum_algorithm = None
1✔
2189

2190
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2191

2192
        response = CreateMultipartUploadOutput(
1✔
2193
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2194
        )
2195

2196
        if checksum_algorithm:
1✔
2197
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2198
            response["ChecksumType"] = checksum_type
1✔
2199

2200
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2201
        if sse_c_key_md5:
1✔
2202
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2203
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2204

2205
        # TODO: missing response fields we're not currently supporting
2206
        # - AbortDate: lifecycle related,not currently supported, todo
2207
        # - AbortRuleId: lifecycle related, not currently supported, todo
2208
        # - RequestCharged: todo
2209

2210
        return response
1✔
2211

2212
    @handler("UploadPart", expand=False)
1✔
2213
    def upload_part(
1✔
2214
        self,
2215
        context: RequestContext,
2216
        request: UploadPartRequest,
2217
    ) -> UploadPartOutput:
2218
        # TODO: missing following parameters:
2219
        #  content_length: ContentLength = None, ->validate?
2220
        #  content_md5: ContentMD5 = None, -> validate?
2221
        #  request_payer: RequestPayer = None,
2222
        bucket_name = request["Bucket"]
1✔
2223
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2224

2225
        upload_id = request.get("UploadId")
1✔
2226
        if not (
1✔
2227
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2228
        ) or s3_multipart.object.key != request.get("Key"):
2229
            raise NoSuchUpload(
1✔
2230
                "The specified upload does not exist. "
2231
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2232
                UploadId=upload_id,
2233
            )
2234
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2235
            raise InvalidArgument(
1✔
2236
                "Part number must be an integer between 1 and 10000, inclusive",
2237
                ArgumentName="partNumber",
2238
                ArgumentValue=part_number,
2239
            )
2240

2241
        if content_md5 := request.get("ContentMD5"):
1✔
2242
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2243
            if not base_64_content_md5_to_etag(content_md5):
1✔
2244
                raise InvalidDigest(
1✔
2245
                    "The Content-MD5 you specified was invalid.",
2246
                    Content_MD5=content_md5,
2247
                )
2248

2249
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2250
        checksum_value = (
1✔
2251
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2252
        )
2253

2254
        # TODO: we're not encrypting the object with the provided key for now
2255
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2256
        validate_sse_c(
1✔
2257
            algorithm=request.get("SSECustomerAlgorithm"),
2258
            encryption_key=request.get("SSECustomerKey"),
2259
            encryption_key_md5=sse_c_key_md5,
2260
        )
2261

2262
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2263
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2264
        ):
2265
            raise InvalidRequest(
1✔
2266
                "The multipart upload initiate requested encryption. "
2267
                "Subsequent part requests must include the appropriate encryption parameters."
2268
            )
2269
        elif (
1✔
2270
            s3_multipart.object.sse_key_hash
2271
            and sse_c_key_md5
2272
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2273
        ):
2274
            raise InvalidRequest(
1✔
2275
                "The provided encryption parameters did not match the ones used originally."
2276
            )
2277

2278
        s3_part = S3Part(
1✔
2279
            part_number=part_number,
2280
            checksum_algorithm=checksum_algorithm,
2281
            checksum_value=checksum_value,
2282
        )
2283
        body = request.get("Body")
1✔
2284
        headers = context.request.headers
1✔
2285
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2286
            "STREAMING-"
2287
        ) or "aws-chunked" in headers.get("content-encoding", "")
2288
        # check if chunked request
2289
        if is_aws_chunked:
1✔
2290
            checksum_algorithm = (
1✔
2291
                checksum_algorithm
2292
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2293
            )
2294
            if checksum_algorithm:
1✔
2295
                s3_part.checksum_algorithm = checksum_algorithm
×
2296

2297
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2298
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2299

2300
        if (
1✔
2301
            s3_multipart.checksum_algorithm
2302
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2303
        ):
2304
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2305
            error_mp_checksum = (
1✔
2306
                s3_multipart.object.checksum_algorithm.lower()
2307
                if s3_multipart.object.checksum_algorithm
2308
                else "null"
2309
            )
2310
            if not error_mp_checksum == "null":
1✔
2311
                raise InvalidRequest(
1✔
2312
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2313
                )
2314

2315
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2316
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2317
            try:
1✔
2318
                stored_s3_part.write(body)
1✔
2319
            except Exception:
1✔
2320
                stored_multipart.remove_part(s3_part)
1✔
2321
                raise
1✔
2322

2323
            if checksum_algorithm:
1✔
2324
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2325
                    stored_multipart.remove_part(s3_part)
1✔
2326
                    raise InvalidRequest(
1✔
2327
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2328
                    )
2329
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2330
                    stored_multipart.remove_part(s3_part)
1✔
2331
                    raise BadDigest(
1✔
2332
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2333
                    )
2334

2335
            if content_md5:
1✔
2336
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2337
                if calculated_md5 != content_md5:
1✔
2338
                    stored_multipart.remove_part(s3_part)
1✔
2339
                    raise BadDigest(
1✔
2340
                        "The Content-MD5 you specified did not match what we received.",
2341
                        ExpectedDigest=content_md5,
2342
                        CalculatedDigest=calculated_md5,
2343
                    )
2344

2345
            s3_multipart.parts[part_number] = s3_part
1✔
2346

2347
        response = UploadPartOutput(
1✔
2348
            ETag=s3_part.quoted_etag,
2349
        )
2350

2351
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2352
        if sse_c_key_md5:
1✔
2353
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2354
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2355

2356
        if s3_part.checksum_algorithm:
1✔
2357
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2358

2359
        # TODO: RequestCharged: Optional[RequestCharged]
2360
        return response
1✔
2361

2362
    @handler("UploadPartCopy", expand=False)
1✔
2363
    def upload_part_copy(
1✔
2364
        self,
2365
        context: RequestContext,
2366
        request: UploadPartCopyRequest,
2367
    ) -> UploadPartCopyOutput:
2368
        # TODO: handle following parameters:
2369
        #  copy_source_if_match: CopySourceIfMatch = None,
2370
        #  copy_source_if_modified_since: CopySourceIfModifiedSince = None,
2371
        #  copy_source_if_none_match: CopySourceIfNoneMatch = None,
2372
        #  copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,
2373
        #  request_payer: RequestPayer = None,
2374
        dest_bucket = request["Bucket"]
1✔
2375
        dest_key = request["Key"]
1✔
2376
        store = self.get_store(context.account_id, context.region)
1✔
2377
        # TODO: validate cross-account UploadPartCopy
2378
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
2379
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2380

2381
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2382
            request.get("CopySource")
2383
        )
2384

2385
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
2386
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2387

2388
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2389
        try:
1✔
2390
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
2391
        except MethodNotAllowed:
×
2392
            raise InvalidRequest(
×
2393
                "The source of a copy request may not specifically refer to a delete marker by version id."
2394
            )
2395

2396
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
2397
            raise InvalidObjectState(
×
2398
                "Operation is not valid for the source object's storage class",
2399
                StorageClass=src_s3_object.storage_class,
2400
            )
2401

2402
        upload_id = request.get("UploadId")
1✔
2403
        if (
1✔
2404
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2405
            or s3_multipart.object.key != dest_key
2406
        ):
2407
            raise NoSuchUpload(
×
2408
                "The specified upload does not exist. "
2409
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2410
                UploadId=upload_id,
2411
            )
2412

2413
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2414
            raise InvalidArgument(
×
2415
                "Part number must be an integer between 1 and 10000, inclusive",
2416
                ArgumentName="partNumber",
2417
                ArgumentValue=part_number,
2418
            )
2419

2420
        source_range = request.get("CopySourceRange")
1✔
2421
        # TODO implement copy source IF (done in ASF provider)
2422

2423
        range_data: Optional[ObjectRange] = None
1✔
2424
        if source_range:
1✔
2425
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2426

2427
        s3_part = S3Part(part_number=part_number)
1✔
2428

2429
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2430
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2431

2432
        s3_multipart.parts[part_number] = s3_part
1✔
2433

2434
        # TODO: return those fields (checksum not handled currently in moto for parts)
2435
        # ChecksumCRC32: Optional[ChecksumCRC32]
2436
        # ChecksumCRC32C: Optional[ChecksumCRC32C]
2437
        # ChecksumSHA1: Optional[ChecksumSHA1]
2438
        # ChecksumSHA256: Optional[ChecksumSHA256]
2439
        #     RequestCharged: Optional[RequestCharged]
2440

2441
        result = CopyPartResult(
1✔
2442
            ETag=s3_part.quoted_etag,
2443
            LastModified=s3_part.last_modified,
2444
        )
2445

2446
        response = UploadPartCopyOutput(
1✔
2447
            CopyPartResult=result,
2448
        )
2449

2450
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
2451
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2452

2453
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2454

2455
        return response
1✔
2456

2457
    def complete_multipart_upload(
1✔
2458
        self,
2459
        context: RequestContext,
2460
        bucket: BucketName,
2461
        key: ObjectKey,
2462
        upload_id: MultipartUploadId,
2463
        multipart_upload: CompletedMultipartUpload = None,
2464
        checksum_crc32: ChecksumCRC32 = None,
2465
        checksum_crc32_c: ChecksumCRC32C = None,
2466
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2467
        checksum_sha1: ChecksumSHA1 = None,
2468
        checksum_sha256: ChecksumSHA256 = None,
2469
        checksum_type: ChecksumType = None,
2470
        mpu_object_size: MpuObjectSize = None,
2471
        request_payer: RequestPayer = None,
2472
        expected_bucket_owner: AccountId = None,
2473
        if_match: IfMatch = None,
2474
        if_none_match: IfNoneMatch = None,
2475
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2476
        sse_customer_key: SSECustomerKey = None,
2477
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2478
        **kwargs,
2479
    ) -> CompleteMultipartUploadOutput:
2480
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2481

2482
        if (
1✔
2483
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2484
            or s3_multipart.object.key != key
2485
        ):
2486
            raise NoSuchUpload(
1✔
2487
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2488
                UploadId=upload_id,
2489
            )
2490

2491
        if if_none_match and if_match:
1✔
2492
            raise NotImplementedException(
2493
                "A header you provided implies functionality that is not implemented",
2494
                Header="If-Match,If-None-Match",
2495
                additionalMessage="Multiple conditional request headers present in the request",
2496
            )
2497

2498
        elif if_none_match:
1✔
2499
            if if_none_match != "*":
1✔
2500
                raise NotImplementedException(
2501
                    "A header you provided implies functionality that is not implemented",
2502
                    Header="If-None-Match",
2503
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2504
                )
2505
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2506
                raise PreconditionFailed(
1✔
2507
                    "At least one of the pre-conditions you specified did not hold",
2508
                    Condition="If-None-Match",
2509
                )
2510
            elif s3_multipart.precondition:
1✔
2511
                raise ConditionalRequestConflict(
1✔
2512
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2513
                    Condition="If-None-Match",
2514
                    Key=key,
2515
                )
2516

2517
        elif if_match:
1✔
2518
            if if_match == "*":
1✔
2519
                raise NotImplementedException(
2520
                    "A header you provided implies functionality that is not implemented",
2521
                    Header="If-None-Match",
2522
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2523
                )
2524
            verify_object_equality_precondition_write(
1✔
2525
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2526
            )
2527

2528
        parts = multipart_upload.get("Parts", [])
1✔
2529
        if not parts:
1✔
2530
            raise InvalidRequest("You must specify at least one part")
1✔
2531

2532
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2533
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2534
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2535
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2536
        if sorted(parts_numbers) != parts_numbers:
1✔
2537
            raise InvalidPartOrder(
1✔
2538
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2539
                UploadId=upload_id,
2540
            )
2541

2542
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2543
        mpu_checksum_type = getattr(s3_multipart, "checksum_type", None)
1✔
2544

2545
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2546
            raise InvalidRequest(
1✔
2547
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2548
                f"The complete request must use the same checksum mode."
2549
            )
2550

2551
        # generate the versionId before completing, in case the bucket versioning status has changed between
2552
        # creation and completion? AWS validate this
2553
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2554
        s3_multipart.object.version_id = version_id
1✔
2555

2556
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2557
        # persistence. if we do not have a new version, do not validate those parameters
2558
        # TODO: remove for next major version (minor?)
2559
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2560
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2561
            checksum_map = {
1✔
2562
                "crc32": checksum_crc32,
2563
                "crc32c": checksum_crc32_c,
2564
                "crc64nvme": checksum_crc64_nvme,
2565
                "sha1": checksum_sha1,
2566
                "sha256": checksum_sha256,
2567
            }
2568
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2569
            s3_multipart.complete_multipart(
1✔
2570
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2571
            )
2572
            if mpu_checksum_algorithm and (
1✔
2573
                (
2574
                    checksum_value
2575
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2576
                    and not checksum_type
2577
                )
2578
                or any(
2579
                    checksum_value
2580
                    for checksum_type, checksum_value in checksum_map.items()
2581
                    if checksum_type != checksum_algorithm
2582
                )
2583
            ):
2584
                # this is not ideal, but this validation comes last... after the validation of individual parts
2585
                s3_multipart.object.parts.clear()
1✔
2586
                raise BadDigest(
1✔
2587
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2588
                )
2589
        else:
2590
            s3_multipart.complete_multipart(parts)
×
2591

2592
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2593
        stored_multipart.complete_multipart(
1✔
2594
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
2595
        )
2596
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2597
            with self._storage_backend.open(
1✔
2598
                bucket, s3_multipart.object, mode="r"
2599
            ) as s3_stored_object:
2600
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2601
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2602

2603
        s3_object = s3_multipart.object
1✔
2604

2605
        s3_bucket.objects.set(key, s3_object)
1✔
2606

2607
        # remove the multipart now that it's complete
2608
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2609
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2610

2611
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2612
        store.TAGS.tags.pop(key_id, None)
1✔
2613
        if s3_multipart.tagging:
1✔
2614
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2615

2616
        # RequestCharged: Optional[RequestCharged] TODO
2617

2618
        response = CompleteMultipartUploadOutput(
1✔
2619
            Bucket=bucket,
2620
            Key=key,
2621
            ETag=s3_object.quoted_etag,
2622
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
2623
        )
2624

2625
        if s3_object.version_id:
1✔
2626
            response["VersionId"] = s3_object.version_id
×
2627

2628
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2629
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2630
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2631
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2632
            response["ChecksumType"] = s3_object.checksum_type
1✔
2633

2634
        if s3_object.expiration:
1✔
2635
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2636

2637
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2638

2639
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2640

2641
        return response
1✔
2642

2643
    def abort_multipart_upload(
1✔
2644
        self,
2645
        context: RequestContext,
2646
        bucket: BucketName,
2647
        key: ObjectKey,
2648
        upload_id: MultipartUploadId,
2649
        request_payer: RequestPayer = None,
2650
        expected_bucket_owner: AccountId = None,
2651
        if_match_initiated_time: IfMatchInitiatedTime = None,
2652
        **kwargs,
2653
    ) -> AbortMultipartUploadOutput:
2654
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2655

2656
        if (
1✔
2657
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2658
            or s3_multipart.object.key != key
2659
        ):
2660
            raise NoSuchUpload(
1✔
2661
                "The specified upload does not exist. "
2662
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2663
                UploadId=upload_id,
2664
            )
2665
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2666

2667
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2668
        response = AbortMultipartUploadOutput()
1✔
2669
        # TODO: requestCharged
2670
        return response
1✔
2671

2672
    def list_parts(
1✔
2673
        self,
2674
        context: RequestContext,
2675
        bucket: BucketName,
2676
        key: ObjectKey,
2677
        upload_id: MultipartUploadId,
2678
        max_parts: MaxParts = None,
2679
        part_number_marker: PartNumberMarker = None,
2680
        request_payer: RequestPayer = None,
2681
        expected_bucket_owner: AccountId = None,
2682
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2683
        sse_customer_key: SSECustomerKey = None,
2684
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2685
        **kwargs,
2686
    ) -> ListPartsOutput:
2687
        # TODO: implement MaxParts
2688
        # TODO: implements PartNumberMarker
2689
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2690

2691
        if (
1✔
2692
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2693
            or s3_multipart.object.key != key
2694
        ):
2695
            raise NoSuchUpload(
1✔
2696
                "The specified upload does not exist. "
2697
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2698
                UploadId=upload_id,
2699
            )
2700

2701
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2702
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2703
        #     RequestCharged: Optional[RequestCharged]
2704

2705
        count = 0
1✔
2706
        is_truncated = False
1✔
2707
        part_number_marker = part_number_marker or 0
1✔
2708
        max_parts = max_parts or 1000
1✔
2709

2710
        parts = []
1✔
2711
        all_parts = sorted(s3_multipart.parts.items())
1✔
2712
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2713
        for part_number, part in all_parts:
1✔
2714
            if part_number <= part_number_marker:
1✔
2715
                continue
1✔
2716
            part_item = Part(
1✔
2717
                ETag=part.quoted_etag,
2718
                LastModified=part.last_modified,
2719
                PartNumber=part_number,
2720
                Size=part.size,
2721
            )
2722
            if s3_multipart.checksum_algorithm:
1✔
2723
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2724

2725
            parts.append(part_item)
1✔
2726
            count += 1
1✔
2727

2728
            if count >= max_parts and part.part_number != last_part_number:
1✔
2729
                is_truncated = True
1✔
2730
                break
1✔
2731

2732
        response = ListPartsOutput(
1✔
2733
            Bucket=bucket,
2734
            Key=key,
2735
            UploadId=upload_id,
2736
            Initiator=s3_multipart.initiator,
2737
            Owner=s3_multipart.initiator,
2738
            StorageClass=s3_multipart.object.storage_class,
2739
            IsTruncated=is_truncated,
2740
            MaxParts=max_parts,
2741
            PartNumberMarker=0,
2742
            NextPartNumberMarker=0,
2743
        )
2744
        if parts:
1✔
2745
            response["Parts"] = parts
1✔
2746
            last_part = parts[-1]["PartNumber"]
1✔
2747
            response["NextPartNumberMarker"] = last_part
1✔
2748

2749
        if part_number_marker:
1✔
2750
            response["PartNumberMarker"] = part_number_marker
1✔
2751
        if s3_multipart.checksum_algorithm:
1✔
2752
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2753
            response["ChecksumType"] = getattr(s3_multipart, "checksum_type", None)
1✔
2754

2755
        return response
1✔
2756

2757
    def list_multipart_uploads(
1✔
2758
        self,
2759
        context: RequestContext,
2760
        bucket: BucketName,
2761
        delimiter: Delimiter = None,
2762
        encoding_type: EncodingType = None,
2763
        key_marker: KeyMarker = None,
2764
        max_uploads: MaxUploads = None,
2765
        prefix: Prefix = None,
2766
        upload_id_marker: UploadIdMarker = None,
2767
        expected_bucket_owner: AccountId = None,
2768
        request_payer: RequestPayer = None,
2769
        **kwargs,
2770
    ) -> ListMultipartUploadsOutput:
2771
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2772

2773
        common_prefixes = set()
1✔
2774
        count = 0
1✔
2775
        is_truncated = False
1✔
2776
        max_uploads = max_uploads or 1000
1✔
2777
        prefix = prefix or ""
1✔
2778
        delimiter = delimiter or ""
1✔
2779
        if encoding_type:
1✔
2780
            prefix = urlparse.quote(prefix)
1✔
2781
            delimiter = urlparse.quote(delimiter)
1✔
2782
        upload_id_marker_found = False
1✔
2783

2784
        if key_marker and upload_id_marker:
1✔
2785
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2786
            if multipart:
1✔
2787
                key = (
1✔
2788
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2789
                )
2790
            else:
2791
                # set key to None so it fails if the multipart is not Found
2792
                key = None
×
2793

2794
            if key_marker != key:
1✔
2795
                raise InvalidArgument(
1✔
2796
                    "Invalid uploadId marker",
2797
                    ArgumentName="upload-id-marker",
2798
                    ArgumentValue=upload_id_marker,
2799
                )
2800

2801
        uploads = []
1✔
2802
        # sort by key and initiated
2803
        all_multiparts = sorted(
1✔
2804
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
2805
        )
2806
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
2807

2808
        for multipart in all_multiparts:
1✔
2809
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
2810
            # skip all keys that are different than key_marker
2811
            if key_marker:
1✔
2812
                if key < key_marker:
1✔
2813
                    continue
1✔
2814
                elif key == key_marker:
1✔
2815
                    if not upload_id_marker:
1✔
2816
                        continue
1✔
2817
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2818
                    if multipart.id == upload_id_marker:
1✔
2819
                        upload_id_marker_found = True
1✔
2820
                        continue
1✔
2821
                    elif not upload_id_marker_found:
1✔
2822
                        # as long as we have not passed the version_key_marker, skip the versions
2823
                        continue
1✔
2824

2825
            # Filter for keys that start with prefix
2826
            if prefix and not key.startswith(prefix):
1✔
2827
                continue
1✔
2828

2829
            # see ListObjectsV2 for the logic comments (shared logic here)
2830
            prefix_including_delimiter = None
1✔
2831
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2832
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2833
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2834

2835
                if prefix_including_delimiter in common_prefixes or (
1✔
2836
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2837
                ):
2838
                    continue
1✔
2839

2840
            if prefix_including_delimiter:
1✔
2841
                common_prefixes.add(prefix_including_delimiter)
1✔
2842
            else:
2843
                multipart_upload = MultipartUpload(
1✔
2844
                    UploadId=multipart.id,
2845
                    Key=multipart.object.key,
2846
                    Initiated=multipart.initiated,
2847
                    StorageClass=multipart.object.storage_class,
2848
                    Owner=multipart.initiator,  # TODO: check the difference
2849
                    Initiator=multipart.initiator,
2850
                )
2851
                if multipart.checksum_algorithm:
1✔
2852
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
2853
                    multipart_upload["ChecksumType"] = getattr(multipart, "checksum_type", None)
1✔
2854

2855
                uploads.append(multipart_upload)
1✔
2856

2857
            count += 1
1✔
2858
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
2859
                is_truncated = True
1✔
2860
                break
1✔
2861

2862
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2863

2864
        response = ListMultipartUploadsOutput(
1✔
2865
            Bucket=bucket,
2866
            IsTruncated=is_truncated,
2867
            MaxUploads=max_uploads or 1000,
2868
            KeyMarker=key_marker or "",
2869
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
2870
            NextKeyMarker="",
2871
            NextUploadIdMarker="",
2872
        )
2873
        if uploads:
1✔
2874
            response["Uploads"] = uploads
1✔
2875
            last_upload = uploads[-1]
1✔
2876
            response["NextKeyMarker"] = last_upload["Key"]
1✔
2877
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
2878
        if delimiter:
1✔
2879
            response["Delimiter"] = delimiter
1✔
2880
        if prefix:
1✔
2881
            response["Prefix"] = prefix
1✔
2882
        if encoding_type:
1✔
2883
            response["EncodingType"] = EncodingType.url
1✔
2884
        if common_prefixes:
1✔
2885
            response["CommonPrefixes"] = common_prefixes
1✔
2886

2887
        return response
1✔
2888

2889
    def put_bucket_versioning(
1✔
2890
        self,
2891
        context: RequestContext,
2892
        bucket: BucketName,
2893
        versioning_configuration: VersioningConfiguration,
2894
        content_md5: ContentMD5 = None,
2895
        checksum_algorithm: ChecksumAlgorithm = None,
2896
        mfa: MFA = None,
2897
        expected_bucket_owner: AccountId = None,
2898
        **kwargs,
2899
    ) -> None:
2900
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2901
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
2902
            raise CommonServiceException(
1✔
2903
                code="IllegalVersioningConfigurationException",
2904
                message="The Versioning element must be specified",
2905
            )
2906

2907
        if versioning_status not in ("Enabled", "Suspended"):
1✔
2908
            raise MalformedXML()
1✔
2909

2910
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
2911
            raise InvalidBucketState(
1✔
2912
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
2913
            )
2914

2915
        if not s3_bucket.versioning_status:
1✔
2916
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
2917

2918
        s3_bucket.versioning_status = versioning_status
1✔
2919

2920
    def get_bucket_versioning(
1✔
2921
        self,
2922
        context: RequestContext,
2923
        bucket: BucketName,
2924
        expected_bucket_owner: AccountId = None,
2925
        **kwargs,
2926
    ) -> GetBucketVersioningOutput:
2927
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2928

2929
        if not s3_bucket.versioning_status:
1✔
2930
            return GetBucketVersioningOutput()
1✔
2931

2932
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
2933

2934
    def get_bucket_encryption(
1✔
2935
        self,
2936
        context: RequestContext,
2937
        bucket: BucketName,
2938
        expected_bucket_owner: AccountId = None,
2939
        **kwargs,
2940
    ) -> GetBucketEncryptionOutput:
2941
        # AWS now encrypts bucket by default with AES256, see:
2942
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
2943
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2944

2945
        if not s3_bucket.encryption_rule:
1✔
2946
            return GetBucketEncryptionOutput()
×
2947

2948
        return GetBucketEncryptionOutput(
1✔
2949
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
2950
        )
2951

2952
    def put_bucket_encryption(
1✔
2953
        self,
2954
        context: RequestContext,
2955
        bucket: BucketName,
2956
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
2957
        content_md5: ContentMD5 = None,
2958
        checksum_algorithm: ChecksumAlgorithm = None,
2959
        expected_bucket_owner: AccountId = None,
2960
        **kwargs,
2961
    ) -> None:
2962
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2963

2964
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
2965
            raise MalformedXML()
1✔
2966

2967
        if len(rules) != 1 or not (
1✔
2968
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
2969
        ):
2970
            raise MalformedXML()
1✔
2971

2972
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
2973
            raise MalformedXML()
×
2974

2975
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
2976
            raise MalformedXML()
×
2977

2978
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
2979
            raise InvalidArgument(
1✔
2980
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
2981
                ArgumentName="ApplyServerSideEncryptionByDefault",
2982
            )
2983
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
2984
        # TODO: validate KMS key? not currently done in moto
2985
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
2986
        # It's always saved as the ARN in the bucket configuration.
2987
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
2988
        # encryption["KMSMasterKeyID"] = master_kms_key
2989

2990
        s3_bucket.encryption_rule = rules[0]
1✔
2991

2992
    def delete_bucket_encryption(
1✔
2993
        self,
2994
        context: RequestContext,
2995
        bucket: BucketName,
2996
        expected_bucket_owner: AccountId = None,
2997
        **kwargs,
2998
    ) -> None:
2999
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3000

3001
        s3_bucket.encryption_rule = None
1✔
3002

3003
    def put_bucket_notification_configuration(
1✔
3004
        self,
3005
        context: RequestContext,
3006
        bucket: BucketName,
3007
        notification_configuration: NotificationConfiguration,
3008
        expected_bucket_owner: AccountId = None,
3009
        skip_destination_validation: SkipValidation = None,
3010
        **kwargs,
3011
    ) -> None:
3012
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3013

3014
        self._verify_notification_configuration(
1✔
3015
            notification_configuration, skip_destination_validation, context, bucket
3016
        )
3017
        s3_bucket.notification_configuration = notification_configuration
1✔
3018

3019
    def get_bucket_notification_configuration(
1✔
3020
        self,
3021
        context: RequestContext,
3022
        bucket: BucketName,
3023
        expected_bucket_owner: AccountId = None,
3024
        **kwargs,
3025
    ) -> NotificationConfiguration:
3026
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3027

3028
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3029

3030
    def put_bucket_tagging(
1✔
3031
        self,
3032
        context: RequestContext,
3033
        bucket: BucketName,
3034
        tagging: Tagging,
3035
        content_md5: ContentMD5 = None,
3036
        checksum_algorithm: ChecksumAlgorithm = None,
3037
        expected_bucket_owner: AccountId = None,
3038
        **kwargs,
3039
    ) -> None:
3040
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3041

3042
        if "TagSet" not in tagging:
1✔
3043
            raise MalformedXML()
×
3044

3045
        validate_tag_set(tagging["TagSet"], type_set="bucket")
1✔
3046

3047
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3048
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3049
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tagging["TagSet"])
1✔
3050

3051
    def get_bucket_tagging(
1✔
3052
        self,
3053
        context: RequestContext,
3054
        bucket: BucketName,
3055
        expected_bucket_owner: AccountId = None,
3056
        **kwargs,
3057
    ) -> GetBucketTaggingOutput:
3058
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3059
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
1✔
3060
        if not tag_set:
1✔
3061
            raise NoSuchTagSet(
1✔
3062
                "The TagSet does not exist",
3063
                BucketName=bucket,
3064
            )
3065

3066
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3067

3068
    def delete_bucket_tagging(
1✔
3069
        self,
3070
        context: RequestContext,
3071
        bucket: BucketName,
3072
        expected_bucket_owner: AccountId = None,
3073
        **kwargs,
3074
    ) -> None:
3075
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3076

3077
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3078

3079
    def put_object_tagging(
1✔
3080
        self,
3081
        context: RequestContext,
3082
        bucket: BucketName,
3083
        key: ObjectKey,
3084
        tagging: Tagging,
3085
        version_id: ObjectVersionId = None,
3086
        content_md5: ContentMD5 = None,
3087
        checksum_algorithm: ChecksumAlgorithm = None,
3088
        expected_bucket_owner: AccountId = None,
3089
        request_payer: RequestPayer = None,
3090
        **kwargs,
3091
    ) -> PutObjectTaggingOutput:
3092
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3093

3094
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3095

3096
        if "TagSet" not in tagging:
1✔
3097
            raise MalformedXML()
×
3098

3099
        validate_tag_set(tagging["TagSet"], type_set="object")
1✔
3100

3101
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3102
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3103
        store.TAGS.tags.pop(key_id, None)
1✔
3104
        store.TAGS.tag_resource(key_id, tags=tagging["TagSet"])
1✔
3105
        response = PutObjectTaggingOutput()
1✔
3106
        if s3_object.version_id:
1✔
3107
            response["VersionId"] = s3_object.version_id
1✔
3108

3109
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3110

3111
        return response
1✔
3112

3113
    def get_object_tagging(
1✔
3114
        self,
3115
        context: RequestContext,
3116
        bucket: BucketName,
3117
        key: ObjectKey,
3118
        version_id: ObjectVersionId = None,
3119
        expected_bucket_owner: AccountId = None,
3120
        request_payer: RequestPayer = None,
3121
        **kwargs,
3122
    ) -> GetObjectTaggingOutput:
3123
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3124

3125
        try:
1✔
3126
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3127
        except NoSuchKey as e:
1✔
3128
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3129
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3130
            # `S3Bucket.get_object` signature for one operation.
3131
            if s3_bucket.versioning_status and (
1✔
3132
                s3_object_version := s3_bucket.objects.get(key, version_id)
3133
            ):
3134
                raise MethodNotAllowed(
1✔
3135
                    "The specified method is not allowed against this resource.",
3136
                    Method="GET",
3137
                    ResourceType="DeleteMarker",
3138
                    DeleteMarker=True,
3139
                    Allow="DELETE",
3140
                    VersionId=s3_object_version.version_id,
3141
                )
3142

3143
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3144
            # follow AWS on this one
3145
            e.Key = f"{bucket}/{key}"
1✔
3146
            raise e
1✔
3147

3148
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3149
            get_unique_key_id(bucket, key, s3_object.version_id)
3150
        )["Tags"]
3151
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3152
        if s3_object.version_id:
1✔
3153
            response["VersionId"] = s3_object.version_id
1✔
3154

3155
        return response
1✔
3156

3157
    def delete_object_tagging(
1✔
3158
        self,
3159
        context: RequestContext,
3160
        bucket: BucketName,
3161
        key: ObjectKey,
3162
        version_id: ObjectVersionId = None,
3163
        expected_bucket_owner: AccountId = None,
3164
        **kwargs,
3165
    ) -> DeleteObjectTaggingOutput:
3166
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3167

3168
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3169

3170
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
3171
        response = DeleteObjectTaggingOutput()
1✔
3172
        if s3_object.version_id:
1✔
3173
            response["VersionId"] = s3_object.version_id
×
3174

3175
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3176

3177
        return response
1✔
3178

3179
    def put_bucket_cors(
1✔
3180
        self,
3181
        context: RequestContext,
3182
        bucket: BucketName,
3183
        cors_configuration: CORSConfiguration,
3184
        content_md5: ContentMD5 = None,
3185
        checksum_algorithm: ChecksumAlgorithm = None,
3186
        expected_bucket_owner: AccountId = None,
3187
        **kwargs,
3188
    ) -> None:
3189
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3190
        validate_cors_configuration(cors_configuration)
1✔
3191
        s3_bucket.cors_rules = cors_configuration
1✔
3192
        self._cors_handler.invalidate_cache()
1✔
3193

3194
    def get_bucket_cors(
1✔
3195
        self,
3196
        context: RequestContext,
3197
        bucket: BucketName,
3198
        expected_bucket_owner: AccountId = None,
3199
        **kwargs,
3200
    ) -> GetBucketCorsOutput:
3201
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3202

3203
        if not s3_bucket.cors_rules:
1✔
3204
            raise NoSuchCORSConfiguration(
1✔
3205
                "The CORS configuration does not exist",
3206
                BucketName=bucket,
3207
            )
3208
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3209

3210
    def delete_bucket_cors(
1✔
3211
        self,
3212
        context: RequestContext,
3213
        bucket: BucketName,
3214
        expected_bucket_owner: AccountId = None,
3215
        **kwargs,
3216
    ) -> None:
3217
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3218

3219
        if s3_bucket.cors_rules:
1✔
3220
            self._cors_handler.invalidate_cache()
1✔
3221
            s3_bucket.cors_rules = None
1✔
3222

3223
    def get_bucket_lifecycle_configuration(
1✔
3224
        self,
3225
        context: RequestContext,
3226
        bucket: BucketName,
3227
        expected_bucket_owner: AccountId = None,
3228
        **kwargs,
3229
    ) -> GetBucketLifecycleConfigurationOutput:
3230
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3231

3232
        if not s3_bucket.lifecycle_rules:
1✔
3233
            raise NoSuchLifecycleConfiguration(
1✔
3234
                "The lifecycle configuration does not exist",
3235
                BucketName=bucket,
3236
            )
3237

3238
        return GetBucketLifecycleConfigurationOutput(
1✔
3239
            Rules=s3_bucket.lifecycle_rules,
3240
            # TODO: remove for next major version, safe access to new attribute
3241
            TransitionDefaultMinimumObjectSize=getattr(
3242
                s3_bucket,
3243
                "transition_default_minimum_object_size",
3244
                TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3245
            ),
3246
        )
3247

3248
    def put_bucket_lifecycle_configuration(
1✔
3249
        self,
3250
        context: RequestContext,
3251
        bucket: BucketName,
3252
        checksum_algorithm: ChecksumAlgorithm = None,
3253
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3254
        expected_bucket_owner: AccountId = None,
3255
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3256
        **kwargs,
3257
    ) -> PutBucketLifecycleConfigurationOutput:
3258
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3259

3260
        transition_min_obj_size = (
1✔
3261
            transition_default_minimum_object_size
3262
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3263
        )
3264

3265
        if transition_min_obj_size not in (
1✔
3266
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3267
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3268
        ):
3269
            raise InvalidRequest(
1✔
3270
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3271
            )
3272

3273
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3274
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3275
        #  everytime we get/head an object
3276
        # for now, we keep a cache and get it everytime we fetch an object
3277
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3278
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3279
        self._expiration_cache[bucket].clear()
1✔
3280
        return PutBucketLifecycleConfigurationOutput(
1✔
3281
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3282
        )
3283

3284
    def delete_bucket_lifecycle(
1✔
3285
        self,
3286
        context: RequestContext,
3287
        bucket: BucketName,
3288
        expected_bucket_owner: AccountId = None,
3289
        **kwargs,
3290
    ) -> None:
3291
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3292

3293
        s3_bucket.lifecycle_rules = None
1✔
3294
        self._expiration_cache[bucket].clear()
1✔
3295

3296
    def put_bucket_analytics_configuration(
1✔
3297
        self,
3298
        context: RequestContext,
3299
        bucket: BucketName,
3300
        id: AnalyticsId,
3301
        analytics_configuration: AnalyticsConfiguration,
3302
        expected_bucket_owner: AccountId = None,
3303
        **kwargs,
3304
    ) -> None:
3305
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3306

3307
        validate_bucket_analytics_configuration(
1✔
3308
            id=id, analytics_configuration=analytics_configuration
3309
        )
3310

3311
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3312

3313
    def get_bucket_analytics_configuration(
1✔
3314
        self,
3315
        context: RequestContext,
3316
        bucket: BucketName,
3317
        id: AnalyticsId,
3318
        expected_bucket_owner: AccountId = None,
3319
        **kwargs,
3320
    ) -> GetBucketAnalyticsConfigurationOutput:
3321
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3322

3323
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3324
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3325

3326
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3327

3328
    def list_bucket_analytics_configurations(
1✔
3329
        self,
3330
        context: RequestContext,
3331
        bucket: BucketName,
3332
        continuation_token: Token = None,
3333
        expected_bucket_owner: AccountId = None,
3334
        **kwargs,
3335
    ) -> ListBucketAnalyticsConfigurationsOutput:
3336
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3337

3338
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3339
            IsTruncated=False,
3340
            AnalyticsConfigurationList=sorted(
3341
                s3_bucket.analytics_configurations.values(),
3342
                key=itemgetter("Id"),
3343
            ),
3344
        )
3345

3346
    def delete_bucket_analytics_configuration(
1✔
3347
        self,
3348
        context: RequestContext,
3349
        bucket: BucketName,
3350
        id: AnalyticsId,
3351
        expected_bucket_owner: AccountId = None,
3352
        **kwargs,
3353
    ) -> None:
3354
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3355

3356
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3357
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3358

3359
    def put_bucket_intelligent_tiering_configuration(
1✔
3360
        self,
3361
        context: RequestContext,
3362
        bucket: BucketName,
3363
        id: IntelligentTieringId,
3364
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3365
        **kwargs,
3366
    ) -> None:
3367
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3368

3369
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3370

3371
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3372

3373
    def get_bucket_intelligent_tiering_configuration(
1✔
3374
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3375
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3376
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3377

3378
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
3379
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3380

3381
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3382
            IntelligentTieringConfiguration=itier_config
3383
        )
3384

3385
    def delete_bucket_intelligent_tiering_configuration(
1✔
3386
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3387
    ) -> None:
3388
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3389

3390
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3391
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3392

3393
    def list_bucket_intelligent_tiering_configurations(
1✔
3394
        self,
3395
        context: RequestContext,
3396
        bucket: BucketName,
3397
        continuation_token: Token = None,
3398
        **kwargs,
3399
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3400
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3401

3402
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3403
            IsTruncated=False,
3404
            IntelligentTieringConfigurationList=sorted(
3405
                s3_bucket.intelligent_tiering_configurations.values(),
3406
                key=itemgetter("Id"),
3407
            ),
3408
        )
3409

3410
    def put_bucket_inventory_configuration(
1✔
3411
        self,
3412
        context: RequestContext,
3413
        bucket: BucketName,
3414
        id: InventoryId,
3415
        inventory_configuration: InventoryConfiguration,
3416
        expected_bucket_owner: AccountId = None,
3417
        **kwargs,
3418
    ) -> None:
3419
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3420

3421
        validate_inventory_configuration(
1✔
3422
            config_id=id, inventory_configuration=inventory_configuration
3423
        )
3424
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3425

3426
    def get_bucket_inventory_configuration(
1✔
3427
        self,
3428
        context: RequestContext,
3429
        bucket: BucketName,
3430
        id: InventoryId,
3431
        expected_bucket_owner: AccountId = None,
3432
        **kwargs,
3433
    ) -> GetBucketInventoryConfigurationOutput:
3434
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3435

3436
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3437
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3438
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3439

3440
    def list_bucket_inventory_configurations(
1✔
3441
        self,
3442
        context: RequestContext,
3443
        bucket: BucketName,
3444
        continuation_token: Token = None,
3445
        expected_bucket_owner: AccountId = None,
3446
        **kwargs,
3447
    ) -> ListBucketInventoryConfigurationsOutput:
3448
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3449

3450
        return ListBucketInventoryConfigurationsOutput(
1✔
3451
            IsTruncated=False,
3452
            InventoryConfigurationList=sorted(
3453
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3454
            ),
3455
        )
3456

3457
    def delete_bucket_inventory_configuration(
1✔
3458
        self,
3459
        context: RequestContext,
3460
        bucket: BucketName,
3461
        id: InventoryId,
3462
        expected_bucket_owner: AccountId = None,
3463
        **kwargs,
3464
    ) -> None:
3465
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3466

3467
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
3468
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3469

3470
    def get_bucket_website(
1✔
3471
        self,
3472
        context: RequestContext,
3473
        bucket: BucketName,
3474
        expected_bucket_owner: AccountId = None,
3475
        **kwargs,
3476
    ) -> GetBucketWebsiteOutput:
3477
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3478

3479
        if not s3_bucket.website_configuration:
1✔
3480
            raise NoSuchWebsiteConfiguration(
1✔
3481
                "The specified bucket does not have a website configuration",
3482
                BucketName=bucket,
3483
            )
3484
        return s3_bucket.website_configuration
1✔
3485

3486
    def put_bucket_website(
1✔
3487
        self,
3488
        context: RequestContext,
3489
        bucket: BucketName,
3490
        website_configuration: WebsiteConfiguration,
3491
        content_md5: ContentMD5 = None,
3492
        checksum_algorithm: ChecksumAlgorithm = None,
3493
        expected_bucket_owner: AccountId = None,
3494
        **kwargs,
3495
    ) -> None:
3496
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3497

3498
        validate_website_configuration(website_configuration)
1✔
3499
        s3_bucket.website_configuration = website_configuration
1✔
3500

3501
    def delete_bucket_website(
1✔
3502
        self,
3503
        context: RequestContext,
3504
        bucket: BucketName,
3505
        expected_bucket_owner: AccountId = None,
3506
        **kwargs,
3507
    ) -> None:
3508
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3509
        # does not raise error if the bucket did not have a config, will simply return
3510
        s3_bucket.website_configuration = None
1✔
3511

3512
    def get_object_lock_configuration(
1✔
3513
        self,
3514
        context: RequestContext,
3515
        bucket: BucketName,
3516
        expected_bucket_owner: AccountId = None,
3517
        **kwargs,
3518
    ) -> GetObjectLockConfigurationOutput:
3519
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3520
        if not s3_bucket.object_lock_enabled:
1✔
3521
            raise ObjectLockConfigurationNotFoundError(
1✔
3522
                "Object Lock configuration does not exist for this bucket",
3523
                BucketName=bucket,
3524
            )
3525

3526
        response = GetObjectLockConfigurationOutput(
1✔
3527
            ObjectLockConfiguration=ObjectLockConfiguration(
3528
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3529
            )
3530
        )
3531
        if s3_bucket.object_lock_default_retention:
1✔
3532
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3533
                "DefaultRetention": s3_bucket.object_lock_default_retention
3534
            }
3535

3536
        return response
1✔
3537

3538
    def put_object_lock_configuration(
1✔
3539
        self,
3540
        context: RequestContext,
3541
        bucket: BucketName,
3542
        object_lock_configuration: ObjectLockConfiguration = None,
3543
        request_payer: RequestPayer = None,
3544
        token: ObjectLockToken = None,
3545
        content_md5: ContentMD5 = None,
3546
        checksum_algorithm: ChecksumAlgorithm = None,
3547
        expected_bucket_owner: AccountId = None,
3548
        **kwargs,
3549
    ) -> PutObjectLockConfigurationOutput:
3550
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3551
        if s3_bucket.versioning_status != "Enabled":
1✔
3552
            raise InvalidBucketState(
1✔
3553
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3554
            )
3555

3556
        if (
1✔
3557
            not object_lock_configuration
3558
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3559
        ):
3560
            raise MalformedXML()
1✔
3561

3562
        if "Rule" not in object_lock_configuration:
1✔
3563
            s3_bucket.object_lock_default_retention = None
1✔
3564
            if not s3_bucket.object_lock_enabled:
1✔
3565
                s3_bucket.object_lock_enabled = True
1✔
3566

3567
            return PutObjectLockConfigurationOutput()
1✔
3568
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3569
            default_retention := rule.get("DefaultRetention")
3570
        ):
3571
            raise MalformedXML()
1✔
3572

3573
        if "Mode" not in default_retention or (
1✔
3574
            ("Days" in default_retention and "Years" in default_retention)
3575
            or ("Days" not in default_retention and "Years" not in default_retention)
3576
        ):
3577
            raise MalformedXML()
1✔
3578

3579
        s3_bucket.object_lock_default_retention = default_retention
1✔
3580
        if not s3_bucket.object_lock_enabled:
1✔
3581
            s3_bucket.object_lock_enabled = True
×
3582

3583
        return PutObjectLockConfigurationOutput()
1✔
3584

3585
    def get_object_legal_hold(
1✔
3586
        self,
3587
        context: RequestContext,
3588
        bucket: BucketName,
3589
        key: ObjectKey,
3590
        version_id: ObjectVersionId = None,
3591
        request_payer: RequestPayer = None,
3592
        expected_bucket_owner: AccountId = None,
3593
        **kwargs,
3594
    ) -> GetObjectLegalHoldOutput:
3595
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3596
        if not s3_bucket.object_lock_enabled:
1✔
3597
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3598

3599
        s3_object = s3_bucket.get_object(
1✔
3600
            key=key,
3601
            version_id=version_id,
3602
            http_method="GET",
3603
        )
3604
        if not s3_object.lock_legal_status:
1✔
3605
            raise NoSuchObjectLockConfiguration(
1✔
3606
                "The specified object does not have a ObjectLock configuration"
3607
            )
3608

3609
        return GetObjectLegalHoldOutput(
1✔
3610
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3611
        )
3612

3613
    def put_object_legal_hold(
1✔
3614
        self,
3615
        context: RequestContext,
3616
        bucket: BucketName,
3617
        key: ObjectKey,
3618
        legal_hold: ObjectLockLegalHold = None,
3619
        request_payer: RequestPayer = None,
3620
        version_id: ObjectVersionId = None,
3621
        content_md5: ContentMD5 = None,
3622
        checksum_algorithm: ChecksumAlgorithm = None,
3623
        expected_bucket_owner: AccountId = None,
3624
        **kwargs,
3625
    ) -> PutObjectLegalHoldOutput:
3626
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3627

3628
        if not legal_hold:
1✔
3629
            raise MalformedXML()
1✔
3630

3631
        if not s3_bucket.object_lock_enabled:
1✔
3632
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3633

3634
        s3_object = s3_bucket.get_object(
1✔
3635
            key=key,
3636
            version_id=version_id,
3637
            http_method="PUT",
3638
        )
3639
        # TODO: check casing
3640
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
3641
            raise MalformedXML()
×
3642

3643
        s3_object.lock_legal_status = status
1✔
3644

3645
        # TODO: return RequestCharged
3646
        return PutObjectRetentionOutput()
1✔
3647

3648
    def get_object_retention(
1✔
3649
        self,
3650
        context: RequestContext,
3651
        bucket: BucketName,
3652
        key: ObjectKey,
3653
        version_id: ObjectVersionId = None,
3654
        request_payer: RequestPayer = None,
3655
        expected_bucket_owner: AccountId = None,
3656
        **kwargs,
3657
    ) -> GetObjectRetentionOutput:
3658
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3659
        if not s3_bucket.object_lock_enabled:
1✔
3660
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3661

3662
        s3_object = s3_bucket.get_object(
1✔
3663
            key=key,
3664
            version_id=version_id,
3665
            http_method="GET",
3666
        )
3667
        if not s3_object.lock_mode:
1✔
3668
            raise NoSuchObjectLockConfiguration(
1✔
3669
                "The specified object does not have a ObjectLock configuration"
3670
            )
3671

3672
        return GetObjectRetentionOutput(
1✔
3673
            Retention=ObjectLockRetention(
3674
                Mode=s3_object.lock_mode,
3675
                RetainUntilDate=s3_object.lock_until,
3676
            )
3677
        )
3678

3679
    def put_object_retention(
1✔
3680
        self,
3681
        context: RequestContext,
3682
        bucket: BucketName,
3683
        key: ObjectKey,
3684
        retention: ObjectLockRetention = None,
3685
        request_payer: RequestPayer = None,
3686
        version_id: ObjectVersionId = None,
3687
        bypass_governance_retention: BypassGovernanceRetention = None,
3688
        content_md5: ContentMD5 = None,
3689
        checksum_algorithm: ChecksumAlgorithm = None,
3690
        expected_bucket_owner: AccountId = None,
3691
        **kwargs,
3692
    ) -> PutObjectRetentionOutput:
3693
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3694
        if not s3_bucket.object_lock_enabled:
1✔
3695
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3696

3697
        s3_object = s3_bucket.get_object(
1✔
3698
            key=key,
3699
            version_id=version_id,
3700
            http_method="PUT",
3701
        )
3702

3703
        if retention and not validate_dict_fields(
1✔
3704
            retention, required_fields={"Mode", "RetainUntilDate"}
3705
        ):
3706
            raise MalformedXML()
1✔
3707

3708
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3709
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3710
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3711
            pst_datetime = retention["RetainUntilDate"].astimezone(tz=ZoneInfo("US/Pacific"))
1✔
3712
            raise InvalidArgument(
1✔
3713
                "The retain until date must be in the future!",
3714
                ArgumentName="RetainUntilDate",
3715
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3716
            )
3717

3718
        if (
1✔
3719
            not retention
3720
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3721
        ) and not (
3722
            bypass_governance_retention and s3_object.lock_mode == ObjectLockMode.GOVERNANCE
3723
        ):
3724
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3725

3726
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3727
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3728

3729
        # TODO: return RequestCharged
3730
        return PutObjectRetentionOutput()
1✔
3731

3732
    def put_bucket_request_payment(
1✔
3733
        self,
3734
        context: RequestContext,
3735
        bucket: BucketName,
3736
        request_payment_configuration: RequestPaymentConfiguration,
3737
        content_md5: ContentMD5 = None,
3738
        checksum_algorithm: ChecksumAlgorithm = None,
3739
        expected_bucket_owner: AccountId = None,
3740
        **kwargs,
3741
    ) -> None:
3742
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3743
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3744

3745
        payer = request_payment_configuration.get("Payer")
1✔
3746
        if payer not in ["Requester", "BucketOwner"]:
1✔
3747
            raise MalformedXML()
1✔
3748

3749
        s3_bucket.payer = payer
1✔
3750

3751
    def get_bucket_request_payment(
1✔
3752
        self,
3753
        context: RequestContext,
3754
        bucket: BucketName,
3755
        expected_bucket_owner: AccountId = None,
3756
        **kwargs,
3757
    ) -> GetBucketRequestPaymentOutput:
3758
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3759
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3760

3761
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
3762

3763
    def get_bucket_ownership_controls(
1✔
3764
        self,
3765
        context: RequestContext,
3766
        bucket: BucketName,
3767
        expected_bucket_owner: AccountId = None,
3768
        **kwargs,
3769
    ) -> GetBucketOwnershipControlsOutput:
3770
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3771

3772
        if not s3_bucket.object_ownership:
1✔
3773
            raise OwnershipControlsNotFoundError(
1✔
3774
                "The bucket ownership controls were not found",
3775
                BucketName=bucket,
3776
            )
3777

3778
        return GetBucketOwnershipControlsOutput(
1✔
3779
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
3780
        )
3781

3782
    def put_bucket_ownership_controls(
1✔
3783
        self,
3784
        context: RequestContext,
3785
        bucket: BucketName,
3786
        ownership_controls: OwnershipControls,
3787
        content_md5: ContentMD5 = None,
3788
        expected_bucket_owner: AccountId = None,
3789
        **kwargs,
3790
    ) -> None:
3791
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3792
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
3793
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3794

3795
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
3796
            raise MalformedXML()
1✔
3797

3798
        rule = rules[0]
1✔
3799
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
3800
            raise MalformedXML()
1✔
3801

3802
        s3_bucket.object_ownership = object_ownership
1✔
3803

3804
    def delete_bucket_ownership_controls(
1✔
3805
        self,
3806
        context: RequestContext,
3807
        bucket: BucketName,
3808
        expected_bucket_owner: AccountId = None,
3809
        **kwargs,
3810
    ) -> None:
3811
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3812

3813
        s3_bucket.object_ownership = None
1✔
3814

3815
    def get_public_access_block(
1✔
3816
        self,
3817
        context: RequestContext,
3818
        bucket: BucketName,
3819
        expected_bucket_owner: AccountId = None,
3820
        **kwargs,
3821
    ) -> GetPublicAccessBlockOutput:
3822
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3823

3824
        if not s3_bucket.public_access_block:
1✔
3825
            raise NoSuchPublicAccessBlockConfiguration(
1✔
3826
                "The public access block configuration was not found", BucketName=bucket
3827
            )
3828

3829
        return GetPublicAccessBlockOutput(
1✔
3830
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
3831
        )
3832

3833
    def put_public_access_block(
1✔
3834
        self,
3835
        context: RequestContext,
3836
        bucket: BucketName,
3837
        public_access_block_configuration: PublicAccessBlockConfiguration,
3838
        content_md5: ContentMD5 = None,
3839
        checksum_algorithm: ChecksumAlgorithm = None,
3840
        expected_bucket_owner: AccountId = None,
3841
        **kwargs,
3842
    ) -> None:
3843
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3844
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
3845
        #  bucket configuration. See s3control
3846
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3847

3848
        public_access_block_fields = {
1✔
3849
            "BlockPublicAcls",
3850
            "BlockPublicPolicy",
3851
            "IgnorePublicAcls",
3852
            "RestrictPublicBuckets",
3853
        }
3854
        if not validate_dict_fields(
1✔
3855
            public_access_block_configuration,
3856
            required_fields=set(),
3857
            optional_fields=public_access_block_fields,
3858
        ):
3859
            raise MalformedXML()
×
3860

3861
        for field in public_access_block_fields:
1✔
3862
            if public_access_block_configuration.get(field) is None:
1✔
3863
                public_access_block_configuration[field] = False
1✔
3864

3865
        s3_bucket.public_access_block = public_access_block_configuration
1✔
3866

3867
    def delete_public_access_block(
1✔
3868
        self,
3869
        context: RequestContext,
3870
        bucket: BucketName,
3871
        expected_bucket_owner: AccountId = None,
3872
        **kwargs,
3873
    ) -> None:
3874
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3875

3876
        s3_bucket.public_access_block = None
1✔
3877

3878
    def get_bucket_policy(
1✔
3879
        self,
3880
        context: RequestContext,
3881
        bucket: BucketName,
3882
        expected_bucket_owner: AccountId = None,
3883
        **kwargs,
3884
    ) -> GetBucketPolicyOutput:
3885
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3886
            context, bucket, expected_bucket_owner=expected_bucket_owner
3887
        )
3888
        if not s3_bucket.policy:
1✔
3889
            raise NoSuchBucketPolicy(
1✔
3890
                "The bucket policy does not exist",
3891
                BucketName=bucket,
3892
            )
3893
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
3894

3895
    def put_bucket_policy(
1✔
3896
        self,
3897
        context: RequestContext,
3898
        bucket: BucketName,
3899
        policy: Policy,
3900
        content_md5: ContentMD5 = None,
3901
        checksum_algorithm: ChecksumAlgorithm = None,
3902
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
3903
        expected_bucket_owner: AccountId = None,
3904
        **kwargs,
3905
    ) -> None:
3906
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3907
            context, bucket, expected_bucket_owner=expected_bucket_owner
3908
        )
3909

3910
        if not policy or policy[0] != "{":
1✔
3911
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
3912
        try:
1✔
3913
            json_policy = json.loads(policy)
1✔
3914
            if not json_policy:
1✔
3915
                # TODO: add more validation around the policy?
3916
                raise MalformedPolicy("Missing required field Statement")
1✔
3917
        except ValueError:
1✔
3918
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
3919

3920
        s3_bucket.policy = policy
1✔
3921

3922
    def delete_bucket_policy(
1✔
3923
        self,
3924
        context: RequestContext,
3925
        bucket: BucketName,
3926
        expected_bucket_owner: AccountId = None,
3927
        **kwargs,
3928
    ) -> None:
3929
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3930
            context, bucket, expected_bucket_owner=expected_bucket_owner
3931
        )
3932

3933
        s3_bucket.policy = None
1✔
3934

3935
    def get_bucket_accelerate_configuration(
1✔
3936
        self,
3937
        context: RequestContext,
3938
        bucket: BucketName,
3939
        expected_bucket_owner: AccountId = None,
3940
        request_payer: RequestPayer = None,
3941
        **kwargs,
3942
    ) -> GetBucketAccelerateConfigurationOutput:
3943
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3944

3945
        response = GetBucketAccelerateConfigurationOutput()
1✔
3946
        if s3_bucket.accelerate_status:
1✔
3947
            response["Status"] = s3_bucket.accelerate_status
1✔
3948

3949
        return response
1✔
3950

3951
    def put_bucket_accelerate_configuration(
1✔
3952
        self,
3953
        context: RequestContext,
3954
        bucket: BucketName,
3955
        accelerate_configuration: AccelerateConfiguration,
3956
        expected_bucket_owner: AccountId = None,
3957
        checksum_algorithm: ChecksumAlgorithm = None,
3958
        **kwargs,
3959
    ) -> None:
3960
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3961

3962
        if "." in bucket:
1✔
3963
            raise InvalidRequest(
1✔
3964
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
3965
            )
3966

3967
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
3968
            "Enabled",
3969
            "Suspended",
3970
        ):
3971
            raise MalformedXML()
1✔
3972

3973
        s3_bucket.accelerate_status = status
1✔
3974

3975
    def put_bucket_logging(
1✔
3976
        self,
3977
        context: RequestContext,
3978
        bucket: BucketName,
3979
        bucket_logging_status: BucketLoggingStatus,
3980
        content_md5: ContentMD5 = None,
3981
        checksum_algorithm: ChecksumAlgorithm = None,
3982
        expected_bucket_owner: AccountId = None,
3983
        **kwargs,
3984
    ) -> None:
3985
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3986

3987
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
3988
            s3_bucket.logging = {}
1✔
3989
            return
1✔
3990

3991
        # the target bucket must be in the same account
3992
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
3993
            raise MalformedXML()
×
3994

3995
        if not logging_config.get("TargetPrefix"):
1✔
3996
            logging_config["TargetPrefix"] = ""
×
3997

3998
        # TODO: validate Grants
3999

4000
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4001
            raise InvalidTargetBucketForLogging(
1✔
4002
                "The target bucket for logging does not exist",
4003
                TargetBucket=target_bucket_name,
4004
            )
4005

4006
        source_bucket_region = s3_bucket.bucket_region
1✔
4007
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4008
            raise (
1✔
4009
                CrossLocationLoggingProhibitted(
4010
                    "Cross S3 location logging not allowed. ",
4011
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4012
                )
4013
                if source_bucket_region == AWS_REGION_US_EAST_1
4014
                else CrossLocationLoggingProhibitted(
4015
                    "Cross S3 location logging not allowed. ",
4016
                    SourceBucketLocation=source_bucket_region,
4017
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4018
                )
4019
            )
4020

4021
        s3_bucket.logging = logging_config
1✔
4022

4023
    def get_bucket_logging(
1✔
4024
        self,
4025
        context: RequestContext,
4026
        bucket: BucketName,
4027
        expected_bucket_owner: AccountId = None,
4028
        **kwargs,
4029
    ) -> GetBucketLoggingOutput:
4030
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4031

4032
        if not s3_bucket.logging:
1✔
4033
            return GetBucketLoggingOutput()
1✔
4034

4035
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4036

4037
    def put_bucket_replication(
1✔
4038
        self,
4039
        context: RequestContext,
4040
        bucket: BucketName,
4041
        replication_configuration: ReplicationConfiguration,
4042
        content_md5: ContentMD5 = None,
4043
        checksum_algorithm: ChecksumAlgorithm = None,
4044
        token: ObjectLockToken = None,
4045
        expected_bucket_owner: AccountId = None,
4046
        **kwargs,
4047
    ) -> None:
4048
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4049
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4050
            raise InvalidRequest(
1✔
4051
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4052
            )
4053

4054
        if not (rules := replication_configuration.get("Rules")):
1✔
4055
            raise MalformedXML()
1✔
4056

4057
        for rule in rules:
1✔
4058
            if "ID" not in rule:
1✔
4059
                rule["ID"] = short_uid()
1✔
4060

4061
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4062
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4063
            if (
1✔
4064
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4065
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4066
            ):
4067
                # according to AWS testing the same exception is raised if the bucket does not exist
4068
                # or if versioning was disabled
4069
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4070

4071
        # TODO more validation on input
4072
        s3_bucket.replication = replication_configuration
1✔
4073

4074
    def get_bucket_replication(
1✔
4075
        self,
4076
        context: RequestContext,
4077
        bucket: BucketName,
4078
        expected_bucket_owner: AccountId = None,
4079
        **kwargs,
4080
    ) -> GetBucketReplicationOutput:
4081
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4082

4083
        if not s3_bucket.replication:
1✔
4084
            raise ReplicationConfigurationNotFoundError(
1✔
4085
                "The replication configuration was not found",
4086
                BucketName=bucket,
4087
            )
4088

4089
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4090

4091
    def delete_bucket_replication(
1✔
4092
        self,
4093
        context: RequestContext,
4094
        bucket: BucketName,
4095
        expected_bucket_owner: AccountId = None,
4096
        **kwargs,
4097
    ) -> None:
4098
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4099

4100
        s3_bucket.replication = None
1✔
4101

4102
    @handler("PutBucketAcl", expand=False)
1✔
4103
    def put_bucket_acl(
1✔
4104
        self,
4105
        context: RequestContext,
4106
        request: PutBucketAclRequest,
4107
    ) -> None:
4108
        bucket = request["Bucket"]
1✔
4109
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4110
        acp = get_access_control_policy_from_acl_request(
1✔
4111
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4112
        )
4113
        s3_bucket.acl = acp
1✔
4114

4115
    def get_bucket_acl(
1✔
4116
        self,
4117
        context: RequestContext,
4118
        bucket: BucketName,
4119
        expected_bucket_owner: AccountId = None,
4120
        **kwargs,
4121
    ) -> GetBucketAclOutput:
4122
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4123

4124
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4125

4126
    @handler("PutObjectAcl", expand=False)
1✔
4127
    def put_object_acl(
1✔
4128
        self,
4129
        context: RequestContext,
4130
        request: PutObjectAclRequest,
4131
    ) -> PutObjectAclOutput:
4132
        bucket = request["Bucket"]
1✔
4133
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4134

4135
        s3_object = s3_bucket.get_object(
1✔
4136
            key=request["Key"],
4137
            version_id=request.get("VersionId"),
4138
            http_method="PUT",
4139
        )
4140
        acp = get_access_control_policy_from_acl_request(
1✔
4141
            request=request, owner=s3_object.owner, request_body=context.request.data
4142
        )
4143
        previous_acl = s3_object.acl
1✔
4144
        s3_object.acl = acp
1✔
4145

4146
        if previous_acl != acp:
1✔
4147
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4148

4149
        # TODO: RequestCharged
4150
        return PutObjectAclOutput()
1✔
4151

4152
    def get_object_acl(
1✔
4153
        self,
4154
        context: RequestContext,
4155
        bucket: BucketName,
4156
        key: ObjectKey,
4157
        version_id: ObjectVersionId = None,
4158
        request_payer: RequestPayer = None,
4159
        expected_bucket_owner: AccountId = None,
4160
        **kwargs,
4161
    ) -> GetObjectAclOutput:
4162
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4163

4164
        s3_object = s3_bucket.get_object(
1✔
4165
            key=key,
4166
            version_id=version_id,
4167
        )
4168
        # TODO: RequestCharged
4169
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4170

4171
    def get_bucket_policy_status(
1✔
4172
        self,
4173
        context: RequestContext,
4174
        bucket: BucketName,
4175
        expected_bucket_owner: AccountId = None,
4176
        **kwargs,
4177
    ) -> GetBucketPolicyStatusOutput:
4178
        raise NotImplementedError
4179

4180
    def get_object_torrent(
1✔
4181
        self,
4182
        context: RequestContext,
4183
        bucket: BucketName,
4184
        key: ObjectKey,
4185
        request_payer: RequestPayer = None,
4186
        expected_bucket_owner: AccountId = None,
4187
        **kwargs,
4188
    ) -> GetObjectTorrentOutput:
4189
        raise NotImplementedError
4190

4191
    def post_object(
1✔
4192
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4193
    ) -> PostResponse:
4194
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4195
            raise PreconditionFailed(
1✔
4196
                "At least one of the pre-conditions you specified did not hold",
4197
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4198
            )
4199
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4200
        # TODO: signature validation is not implemented for pre-signed POST
4201
        # policy validation is not implemented either, except expiration and mandatory fields
4202
        # This operation is the only one using form for storing the request data. We will have to do some manual
4203
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4204
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4205

4206
        form = context.request.form
1✔
4207
        object_key = context.request.form.get("key")
1✔
4208

4209
        if "file" in form:
1✔
4210
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4211
            file_data = to_bytes(form["file"])
1✔
4212
            object_content_length = len(file_data)
1✔
4213
            stream = BytesIO(file_data)
1✔
4214
        else:
4215
            # this is the default behaviour
4216
            fileobj = context.request.files["file"]
1✔
4217
            stream = fileobj.stream
1✔
4218
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4219
            # validation
4220
            original_pos = stream.tell()
1✔
4221
            object_content_length = stream.seek(0, 2)
1✔
4222
            # reset the stream and put it back at its original position
4223
            stream.seek(original_pos, 0)
1✔
4224

4225
            if "${filename}" in object_key:
1✔
4226
                # TODO: ${filename} is actually usable in all form fields
4227
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4228
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4229
                # is recognized by all form fields.
4230
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4231

4232
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4233
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4234
        additional_policy_metadata = {
1✔
4235
            "bucket": bucket,
4236
            "content_length": object_content_length,
4237
        }
4238
        validate_post_policy(form, additional_policy_metadata)
1✔
4239

4240
        if canned_acl := form.get("acl"):
1✔
4241
            validate_canned_acl(canned_acl)
×
4242
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4243
        else:
4244
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4245

4246
        post_system_settable_headers = [
1✔
4247
            "Cache-Control",
4248
            "Content-Type",
4249
            "Content-Disposition",
4250
            "Content-Encoding",
4251
        ]
4252
        system_metadata = {}
1✔
4253
        for system_metadata_field in post_system_settable_headers:
1✔
4254
            if field_value := form.get(system_metadata_field):
1✔
4255
                system_metadata[system_metadata_field.replace("-", "")] = field_value
1✔
4256

4257
        if not system_metadata.get("ContentType"):
1✔
4258
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4259

4260
        user_metadata = {
1✔
4261
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4262
            for field in form
4263
            if field.startswith("x-amz-meta-")
4264
        }
4265

4266
        if tagging := form.get("tagging"):
1✔
4267
            # this is weird, as it's direct XML in the form, we need to parse it directly
4268
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4269

4270
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4271
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4272
        ):
4273
            raise InvalidStorageClass(
1✔
4274
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4275
            )
4276

4277
        encryption_request = {
1✔
4278
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4279
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4280
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4281
        }
4282

4283
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4284
            encryption_request,
4285
            s3_bucket,
4286
            store,
4287
        )
4288

4289
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4290
        checksum_value = (
1✔
4291
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4292
        )
4293
        expires = (
1✔
4294
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4295
        )
4296

4297
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4298

4299
        s3_object = S3Object(
1✔
4300
            key=object_key,
4301
            version_id=version_id,
4302
            storage_class=storage_class,
4303
            expires=expires,
4304
            user_metadata=user_metadata,
4305
            system_metadata=system_metadata,
4306
            checksum_algorithm=checksum_algorithm,
4307
            checksum_value=checksum_value,
4308
            encryption=encryption_parameters.encryption,
4309
            kms_key_id=encryption_parameters.kms_key_id,
4310
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4311
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4312
            acl=acp,
4313
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4314
        )
4315

4316
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4317
            s3_stored_object.write(stream)
1✔
4318

4319
            if not s3_object.checksum_value:
1✔
4320
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4321

4322
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
4323
                self._storage_backend.remove(bucket, s3_object)
×
4324
                raise InvalidRequest(
×
4325
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4326
                )
4327

4328
            s3_bucket.objects.set(object_key, s3_object)
1✔
4329

4330
        # in case we are overriding an object, delete the tags entry
4331
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4332
        store.TAGS.tags.pop(key_id, None)
1✔
4333
        if tagging:
1✔
4334
            store.TAGS.tags[key_id] = tagging
1✔
4335

4336
        response = PostResponse()
1✔
4337
        # hacky way to set the etag in the headers as well: two locations for one value
4338
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4339

4340
        if redirect := form.get("success_action_redirect"):
1✔
4341
            # we need to create the redirect, as the parser could not return the moto-calculated one
4342
            try:
1✔
4343
                redirect = create_redirect_for_post_request(
1✔
4344
                    base_redirect=redirect,
4345
                    bucket=bucket,
4346
                    object_key=object_key,
4347
                    etag=s3_object.quoted_etag,
4348
                )
4349
                response["LocationHeader"] = redirect
1✔
4350
                response["StatusCode"] = 303
1✔
4351
            except ValueError:
1✔
4352
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4353
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4354

4355
        elif status_code := form.get("success_action_status"):
1✔
4356
            response["StatusCode"] = status_code
1✔
4357
        else:
4358
            response["StatusCode"] = 204
1✔
4359

4360
        response["LocationHeader"] = response.get(
1✔
4361
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
4362
        )
4363

4364
        if s3_bucket.versioning_status == "Enabled":
1✔
4365
            response["VersionId"] = s3_object.version_id
×
4366

4367
        if s3_object.checksum_algorithm:
1✔
4368
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4369
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4370

4371
        if s3_bucket.lifecycle_rules:
1✔
4372
            if expiration_header := self._get_expiration_header(
×
4373
                s3_bucket.lifecycle_rules,
4374
                bucket,
4375
                s3_object,
4376
                store.TAGS.tags.get(key_id, {}),
4377
            ):
4378
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4379
                #  apply them everytime we get/head an object
4380
                response["Expiration"] = expiration_header
×
4381

4382
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4383

4384
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4385

4386
        if response["StatusCode"] == "201":
1✔
4387
            # if the StatusCode is 201, S3 returns an XML body with additional information
4388
            response["ETag"] = s3_object.quoted_etag
1✔
4389
            response["Bucket"] = bucket
1✔
4390
            response["Key"] = object_key
1✔
4391
            response["Location"] = response["LocationHeader"]
1✔
4392

4393
        return response
1✔
4394

4395

4396
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4397
    if not bucket_versioning_status:
1✔
4398
        return None
1✔
4399
    elif bucket_versioning_status.lower() == "enabled":
1✔
4400
        return generate_safe_version_id()
1✔
4401
    else:
4402
        return "null"
1✔
4403

4404

4405
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4406
    if encryption := s3_object.encryption:
1✔
4407
        response["ServerSideEncryption"] = encryption
1✔
4408
        if encryption == ServerSideEncryption.aws_kms:
1✔
4409
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4410
            if s3_object.bucket_key_enabled:
1✔
4411
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4412

4413

4414
def get_encryption_parameters_from_request_and_bucket(
1✔
4415
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4416
    s3_bucket: S3Bucket,
4417
    store: S3Store,
4418
) -> EncryptionParameters:
4419
    if request.get("SSECustomerKey"):
1✔
4420
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4421
        return EncryptionParameters(None, None, False)
1✔
4422

4423
    encryption = request.get("ServerSideEncryption")
1✔
4424
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4425
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4426
    if s3_bucket.encryption_rule:
1✔
4427
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4428
        encryption = (
1✔
4429
            encryption
4430
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4431
        )
4432
        if encryption == ServerSideEncryption.aws_kms:
1✔
4433
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4434
                "ApplyServerSideEncryptionByDefault"
4435
            ].get("KMSMasterKeyID")
4436
            kms_key_id = get_kms_key_arn(
1✔
4437
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4438
            )
4439
            if not kms_key_id:
1✔
4440
                # if not key is provided, AWS will use an AWS managed KMS key
4441
                # create it if it doesn't already exist, and save it in the store per region
4442
                if not store.aws_managed_kms_key_id:
1✔
4443
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4444
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4445
                    )
4446
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4447

4448
                kms_key_id = store.aws_managed_kms_key_id
1✔
4449

4450
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4451

4452

4453
def get_object_lock_parameters_from_bucket_and_request(
1✔
4454
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4455
    s3_bucket: S3Bucket,
4456
):
4457
    # TODO: also validate here?
4458
    lock_mode = request.get("ObjectLockMode")
1✔
4459
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4460
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4461

4462
    if default_retention := s3_bucket.object_lock_default_retention:
1✔
4463
        lock_mode = lock_mode or default_retention.get("Mode")
1✔
4464
        if lock_mode and not lock_until:
1✔
4465
            lock_until = get_retention_from_now(
1✔
4466
                days=default_retention.get("Days"),
4467
                years=default_retention.get("Years"),
4468
            )
4469

4470
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4471

4472

4473
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4474
    """
4475
    Calculate the range value from a part Number for an S3 Object
4476
    :param s3_object: S3Object
4477
    :param part_number: the wanted part from the S3Object
4478
    :return: an ObjectRange used to return only a slice of an Object
4479
    """
4480
    if not s3_object.parts:
1✔
4481
        if part_number > 1:
1✔
4482
            raise InvalidPartNumber(
1✔
4483
                "The requested partnumber is not satisfiable",
4484
                PartNumberRequested=part_number,
4485
                ActualPartCount=1,
4486
            )
4487
        return ObjectRange(
1✔
4488
            begin=0,
4489
            end=s3_object.size - 1,
4490
            content_length=s3_object.size,
4491
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4492
        )
4493
    elif not (part_data := s3_object.parts.get(part_number)):
1✔
4494
        raise InvalidPartNumber(
1✔
4495
            "The requested partnumber is not satisfiable",
4496
            PartNumberRequested=part_number,
4497
            ActualPartCount=len(s3_object.parts),
4498
        )
4499

4500
    begin, part_length = part_data
1✔
4501
    end = begin + part_length - 1
1✔
4502
    return ObjectRange(
1✔
4503
        begin=begin,
4504
        end=end,
4505
        content_length=part_length,
4506
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4507
    )
4508

4509

4510
def get_acl_headers_from_request(
1✔
4511
    request: Union[
4512
        PutObjectRequest,
4513
        CreateMultipartUploadRequest,
4514
        CopyObjectRequest,
4515
        CreateBucketRequest,
4516
        PutBucketAclRequest,
4517
        PutObjectAclRequest,
4518
    ],
4519
) -> list[tuple[str, str]]:
4520
    permission_keys = [
1✔
4521
        "GrantFullControl",
4522
        "GrantRead",
4523
        "GrantReadACP",
4524
        "GrantWrite",
4525
        "GrantWriteACP",
4526
    ]
4527
    acl_headers = [
1✔
4528
        (permission, grant_header)
4529
        for permission in permission_keys
4530
        if (grant_header := request.get(permission))
4531
    ]
4532
    return acl_headers
1✔
4533

4534

4535
def get_access_control_policy_from_acl_request(
1✔
4536
    request: Union[PutBucketAclRequest, PutObjectAclRequest],
4537
    owner: Owner,
4538
    request_body: bytes,
4539
) -> AccessControlPolicy:
4540
    canned_acl = request.get("ACL")
1✔
4541
    acl_headers = get_acl_headers_from_request(request)
1✔
4542

4543
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4544
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4545
    # use case seems dangerous
4546
    is_acp_in_body = request_body
1✔
4547

4548
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4549
        raise MissingSecurityHeader(
1✔
4550
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4551
        )
4552

4553
    elif canned_acl and acl_headers:
1✔
4554
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4555

4556
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4557
        raise UnexpectedContent("This request does not support content")
1✔
4558

4559
    if canned_acl:
1✔
4560
        validate_canned_acl(canned_acl)
1✔
4561
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4562

4563
    elif acl_headers:
1✔
4564
        grants = []
1✔
4565
        for permission, grantees_values in acl_headers:
1✔
4566
            permission = get_permission_from_header(permission)
1✔
4567
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4568
            grants.extend(partial_grants)
1✔
4569

4570
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4571
    else:
4572
        acp = request.get("AccessControlPolicy")
1✔
4573
        validate_acl_acp(acp)
1✔
4574
        if (
1✔
4575
            owner.get("DisplayName")
4576
            and acp["Grants"]
4577
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4578
        ):
4579
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4580

4581
    return acp
1✔
4582

4583

4584
def get_access_control_policy_for_new_resource_request(
1✔
4585
    request: Union[
4586
        PutObjectRequest, CreateMultipartUploadRequest, CopyObjectRequest, CreateBucketRequest
4587
    ],
4588
    owner: Owner,
4589
) -> AccessControlPolicy:
4590
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4591
    canned_acl = request.get("ACL")
1✔
4592
    acl_headers = get_acl_headers_from_request(request)
1✔
4593

4594
    if not (canned_acl or acl_headers):
1✔
4595
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4596

4597
    elif canned_acl and acl_headers:
1✔
4598
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4599

4600
    if canned_acl:
1✔
4601
        validate_canned_acl(canned_acl)
1✔
4602
        return get_canned_acl(canned_acl, owner=owner)
1✔
4603

4604
    grants = []
×
4605
    for permission, grantees_values in acl_headers:
×
4606
        permission = get_permission_from_header(permission)
×
4607
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
4608
        grants.extend(partial_grants)
×
4609

4610
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
4611

4612

4613
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
4614
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
4615

4616

4617
def verify_object_equality_precondition_write(
1✔
4618
    s3_bucket: S3Bucket,
4619
    key: ObjectKey,
4620
    etag: str,
4621
    initiated: datetime.datetime | None = None,
4622
) -> None:
4623
    existing = s3_bucket.objects.get(key)
1✔
4624
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
4625
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
4626

4627
    if not existing.etag == etag.strip('"'):
1✔
4628
        raise PreconditionFailed(
1✔
4629
            "At least one of the pre-conditions you specified did not hold",
4630
            Condition="If-Match",
4631
        )
4632

4633
    if initiated and initiated < existing.last_modified:
1✔
4634
        raise ConditionalRequestConflict(
1✔
4635
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
4636
            Condition="If-Match",
4637
            Key=key,
4638
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc