• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 4c016940-14bb-4fba-a49d-427f621d8d2d

10 Mar 2025 11:34PM UTC coverage: 86.901% (-0.03%) from 86.929%
4c016940-14bb-4fba-a49d-427f621d8d2d

push

circleci

web-flow
Update CODEOWNERS (#12359)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>

62118 of 71481 relevant lines covered (86.9%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.43
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import copy
1✔
3
import datetime
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
from collections import defaultdict
1✔
8
from inspect import signature
1✔
9
from io import BytesIO
1✔
10
from operator import itemgetter
1✔
11
from typing import IO, Optional, Union
1✔
12
from urllib import parse as urlparse
1✔
13
from zoneinfo import ZoneInfo
1✔
14

15
from localstack import config
1✔
16
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
17
from localstack.aws.api.s3 import (
1✔
18
    MFA,
19
    AbortMultipartUploadOutput,
20
    AccelerateConfiguration,
21
    AccessControlPolicy,
22
    AccessDenied,
23
    AccountId,
24
    AnalyticsConfiguration,
25
    AnalyticsId,
26
    BadDigest,
27
    Body,
28
    Bucket,
29
    BucketAlreadyExists,
30
    BucketAlreadyOwnedByYou,
31
    BucketCannedACL,
32
    BucketLifecycleConfiguration,
33
    BucketLoggingStatus,
34
    BucketName,
35
    BucketNotEmpty,
36
    BucketRegion,
37
    BucketVersioningStatus,
38
    BypassGovernanceRetention,
39
    ChecksumAlgorithm,
40
    ChecksumCRC32,
41
    ChecksumCRC32C,
42
    ChecksumCRC64NVME,
43
    ChecksumSHA1,
44
    ChecksumSHA256,
45
    ChecksumType,
46
    CommonPrefix,
47
    CompletedMultipartUpload,
48
    CompleteMultipartUploadOutput,
49
    ConditionalRequestConflict,
50
    ConfirmRemoveSelfBucketAccess,
51
    ContentMD5,
52
    CopyObjectOutput,
53
    CopyObjectRequest,
54
    CopyObjectResult,
55
    CopyPartResult,
56
    CORSConfiguration,
57
    CreateBucketOutput,
58
    CreateBucketRequest,
59
    CreateMultipartUploadOutput,
60
    CreateMultipartUploadRequest,
61
    CrossLocationLoggingProhibitted,
62
    Delete,
63
    DeletedObject,
64
    DeleteMarkerEntry,
65
    DeleteObjectOutput,
66
    DeleteObjectsOutput,
67
    DeleteObjectTaggingOutput,
68
    Delimiter,
69
    EncodingType,
70
    Error,
71
    Expiration,
72
    FetchOwner,
73
    GetBucketAccelerateConfigurationOutput,
74
    GetBucketAclOutput,
75
    GetBucketAnalyticsConfigurationOutput,
76
    GetBucketCorsOutput,
77
    GetBucketEncryptionOutput,
78
    GetBucketIntelligentTieringConfigurationOutput,
79
    GetBucketInventoryConfigurationOutput,
80
    GetBucketLifecycleConfigurationOutput,
81
    GetBucketLocationOutput,
82
    GetBucketLoggingOutput,
83
    GetBucketOwnershipControlsOutput,
84
    GetBucketPolicyOutput,
85
    GetBucketPolicyStatusOutput,
86
    GetBucketReplicationOutput,
87
    GetBucketRequestPaymentOutput,
88
    GetBucketTaggingOutput,
89
    GetBucketVersioningOutput,
90
    GetBucketWebsiteOutput,
91
    GetObjectAclOutput,
92
    GetObjectAttributesOutput,
93
    GetObjectAttributesParts,
94
    GetObjectAttributesRequest,
95
    GetObjectLegalHoldOutput,
96
    GetObjectLockConfigurationOutput,
97
    GetObjectOutput,
98
    GetObjectRequest,
99
    GetObjectRetentionOutput,
100
    GetObjectTaggingOutput,
101
    GetObjectTorrentOutput,
102
    GetPublicAccessBlockOutput,
103
    HeadBucketOutput,
104
    HeadObjectOutput,
105
    HeadObjectRequest,
106
    IfMatch,
107
    IfMatchInitiatedTime,
108
    IfMatchLastModifiedTime,
109
    IfMatchSize,
110
    IfNoneMatch,
111
    IntelligentTieringConfiguration,
112
    IntelligentTieringId,
113
    InvalidArgument,
114
    InvalidBucketName,
115
    InvalidDigest,
116
    InvalidLocationConstraint,
117
    InvalidObjectState,
118
    InvalidPartNumber,
119
    InvalidPartOrder,
120
    InvalidStorageClass,
121
    InvalidTargetBucketForLogging,
122
    InventoryConfiguration,
123
    InventoryId,
124
    KeyMarker,
125
    LifecycleRules,
126
    ListBucketAnalyticsConfigurationsOutput,
127
    ListBucketIntelligentTieringConfigurationsOutput,
128
    ListBucketInventoryConfigurationsOutput,
129
    ListBucketsOutput,
130
    ListMultipartUploadsOutput,
131
    ListObjectsOutput,
132
    ListObjectsV2Output,
133
    ListObjectVersionsOutput,
134
    ListPartsOutput,
135
    Marker,
136
    MaxBuckets,
137
    MaxKeys,
138
    MaxParts,
139
    MaxUploads,
140
    MethodNotAllowed,
141
    MissingSecurityHeader,
142
    MpuObjectSize,
143
    MultipartUpload,
144
    MultipartUploadId,
145
    NoSuchBucket,
146
    NoSuchBucketPolicy,
147
    NoSuchCORSConfiguration,
148
    NoSuchKey,
149
    NoSuchLifecycleConfiguration,
150
    NoSuchPublicAccessBlockConfiguration,
151
    NoSuchTagSet,
152
    NoSuchUpload,
153
    NoSuchWebsiteConfiguration,
154
    NotificationConfiguration,
155
    Object,
156
    ObjectIdentifier,
157
    ObjectKey,
158
    ObjectLockConfiguration,
159
    ObjectLockConfigurationNotFoundError,
160
    ObjectLockEnabled,
161
    ObjectLockLegalHold,
162
    ObjectLockMode,
163
    ObjectLockRetention,
164
    ObjectLockToken,
165
    ObjectOwnership,
166
    ObjectVersion,
167
    ObjectVersionId,
168
    ObjectVersionStorageClass,
169
    OptionalObjectAttributesList,
170
    Owner,
171
    OwnershipControls,
172
    OwnershipControlsNotFoundError,
173
    Part,
174
    PartNumber,
175
    PartNumberMarker,
176
    Policy,
177
    PostResponse,
178
    PreconditionFailed,
179
    Prefix,
180
    PublicAccessBlockConfiguration,
181
    PutBucketAclRequest,
182
    PutBucketLifecycleConfigurationOutput,
183
    PutObjectAclOutput,
184
    PutObjectAclRequest,
185
    PutObjectLegalHoldOutput,
186
    PutObjectLockConfigurationOutput,
187
    PutObjectOutput,
188
    PutObjectRequest,
189
    PutObjectRetentionOutput,
190
    PutObjectTaggingOutput,
191
    ReplicationConfiguration,
192
    ReplicationConfigurationNotFoundError,
193
    RequestPayer,
194
    RequestPaymentConfiguration,
195
    RestoreObjectOutput,
196
    RestoreRequest,
197
    S3Api,
198
    ServerSideEncryption,
199
    ServerSideEncryptionConfiguration,
200
    SkipValidation,
201
    SSECustomerAlgorithm,
202
    SSECustomerKey,
203
    SSECustomerKeyMD5,
204
    StartAfter,
205
    StorageClass,
206
    Tagging,
207
    Token,
208
    TransitionDefaultMinimumObjectSize,
209
    UploadIdMarker,
210
    UploadPartCopyOutput,
211
    UploadPartCopyRequest,
212
    UploadPartOutput,
213
    UploadPartRequest,
214
    VersionIdMarker,
215
    VersioningConfiguration,
216
    WebsiteConfiguration,
217
)
218
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
219
from localstack.aws.handlers import (
1✔
220
    modify_service_response,
221
    preprocess_request,
222
    serve_custom_service_request_handlers,
223
)
224
from localstack.constants import AWS_REGION_US_EAST_1
1✔
225
from localstack.services.edge import ROUTER
1✔
226
from localstack.services.plugins import ServiceLifecycleHook
1✔
227
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
228
from localstack.services.s3.constants import (
1✔
229
    ALLOWED_HEADER_OVERRIDES,
230
    ARCHIVES_STORAGE_CLASSES,
231
    CHECKSUM_ALGORITHMS,
232
    DEFAULT_BUCKET_ENCRYPTION,
233
)
234
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
235
from localstack.services.s3.exceptions import (
1✔
236
    InvalidBucketOwnerAWSAccountID,
237
    InvalidBucketState,
238
    InvalidRequest,
239
    MalformedPolicy,
240
    MalformedXML,
241
    NoSuchConfiguration,
242
    NoSuchObjectLockConfiguration,
243
    UnexpectedContent,
244
)
245
from localstack.services.s3.models import (
1✔
246
    BucketCorsIndex,
247
    EncryptionParameters,
248
    ObjectLockParameters,
249
    S3Bucket,
250
    S3DeleteMarker,
251
    S3Multipart,
252
    S3Object,
253
    S3Part,
254
    S3Store,
255
    VersionedKeyStore,
256
    s3_stores,
257
)
258
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
259
from localstack.services.s3.presigned_url import validate_post_policy
1✔
260
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
261
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
262
from localstack.services.s3.utils import (
1✔
263
    ObjectRange,
264
    add_expiration_days_to_datetime,
265
    base_64_content_md5_to_etag,
266
    create_redirect_for_post_request,
267
    create_s3_kms_managed_key_for_region,
268
    etag_to_base_64_content_md5,
269
    extract_bucket_key_version_id_from_copy_source,
270
    generate_safe_version_id,
271
    get_canned_acl,
272
    get_class_attrs_from_spec_class,
273
    get_failed_precondition_copy_source,
274
    get_full_default_bucket_location,
275
    get_kms_key_arn,
276
    get_lifecycle_rule_from_object,
277
    get_owner_for_account_id,
278
    get_permission_from_header,
279
    get_retention_from_now,
280
    get_s3_checksum_algorithm_from_request,
281
    get_s3_checksum_algorithm_from_trailing_headers,
282
    get_system_metadata_from_request,
283
    get_unique_key_id,
284
    is_bucket_name_valid,
285
    is_version_older_than_other,
286
    parse_copy_source_range_header,
287
    parse_post_object_tagging_xml,
288
    parse_range_header,
289
    parse_tagging_header,
290
    s3_response_handler,
291
    serialize_expiration_header,
292
    str_to_rfc_1123_datetime,
293
    validate_dict_fields,
294
    validate_failed_precondition,
295
    validate_kms_key_id,
296
    validate_tag_set,
297
)
298
from localstack.services.s3.validation import (
1✔
299
    parse_grants_in_headers,
300
    validate_acl_acp,
301
    validate_bucket_analytics_configuration,
302
    validate_bucket_intelligent_tiering_configuration,
303
    validate_canned_acl,
304
    validate_checksum_value,
305
    validate_cors_configuration,
306
    validate_inventory_configuration,
307
    validate_lifecycle_configuration,
308
    validate_object_key,
309
    validate_sse_c,
310
    validate_website_configuration,
311
)
312
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
313
from localstack.state import AssetDirectory, StateVisitor
1✔
314
from localstack.utils.aws.arns import s3_bucket_name
1✔
315
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
316

317
LOG = logging.getLogger(__name__)
1✔
318

319
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
320
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
321
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
322

323
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
324

325

326
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
327
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
328
        super().__init__()
1✔
329
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
330
        self._notification_dispatcher = NotificationDispatcher()
1✔
331
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
332

333
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
334
        # in case the rules have changed
335
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
336

337
    def on_after_init(self):
1✔
338
        preprocess_request.append(self._cors_handler)
1✔
339
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
340
        modify_service_response.append(self.service, s3_response_handler)
1✔
341
        register_website_hosting_routes(router=ROUTER)
1✔
342

343
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
344
        visitor.visit(s3_stores)
×
345
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
346

347
    def on_before_state_save(self):
1✔
348
        self._storage_backend.flush()
×
349

350
    def on_after_state_reset(self):
1✔
351
        self._cors_handler.invalidate_cache()
×
352

353
    def on_after_state_load(self):
1✔
354
        self._cors_handler.invalidate_cache()
×
355

356
    def on_before_stop(self):
1✔
357
        self._notification_dispatcher.shutdown()
1✔
358
        self._storage_backend.close()
1✔
359

360
    def _notify(
1✔
361
        self,
362
        context: RequestContext,
363
        s3_bucket: S3Bucket,
364
        s3_object: S3Object | S3DeleteMarker = None,
365
        s3_notif_ctx: S3EventNotificationContext = None,
366
    ):
367
        """
368
        :param context: the RequestContext, to retrieve more information about the incoming notification
369
        :param s3_bucket: the S3Bucket object
370
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
371
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
372
        :return:
373
        """
374
        if s3_bucket.notification_configuration:
1✔
375
            if not s3_notif_ctx:
1✔
376
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
377
                    context,
378
                    s3_bucket=s3_bucket,
379
                    s3_object=s3_object,
380
                )
381

382
            self._notification_dispatcher.send_notifications(
1✔
383
                s3_notif_ctx, s3_bucket.notification_configuration
384
            )
385

386
    def _verify_notification_configuration(
1✔
387
        self,
388
        notification_configuration: NotificationConfiguration,
389
        skip_destination_validation: SkipValidation,
390
        context: RequestContext,
391
        bucket_name: str,
392
    ):
393
        self._notification_dispatcher.verify_configuration(
1✔
394
            notification_configuration, skip_destination_validation, context, bucket_name
395
        )
396

397
    def _get_expiration_header(
1✔
398
        self,
399
        lifecycle_rules: LifecycleRules,
400
        bucket: BucketName,
401
        s3_object: S3Object,
402
        object_tags: dict[str, str],
403
    ) -> Expiration:
404
        """
405
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
406
        the case. We're caching it because it can change depending on the set rules on the bucket.
407
        We can't use `lru_cache` as the parameters needs to be hashable
408
        :param lifecycle_rules: the bucket LifecycleRules
409
        :param s3_object: S3Object
410
        :param object_tags: the object tags
411
        :return: the Expiration header if there's a rule matching
412
        """
413
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
414
            return cached_exp
1✔
415

416
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
417
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
418
        ):
419
            expiration_header = serialize_expiration_header(
1✔
420
                lifecycle_rule["ID"],
421
                lifecycle_rule["Expiration"],
422
                s3_object.last_modified,
423
            )
424
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
425
            return expiration_header
1✔
426

427
    def _get_cross_account_bucket(
1✔
428
        self,
429
        context: RequestContext,
430
        bucket_name: BucketName,
431
        *,
432
        expected_bucket_owner: AccountId = None,
433
    ) -> tuple[S3Store, S3Bucket]:
434
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
435
            raise InvalidBucketOwnerAWSAccountID(
1✔
436
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
437
            )
438

439
        store = self.get_store(context.account_id, context.region)
1✔
440
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
441
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
442
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
443

444
            store = self.get_store(account_id, context.region)
1✔
445
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
446
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
447

448
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
449
            raise AccessDenied("Access Denied")
1✔
450

451
        return store, s3_bucket
1✔
452

453
    @staticmethod
1✔
454
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
455
        # Use default account id for external access? would need an anonymous one
456
        return s3_stores[account_id][region_name]
1✔
457

458
    @handler("CreateBucket", expand=False)
1✔
459
    def create_bucket(
1✔
460
        self,
461
        context: RequestContext,
462
        request: CreateBucketRequest,
463
    ) -> CreateBucketOutput:
464
        bucket_name = request["Bucket"]
1✔
465

466
        if not is_bucket_name_valid(bucket_name):
1✔
467
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
468

469
        # the XML parser returns an empty dict if the body contains the following:
470
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
471
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
472
        # if the body is empty or not
473
        if context.request.data and (
1✔
474
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
475
        ):
476
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
1✔
477
                raise MalformedXML()
1✔
478

479
            if context.region == AWS_REGION_US_EAST_1:
1✔
480
                if bucket_region == "us-east-1":
1✔
481
                    raise InvalidLocationConstraint(
1✔
482
                        "The specified location-constraint is not valid",
483
                        LocationConstraint=bucket_region,
484
                    )
485
            elif context.region != bucket_region:
1✔
486
                raise CommonServiceException(
1✔
487
                    code="IllegalLocationConstraintException",
488
                    message=f"The {bucket_region} location constraint is incompatible for the region specific endpoint this request was sent to.",
489
                )
490
        else:
491
            bucket_region = AWS_REGION_US_EAST_1
1✔
492
            if context.region != bucket_region:
1✔
493
                raise CommonServiceException(
1✔
494
                    code="IllegalLocationConstraintException",
495
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
496
                )
497

498
        store = self.get_store(context.account_id, bucket_region)
1✔
499

500
        if bucket_name in store.global_bucket_map:
1✔
501
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
502
            if existing_bucket_owner != context.account_id:
1✔
503
                raise BucketAlreadyExists()
1✔
504

505
            # if the existing bucket has the same owner, the behaviour will depend on the region
506
            if bucket_region != "us-east-1":
1✔
507
                raise BucketAlreadyOwnedByYou(
1✔
508
                    "Your previous request to create the named bucket succeeded and you already own it.",
509
                    BucketName=bucket_name,
510
                )
511
            else:
512
                # CreateBucket is idempotent in us-east-1
513
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
514

515
        if (
1✔
516
            object_ownership := request.get("ObjectOwnership")
517
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
518
            raise InvalidArgument(
1✔
519
                f"Invalid x-amz-object-ownership header: {object_ownership}",
520
                ArgumentName="x-amz-object-ownership",
521
            )
522
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
523
        owner = get_owner_for_account_id(context.account_id)
1✔
524
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
525
        s3_bucket = S3Bucket(
1✔
526
            name=bucket_name,
527
            account_id=context.account_id,
528
            bucket_region=bucket_region,
529
            owner=owner,
530
            acl=acl,
531
            object_ownership=request.get("ObjectOwnership"),
532
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
533
        )
534

535
        store.buckets[bucket_name] = s3_bucket
1✔
536
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
537
        self._cors_handler.invalidate_cache()
1✔
538
        self._storage_backend.create_bucket(bucket_name)
1✔
539

540
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
541
        location = (
1✔
542
            f"/{bucket_name}"
543
            if bucket_region == "us-east-1"
544
            else get_full_default_bucket_location(bucket_name)
545
        )
546
        response = CreateBucketOutput(Location=location)
1✔
547
        return response
1✔
548

549
    def delete_bucket(
1✔
550
        self,
551
        context: RequestContext,
552
        bucket: BucketName,
553
        expected_bucket_owner: AccountId = None,
554
        **kwargs,
555
    ) -> None:
556
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
557

558
        # the bucket still contains objects
559
        if not s3_bucket.objects.is_empty():
1✔
560
            message = "The bucket you tried to delete is not empty"
1✔
561
            if s3_bucket.versioning_status:
1✔
562
                message += ". You must delete all versions in the bucket."
1✔
563
            raise BucketNotEmpty(
1✔
564
                message,
565
                BucketName=bucket,
566
            )
567

568
        store.buckets.pop(bucket)
1✔
569
        store.global_bucket_map.pop(bucket)
1✔
570
        self._cors_handler.invalidate_cache()
1✔
571
        self._expiration_cache.pop(bucket, None)
1✔
572
        # clean up the storage backend
573
        self._storage_backend.delete_bucket(bucket)
1✔
574

575
    def list_buckets(
1✔
576
        self,
577
        context: RequestContext,
578
        max_buckets: MaxBuckets = None,
579
        continuation_token: Token = None,
580
        prefix: Prefix = None,
581
        bucket_region: BucketRegion = None,
582
        **kwargs,
583
    ) -> ListBucketsOutput:
584
        # TODO add support for max_buckets, continuation_token, prefix, and bucket_region
585
        owner = get_owner_for_account_id(context.account_id)
1✔
586
        store = self.get_store(context.account_id, context.region)
1✔
587
        buckets = [
1✔
588
            Bucket(Name=bucket.name, CreationDate=bucket.creation_date)
589
            for bucket in store.buckets.values()
590
        ]
591
        return ListBucketsOutput(Owner=owner, Buckets=buckets)
1✔
592

593
    def head_bucket(
1✔
594
        self,
595
        context: RequestContext,
596
        bucket: BucketName,
597
        expected_bucket_owner: AccountId = None,
598
        **kwargs,
599
    ) -> HeadBucketOutput:
600
        store = self.get_store(context.account_id, context.region)
1✔
601
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
602
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
603
                # just to return the 404 error message
604
                raise NoSuchBucket()
1✔
605

606
            store = self.get_store(account_id, context.region)
×
607
            if not (s3_bucket := store.buckets.get(bucket)):
×
608
                # just to return the 404 error message
609
                raise NoSuchBucket()
×
610

611
        # TODO: this call is also used to check if the user has access/authorization for the bucket
612
        #  it can return 403
613
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
614

615
    def get_bucket_location(
1✔
616
        self,
617
        context: RequestContext,
618
        bucket: BucketName,
619
        expected_bucket_owner: AccountId = None,
620
        **kwargs,
621
    ) -> GetBucketLocationOutput:
622
        """
623
        When implementing the ASF provider, this operation is implemented because:
624
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
625
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
626
        - We circumvent the root level element here by patching the spec such that this operation returns a
627
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
628
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
629
          the payload, which is why we need to manually do this here by manipulating the string.
630
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
631
        """
632
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
633

634
        location_constraint = (
1✔
635
            '<?xml version="1.0" encoding="UTF-8"?>\n'
636
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
637
        )
638

639
        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
1✔
640
        location_constraint = location_constraint.replace("{{location}}", location)
1✔
641

642
        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
1✔
643
        return response
1✔
644

645
    @handler("PutObject", expand=False)
1✔
646
    def put_object(
1✔
647
        self,
648
        context: RequestContext,
649
        request: PutObjectRequest,
650
    ) -> PutObjectOutput:
651
        # TODO: validate order of validation
652
        # TODO: still need to handle following parameters
653
        #  request_payer: RequestPayer = None,
654
        bucket_name = request["Bucket"]
1✔
655
        key = request["Key"]
1✔
656
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
657

658
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
659
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
660
        ):
661
            raise InvalidStorageClass(
1✔
662
                "The storage class you specified is not valid", StorageClassRequested=storage_class
663
            )
664

665
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
666
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
667

668
        validate_object_key(key)
1✔
669

670
        if_match = request.get("IfMatch")
1✔
671
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
672
            raise NotImplementedException(
673
                "A header you provided implies functionality that is not implemented",
674
                Header="If-Match,If-None-Match",
675
                additionalMessage="Multiple conditional request headers present in the request",
676
            )
677

678
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
679
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
680
            raise NotImplementedException(
681
                "A header you provided implies functionality that is not implemented",
682
                Header=header_name,
683
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
684
            )
685

686
        system_metadata = get_system_metadata_from_request(request)
1✔
687
        if not system_metadata.get("ContentType"):
1✔
688
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
689

690
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
691

692
        etag_content_md5 = ""
1✔
693
        if content_md5 := request.get("ContentMD5"):
1✔
694
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
695
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
696
            if not etag_content_md5:
1✔
697
                raise InvalidDigest(
1✔
698
                    "The Content-MD5 you specified was invalid.",
699
                    Content_MD5=content_md5,
700
                )
701

702
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
703
        checksum_value = (
1✔
704
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
705
        )
706

707
        # TODO: we're not encrypting the object with the provided key for now
708
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
709
        validate_sse_c(
1✔
710
            algorithm=request.get("SSECustomerAlgorithm"),
711
            encryption_key=request.get("SSECustomerKey"),
712
            encryption_key_md5=sse_c_key_md5,
713
            server_side_encryption=request.get("ServerSideEncryption"),
714
        )
715

716
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
717
            request,
718
            s3_bucket,
719
            store,
720
        )
721

722
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
723

724
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
725

726
        if tagging := request.get("Tagging"):
1✔
727
            tagging = parse_tagging_header(tagging)
1✔
728

729
        s3_object = S3Object(
1✔
730
            key=key,
731
            version_id=version_id,
732
            storage_class=storage_class,
733
            expires=request.get("Expires"),
734
            user_metadata=request.get("Metadata"),
735
            system_metadata=system_metadata,
736
            checksum_algorithm=checksum_algorithm,
737
            checksum_value=checksum_value,
738
            encryption=encryption_parameters.encryption,
739
            kms_key_id=encryption_parameters.kms_key_id,
740
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
741
            sse_key_hash=sse_c_key_md5,
742
            lock_mode=lock_parameters.lock_mode,
743
            lock_legal_status=lock_parameters.lock_legal_status,
744
            lock_until=lock_parameters.lock_until,
745
            website_redirect_location=request.get("WebsiteRedirectLocation"),
746
            acl=acl,
747
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
748
        )
749

750
        body = request.get("Body")
1✔
751
        # check if chunked request
752
        headers = context.request.headers
1✔
753
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
754
            "STREAMING-"
755
        ) or "aws-chunked" in headers.get("content-encoding", "")
756
        if is_aws_chunked:
1✔
757
            checksum_algorithm = (
1✔
758
                checksum_algorithm
759
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
760
            )
761
            if checksum_algorithm:
1✔
762
                s3_object.checksum_algorithm = checksum_algorithm
1✔
763

764
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
765
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
766

767
            # S3 removes the `aws-chunked` value from ContentEncoding
768
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
769
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
770
                if encodings:
1✔
771
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
772

773
        with self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object:
1✔
774
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
775
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
776
            # opening the lock and other requests can check this condition
777
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
778
                raise PreconditionFailed(
1✔
779
                    "At least one of the pre-conditions you specified did not hold",
780
                    Condition="If-None-Match",
781
                )
782

783
            elif if_match:
1✔
784
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
785

786
            s3_stored_object.write(body)
1✔
787

788
            if s3_object.checksum_algorithm:
1✔
789
                if not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
790
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
791
                    raise InvalidRequest(
1✔
792
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
793
                    )
794
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
795
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
796
                    raise BadDigest(
1✔
797
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
798
                    )
799

800
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
801
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
802
            if content_md5:
1✔
803
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
804
                if calculated_md5 != content_md5:
1✔
805
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
806
                    raise BadDigest(
1✔
807
                        "The Content-MD5 you specified did not match what we received.",
808
                        ExpectedDigest=etag_content_md5,
809
                        CalculatedDigest=calculated_md5,
810
                    )
811

812
            s3_bucket.objects.set(key, s3_object)
1✔
813

814
        # in case we are overriding an object, delete the tags entry
815
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
816
        store.TAGS.tags.pop(key_id, None)
1✔
817
        if tagging:
1✔
818
            store.TAGS.tags[key_id] = tagging
1✔
819

820
        # RequestCharged: Optional[RequestCharged]  # TODO
821
        response = PutObjectOutput(
1✔
822
            ETag=s3_object.quoted_etag,
823
        )
824
        if s3_bucket.versioning_status == "Enabled":
1✔
825
            response["VersionId"] = s3_object.version_id
1✔
826

827
        if s3_object.checksum_algorithm:
1✔
828
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
829
            response["ChecksumType"] = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
830

831
        if s3_bucket.lifecycle_rules:
1✔
832
            if expiration_header := self._get_expiration_header(
1✔
833
                s3_bucket.lifecycle_rules,
834
                bucket_name,
835
                s3_object,
836
                store.TAGS.tags.get(key_id, {}),
837
            ):
838
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
839
                #  apply them everytime we get/head an object
840
                response["Expiration"] = expiration_header
1✔
841

842
        add_encryption_to_response(response, s3_object=s3_object)
1✔
843
        if sse_c_key_md5:
1✔
844
            response["SSECustomerAlgorithm"] = "AES256"
1✔
845
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
846

847
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
848

849
        return response
1✔
850

851
    @handler("GetObject", expand=False)
1✔
852
    def get_object(
1✔
853
        self,
854
        context: RequestContext,
855
        request: GetObjectRequest,
856
    ) -> GetObjectOutput:
857
        # TODO: missing handling parameters:
858
        #  request_payer: RequestPayer = None,
859
        #  expected_bucket_owner: AccountId = None,
860

861
        bucket_name = request["Bucket"]
1✔
862
        object_key = request["Key"]
1✔
863
        version_id = request.get("VersionId")
1✔
864
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
865

866
        s3_object = s3_bucket.get_object(
1✔
867
            key=object_key,
868
            version_id=version_id,
869
            http_method="GET",
870
        )
871
        if s3_object.expires and s3_object.expires < datetime.datetime.now(
1✔
872
            tz=s3_object.expires.tzinfo
873
        ):
874
            # TODO: old behaviour was deleting key instantly if expired. AWS cleans up only once a day generally
875
            #  you can still HeadObject on it and you get the expiry time until scheduled deletion
876
            kwargs = {"Key": object_key}
1✔
877
            if version_id:
1✔
878
                kwargs["VersionId"] = version_id
×
879
            raise NoSuchKey("The specified key does not exist.", **kwargs)
1✔
880

881
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
882
            raise InvalidObjectState(
1✔
883
                "The operation is not valid for the object's storage class",
884
                StorageClass=s3_object.storage_class,
885
            )
886

887
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
888
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
889

890
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
891
        # we're using getattr access because when restoring, the field might not exist
892
        # TODO: cleanup at next major release
893
        if sse_key_hash := getattr(s3_object, "sse_key_hash", None):
1✔
894
            if sse_key_hash and not sse_c_key_md5:
1✔
895
                raise InvalidRequest(
1✔
896
                    "The object was stored using a form of Server Side Encryption. "
897
                    "The correct parameters must be provided to retrieve the object."
898
                )
899
            elif sse_key_hash != sse_c_key_md5:
1✔
900
                raise AccessDenied(
1✔
901
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
902
                )
903

904
        validate_sse_c(
1✔
905
            algorithm=request.get("SSECustomerAlgorithm"),
906
            encryption_key=request.get("SSECustomerKey"),
907
            encryption_key_md5=sse_c_key_md5,
908
        )
909

910
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
911

912
        range_header = request.get("Range")
1✔
913
        part_number = request.get("PartNumber")
1✔
914
        if range_header and part_number:
1✔
915
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
916
        range_data = None
1✔
917
        if range_header:
1✔
918
            range_data = parse_range_header(range_header, s3_object.size)
1✔
919
        elif part_number:
1✔
920
            range_data = get_part_range(s3_object, part_number)
1✔
921

922
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
923
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
924
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
925
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
926
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
927

928
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
929
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
930
        # the object again
931
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
932
            s3_object = s3_bucket.get_object(
1✔
933
                key=object_key,
934
                version_id=version_id,
935
                http_method="GET",
936
            )
937

938
        response = GetObjectOutput(
1✔
939
            AcceptRanges="bytes",
940
            **s3_object.get_system_metadata_fields(),
941
        )
942
        if s3_object.user_metadata:
1✔
943
            response["Metadata"] = s3_object.user_metadata
1✔
944

945
        if s3_object.parts and request.get("PartNumber"):
1✔
946
            response["PartsCount"] = len(s3_object.parts)
1✔
947

948
        if s3_object.version_id:
1✔
949
            response["VersionId"] = s3_object.version_id
1✔
950

951
        if s3_object.website_redirect_location:
1✔
952
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
953

954
        if s3_object.restore:
1✔
955
            response["Restore"] = s3_object.restore
×
956

957
        checksum_value = None
1✔
958
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
959
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
960
                checksum_value = s3_object.checksum_value
1✔
961

962
        if range_data:
1✔
963
            s3_stored_object.seek(range_data.begin)
1✔
964
            response["Body"] = LimitedIterableStream(
1✔
965
                s3_stored_object, max_length=range_data.content_length
966
            )
967
            response["ContentRange"] = range_data.content_range
1✔
968
            response["ContentLength"] = range_data.content_length
1✔
969
            response["StatusCode"] = 206
1✔
970
            if range_data.content_length == s3_object.size and checksum_value:
1✔
971
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
972
                response["ChecksumType"] = getattr(
1✔
973
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
974
                )
975
        else:
976
            response["Body"] = s3_stored_object
1✔
977
            if checksum_value:
1✔
978
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
979
                response["ChecksumType"] = getattr(
1✔
980
                    s3_object, "checksum_type", ChecksumType.FULL_OBJECT
981
                )
982

983
        add_encryption_to_response(response, s3_object=s3_object)
1✔
984

985
        if object_tags := store.TAGS.tags.get(
1✔
986
            get_unique_key_id(bucket_name, object_key, version_id)
987
        ):
988
            response["TagCount"] = len(object_tags)
1✔
989

990
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
991
            if expiration_header := self._get_expiration_header(
1✔
992
                s3_bucket.lifecycle_rules,
993
                bucket_name,
994
                s3_object,
995
                object_tags,
996
            ):
997
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
998
                #  apply them everytime we get/head an object
999
                response["Expiration"] = expiration_header
1✔
1000

1001
        # TODO: missing returned fields
1002
        #     RequestCharged: Optional[RequestCharged]
1003
        #     ReplicationStatus: Optional[ReplicationStatus]
1004

1005
        if s3_object.lock_mode:
1✔
1006
            response["ObjectLockMode"] = s3_object.lock_mode
×
1007
            if s3_object.lock_until:
×
1008
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1009
        if s3_object.lock_legal_status:
1✔
1010
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1011

1012
        if sse_c_key_md5:
1✔
1013
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1014
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1015

1016
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1017
            if request_param_value := request.get(request_param):
1✔
1018
                response[response_param] = request_param_value
1✔
1019

1020
        return response
1✔
1021

1022
    @handler("HeadObject", expand=False)
1✔
1023
    def head_object(
1✔
1024
        self,
1025
        context: RequestContext,
1026
        request: HeadObjectRequest,
1027
    ) -> HeadObjectOutput:
1028
        bucket_name = request["Bucket"]
1✔
1029
        object_key = request["Key"]
1✔
1030
        version_id = request.get("VersionId")
1✔
1031
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1032

1033
        s3_object = s3_bucket.get_object(
1✔
1034
            key=object_key,
1035
            version_id=version_id,
1036
            http_method="HEAD",
1037
        )
1038

1039
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1040

1041
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1042
        if s3_object.sse_key_hash:
1✔
1043
            if not sse_c_key_md5:
1✔
1044
                raise InvalidRequest(
×
1045
                    "The object was stored using a form of Server Side Encryption. "
1046
                    "The correct parameters must be provided to retrieve the object."
1047
                )
1048
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1049
                raise AccessDenied(
1✔
1050
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1051
                )
1052

1053
        validate_sse_c(
1✔
1054
            algorithm=request.get("SSECustomerAlgorithm"),
1055
            encryption_key=request.get("SSECustomerKey"),
1056
            encryption_key_md5=sse_c_key_md5,
1057
        )
1058

1059
        response = HeadObjectOutput(
1✔
1060
            AcceptRanges="bytes",
1061
            **s3_object.get_system_metadata_fields(),
1062
        )
1063
        if s3_object.user_metadata:
1✔
1064
            response["Metadata"] = s3_object.user_metadata
1✔
1065

1066
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1067
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1068
                response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
1069

1070
        if s3_object.parts and request.get("PartNumber"):
1✔
1071
            response["PartsCount"] = len(s3_object.parts)
1✔
1072

1073
        if s3_object.version_id:
1✔
1074
            response["VersionId"] = s3_object.version_id
1✔
1075

1076
        if s3_object.website_redirect_location:
1✔
1077
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1078

1079
        if s3_object.restore:
1✔
1080
            response["Restore"] = s3_object.restore
1✔
1081

1082
        range_header = request.get("Range")
1✔
1083
        part_number = request.get("PartNumber")
1✔
1084
        if range_header and part_number:
1✔
1085
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1086
        range_data = None
1✔
1087
        if range_header:
1✔
1088
            range_data = parse_range_header(range_header, s3_object.size)
×
1089
        elif part_number:
1✔
1090
            range_data = get_part_range(s3_object, part_number)
1✔
1091

1092
        if range_data:
1✔
1093
            response["ContentLength"] = range_data.content_length
1✔
1094
            response["StatusCode"] = 206
1✔
1095

1096
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1097

1098
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1099
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1100
            object_tags = store.TAGS.tags.get(
1✔
1101
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1102
            )
1103
            if expiration_header := self._get_expiration_header(
1✔
1104
                s3_bucket.lifecycle_rules,
1105
                bucket_name,
1106
                s3_object,
1107
                object_tags,
1108
            ):
1109
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1110
                #  apply them everytime we get/head an object
1111
                response["Expiration"] = expiration_header
1✔
1112

1113
        if s3_object.lock_mode:
1✔
1114
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1115
            if s3_object.lock_until:
1✔
1116
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1117
        if s3_object.lock_legal_status:
1✔
1118
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1119

1120
        if sse_c_key_md5:
1✔
1121
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1122
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1123

1124
        # TODO: missing return fields:
1125
        #  ArchiveStatus: Optional[ArchiveStatus]
1126
        #  RequestCharged: Optional[RequestCharged]
1127
        #  ReplicationStatus: Optional[ReplicationStatus]
1128

1129
        return response
1✔
1130

1131
    def delete_object(
1✔
1132
        self,
1133
        context: RequestContext,
1134
        bucket: BucketName,
1135
        key: ObjectKey,
1136
        mfa: MFA = None,
1137
        version_id: ObjectVersionId = None,
1138
        request_payer: RequestPayer = None,
1139
        bypass_governance_retention: BypassGovernanceRetention = None,
1140
        expected_bucket_owner: AccountId = None,
1141
        if_match: IfMatch = None,
1142
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1143
        if_match_size: IfMatchSize = None,
1144
        **kwargs,
1145
    ) -> DeleteObjectOutput:
1146
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1147

1148
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1149
            raise InvalidArgument(
1✔
1150
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1151
                ArgumentName="x-amz-bypass-governance-retention",
1152
            )
1153

1154
        if s3_bucket.versioning_status is None:
1✔
1155
            if version_id and version_id != "null":
1✔
1156
                raise InvalidArgument(
1✔
1157
                    "Invalid version id specified",
1158
                    ArgumentName="versionId",
1159
                    ArgumentValue=version_id,
1160
                )
1161

1162
            found_object = s3_bucket.objects.pop(key, None)
1✔
1163
            # TODO: RequestCharged
1164
            if found_object:
1✔
1165
                self._storage_backend.remove(bucket, found_object)
1✔
1166
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1167
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1168

1169
            return DeleteObjectOutput()
1✔
1170

1171
        if not version_id:
1✔
1172
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1173
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1174
            s3_bucket.objects.set(key, delete_marker)
1✔
1175
            s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1176
                context,
1177
                s3_bucket=s3_bucket,
1178
                s3_object=delete_marker,
1179
            )
1180
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1181
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1182

1183
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1184

1185
        if key not in s3_bucket.objects:
1✔
1186
            return DeleteObjectOutput()
×
1187

1188
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1189
            raise InvalidArgument(
1✔
1190
                "Invalid version id specified",
1191
                ArgumentName="versionId",
1192
                ArgumentValue=version_id,
1193
            )
1194

1195
        if s3_object.is_locked(bypass_governance_retention):
1✔
1196
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1197

1198
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1199
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1200

1201
        if isinstance(s3_object, S3DeleteMarker):
1✔
1202
            response["DeleteMarker"] = True
1✔
1203
        else:
1204
            self._storage_backend.remove(bucket, s3_object)
1✔
1205
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1206
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1207

1208
        return response
1✔
1209

1210
    def delete_objects(
1✔
1211
        self,
1212
        context: RequestContext,
1213
        bucket: BucketName,
1214
        delete: Delete,
1215
        mfa: MFA = None,
1216
        request_payer: RequestPayer = None,
1217
        bypass_governance_retention: BypassGovernanceRetention = None,
1218
        expected_bucket_owner: AccountId = None,
1219
        checksum_algorithm: ChecksumAlgorithm = None,
1220
        **kwargs,
1221
    ) -> DeleteObjectsOutput:
1222
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1223

1224
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1225
            raise InvalidArgument(
1✔
1226
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1227
                ArgumentName="x-amz-bypass-governance-retention",
1228
            )
1229

1230
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1231
        if not objects:
1✔
1232
            raise MalformedXML()
×
1233

1234
        # TODO: max 1000 delete at once? test against AWS?
1235

1236
        quiet = delete.get("Quiet", False)
1✔
1237
        deleted = []
1✔
1238
        errors = []
1✔
1239

1240
        to_remove = []
1✔
1241
        for to_delete_object in objects:
1✔
1242
            object_key = to_delete_object.get("Key")
1✔
1243
            version_id = to_delete_object.get("VersionId")
1✔
1244
            if s3_bucket.versioning_status is None:
1✔
1245
                if version_id and version_id != "null":
1✔
1246
                    errors.append(
1✔
1247
                        Error(
1248
                            Code="NoSuchVersion",
1249
                            Key=object_key,
1250
                            Message="The specified version does not exist.",
1251
                            VersionId=version_id,
1252
                        )
1253
                    )
1254
                    continue
1✔
1255

1256
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1257
                if found_object:
1✔
1258
                    to_remove.append(found_object)
1✔
1259
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1260
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1261
                # small hack to not create a fake object for nothing
1262
                elif s3_bucket.notification_configuration:
1✔
1263
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1264
                    # for a non-existing object being deleted
1265
                    self._notify(
1✔
1266
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1267
                    )
1268

1269
                if not quiet:
1✔
1270
                    deleted.append(DeletedObject(Key=object_key))
1✔
1271

1272
                continue
1✔
1273

1274
            if not version_id:
1✔
1275
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1276
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1277
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1278
                s3_notif_ctx = S3EventNotificationContext.from_request_context_native(
1✔
1279
                    context,
1280
                    s3_bucket=s3_bucket,
1281
                    s3_object=delete_marker,
1282
                )
1283
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1284
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1285

1286
                if not quiet:
1✔
1287
                    deleted.append(
1✔
1288
                        DeletedObject(
1289
                            DeleteMarker=True,
1290
                            DeleteMarkerVersionId=delete_marker_id,
1291
                            Key=object_key,
1292
                        )
1293
                    )
1294
                continue
1✔
1295

1296
            if not (
1✔
1297
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1298
            ):
1299
                errors.append(
1✔
1300
                    Error(
1301
                        Code="NoSuchVersion",
1302
                        Key=object_key,
1303
                        Message="The specified version does not exist.",
1304
                        VersionId=version_id,
1305
                    )
1306
                )
1307
                continue
1✔
1308

1309
            if found_object.is_locked(bypass_governance_retention):
1✔
1310
                errors.append(
1✔
1311
                    Error(
1312
                        Code="AccessDenied",
1313
                        Key=object_key,
1314
                        Message="Access Denied because object protected by object lock.",
1315
                        VersionId=version_id,
1316
                    )
1317
                )
1318
                continue
1✔
1319

1320
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1321
            if not quiet:
1✔
1322
                deleted_object = DeletedObject(
1✔
1323
                    Key=object_key,
1324
                    VersionId=version_id,
1325
                )
1326
                if isinstance(found_object, S3DeleteMarker):
1✔
1327
                    deleted_object["DeleteMarker"] = True
1✔
1328
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1329

1330
                deleted.append(deleted_object)
1✔
1331

1332
            if isinstance(found_object, S3Object):
1✔
1333
                to_remove.append(found_object)
1✔
1334

1335
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1336
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1337

1338
        # TODO: request charged
1339
        self._storage_backend.remove(bucket, to_remove)
1✔
1340
        response: DeleteObjectsOutput = {}
1✔
1341
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1342
        if errors:
1✔
1343
            response["Errors"] = errors
1✔
1344
        if not quiet:
1✔
1345
            response["Deleted"] = deleted
1✔
1346

1347
        return response
1✔
1348

1349
    @handler("CopyObject", expand=False)
1✔
1350
    def copy_object(
1✔
1351
        self,
1352
        context: RequestContext,
1353
        request: CopyObjectRequest,
1354
    ) -> CopyObjectOutput:
1355
        # request_payer: RequestPayer = None,  # TODO:
1356
        dest_bucket = request["Bucket"]
1✔
1357
        dest_key = request["Key"]
1✔
1358
        validate_object_key(dest_key)
1✔
1359
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1360

1361
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1362
            request.get("CopySource")
1363
        )
1364
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1365

1366
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1367
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1368

1369
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1370
        try:
1✔
1371
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
1372
        except MethodNotAllowed:
×
1373
            raise InvalidRequest(
×
1374
                "The source of a copy request may not specifically refer to a delete marker by version id."
1375
            )
1376

1377
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
1378
            raise InvalidObjectState(
×
1379
                "Operation is not valid for the source object's storage class",
1380
                StorageClass=src_s3_object.storage_class,
1381
            )
1382

1383
        if failed_condition := get_failed_precondition_copy_source(
1✔
1384
            request, src_s3_object.last_modified, src_s3_object.etag
1385
        ):
1386
            raise PreconditionFailed(
1✔
1387
                "At least one of the pre-conditions you specified did not hold",
1388
                Condition=failed_condition,
1389
            )
1390

1391
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1392
        if src_s3_object.sse_key_hash:
1✔
1393
            if not source_sse_c_key_md5:
1✔
1394
                raise InvalidRequest(
1✔
1395
                    "The object was stored using a form of Server Side Encryption. "
1396
                    "The correct parameters must be provided to retrieve the object."
1397
                )
1398
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
1399
                raise AccessDenied("Access Denied")
×
1400

1401
        validate_sse_c(
1✔
1402
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1403
            encryption_key=request.get("CopySourceSSECustomerKey"),
1404
            encryption_key_md5=source_sse_c_key_md5,
1405
        )
1406

1407
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1408
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1409
        # validate target SSE-C parameters
1410
        validate_sse_c(
1✔
1411
            algorithm=request.get("SSECustomerAlgorithm"),
1412
            encryption_key=request.get("SSECustomerKey"),
1413
            encryption_key_md5=target_sse_c_key_md5,
1414
            server_side_encryption=server_side_encryption,
1415
        )
1416

1417
        # TODO validate order of validation
1418
        storage_class = request.get("StorageClass")
1✔
1419
        metadata_directive = request.get("MetadataDirective")
1✔
1420
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1421
        # we need to check for identity of the object, to see if the default one has been changed
1422
        is_default_encryption = (
1✔
1423
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1424
            and src_s3_object.encryption == "AES256"
1425
        )
1426
        if (
1✔
1427
            src_bucket == dest_bucket
1428
            and src_key == dest_key
1429
            and not any(
1430
                (
1431
                    storage_class,
1432
                    server_side_encryption,
1433
                    target_sse_c_key_md5,
1434
                    metadata_directive == "REPLACE",
1435
                    website_redirect_location,
1436
                    dest_s3_bucket.encryption_rule
1437
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1438
                    src_s3_object.restore,
1439
                )
1440
            )
1441
        ):
1442
            raise InvalidRequest(
1✔
1443
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1444
                "object's metadata, storage class, website redirect location or encryption attributes."
1445
            )
1446

1447
        if tagging := request.get("Tagging"):
1✔
1448
            tagging = parse_tagging_header(tagging)
1✔
1449

1450
        if metadata_directive == "REPLACE":
1✔
1451
            user_metadata = request.get("Metadata")
1✔
1452
            system_metadata = get_system_metadata_from_request(request)
1✔
1453
            if not system_metadata.get("ContentType"):
1✔
1454
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1455
        else:
1456
            user_metadata = src_s3_object.user_metadata
1✔
1457
            system_metadata = src_s3_object.system_metadata
1✔
1458

1459
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1460

1461
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1462
            request,
1463
            dest_s3_bucket,
1464
            store,
1465
        )
1466
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1467
            request, dest_s3_bucket
1468
        )
1469

1470
        acl = get_access_control_policy_for_new_resource_request(
1✔
1471
            request, owner=dest_s3_bucket.owner
1472
        )
1473

1474
        s3_object = S3Object(
1✔
1475
            key=dest_key,
1476
            size=src_s3_object.size,
1477
            version_id=dest_version_id,
1478
            storage_class=storage_class,
1479
            expires=request.get("Expires"),
1480
            user_metadata=user_metadata,
1481
            system_metadata=system_metadata,
1482
            checksum_algorithm=request.get("ChecksumAlgorithm") or src_s3_object.checksum_algorithm,
1483
            encryption=encryption_parameters.encryption,
1484
            kms_key_id=encryption_parameters.kms_key_id,
1485
            bucket_key_enabled=request.get(
1486
                "BucketKeyEnabled"
1487
            ),  # CopyObject does not inherit from the bucket here
1488
            sse_key_hash=target_sse_c_key_md5,
1489
            lock_mode=lock_parameters.lock_mode,
1490
            lock_legal_status=lock_parameters.lock_legal_status,
1491
            lock_until=lock_parameters.lock_until,
1492
            website_redirect_location=website_redirect_location,
1493
            expiration=None,  # TODO, from lifecycle
1494
            acl=acl,
1495
            owner=dest_s3_bucket.owner,
1496
        )
1497

1498
        with self._storage_backend.copy(
1✔
1499
            src_bucket=src_bucket,
1500
            src_object=src_s3_object,
1501
            dest_bucket=dest_bucket,
1502
            dest_object=s3_object,
1503
        ) as s3_stored_object:
1504
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1505
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1506

1507
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1508

1509
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1510

1511
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1512
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1513
        else:
1514
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1515
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1516
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1517

1518
        copy_object_result = CopyObjectResult(
1✔
1519
            ETag=s3_object.quoted_etag,
1520
            LastModified=s3_object.last_modified,
1521
        )
1522
        if s3_object.checksum_algorithm:
1✔
1523
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1524
                s3_object.checksum_value
1525
            )
1526

1527
        response = CopyObjectOutput(
1✔
1528
            CopyObjectResult=copy_object_result,
1529
        )
1530

1531
        if s3_object.version_id:
1✔
1532
            response["VersionId"] = s3_object.version_id
1✔
1533

1534
        if s3_object.expiration:
1✔
1535
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1536

1537
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1538
        if target_sse_c_key_md5:
1✔
1539
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1540
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1541

1542
        if (
1✔
1543
            src_s3_bucket.versioning_status
1544
            and src_s3_object.version_id
1545
            and src_s3_object.version_id != "null"
1546
        ):
1547
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1548

1549
        # RequestCharged: Optional[RequestCharged] # TODO
1550
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1551

1552
        return response
1✔
1553

1554
    def list_objects(
1✔
1555
        self,
1556
        context: RequestContext,
1557
        bucket: BucketName,
1558
        delimiter: Delimiter = None,
1559
        encoding_type: EncodingType = None,
1560
        marker: Marker = None,
1561
        max_keys: MaxKeys = None,
1562
        prefix: Prefix = None,
1563
        request_payer: RequestPayer = None,
1564
        expected_bucket_owner: AccountId = None,
1565
        optional_object_attributes: OptionalObjectAttributesList = None,
1566
        **kwargs,
1567
    ) -> ListObjectsOutput:
1568
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1569

1570
        common_prefixes = set()
1✔
1571
        count = 0
1✔
1572
        is_truncated = False
1✔
1573
        next_key_marker = None
1✔
1574
        max_keys = max_keys or 1000
1✔
1575
        prefix = prefix or ""
1✔
1576
        delimiter = delimiter or ""
1✔
1577
        if encoding_type:
1✔
1578
            prefix = urlparse.quote(prefix)
1✔
1579
            delimiter = urlparse.quote(delimiter)
1✔
1580

1581
        s3_objects: list[Object] = []
1✔
1582

1583
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1584
        last_key = all_keys[-1] if all_keys else None
1✔
1585

1586
        # sort by key
1587
        for s3_object in all_keys:
1✔
1588
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1589
            # skip all keys that alphabetically come before key_marker
1590
            if marker:
1✔
1591
                if key <= marker:
1✔
1592
                    continue
1✔
1593

1594
            # Filter for keys that start with prefix
1595
            if prefix and not key.startswith(prefix):
1✔
1596
                continue
×
1597

1598
            # see ListObjectsV2 for the logic comments (shared logic here)
1599
            prefix_including_delimiter = None
1✔
1600
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1601
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1602
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1603

1604
                if prefix_including_delimiter in common_prefixes or (
1✔
1605
                    marker and marker.startswith(prefix_including_delimiter)
1606
                ):
1607
                    continue
1✔
1608

1609
            if prefix_including_delimiter:
1✔
1610
                common_prefixes.add(prefix_including_delimiter)
1✔
1611
            else:
1612
                # TODO: add RestoreStatus if present
1613
                object_data = Object(
1✔
1614
                    Key=key,
1615
                    ETag=s3_object.quoted_etag,
1616
                    Owner=s3_bucket.owner,  # TODO: verify reality
1617
                    Size=s3_object.size,
1618
                    LastModified=s3_object.last_modified,
1619
                    StorageClass=s3_object.storage_class,
1620
                )
1621

1622
                if s3_object.checksum_algorithm:
1✔
1623
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1624
                    object_data["ChecksumType"] = getattr(
1✔
1625
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1626
                    )
1627

1628
                s3_objects.append(object_data)
1✔
1629

1630
            # we just added a CommonPrefix or an Object, increase the counter
1631
            count += 1
1✔
1632
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1633
                is_truncated = True
1✔
1634
                if prefix_including_delimiter:
1✔
1635
                    next_key_marker = prefix_including_delimiter
1✔
1636
                elif s3_objects:
1✔
1637
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1638
                break
1✔
1639

1640
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1641

1642
        response = ListObjectsOutput(
1✔
1643
            IsTruncated=is_truncated,
1644
            Name=bucket,
1645
            MaxKeys=max_keys,
1646
            Prefix=prefix or "",
1647
            Marker=marker or "",
1648
        )
1649
        if s3_objects:
1✔
1650
            response["Contents"] = s3_objects
1✔
1651
        if encoding_type:
1✔
1652
            response["EncodingType"] = EncodingType.url
1✔
1653
        if delimiter:
1✔
1654
            response["Delimiter"] = delimiter
1✔
1655
        if common_prefixes:
1✔
1656
            response["CommonPrefixes"] = common_prefixes
1✔
1657
        if delimiter and next_key_marker:
1✔
1658
            response["NextMarker"] = next_key_marker
1✔
1659
        if s3_bucket.bucket_region != "us-east-1":
1✔
1660
            response["BucketRegion"] = s3_bucket.bucket_region
×
1661

1662
        # RequestCharged: Optional[RequestCharged]  # TODO
1663
        return response
1✔
1664

1665
    def list_objects_v2(
1✔
1666
        self,
1667
        context: RequestContext,
1668
        bucket: BucketName,
1669
        delimiter: Delimiter = None,
1670
        encoding_type: EncodingType = None,
1671
        max_keys: MaxKeys = None,
1672
        prefix: Prefix = None,
1673
        continuation_token: Token = None,
1674
        fetch_owner: FetchOwner = None,
1675
        start_after: StartAfter = None,
1676
        request_payer: RequestPayer = None,
1677
        expected_bucket_owner: AccountId = None,
1678
        optional_object_attributes: OptionalObjectAttributesList = None,
1679
        **kwargs,
1680
    ) -> ListObjectsV2Output:
1681
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1682

1683
        if continuation_token == "":
1✔
1684
            raise InvalidArgument(
1✔
1685
                "The continuation token provided is incorrect",
1686
                ArgumentName="continuation-token",
1687
            )
1688

1689
        common_prefixes = set()
1✔
1690
        count = 0
1✔
1691
        is_truncated = False
1✔
1692
        next_continuation_token = None
1✔
1693
        max_keys = max_keys or 1000
1✔
1694
        prefix = prefix or ""
1✔
1695
        delimiter = delimiter or ""
1✔
1696
        if encoding_type:
1✔
1697
            prefix = urlparse.quote(prefix)
1✔
1698
            delimiter = urlparse.quote(delimiter)
1✔
1699
        decoded_continuation_token = (
1✔
1700
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1701
            if continuation_token
1702
            else None
1703
        )
1704

1705
        s3_objects: list[Object] = []
1✔
1706

1707
        # sort by key
1708
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1709
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1710

1711
            # skip all keys that alphabetically come before continuation_token
1712
            if continuation_token:
1✔
1713
                if key < decoded_continuation_token:
1✔
1714
                    continue
1✔
1715

1716
            elif start_after:
1✔
1717
                if key <= start_after:
1✔
1718
                    continue
1✔
1719

1720
            # Filter for keys that start with prefix
1721
            if prefix and not key.startswith(prefix):
1✔
1722
                continue
1✔
1723

1724
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1725
            prefix_including_delimiter = None
1✔
1726
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1727
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1728
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1729

1730
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1731
                # the entry without increasing the counter. We need to iterate over all of these entries before
1732
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1733
                if prefix_including_delimiter in common_prefixes:
1✔
1734
                    continue
1✔
1735

1736
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1737
            if count >= max_keys:
1✔
1738
                is_truncated = True
1✔
1739
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1740
                break
1✔
1741

1742
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1743
            # else, it means it's a new Object, add it to the Contents
1744
            if prefix_including_delimiter:
1✔
1745
                common_prefixes.add(prefix_including_delimiter)
1✔
1746
            else:
1747
                # TODO: add RestoreStatus if present
1748
                object_data = Object(
1✔
1749
                    Key=key,
1750
                    ETag=s3_object.quoted_etag,
1751
                    Size=s3_object.size,
1752
                    LastModified=s3_object.last_modified,
1753
                    StorageClass=s3_object.storage_class,
1754
                )
1755

1756
                if fetch_owner:
1✔
1757
                    object_data["Owner"] = s3_bucket.owner
×
1758

1759
                if s3_object.checksum_algorithm:
1✔
1760
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1761
                    object_data["ChecksumType"] = getattr(
1✔
1762
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1763
                    )
1764

1765
                s3_objects.append(object_data)
1✔
1766

1767
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1768
            count += 1
1✔
1769

1770
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1771

1772
        response = ListObjectsV2Output(
1✔
1773
            IsTruncated=is_truncated,
1774
            Name=bucket,
1775
            MaxKeys=max_keys,
1776
            Prefix=prefix or "",
1777
            KeyCount=count,
1778
        )
1779
        if s3_objects:
1✔
1780
            response["Contents"] = s3_objects
1✔
1781
        if encoding_type:
1✔
1782
            response["EncodingType"] = EncodingType.url
1✔
1783
        if delimiter:
1✔
1784
            response["Delimiter"] = delimiter
1✔
1785
        if common_prefixes:
1✔
1786
            response["CommonPrefixes"] = common_prefixes
1✔
1787
        if next_continuation_token:
1✔
1788
            response["NextContinuationToken"] = next_continuation_token
1✔
1789

1790
        if continuation_token:
1✔
1791
            response["ContinuationToken"] = continuation_token
1✔
1792
        elif start_after:
1✔
1793
            response["StartAfter"] = start_after
1✔
1794

1795
        if s3_bucket.bucket_region != "us-east-1":
1✔
1796
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1797

1798
        # RequestCharged: Optional[RequestCharged]  # TODO
1799
        return response
1✔
1800

1801
    def list_object_versions(
1✔
1802
        self,
1803
        context: RequestContext,
1804
        bucket: BucketName,
1805
        delimiter: Delimiter = None,
1806
        encoding_type: EncodingType = None,
1807
        key_marker: KeyMarker = None,
1808
        max_keys: MaxKeys = None,
1809
        prefix: Prefix = None,
1810
        version_id_marker: VersionIdMarker = None,
1811
        expected_bucket_owner: AccountId = None,
1812
        request_payer: RequestPayer = None,
1813
        optional_object_attributes: OptionalObjectAttributesList = None,
1814
        **kwargs,
1815
    ) -> ListObjectVersionsOutput:
1816
        if version_id_marker and not key_marker:
1✔
1817
            raise InvalidArgument(
1✔
1818
                "A version-id marker cannot be specified without a key marker.",
1819
                ArgumentName="version-id-marker",
1820
                ArgumentValue=version_id_marker,
1821
            )
1822

1823
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1824
        common_prefixes = set()
1✔
1825
        count = 0
1✔
1826
        is_truncated = False
1✔
1827
        next_key_marker = None
1✔
1828
        next_version_id_marker = None
1✔
1829
        max_keys = max_keys or 1000
1✔
1830
        prefix = prefix or ""
1✔
1831
        delimiter = delimiter or ""
1✔
1832
        if encoding_type:
1✔
1833
            prefix = urlparse.quote(prefix)
1✔
1834
            delimiter = urlparse.quote(delimiter)
1✔
1835
        version_key_marker_found = False
1✔
1836

1837
        object_versions: list[ObjectVersion] = []
1✔
1838
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1839

1840
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1841
        # sort by key, and last-modified-date, to get the last version first
1842
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1843
        last_version = all_versions[-1] if all_versions else None
1✔
1844

1845
        for version in all_versions:
1✔
1846
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1847
            # skip all keys that alphabetically come before key_marker
1848
            if key_marker:
1✔
1849
                if key < key_marker:
1✔
1850
                    continue
1✔
1851
                elif key == key_marker:
1✔
1852
                    if not version_id_marker:
1✔
1853
                        continue
1✔
1854
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
1855
                    if version.version_id == version_id_marker:
1✔
1856
                        version_key_marker_found = True
1✔
1857
                        continue
1✔
1858

1859
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
1860
                    # as soon as the next version id is older than the version id marker (meaning this version was
1861
                    # next after the now-deleted version)
1862
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
1863
                        version_key_marker_found = True
1✔
1864

1865
                    elif not version_key_marker_found:
1✔
1866
                        # as long as we have not passed the version_key_marker, skip the versions
1867
                        continue
1✔
1868

1869
            # Filter for keys that start with prefix
1870
            if prefix and not key.startswith(prefix):
1✔
1871
                continue
1✔
1872

1873
            # see ListObjectsV2 for the logic comments (shared logic here)
1874
            prefix_including_delimiter = None
1✔
1875
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1876
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1877
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1878

1879
                if prefix_including_delimiter in common_prefixes or (
1✔
1880
                    key_marker and key_marker.startswith(prefix_including_delimiter)
1881
                ):
1882
                    continue
1✔
1883

1884
            if prefix_including_delimiter:
1✔
1885
                common_prefixes.add(prefix_including_delimiter)
1✔
1886

1887
            elif isinstance(version, S3DeleteMarker):
1✔
1888
                delete_marker = DeleteMarkerEntry(
1✔
1889
                    Key=key,
1890
                    Owner=s3_bucket.owner,
1891
                    VersionId=version.version_id,
1892
                    IsLatest=version.is_current,
1893
                    LastModified=version.last_modified,
1894
                )
1895
                delete_markers.append(delete_marker)
1✔
1896
            else:
1897
                # TODO: add RestoreStatus if present
1898
                object_version = ObjectVersion(
1✔
1899
                    Key=key,
1900
                    ETag=version.quoted_etag,
1901
                    Owner=s3_bucket.owner,  # TODO: verify reality
1902
                    Size=version.size,
1903
                    VersionId=version.version_id or "null",
1904
                    LastModified=version.last_modified,
1905
                    IsLatest=version.is_current,
1906
                    # TODO: verify this, are other class possible?
1907
                    # StorageClass=version.storage_class,
1908
                    StorageClass=ObjectVersionStorageClass.STANDARD,
1909
                )
1910

1911
                if version.checksum_algorithm:
1✔
1912
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
1913
                    object_version["ChecksumType"] = getattr(
1✔
1914
                        version, "checksum_type", ChecksumType.FULL_OBJECT
1915
                    )
1916

1917
                object_versions.append(object_version)
1✔
1918

1919
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
1920
            count += 1
1✔
1921
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
1922
                is_truncated = True
1✔
1923
                if prefix_including_delimiter:
1✔
1924
                    next_key_marker = prefix_including_delimiter
1✔
1925
                else:
1926
                    next_key_marker = version.key
1✔
1927
                    next_version_id_marker = version.version_id
1✔
1928
                break
1✔
1929

1930
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1931

1932
        response = ListObjectVersionsOutput(
1✔
1933
            IsTruncated=is_truncated,
1934
            Name=bucket,
1935
            MaxKeys=max_keys,
1936
            Prefix=prefix,
1937
            KeyMarker=key_marker or "",
1938
            VersionIdMarker=version_id_marker or "",
1939
        )
1940
        if object_versions:
1✔
1941
            response["Versions"] = object_versions
1✔
1942
        if encoding_type:
1✔
1943
            response["EncodingType"] = EncodingType.url
1✔
1944
        if delete_markers:
1✔
1945
            response["DeleteMarkers"] = delete_markers
1✔
1946
        if delimiter:
1✔
1947
            response["Delimiter"] = delimiter
1✔
1948
        if common_prefixes:
1✔
1949
            response["CommonPrefixes"] = common_prefixes
1✔
1950
        if next_key_marker:
1✔
1951
            response["NextKeyMarker"] = next_key_marker
1✔
1952
        if next_version_id_marker:
1✔
1953
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
1954

1955
        # RequestCharged: Optional[RequestCharged]  # TODO
1956
        return response
1✔
1957

1958
    @handler("GetObjectAttributes", expand=False)
1✔
1959
    def get_object_attributes(
1✔
1960
        self,
1961
        context: RequestContext,
1962
        request: GetObjectAttributesRequest,
1963
    ) -> GetObjectAttributesOutput:
1964
        bucket_name = request["Bucket"]
1✔
1965
        object_key = request["Key"]
1✔
1966
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1967

1968
        s3_object = s3_bucket.get_object(
1✔
1969
            key=object_key,
1970
            version_id=request.get("VersionId"),
1971
            http_method="GET",
1972
        )
1973

1974
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1975
        if s3_object.sse_key_hash:
1✔
1976
            if not sse_c_key_md5:
1✔
1977
                raise InvalidRequest(
×
1978
                    "The object was stored using a form of Server Side Encryption. "
1979
                    "The correct parameters must be provided to retrieve the object."
1980
                )
1981
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1982
                raise AccessDenied("Access Denied")
×
1983

1984
        validate_sse_c(
1✔
1985
            algorithm=request.get("SSECustomerAlgorithm"),
1986
            encryption_key=request.get("SSECustomerKey"),
1987
            encryption_key_md5=sse_c_key_md5,
1988
        )
1989

1990
        object_attrs = request.get("ObjectAttributes", [])
1✔
1991
        response = GetObjectAttributesOutput()
1✔
1992
        if "ETag" in object_attrs:
1✔
1993
            response["ETag"] = s3_object.etag
1✔
1994
        if "StorageClass" in object_attrs:
1✔
1995
            response["StorageClass"] = s3_object.storage_class
1✔
1996
        if "ObjectSize" in object_attrs:
1✔
1997
            response["ObjectSize"] = s3_object.size
1✔
1998
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
1999
            if s3_object.parts:
1✔
2000
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2001
            else:
2002
                checksum_value = s3_object.checksum_value
1✔
2003
            response["Checksum"] = {
1✔
2004
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2005
                "ChecksumType": getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT),
2006
            }
2007

2008
        response["LastModified"] = s3_object.last_modified
1✔
2009

2010
        if s3_bucket.versioning_status:
1✔
2011
            response["VersionId"] = s3_object.version_id
1✔
2012

2013
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2014
            # TODO: implements ObjectParts, this is basically a simplified `ListParts` call on the object, we might
2015
            #  need to store more data about the Parts once we implement checksums for them
2016
            response["ObjectParts"] = GetObjectAttributesParts(TotalPartsCount=len(s3_object.parts))
1✔
2017

2018
        return response
1✔
2019

2020
    def restore_object(
1✔
2021
        self,
2022
        context: RequestContext,
2023
        bucket: BucketName,
2024
        key: ObjectKey,
2025
        version_id: ObjectVersionId = None,
2026
        restore_request: RestoreRequest = None,
2027
        request_payer: RequestPayer = None,
2028
        checksum_algorithm: ChecksumAlgorithm = None,
2029
        expected_bucket_owner: AccountId = None,
2030
        **kwargs,
2031
    ) -> RestoreObjectOutput:
2032
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2033

2034
        s3_object = s3_bucket.get_object(
1✔
2035
            key=key,
2036
            version_id=version_id,
2037
            http_method="GET",  # TODO: verify http method
2038
        )
2039
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
2040
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2041

2042
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2043
        # will only implement only the same functionality for now
2044

2045
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2046
        status_code = 200 if s3_object.restore else 202
1✔
2047
        restore_days = restore_request.get("Days")
1✔
2048
        if not restore_days:
1✔
2049
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
2050
            return RestoreObjectOutput()
×
2051

2052
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2053
            datetime.datetime.now(datetime.UTC), restore_days
2054
        )
2055
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2056
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2057

2058
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context_native(
1✔
2059
            context,
2060
            s3_bucket=s3_bucket,
2061
            s3_object=s3_object,
2062
        )
2063
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2064
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2065
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2066
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2067
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2068
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2069
            "Post", "Completed"
2070
        )
2071
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2072

2073
        # TODO: request charged
2074
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2075

2076
    @handler("CreateMultipartUpload", expand=False)
1✔
2077
    def create_multipart_upload(
1✔
2078
        self,
2079
        context: RequestContext,
2080
        request: CreateMultipartUploadRequest,
2081
    ) -> CreateMultipartUploadOutput:
2082
        # TODO: handle missing parameters:
2083
        #  request_payer: RequestPayer = None,
2084
        bucket_name = request["Bucket"]
1✔
2085
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2086

2087
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2088
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2089
        ):
2090
            raise InvalidStorageClass(
1✔
2091
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2092
            )
2093

2094
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2095
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2096

2097
        if tagging := request.get("Tagging"):
1✔
2098
            tagging = parse_tagging_header(tagging_header=tagging)
×
2099

2100
        key = request["Key"]
1✔
2101

2102
        system_metadata = get_system_metadata_from_request(request)
1✔
2103
        if not system_metadata.get("ContentType"):
1✔
2104
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2105

2106
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2107
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2108
            raise InvalidRequest(
1✔
2109
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2110
            )
2111

2112
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2113
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2114
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2115
            else:
2116
                checksum_type = ChecksumType.COMPOSITE
1✔
2117
        elif checksum_type and not checksum_algorithm:
1✔
2118
            raise InvalidRequest(
1✔
2119
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2120
            )
2121

2122
        if (
1✔
2123
            checksum_type == ChecksumType.COMPOSITE
2124
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2125
        ):
2126
            raise InvalidRequest(
1✔
2127
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2128
            )
2129
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2130
            "SHA"
2131
        ):
2132
            raise InvalidRequest(
1✔
2133
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2134
            )
2135

2136
        # TODO: we're not encrypting the object with the provided key for now
2137
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2138
        validate_sse_c(
1✔
2139
            algorithm=request.get("SSECustomerAlgorithm"),
2140
            encryption_key=request.get("SSECustomerKey"),
2141
            encryption_key_md5=sse_c_key_md5,
2142
            server_side_encryption=request.get("ServerSideEncryption"),
2143
        )
2144

2145
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2146
            request,
2147
            s3_bucket,
2148
            store,
2149
        )
2150
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2151

2152
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2153

2154
        # validate encryption values
2155
        s3_multipart = S3Multipart(
1✔
2156
            key=key,
2157
            storage_class=storage_class,
2158
            expires=request.get("Expires"),
2159
            user_metadata=request.get("Metadata"),
2160
            system_metadata=system_metadata,
2161
            checksum_algorithm=checksum_algorithm,
2162
            checksum_type=checksum_type,
2163
            encryption=encryption_parameters.encryption,
2164
            kms_key_id=encryption_parameters.kms_key_id,
2165
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2166
            sse_key_hash=sse_c_key_md5,
2167
            lock_mode=lock_parameters.lock_mode,
2168
            lock_legal_status=lock_parameters.lock_legal_status,
2169
            lock_until=lock_parameters.lock_until,
2170
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2171
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2172
            acl=acl,
2173
            initiator=get_owner_for_account_id(context.account_id),
2174
            tagging=tagging,
2175
            owner=s3_bucket.owner,
2176
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2177
        )
2178

2179
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2180

2181
        response = CreateMultipartUploadOutput(
1✔
2182
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2183
        )
2184

2185
        if checksum_algorithm:
1✔
2186
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2187
            response["ChecksumType"] = checksum_type
1✔
2188

2189
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2190
        if sse_c_key_md5:
1✔
2191
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2192
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2193

2194
        # TODO: missing response fields we're not currently supporting
2195
        # - AbortDate: lifecycle related,not currently supported, todo
2196
        # - AbortRuleId: lifecycle related, not currently supported, todo
2197
        # - RequestCharged: todo
2198

2199
        return response
1✔
2200

2201
    @handler("UploadPart", expand=False)
1✔
2202
    def upload_part(
1✔
2203
        self,
2204
        context: RequestContext,
2205
        request: UploadPartRequest,
2206
    ) -> UploadPartOutput:
2207
        # TODO: missing following parameters:
2208
        #  content_length: ContentLength = None, ->validate?
2209
        #  content_md5: ContentMD5 = None, -> validate?
2210
        #  request_payer: RequestPayer = None,
2211
        bucket_name = request["Bucket"]
1✔
2212
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2213

2214
        upload_id = request.get("UploadId")
1✔
2215
        if not (
1✔
2216
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2217
        ) or s3_multipart.object.key != request.get("Key"):
2218
            raise NoSuchUpload(
1✔
2219
                "The specified upload does not exist. "
2220
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2221
                UploadId=upload_id,
2222
            )
2223
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2224
            raise InvalidArgument(
1✔
2225
                "Part number must be an integer between 1 and 10000, inclusive",
2226
                ArgumentName="partNumber",
2227
                ArgumentValue=part_number,
2228
            )
2229

2230
        if content_md5 := request.get("ContentMD5"):
1✔
2231
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2232
            if not base_64_content_md5_to_etag(content_md5):
1✔
2233
                raise InvalidDigest(
1✔
2234
                    "The Content-MD5 you specified was invalid.",
2235
                    Content_MD5=content_md5,
2236
                )
2237

2238
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2239
        checksum_value = (
1✔
2240
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2241
        )
2242

2243
        # TODO: we're not encrypting the object with the provided key for now
2244
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2245
        validate_sse_c(
1✔
2246
            algorithm=request.get("SSECustomerAlgorithm"),
2247
            encryption_key=request.get("SSECustomerKey"),
2248
            encryption_key_md5=sse_c_key_md5,
2249
        )
2250

2251
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2252
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2253
        ):
2254
            raise InvalidRequest(
1✔
2255
                "The multipart upload initiate requested encryption. "
2256
                "Subsequent part requests must include the appropriate encryption parameters."
2257
            )
2258
        elif (
1✔
2259
            s3_multipart.object.sse_key_hash
2260
            and sse_c_key_md5
2261
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2262
        ):
2263
            raise InvalidRequest(
1✔
2264
                "The provided encryption parameters did not match the ones used originally."
2265
            )
2266

2267
        s3_part = S3Part(
1✔
2268
            part_number=part_number,
2269
            checksum_algorithm=checksum_algorithm,
2270
            checksum_value=checksum_value,
2271
        )
2272
        body = request.get("Body")
1✔
2273
        headers = context.request.headers
1✔
2274
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2275
            "STREAMING-"
2276
        ) or "aws-chunked" in headers.get("content-encoding", "")
2277
        # check if chunked request
2278
        if is_aws_chunked:
1✔
2279
            checksum_algorithm = (
1✔
2280
                checksum_algorithm
2281
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2282
            )
2283
            if checksum_algorithm:
1✔
2284
                s3_part.checksum_algorithm = checksum_algorithm
×
2285

2286
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2287
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2288

2289
        if s3_part.checksum_algorithm != s3_multipart.object.checksum_algorithm:
1✔
2290
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2291
            error_mp_checksum = (
1✔
2292
                s3_multipart.object.checksum_algorithm.lower()
2293
                if s3_multipart.object.checksum_algorithm
2294
                else "null"
2295
            )
2296
            if not error_mp_checksum == "null":
1✔
2297
                raise InvalidRequest(
1✔
2298
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2299
                )
2300

2301
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2302
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2303
            try:
1✔
2304
                stored_s3_part.write(body)
1✔
2305
            except Exception:
1✔
2306
                stored_multipart.remove_part(s3_part)
1✔
2307
                raise
1✔
2308

2309
            if checksum_algorithm:
1✔
2310
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2311
                    stored_multipart.remove_part(s3_part)
1✔
2312
                    raise InvalidRequest(
1✔
2313
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2314
                    )
2315
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2316
                    stored_multipart.remove_part(s3_part)
1✔
2317
                    raise BadDigest(
1✔
2318
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2319
                    )
2320

2321
            if content_md5:
1✔
2322
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2323
                if calculated_md5 != content_md5:
1✔
2324
                    stored_multipart.remove_part(s3_part)
1✔
2325
                    raise BadDigest(
1✔
2326
                        "The Content-MD5 you specified did not match what we received.",
2327
                        ExpectedDigest=content_md5,
2328
                        CalculatedDigest=calculated_md5,
2329
                    )
2330

2331
            s3_multipart.parts[part_number] = s3_part
1✔
2332

2333
        response = UploadPartOutput(
1✔
2334
            ETag=s3_part.quoted_etag,
2335
        )
2336

2337
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2338
        if sse_c_key_md5:
1✔
2339
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2340
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2341

2342
        if s3_part.checksum_algorithm:
1✔
2343
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2344

2345
        # TODO: RequestCharged: Optional[RequestCharged]
2346
        return response
1✔
2347

2348
    @handler("UploadPartCopy", expand=False)
1✔
2349
    def upload_part_copy(
1✔
2350
        self,
2351
        context: RequestContext,
2352
        request: UploadPartCopyRequest,
2353
    ) -> UploadPartCopyOutput:
2354
        # TODO: handle following parameters:
2355
        #  copy_source_if_match: CopySourceIfMatch = None,
2356
        #  copy_source_if_modified_since: CopySourceIfModifiedSince = None,
2357
        #  copy_source_if_none_match: CopySourceIfNoneMatch = None,
2358
        #  copy_source_if_unmodified_since: CopySourceIfUnmodifiedSince = None,
2359
        #  request_payer: RequestPayer = None,
2360
        dest_bucket = request["Bucket"]
1✔
2361
        dest_key = request["Key"]
1✔
2362
        store = self.get_store(context.account_id, context.region)
1✔
2363
        # TODO: validate cross-account UploadPartCopy
2364
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
2365
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2366

2367
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2368
            request.get("CopySource")
2369
        )
2370

2371
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
2372
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2373

2374
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2375
        try:
1✔
2376
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
2377
        except MethodNotAllowed:
×
2378
            raise InvalidRequest(
×
2379
                "The source of a copy request may not specifically refer to a delete marker by version id."
2380
            )
2381

2382
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
2383
            raise InvalidObjectState(
×
2384
                "Operation is not valid for the source object's storage class",
2385
                StorageClass=src_s3_object.storage_class,
2386
            )
2387

2388
        upload_id = request.get("UploadId")
1✔
2389
        if (
1✔
2390
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2391
            or s3_multipart.object.key != dest_key
2392
        ):
2393
            raise NoSuchUpload(
×
2394
                "The specified upload does not exist. "
2395
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2396
                UploadId=upload_id,
2397
            )
2398

2399
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2400
            raise InvalidArgument(
×
2401
                "Part number must be an integer between 1 and 10000, inclusive",
2402
                ArgumentName="partNumber",
2403
                ArgumentValue=part_number,
2404
            )
2405

2406
        source_range = request.get("CopySourceRange")
1✔
2407
        # TODO implement copy source IF (done in ASF provider)
2408

2409
        range_data: Optional[ObjectRange] = None
1✔
2410
        if source_range:
1✔
2411
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2412

2413
        s3_part = S3Part(part_number=part_number)
1✔
2414

2415
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2416
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2417

2418
        s3_multipart.parts[part_number] = s3_part
1✔
2419

2420
        # TODO: return those fields (checksum not handled currently in moto for parts)
2421
        # ChecksumCRC32: Optional[ChecksumCRC32]
2422
        # ChecksumCRC32C: Optional[ChecksumCRC32C]
2423
        # ChecksumSHA1: Optional[ChecksumSHA1]
2424
        # ChecksumSHA256: Optional[ChecksumSHA256]
2425
        #     RequestCharged: Optional[RequestCharged]
2426

2427
        result = CopyPartResult(
1✔
2428
            ETag=s3_part.quoted_etag,
2429
            LastModified=s3_part.last_modified,
2430
        )
2431

2432
        response = UploadPartCopyOutput(
1✔
2433
            CopyPartResult=result,
2434
        )
2435

2436
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
2437
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2438

2439
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2440

2441
        return response
1✔
2442

2443
    def complete_multipart_upload(
1✔
2444
        self,
2445
        context: RequestContext,
2446
        bucket: BucketName,
2447
        key: ObjectKey,
2448
        upload_id: MultipartUploadId,
2449
        multipart_upload: CompletedMultipartUpload = None,
2450
        checksum_crc32: ChecksumCRC32 = None,
2451
        checksum_crc32_c: ChecksumCRC32C = None,
2452
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2453
        checksum_sha1: ChecksumSHA1 = None,
2454
        checksum_sha256: ChecksumSHA256 = None,
2455
        checksum_type: ChecksumType = None,
2456
        mpu_object_size: MpuObjectSize = None,
2457
        request_payer: RequestPayer = None,
2458
        expected_bucket_owner: AccountId = None,
2459
        if_match: IfMatch = None,
2460
        if_none_match: IfNoneMatch = None,
2461
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2462
        sse_customer_key: SSECustomerKey = None,
2463
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2464
        **kwargs,
2465
    ) -> CompleteMultipartUploadOutput:
2466
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2467

2468
        if (
1✔
2469
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2470
            or s3_multipart.object.key != key
2471
        ):
2472
            raise NoSuchUpload(
1✔
2473
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2474
                UploadId=upload_id,
2475
            )
2476

2477
        if if_none_match and if_match:
1✔
2478
            raise NotImplementedException(
2479
                "A header you provided implies functionality that is not implemented",
2480
                Header="If-Match,If-None-Match",
2481
                additionalMessage="Multiple conditional request headers present in the request",
2482
            )
2483

2484
        elif if_none_match:
1✔
2485
            if if_none_match != "*":
1✔
2486
                raise NotImplementedException(
2487
                    "A header you provided implies functionality that is not implemented",
2488
                    Header="If-None-Match",
2489
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2490
                )
2491
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2492
                raise PreconditionFailed(
1✔
2493
                    "At least one of the pre-conditions you specified did not hold",
2494
                    Condition="If-None-Match",
2495
                )
2496
            elif s3_multipart.precondition:
1✔
2497
                raise ConditionalRequestConflict(
1✔
2498
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2499
                    Condition="If-None-Match",
2500
                    Key=key,
2501
                )
2502

2503
        elif if_match:
1✔
2504
            if if_match == "*":
1✔
2505
                raise NotImplementedException(
2506
                    "A header you provided implies functionality that is not implemented",
2507
                    Header="If-None-Match",
2508
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2509
                )
2510
            verify_object_equality_precondition_write(
1✔
2511
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2512
            )
2513

2514
        parts = multipart_upload.get("Parts", [])
1✔
2515
        if not parts:
1✔
2516
            raise InvalidRequest("You must specify at least one part")
1✔
2517

2518
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2519
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2520
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2521
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2522
        if sorted(parts_numbers) != parts_numbers:
1✔
2523
            raise InvalidPartOrder(
1✔
2524
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2525
                UploadId=upload_id,
2526
            )
2527

2528
        mpu_checksum_algorithm = s3_multipart.object.checksum_algorithm
1✔
2529
        mpu_checksum_type = getattr(s3_multipart, "checksum_type", None)
1✔
2530

2531
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2532
            raise InvalidRequest(
1✔
2533
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2534
                f"The complete request must use the same checksum mode."
2535
            )
2536

2537
        # generate the versionId before completing, in case the bucket versioning status has changed between
2538
        # creation and completion? AWS validate this
2539
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2540
        s3_multipart.object.version_id = version_id
1✔
2541

2542
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2543
        # persistence. if we do not have a new version, do not validate those parameters
2544
        # TODO: remove for next major version (minor?)
2545
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2546
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2547
            checksum_map = {
1✔
2548
                "crc32": checksum_crc32,
2549
                "crc32c": checksum_crc32_c,
2550
                "crc64nvme": checksum_crc64_nvme,
2551
                "sha1": checksum_sha1,
2552
                "sha256": checksum_sha256,
2553
            }
2554
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2555
            s3_multipart.complete_multipart(
1✔
2556
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2557
            )
2558
        else:
2559
            s3_multipart.complete_multipart(parts)
×
2560

2561
        if (
1✔
2562
            mpu_checksum_algorithm
2563
            and not checksum_type
2564
            and mpu_checksum_type == ChecksumType.FULL_OBJECT
2565
        ):
2566
            # this is not ideal, but this validation comes last... after the validation of individual parts
2567
            s3_multipart.object.parts.clear()
1✔
2568
            raise BadDigest(
1✔
2569
                f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2570
            )
2571

2572
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2573
        stored_multipart.complete_multipart(
1✔
2574
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
2575
        )
2576

2577
        s3_object = s3_multipart.object
1✔
2578

2579
        s3_bucket.objects.set(key, s3_object)
1✔
2580

2581
        # remove the multipart now that it's complete
2582
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2583
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2584

2585
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2586
        store.TAGS.tags.pop(key_id, None)
1✔
2587
        if s3_multipart.tagging:
1✔
2588
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2589

2590
        # RequestCharged: Optional[RequestCharged] TODO
2591

2592
        response = CompleteMultipartUploadOutput(
1✔
2593
            Bucket=bucket,
2594
            Key=key,
2595
            ETag=s3_object.quoted_etag,
2596
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
2597
        )
2598

2599
        if s3_object.version_id:
1✔
2600
            response["VersionId"] = s3_object.version_id
×
2601

2602
        if s3_object.checksum_algorithm:
1✔
2603
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2604
            response["ChecksumType"] = mpu_checksum_type
1✔
2605

2606
        if s3_object.expiration:
1✔
2607
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2608

2609
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2610

2611
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2612

2613
        return response
1✔
2614

2615
    def abort_multipart_upload(
1✔
2616
        self,
2617
        context: RequestContext,
2618
        bucket: BucketName,
2619
        key: ObjectKey,
2620
        upload_id: MultipartUploadId,
2621
        request_payer: RequestPayer = None,
2622
        expected_bucket_owner: AccountId = None,
2623
        if_match_initiated_time: IfMatchInitiatedTime = None,
2624
        **kwargs,
2625
    ) -> AbortMultipartUploadOutput:
2626
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2627

2628
        if (
1✔
2629
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2630
            or s3_multipart.object.key != key
2631
        ):
2632
            raise NoSuchUpload(
1✔
2633
                "The specified upload does not exist. "
2634
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2635
                UploadId=upload_id,
2636
            )
2637
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2638

2639
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2640
        response = AbortMultipartUploadOutput()
1✔
2641
        # TODO: requestCharged
2642
        return response
1✔
2643

2644
    def list_parts(
1✔
2645
        self,
2646
        context: RequestContext,
2647
        bucket: BucketName,
2648
        key: ObjectKey,
2649
        upload_id: MultipartUploadId,
2650
        max_parts: MaxParts = None,
2651
        part_number_marker: PartNumberMarker = None,
2652
        request_payer: RequestPayer = None,
2653
        expected_bucket_owner: AccountId = None,
2654
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2655
        sse_customer_key: SSECustomerKey = None,
2656
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2657
        **kwargs,
2658
    ) -> ListPartsOutput:
2659
        # TODO: implement MaxParts
2660
        # TODO: implements PartNumberMarker
2661
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2662

2663
        if (
1✔
2664
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2665
            or s3_multipart.object.key != key
2666
        ):
2667
            raise NoSuchUpload(
1✔
2668
                "The specified upload does not exist. "
2669
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2670
                UploadId=upload_id,
2671
            )
2672

2673
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2674
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2675
        #     RequestCharged: Optional[RequestCharged]
2676

2677
        count = 0
1✔
2678
        is_truncated = False
1✔
2679
        part_number_marker = part_number_marker or 0
1✔
2680
        max_parts = max_parts or 1000
1✔
2681

2682
        parts = []
1✔
2683
        all_parts = sorted(s3_multipart.parts.items())
1✔
2684
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2685
        for part_number, part in all_parts:
1✔
2686
            if part_number <= part_number_marker:
1✔
2687
                continue
1✔
2688
            part_item = Part(
1✔
2689
                ETag=part.quoted_etag,
2690
                LastModified=part.last_modified,
2691
                PartNumber=part_number,
2692
                Size=part.size,
2693
            )
2694
            if s3_multipart.object.checksum_algorithm:
1✔
2695
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2696

2697
            parts.append(part_item)
1✔
2698
            count += 1
1✔
2699

2700
            if count >= max_parts and part.part_number != last_part_number:
1✔
2701
                is_truncated = True
1✔
2702
                break
1✔
2703

2704
        response = ListPartsOutput(
1✔
2705
            Bucket=bucket,
2706
            Key=key,
2707
            UploadId=upload_id,
2708
            Initiator=s3_multipart.initiator,
2709
            Owner=s3_multipart.initiator,
2710
            StorageClass=s3_multipart.object.storage_class,
2711
            IsTruncated=is_truncated,
2712
            MaxParts=max_parts,
2713
            PartNumberMarker=0,
2714
            NextPartNumberMarker=0,
2715
        )
2716
        if parts:
1✔
2717
            response["Parts"] = parts
1✔
2718
            last_part = parts[-1]["PartNumber"]
1✔
2719
            response["NextPartNumberMarker"] = last_part
1✔
2720

2721
        if part_number_marker:
1✔
2722
            response["PartNumberMarker"] = part_number_marker
1✔
2723
        if s3_multipart.object.checksum_algorithm:
1✔
2724
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2725
            response["ChecksumType"] = getattr(s3_multipart, "checksum_type", None)
1✔
2726

2727
        return response
1✔
2728

2729
    def list_multipart_uploads(
1✔
2730
        self,
2731
        context: RequestContext,
2732
        bucket: BucketName,
2733
        delimiter: Delimiter = None,
2734
        encoding_type: EncodingType = None,
2735
        key_marker: KeyMarker = None,
2736
        max_uploads: MaxUploads = None,
2737
        prefix: Prefix = None,
2738
        upload_id_marker: UploadIdMarker = None,
2739
        expected_bucket_owner: AccountId = None,
2740
        request_payer: RequestPayer = None,
2741
        **kwargs,
2742
    ) -> ListMultipartUploadsOutput:
2743
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2744

2745
        common_prefixes = set()
1✔
2746
        count = 0
1✔
2747
        is_truncated = False
1✔
2748
        max_uploads = max_uploads or 1000
1✔
2749
        prefix = prefix or ""
1✔
2750
        delimiter = delimiter or ""
1✔
2751
        if encoding_type:
1✔
2752
            prefix = urlparse.quote(prefix)
1✔
2753
            delimiter = urlparse.quote(delimiter)
1✔
2754
        upload_id_marker_found = False
1✔
2755

2756
        if key_marker and upload_id_marker:
1✔
2757
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2758
            if multipart:
1✔
2759
                key = (
1✔
2760
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2761
                )
2762
            else:
2763
                # set key to None so it fails if the multipart is not Found
2764
                key = None
×
2765

2766
            if key_marker != key:
1✔
2767
                raise InvalidArgument(
1✔
2768
                    "Invalid uploadId marker",
2769
                    ArgumentName="upload-id-marker",
2770
                    ArgumentValue=upload_id_marker,
2771
                )
2772

2773
        uploads = []
1✔
2774
        # sort by key and initiated
2775
        all_multiparts = sorted(
1✔
2776
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
2777
        )
2778
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
2779

2780
        for multipart in all_multiparts:
1✔
2781
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
2782
            # skip all keys that are different than key_marker
2783
            if key_marker:
1✔
2784
                if key < key_marker:
1✔
2785
                    continue
1✔
2786
                elif key == key_marker:
1✔
2787
                    if not upload_id_marker:
1✔
2788
                        continue
1✔
2789
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
2790
                    if multipart.id == upload_id_marker:
1✔
2791
                        upload_id_marker_found = True
1✔
2792
                        continue
1✔
2793
                    elif not upload_id_marker_found:
1✔
2794
                        # as long as we have not passed the version_key_marker, skip the versions
2795
                        continue
1✔
2796

2797
            # Filter for keys that start with prefix
2798
            if prefix and not key.startswith(prefix):
1✔
2799
                continue
1✔
2800

2801
            # see ListObjectsV2 for the logic comments (shared logic here)
2802
            prefix_including_delimiter = None
1✔
2803
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2804
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2805
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2806

2807
                if prefix_including_delimiter in common_prefixes or (
1✔
2808
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2809
                ):
2810
                    continue
1✔
2811

2812
            if prefix_including_delimiter:
1✔
2813
                common_prefixes.add(prefix_including_delimiter)
1✔
2814
            else:
2815
                multipart_upload = MultipartUpload(
1✔
2816
                    UploadId=multipart.id,
2817
                    Key=multipart.object.key,
2818
                    Initiated=multipart.initiated,
2819
                    StorageClass=multipart.object.storage_class,
2820
                    Owner=multipart.initiator,  # TODO: check the difference
2821
                    Initiator=multipart.initiator,
2822
                )
2823
                if multipart.object.checksum_algorithm:
1✔
2824
                    multipart_upload["ChecksumAlgorithm"] = multipart.object.checksum_algorithm
1✔
2825
                    multipart_upload["ChecksumType"] = getattr(multipart, "checksum_type", None)
1✔
2826

2827
                uploads.append(multipart_upload)
1✔
2828

2829
            count += 1
1✔
2830
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
2831
                is_truncated = True
1✔
2832
                break
1✔
2833

2834
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2835

2836
        response = ListMultipartUploadsOutput(
1✔
2837
            Bucket=bucket,
2838
            IsTruncated=is_truncated,
2839
            MaxUploads=max_uploads or 1000,
2840
            KeyMarker=key_marker or "",
2841
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
2842
            NextKeyMarker="",
2843
            NextUploadIdMarker="",
2844
        )
2845
        if uploads:
1✔
2846
            response["Uploads"] = uploads
1✔
2847
            last_upload = uploads[-1]
1✔
2848
            response["NextKeyMarker"] = last_upload["Key"]
1✔
2849
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
2850
        if delimiter:
1✔
2851
            response["Delimiter"] = delimiter
1✔
2852
        if prefix:
1✔
2853
            response["Prefix"] = prefix
1✔
2854
        if encoding_type:
1✔
2855
            response["EncodingType"] = EncodingType.url
1✔
2856
        if common_prefixes:
1✔
2857
            response["CommonPrefixes"] = common_prefixes
1✔
2858

2859
        return response
1✔
2860

2861
    def put_bucket_versioning(
1✔
2862
        self,
2863
        context: RequestContext,
2864
        bucket: BucketName,
2865
        versioning_configuration: VersioningConfiguration,
2866
        content_md5: ContentMD5 = None,
2867
        checksum_algorithm: ChecksumAlgorithm = None,
2868
        mfa: MFA = None,
2869
        expected_bucket_owner: AccountId = None,
2870
        **kwargs,
2871
    ) -> None:
2872
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2873
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
2874
            raise CommonServiceException(
1✔
2875
                code="IllegalVersioningConfigurationException",
2876
                message="The Versioning element must be specified",
2877
            )
2878

2879
        if versioning_status not in ("Enabled", "Suspended"):
1✔
2880
            raise MalformedXML()
1✔
2881

2882
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
2883
            raise InvalidBucketState(
1✔
2884
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
2885
            )
2886

2887
        if not s3_bucket.versioning_status:
1✔
2888
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
2889

2890
        s3_bucket.versioning_status = versioning_status
1✔
2891

2892
    def get_bucket_versioning(
1✔
2893
        self,
2894
        context: RequestContext,
2895
        bucket: BucketName,
2896
        expected_bucket_owner: AccountId = None,
2897
        **kwargs,
2898
    ) -> GetBucketVersioningOutput:
2899
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2900

2901
        if not s3_bucket.versioning_status:
1✔
2902
            return GetBucketVersioningOutput()
1✔
2903

2904
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
2905

2906
    def get_bucket_encryption(
1✔
2907
        self,
2908
        context: RequestContext,
2909
        bucket: BucketName,
2910
        expected_bucket_owner: AccountId = None,
2911
        **kwargs,
2912
    ) -> GetBucketEncryptionOutput:
2913
        # AWS now encrypts bucket by default with AES256, see:
2914
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
2915
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2916

2917
        if not s3_bucket.encryption_rule:
1✔
2918
            return GetBucketEncryptionOutput()
×
2919

2920
        return GetBucketEncryptionOutput(
1✔
2921
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
2922
        )
2923

2924
    def put_bucket_encryption(
1✔
2925
        self,
2926
        context: RequestContext,
2927
        bucket: BucketName,
2928
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
2929
        content_md5: ContentMD5 = None,
2930
        checksum_algorithm: ChecksumAlgorithm = None,
2931
        expected_bucket_owner: AccountId = None,
2932
        **kwargs,
2933
    ) -> None:
2934
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2935

2936
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
2937
            raise MalformedXML()
1✔
2938

2939
        if len(rules) != 1 or not (
1✔
2940
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
2941
        ):
2942
            raise MalformedXML()
1✔
2943

2944
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
2945
            raise MalformedXML()
×
2946

2947
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
2948
            raise MalformedXML()
×
2949

2950
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
2951
            raise InvalidArgument(
1✔
2952
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
2953
                ArgumentName="ApplyServerSideEncryptionByDefault",
2954
            )
2955
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
2956
        # TODO: validate KMS key? not currently done in moto
2957
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
2958
        # It's always saved as the ARN in the bucket configuration.
2959
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
2960
        # encryption["KMSMasterKeyID"] = master_kms_key
2961

2962
        s3_bucket.encryption_rule = rules[0]
1✔
2963

2964
    def delete_bucket_encryption(
1✔
2965
        self,
2966
        context: RequestContext,
2967
        bucket: BucketName,
2968
        expected_bucket_owner: AccountId = None,
2969
        **kwargs,
2970
    ) -> None:
2971
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2972

2973
        s3_bucket.encryption_rule = None
1✔
2974

2975
    def put_bucket_notification_configuration(
1✔
2976
        self,
2977
        context: RequestContext,
2978
        bucket: BucketName,
2979
        notification_configuration: NotificationConfiguration,
2980
        expected_bucket_owner: AccountId = None,
2981
        skip_destination_validation: SkipValidation = None,
2982
        **kwargs,
2983
    ) -> None:
2984
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2985

2986
        self._verify_notification_configuration(
1✔
2987
            notification_configuration, skip_destination_validation, context, bucket
2988
        )
2989
        s3_bucket.notification_configuration = notification_configuration
1✔
2990

2991
    def get_bucket_notification_configuration(
1✔
2992
        self,
2993
        context: RequestContext,
2994
        bucket: BucketName,
2995
        expected_bucket_owner: AccountId = None,
2996
        **kwargs,
2997
    ) -> NotificationConfiguration:
2998
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2999

3000
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3001

3002
    def put_bucket_tagging(
1✔
3003
        self,
3004
        context: RequestContext,
3005
        bucket: BucketName,
3006
        tagging: Tagging,
3007
        content_md5: ContentMD5 = None,
3008
        checksum_algorithm: ChecksumAlgorithm = None,
3009
        expected_bucket_owner: AccountId = None,
3010
        **kwargs,
3011
    ) -> None:
3012
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3013

3014
        if "TagSet" not in tagging:
1✔
3015
            raise MalformedXML()
×
3016

3017
        validate_tag_set(tagging["TagSet"], type_set="bucket")
1✔
3018

3019
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3020
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3021
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tagging["TagSet"])
1✔
3022

3023
    def get_bucket_tagging(
1✔
3024
        self,
3025
        context: RequestContext,
3026
        bucket: BucketName,
3027
        expected_bucket_owner: AccountId = None,
3028
        **kwargs,
3029
    ) -> GetBucketTaggingOutput:
3030
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3031
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
1✔
3032
        if not tag_set:
1✔
3033
            raise NoSuchTagSet(
1✔
3034
                "The TagSet does not exist",
3035
                BucketName=bucket,
3036
            )
3037

3038
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3039

3040
    def delete_bucket_tagging(
1✔
3041
        self,
3042
        context: RequestContext,
3043
        bucket: BucketName,
3044
        expected_bucket_owner: AccountId = None,
3045
        **kwargs,
3046
    ) -> None:
3047
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3048

3049
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3050

3051
    def put_object_tagging(
1✔
3052
        self,
3053
        context: RequestContext,
3054
        bucket: BucketName,
3055
        key: ObjectKey,
3056
        tagging: Tagging,
3057
        version_id: ObjectVersionId = None,
3058
        content_md5: ContentMD5 = None,
3059
        checksum_algorithm: ChecksumAlgorithm = None,
3060
        expected_bucket_owner: AccountId = None,
3061
        request_payer: RequestPayer = None,
3062
        **kwargs,
3063
    ) -> PutObjectTaggingOutput:
3064
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3065

3066
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3067

3068
        if "TagSet" not in tagging:
1✔
3069
            raise MalformedXML()
×
3070

3071
        validate_tag_set(tagging["TagSet"], type_set="object")
1✔
3072

3073
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3074
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3075
        store.TAGS.tags.pop(key_id, None)
1✔
3076
        store.TAGS.tag_resource(key_id, tags=tagging["TagSet"])
1✔
3077
        response = PutObjectTaggingOutput()
1✔
3078
        if s3_object.version_id:
1✔
3079
            response["VersionId"] = s3_object.version_id
1✔
3080

3081
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3082

3083
        return response
1✔
3084

3085
    def get_object_tagging(
1✔
3086
        self,
3087
        context: RequestContext,
3088
        bucket: BucketName,
3089
        key: ObjectKey,
3090
        version_id: ObjectVersionId = None,
3091
        expected_bucket_owner: AccountId = None,
3092
        request_payer: RequestPayer = None,
3093
        **kwargs,
3094
    ) -> GetObjectTaggingOutput:
3095
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3096

3097
        try:
1✔
3098
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3099
        except NoSuchKey as e:
1✔
3100
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3101
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3102
            # `S3Bucket.get_object` signature for one operation.
3103
            if s3_bucket.versioning_status and (
1✔
3104
                s3_object_version := s3_bucket.objects.get(key, version_id)
3105
            ):
3106
                raise MethodNotAllowed(
1✔
3107
                    "The specified method is not allowed against this resource.",
3108
                    Method="GET",
3109
                    ResourceType="DeleteMarker",
3110
                    DeleteMarker=True,
3111
                    Allow="DELETE",
3112
                    VersionId=s3_object_version.version_id,
3113
                )
3114

3115
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3116
            # follow AWS on this one
3117
            e.Key = f"{bucket}/{key}"
1✔
3118
            raise e
1✔
3119

3120
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3121
            get_unique_key_id(bucket, key, s3_object.version_id)
3122
        )["Tags"]
3123
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3124
        if s3_object.version_id:
1✔
3125
            response["VersionId"] = s3_object.version_id
1✔
3126

3127
        return response
1✔
3128

3129
    def delete_object_tagging(
1✔
3130
        self,
3131
        context: RequestContext,
3132
        bucket: BucketName,
3133
        key: ObjectKey,
3134
        version_id: ObjectVersionId = None,
3135
        expected_bucket_owner: AccountId = None,
3136
        **kwargs,
3137
    ) -> DeleteObjectTaggingOutput:
3138
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3139

3140
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3141

3142
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
3143
        response = DeleteObjectTaggingOutput()
1✔
3144
        if s3_object.version_id:
1✔
3145
            response["VersionId"] = s3_object.version_id
×
3146

3147
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3148

3149
        return response
1✔
3150

3151
    def put_bucket_cors(
1✔
3152
        self,
3153
        context: RequestContext,
3154
        bucket: BucketName,
3155
        cors_configuration: CORSConfiguration,
3156
        content_md5: ContentMD5 = None,
3157
        checksum_algorithm: ChecksumAlgorithm = None,
3158
        expected_bucket_owner: AccountId = None,
3159
        **kwargs,
3160
    ) -> None:
3161
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3162
        validate_cors_configuration(cors_configuration)
1✔
3163
        s3_bucket.cors_rules = cors_configuration
1✔
3164
        self._cors_handler.invalidate_cache()
1✔
3165

3166
    def get_bucket_cors(
1✔
3167
        self,
3168
        context: RequestContext,
3169
        bucket: BucketName,
3170
        expected_bucket_owner: AccountId = None,
3171
        **kwargs,
3172
    ) -> GetBucketCorsOutput:
3173
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3174

3175
        if not s3_bucket.cors_rules:
1✔
3176
            raise NoSuchCORSConfiguration(
1✔
3177
                "The CORS configuration does not exist",
3178
                BucketName=bucket,
3179
            )
3180
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3181

3182
    def delete_bucket_cors(
1✔
3183
        self,
3184
        context: RequestContext,
3185
        bucket: BucketName,
3186
        expected_bucket_owner: AccountId = None,
3187
        **kwargs,
3188
    ) -> None:
3189
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3190

3191
        if s3_bucket.cors_rules:
1✔
3192
            self._cors_handler.invalidate_cache()
1✔
3193
            s3_bucket.cors_rules = None
1✔
3194

3195
    def get_bucket_lifecycle_configuration(
1✔
3196
        self,
3197
        context: RequestContext,
3198
        bucket: BucketName,
3199
        expected_bucket_owner: AccountId = None,
3200
        **kwargs,
3201
    ) -> GetBucketLifecycleConfigurationOutput:
3202
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3203

3204
        if not s3_bucket.lifecycle_rules:
1✔
3205
            raise NoSuchLifecycleConfiguration(
1✔
3206
                "The lifecycle configuration does not exist",
3207
                BucketName=bucket,
3208
            )
3209

3210
        return GetBucketLifecycleConfigurationOutput(
1✔
3211
            Rules=s3_bucket.lifecycle_rules,
3212
            # TODO: remove for next major version, safe access to new attribute
3213
            TransitionDefaultMinimumObjectSize=getattr(
3214
                s3_bucket,
3215
                "transition_default_minimum_object_size",
3216
                TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3217
            ),
3218
        )
3219

3220
    def put_bucket_lifecycle_configuration(
1✔
3221
        self,
3222
        context: RequestContext,
3223
        bucket: BucketName,
3224
        checksum_algorithm: ChecksumAlgorithm = None,
3225
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3226
        expected_bucket_owner: AccountId = None,
3227
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3228
        **kwargs,
3229
    ) -> PutBucketLifecycleConfigurationOutput:
3230
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3231

3232
        transition_min_obj_size = (
1✔
3233
            transition_default_minimum_object_size
3234
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3235
        )
3236

3237
        if transition_min_obj_size not in (
1✔
3238
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3239
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3240
        ):
3241
            raise InvalidRequest(
1✔
3242
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3243
            )
3244

3245
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3246
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3247
        #  everytime we get/head an object
3248
        # for now, we keep a cache and get it everytime we fetch an object
3249
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3250
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3251
        self._expiration_cache[bucket].clear()
1✔
3252
        return PutBucketLifecycleConfigurationOutput(
1✔
3253
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3254
        )
3255

3256
    def delete_bucket_lifecycle(
1✔
3257
        self,
3258
        context: RequestContext,
3259
        bucket: BucketName,
3260
        expected_bucket_owner: AccountId = None,
3261
        **kwargs,
3262
    ) -> None:
3263
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3264

3265
        s3_bucket.lifecycle_rules = None
1✔
3266
        self._expiration_cache[bucket].clear()
1✔
3267

3268
    def put_bucket_analytics_configuration(
1✔
3269
        self,
3270
        context: RequestContext,
3271
        bucket: BucketName,
3272
        id: AnalyticsId,
3273
        analytics_configuration: AnalyticsConfiguration,
3274
        expected_bucket_owner: AccountId = None,
3275
        **kwargs,
3276
    ) -> None:
3277
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3278

3279
        validate_bucket_analytics_configuration(
1✔
3280
            id=id, analytics_configuration=analytics_configuration
3281
        )
3282

3283
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3284

3285
    def get_bucket_analytics_configuration(
1✔
3286
        self,
3287
        context: RequestContext,
3288
        bucket: BucketName,
3289
        id: AnalyticsId,
3290
        expected_bucket_owner: AccountId = None,
3291
        **kwargs,
3292
    ) -> GetBucketAnalyticsConfigurationOutput:
3293
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3294

3295
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3296
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3297

3298
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3299

3300
    def list_bucket_analytics_configurations(
1✔
3301
        self,
3302
        context: RequestContext,
3303
        bucket: BucketName,
3304
        continuation_token: Token = None,
3305
        expected_bucket_owner: AccountId = None,
3306
        **kwargs,
3307
    ) -> ListBucketAnalyticsConfigurationsOutput:
3308
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3309

3310
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3311
            IsTruncated=False,
3312
            AnalyticsConfigurationList=sorted(
3313
                s3_bucket.analytics_configurations.values(),
3314
                key=itemgetter("Id"),
3315
            ),
3316
        )
3317

3318
    def delete_bucket_analytics_configuration(
1✔
3319
        self,
3320
        context: RequestContext,
3321
        bucket: BucketName,
3322
        id: AnalyticsId,
3323
        expected_bucket_owner: AccountId = None,
3324
        **kwargs,
3325
    ) -> None:
3326
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3327

3328
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3329
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3330

3331
    def put_bucket_intelligent_tiering_configuration(
1✔
3332
        self,
3333
        context: RequestContext,
3334
        bucket: BucketName,
3335
        id: IntelligentTieringId,
3336
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3337
        **kwargs,
3338
    ) -> None:
3339
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3340

3341
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3342

3343
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3344

3345
    def get_bucket_intelligent_tiering_configuration(
1✔
3346
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3347
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3348
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3349

3350
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
3351
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3352

3353
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3354
            IntelligentTieringConfiguration=itier_config
3355
        )
3356

3357
    def delete_bucket_intelligent_tiering_configuration(
1✔
3358
        self, context: RequestContext, bucket: BucketName, id: IntelligentTieringId, **kwargs
3359
    ) -> None:
3360
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3361

3362
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3363
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3364

3365
    def list_bucket_intelligent_tiering_configurations(
1✔
3366
        self,
3367
        context: RequestContext,
3368
        bucket: BucketName,
3369
        continuation_token: Token = None,
3370
        **kwargs,
3371
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3372
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3373

3374
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3375
            IsTruncated=False,
3376
            IntelligentTieringConfigurationList=sorted(
3377
                s3_bucket.intelligent_tiering_configurations.values(),
3378
                key=itemgetter("Id"),
3379
            ),
3380
        )
3381

3382
    def put_bucket_inventory_configuration(
1✔
3383
        self,
3384
        context: RequestContext,
3385
        bucket: BucketName,
3386
        id: InventoryId,
3387
        inventory_configuration: InventoryConfiguration,
3388
        expected_bucket_owner: AccountId = None,
3389
        **kwargs,
3390
    ) -> None:
3391
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3392

3393
        validate_inventory_configuration(
1✔
3394
            config_id=id, inventory_configuration=inventory_configuration
3395
        )
3396
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3397

3398
    def get_bucket_inventory_configuration(
1✔
3399
        self,
3400
        context: RequestContext,
3401
        bucket: BucketName,
3402
        id: InventoryId,
3403
        expected_bucket_owner: AccountId = None,
3404
        **kwargs,
3405
    ) -> GetBucketInventoryConfigurationOutput:
3406
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3407

3408
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3409
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3410
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3411

3412
    def list_bucket_inventory_configurations(
1✔
3413
        self,
3414
        context: RequestContext,
3415
        bucket: BucketName,
3416
        continuation_token: Token = None,
3417
        expected_bucket_owner: AccountId = None,
3418
        **kwargs,
3419
    ) -> ListBucketInventoryConfigurationsOutput:
3420
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3421

3422
        return ListBucketInventoryConfigurationsOutput(
1✔
3423
            IsTruncated=False,
3424
            InventoryConfigurationList=sorted(
3425
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3426
            ),
3427
        )
3428

3429
    def delete_bucket_inventory_configuration(
1✔
3430
        self,
3431
        context: RequestContext,
3432
        bucket: BucketName,
3433
        id: InventoryId,
3434
        expected_bucket_owner: AccountId = None,
3435
        **kwargs,
3436
    ) -> None:
3437
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3438

3439
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
3440
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3441

3442
    def get_bucket_website(
1✔
3443
        self,
3444
        context: RequestContext,
3445
        bucket: BucketName,
3446
        expected_bucket_owner: AccountId = None,
3447
        **kwargs,
3448
    ) -> GetBucketWebsiteOutput:
3449
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3450

3451
        if not s3_bucket.website_configuration:
1✔
3452
            raise NoSuchWebsiteConfiguration(
1✔
3453
                "The specified bucket does not have a website configuration",
3454
                BucketName=bucket,
3455
            )
3456
        return s3_bucket.website_configuration
1✔
3457

3458
    def put_bucket_website(
1✔
3459
        self,
3460
        context: RequestContext,
3461
        bucket: BucketName,
3462
        website_configuration: WebsiteConfiguration,
3463
        content_md5: ContentMD5 = None,
3464
        checksum_algorithm: ChecksumAlgorithm = None,
3465
        expected_bucket_owner: AccountId = None,
3466
        **kwargs,
3467
    ) -> None:
3468
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3469

3470
        validate_website_configuration(website_configuration)
1✔
3471
        s3_bucket.website_configuration = website_configuration
1✔
3472

3473
    def delete_bucket_website(
1✔
3474
        self,
3475
        context: RequestContext,
3476
        bucket: BucketName,
3477
        expected_bucket_owner: AccountId = None,
3478
        **kwargs,
3479
    ) -> None:
3480
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3481
        # does not raise error if the bucket did not have a config, will simply return
3482
        s3_bucket.website_configuration = None
1✔
3483

3484
    def get_object_lock_configuration(
1✔
3485
        self,
3486
        context: RequestContext,
3487
        bucket: BucketName,
3488
        expected_bucket_owner: AccountId = None,
3489
        **kwargs,
3490
    ) -> GetObjectLockConfigurationOutput:
3491
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3492
        if not s3_bucket.object_lock_enabled:
1✔
3493
            raise ObjectLockConfigurationNotFoundError(
1✔
3494
                "Object Lock configuration does not exist for this bucket",
3495
                BucketName=bucket,
3496
            )
3497

3498
        response = GetObjectLockConfigurationOutput(
1✔
3499
            ObjectLockConfiguration=ObjectLockConfiguration(
3500
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3501
            )
3502
        )
3503
        if s3_bucket.object_lock_default_retention:
1✔
3504
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3505
                "DefaultRetention": s3_bucket.object_lock_default_retention
3506
            }
3507

3508
        return response
1✔
3509

3510
    def put_object_lock_configuration(
1✔
3511
        self,
3512
        context: RequestContext,
3513
        bucket: BucketName,
3514
        object_lock_configuration: ObjectLockConfiguration = None,
3515
        request_payer: RequestPayer = None,
3516
        token: ObjectLockToken = None,
3517
        content_md5: ContentMD5 = None,
3518
        checksum_algorithm: ChecksumAlgorithm = None,
3519
        expected_bucket_owner: AccountId = None,
3520
        **kwargs,
3521
    ) -> PutObjectLockConfigurationOutput:
3522
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3523
        if s3_bucket.versioning_status != "Enabled":
1✔
3524
            raise InvalidBucketState(
1✔
3525
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3526
            )
3527

3528
        if (
1✔
3529
            not object_lock_configuration
3530
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3531
        ):
3532
            raise MalformedXML()
1✔
3533

3534
        if "Rule" not in object_lock_configuration:
1✔
3535
            s3_bucket.object_lock_default_retention = None
1✔
3536
            if not s3_bucket.object_lock_enabled:
1✔
3537
                s3_bucket.object_lock_enabled = True
1✔
3538

3539
            return PutObjectLockConfigurationOutput()
1✔
3540
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3541
            default_retention := rule.get("DefaultRetention")
3542
        ):
3543
            raise MalformedXML()
1✔
3544

3545
        if "Mode" not in default_retention or (
1✔
3546
            ("Days" in default_retention and "Years" in default_retention)
3547
            or ("Days" not in default_retention and "Years" not in default_retention)
3548
        ):
3549
            raise MalformedXML()
1✔
3550

3551
        s3_bucket.object_lock_default_retention = default_retention
1✔
3552
        if not s3_bucket.object_lock_enabled:
1✔
3553
            s3_bucket.object_lock_enabled = True
×
3554

3555
        return PutObjectLockConfigurationOutput()
1✔
3556

3557
    def get_object_legal_hold(
1✔
3558
        self,
3559
        context: RequestContext,
3560
        bucket: BucketName,
3561
        key: ObjectKey,
3562
        version_id: ObjectVersionId = None,
3563
        request_payer: RequestPayer = None,
3564
        expected_bucket_owner: AccountId = None,
3565
        **kwargs,
3566
    ) -> GetObjectLegalHoldOutput:
3567
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3568
        if not s3_bucket.object_lock_enabled:
1✔
3569
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3570

3571
        s3_object = s3_bucket.get_object(
1✔
3572
            key=key,
3573
            version_id=version_id,
3574
            http_method="GET",
3575
        )
3576
        if not s3_object.lock_legal_status:
1✔
3577
            raise NoSuchObjectLockConfiguration(
1✔
3578
                "The specified object does not have a ObjectLock configuration"
3579
            )
3580

3581
        return GetObjectLegalHoldOutput(
1✔
3582
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3583
        )
3584

3585
    def put_object_legal_hold(
1✔
3586
        self,
3587
        context: RequestContext,
3588
        bucket: BucketName,
3589
        key: ObjectKey,
3590
        legal_hold: ObjectLockLegalHold = None,
3591
        request_payer: RequestPayer = None,
3592
        version_id: ObjectVersionId = None,
3593
        content_md5: ContentMD5 = None,
3594
        checksum_algorithm: ChecksumAlgorithm = None,
3595
        expected_bucket_owner: AccountId = None,
3596
        **kwargs,
3597
    ) -> PutObjectLegalHoldOutput:
3598
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3599

3600
        if not legal_hold:
1✔
3601
            raise MalformedXML()
1✔
3602

3603
        if not s3_bucket.object_lock_enabled:
1✔
3604
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3605

3606
        s3_object = s3_bucket.get_object(
1✔
3607
            key=key,
3608
            version_id=version_id,
3609
            http_method="PUT",
3610
        )
3611
        # TODO: check casing
3612
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
3613
            raise MalformedXML()
×
3614

3615
        s3_object.lock_legal_status = status
1✔
3616

3617
        # TODO: return RequestCharged
3618
        return PutObjectRetentionOutput()
1✔
3619

3620
    def get_object_retention(
1✔
3621
        self,
3622
        context: RequestContext,
3623
        bucket: BucketName,
3624
        key: ObjectKey,
3625
        version_id: ObjectVersionId = None,
3626
        request_payer: RequestPayer = None,
3627
        expected_bucket_owner: AccountId = None,
3628
        **kwargs,
3629
    ) -> GetObjectRetentionOutput:
3630
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3631
        if not s3_bucket.object_lock_enabled:
1✔
3632
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3633

3634
        s3_object = s3_bucket.get_object(
1✔
3635
            key=key,
3636
            version_id=version_id,
3637
            http_method="GET",
3638
        )
3639
        if not s3_object.lock_mode:
1✔
3640
            raise NoSuchObjectLockConfiguration(
1✔
3641
                "The specified object does not have a ObjectLock configuration"
3642
            )
3643

3644
        return GetObjectRetentionOutput(
1✔
3645
            Retention=ObjectLockRetention(
3646
                Mode=s3_object.lock_mode,
3647
                RetainUntilDate=s3_object.lock_until,
3648
            )
3649
        )
3650

3651
    def put_object_retention(
1✔
3652
        self,
3653
        context: RequestContext,
3654
        bucket: BucketName,
3655
        key: ObjectKey,
3656
        retention: ObjectLockRetention = None,
3657
        request_payer: RequestPayer = None,
3658
        version_id: ObjectVersionId = None,
3659
        bypass_governance_retention: BypassGovernanceRetention = None,
3660
        content_md5: ContentMD5 = None,
3661
        checksum_algorithm: ChecksumAlgorithm = None,
3662
        expected_bucket_owner: AccountId = None,
3663
        **kwargs,
3664
    ) -> PutObjectRetentionOutput:
3665
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3666
        if not s3_bucket.object_lock_enabled:
1✔
3667
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3668

3669
        s3_object = s3_bucket.get_object(
1✔
3670
            key=key,
3671
            version_id=version_id,
3672
            http_method="PUT",
3673
        )
3674

3675
        if retention and not validate_dict_fields(
1✔
3676
            retention, required_fields={"Mode", "RetainUntilDate"}
3677
        ):
3678
            raise MalformedXML()
1✔
3679

3680
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3681
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3682
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3683
            pst_datetime = retention["RetainUntilDate"].astimezone(tz=ZoneInfo("US/Pacific"))
1✔
3684
            raise InvalidArgument(
1✔
3685
                "The retain until date must be in the future!",
3686
                ArgumentName="RetainUntilDate",
3687
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3688
            )
3689

3690
        if (
1✔
3691
            not retention
3692
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3693
        ) and not (
3694
            bypass_governance_retention and s3_object.lock_mode == ObjectLockMode.GOVERNANCE
3695
        ):
3696
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3697

3698
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3699
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3700

3701
        # TODO: return RequestCharged
3702
        return PutObjectRetentionOutput()
1✔
3703

3704
    def put_bucket_request_payment(
1✔
3705
        self,
3706
        context: RequestContext,
3707
        bucket: BucketName,
3708
        request_payment_configuration: RequestPaymentConfiguration,
3709
        content_md5: ContentMD5 = None,
3710
        checksum_algorithm: ChecksumAlgorithm = None,
3711
        expected_bucket_owner: AccountId = None,
3712
        **kwargs,
3713
    ) -> None:
3714
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3715
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3716

3717
        payer = request_payment_configuration.get("Payer")
1✔
3718
        if payer not in ["Requester", "BucketOwner"]:
1✔
3719
            raise MalformedXML()
1✔
3720

3721
        s3_bucket.payer = payer
1✔
3722

3723
    def get_bucket_request_payment(
1✔
3724
        self,
3725
        context: RequestContext,
3726
        bucket: BucketName,
3727
        expected_bucket_owner: AccountId = None,
3728
        **kwargs,
3729
    ) -> GetBucketRequestPaymentOutput:
3730
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3731
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3732

3733
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
3734

3735
    def get_bucket_ownership_controls(
1✔
3736
        self,
3737
        context: RequestContext,
3738
        bucket: BucketName,
3739
        expected_bucket_owner: AccountId = None,
3740
        **kwargs,
3741
    ) -> GetBucketOwnershipControlsOutput:
3742
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3743

3744
        if not s3_bucket.object_ownership:
1✔
3745
            raise OwnershipControlsNotFoundError(
1✔
3746
                "The bucket ownership controls were not found",
3747
                BucketName=bucket,
3748
            )
3749

3750
        return GetBucketOwnershipControlsOutput(
1✔
3751
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
3752
        )
3753

3754
    def put_bucket_ownership_controls(
1✔
3755
        self,
3756
        context: RequestContext,
3757
        bucket: BucketName,
3758
        ownership_controls: OwnershipControls,
3759
        content_md5: ContentMD5 = None,
3760
        expected_bucket_owner: AccountId = None,
3761
        **kwargs,
3762
    ) -> None:
3763
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3764
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
3765
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3766

3767
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
3768
            raise MalformedXML()
1✔
3769

3770
        rule = rules[0]
1✔
3771
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
3772
            raise MalformedXML()
1✔
3773

3774
        s3_bucket.object_ownership = object_ownership
1✔
3775

3776
    def delete_bucket_ownership_controls(
1✔
3777
        self,
3778
        context: RequestContext,
3779
        bucket: BucketName,
3780
        expected_bucket_owner: AccountId = None,
3781
        **kwargs,
3782
    ) -> None:
3783
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3784

3785
        s3_bucket.object_ownership = None
1✔
3786

3787
    def get_public_access_block(
1✔
3788
        self,
3789
        context: RequestContext,
3790
        bucket: BucketName,
3791
        expected_bucket_owner: AccountId = None,
3792
        **kwargs,
3793
    ) -> GetPublicAccessBlockOutput:
3794
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3795

3796
        if not s3_bucket.public_access_block:
1✔
3797
            raise NoSuchPublicAccessBlockConfiguration(
1✔
3798
                "The public access block configuration was not found", BucketName=bucket
3799
            )
3800

3801
        return GetPublicAccessBlockOutput(
1✔
3802
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
3803
        )
3804

3805
    def put_public_access_block(
1✔
3806
        self,
3807
        context: RequestContext,
3808
        bucket: BucketName,
3809
        public_access_block_configuration: PublicAccessBlockConfiguration,
3810
        content_md5: ContentMD5 = None,
3811
        checksum_algorithm: ChecksumAlgorithm = None,
3812
        expected_bucket_owner: AccountId = None,
3813
        **kwargs,
3814
    ) -> None:
3815
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3816
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
3817
        #  bucket configuration. See s3control
3818
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3819

3820
        public_access_block_fields = {
1✔
3821
            "BlockPublicAcls",
3822
            "BlockPublicPolicy",
3823
            "IgnorePublicAcls",
3824
            "RestrictPublicBuckets",
3825
        }
3826
        if not validate_dict_fields(
1✔
3827
            public_access_block_configuration,
3828
            required_fields=set(),
3829
            optional_fields=public_access_block_fields,
3830
        ):
3831
            raise MalformedXML()
×
3832

3833
        for field in public_access_block_fields:
1✔
3834
            if public_access_block_configuration.get(field) is None:
1✔
3835
                public_access_block_configuration[field] = False
1✔
3836

3837
        s3_bucket.public_access_block = public_access_block_configuration
1✔
3838

3839
    def delete_public_access_block(
1✔
3840
        self,
3841
        context: RequestContext,
3842
        bucket: BucketName,
3843
        expected_bucket_owner: AccountId = None,
3844
        **kwargs,
3845
    ) -> None:
3846
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3847

3848
        s3_bucket.public_access_block = None
1✔
3849

3850
    def get_bucket_policy(
1✔
3851
        self,
3852
        context: RequestContext,
3853
        bucket: BucketName,
3854
        expected_bucket_owner: AccountId = None,
3855
        **kwargs,
3856
    ) -> GetBucketPolicyOutput:
3857
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3858
            context, bucket, expected_bucket_owner=expected_bucket_owner
3859
        )
3860
        if not s3_bucket.policy:
1✔
3861
            raise NoSuchBucketPolicy(
1✔
3862
                "The bucket policy does not exist",
3863
                BucketName=bucket,
3864
            )
3865
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
3866

3867
    def put_bucket_policy(
1✔
3868
        self,
3869
        context: RequestContext,
3870
        bucket: BucketName,
3871
        policy: Policy,
3872
        content_md5: ContentMD5 = None,
3873
        checksum_algorithm: ChecksumAlgorithm = None,
3874
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
3875
        expected_bucket_owner: AccountId = None,
3876
        **kwargs,
3877
    ) -> None:
3878
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3879
            context, bucket, expected_bucket_owner=expected_bucket_owner
3880
        )
3881

3882
        if not policy or policy[0] != "{":
1✔
3883
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
3884
        try:
1✔
3885
            json_policy = json.loads(policy)
1✔
3886
            if not json_policy:
1✔
3887
                # TODO: add more validation around the policy?
3888
                raise MalformedPolicy("Missing required field Statement")
1✔
3889
        except ValueError:
1✔
3890
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
3891

3892
        s3_bucket.policy = policy
1✔
3893

3894
    def delete_bucket_policy(
1✔
3895
        self,
3896
        context: RequestContext,
3897
        bucket: BucketName,
3898
        expected_bucket_owner: AccountId = None,
3899
        **kwargs,
3900
    ) -> None:
3901
        store, s3_bucket = self._get_cross_account_bucket(
1✔
3902
            context, bucket, expected_bucket_owner=expected_bucket_owner
3903
        )
3904

3905
        s3_bucket.policy = None
1✔
3906

3907
    def get_bucket_accelerate_configuration(
1✔
3908
        self,
3909
        context: RequestContext,
3910
        bucket: BucketName,
3911
        expected_bucket_owner: AccountId = None,
3912
        request_payer: RequestPayer = None,
3913
        **kwargs,
3914
    ) -> GetBucketAccelerateConfigurationOutput:
3915
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3916

3917
        response = GetBucketAccelerateConfigurationOutput()
1✔
3918
        if s3_bucket.accelerate_status:
1✔
3919
            response["Status"] = s3_bucket.accelerate_status
1✔
3920

3921
        return response
1✔
3922

3923
    def put_bucket_accelerate_configuration(
1✔
3924
        self,
3925
        context: RequestContext,
3926
        bucket: BucketName,
3927
        accelerate_configuration: AccelerateConfiguration,
3928
        expected_bucket_owner: AccountId = None,
3929
        checksum_algorithm: ChecksumAlgorithm = None,
3930
        **kwargs,
3931
    ) -> None:
3932
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3933

3934
        if "." in bucket:
1✔
3935
            raise InvalidRequest(
1✔
3936
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
3937
            )
3938

3939
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
3940
            "Enabled",
3941
            "Suspended",
3942
        ):
3943
            raise MalformedXML()
1✔
3944

3945
        s3_bucket.accelerate_status = status
1✔
3946

3947
    def put_bucket_logging(
1✔
3948
        self,
3949
        context: RequestContext,
3950
        bucket: BucketName,
3951
        bucket_logging_status: BucketLoggingStatus,
3952
        content_md5: ContentMD5 = None,
3953
        checksum_algorithm: ChecksumAlgorithm = None,
3954
        expected_bucket_owner: AccountId = None,
3955
        **kwargs,
3956
    ) -> None:
3957
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3958

3959
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
3960
            s3_bucket.logging = {}
1✔
3961
            return
1✔
3962

3963
        # the target bucket must be in the same account
3964
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
3965
            raise MalformedXML()
×
3966

3967
        if not logging_config.get("TargetPrefix"):
1✔
3968
            logging_config["TargetPrefix"] = ""
×
3969

3970
        # TODO: validate Grants
3971

3972
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
3973
            raise InvalidTargetBucketForLogging(
1✔
3974
                "The target bucket for logging does not exist",
3975
                TargetBucket=target_bucket_name,
3976
            )
3977

3978
        source_bucket_region = s3_bucket.bucket_region
1✔
3979
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
3980
            raise (
1✔
3981
                CrossLocationLoggingProhibitted(
3982
                    "Cross S3 location logging not allowed. ",
3983
                    TargetBucketLocation=target_s3_bucket.bucket_region,
3984
                )
3985
                if source_bucket_region == AWS_REGION_US_EAST_1
3986
                else CrossLocationLoggingProhibitted(
3987
                    "Cross S3 location logging not allowed. ",
3988
                    SourceBucketLocation=source_bucket_region,
3989
                    TargetBucketLocation=target_s3_bucket.bucket_region,
3990
                )
3991
            )
3992

3993
        s3_bucket.logging = logging_config
1✔
3994

3995
    def get_bucket_logging(
1✔
3996
        self,
3997
        context: RequestContext,
3998
        bucket: BucketName,
3999
        expected_bucket_owner: AccountId = None,
4000
        **kwargs,
4001
    ) -> GetBucketLoggingOutput:
4002
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4003

4004
        if not s3_bucket.logging:
1✔
4005
            return GetBucketLoggingOutput()
1✔
4006

4007
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4008

4009
    def put_bucket_replication(
1✔
4010
        self,
4011
        context: RequestContext,
4012
        bucket: BucketName,
4013
        replication_configuration: ReplicationConfiguration,
4014
        content_md5: ContentMD5 = None,
4015
        checksum_algorithm: ChecksumAlgorithm = None,
4016
        token: ObjectLockToken = None,
4017
        expected_bucket_owner: AccountId = None,
4018
        **kwargs,
4019
    ) -> None:
4020
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4021
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4022
            raise InvalidRequest(
1✔
4023
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4024
            )
4025

4026
        if not (rules := replication_configuration.get("Rules")):
1✔
4027
            raise MalformedXML()
1✔
4028

4029
        for rule in rules:
1✔
4030
            if "ID" not in rule:
1✔
4031
                rule["ID"] = short_uid()
1✔
4032

4033
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4034
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4035
            if (
1✔
4036
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4037
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4038
            ):
4039
                # according to AWS testing the same exception is raised if the bucket does not exist
4040
                # or if versioning was disabled
4041
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4042

4043
        # TODO more validation on input
4044
        s3_bucket.replication = replication_configuration
1✔
4045

4046
    def get_bucket_replication(
1✔
4047
        self,
4048
        context: RequestContext,
4049
        bucket: BucketName,
4050
        expected_bucket_owner: AccountId = None,
4051
        **kwargs,
4052
    ) -> GetBucketReplicationOutput:
4053
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4054

4055
        if not s3_bucket.replication:
1✔
4056
            raise ReplicationConfigurationNotFoundError(
1✔
4057
                "The replication configuration was not found",
4058
                BucketName=bucket,
4059
            )
4060

4061
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4062

4063
    def delete_bucket_replication(
1✔
4064
        self,
4065
        context: RequestContext,
4066
        bucket: BucketName,
4067
        expected_bucket_owner: AccountId = None,
4068
        **kwargs,
4069
    ) -> None:
4070
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4071

4072
        s3_bucket.replication = None
1✔
4073

4074
    @handler("PutBucketAcl", expand=False)
1✔
4075
    def put_bucket_acl(
1✔
4076
        self,
4077
        context: RequestContext,
4078
        request: PutBucketAclRequest,
4079
    ) -> None:
4080
        bucket = request["Bucket"]
1✔
4081
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4082
        acp = get_access_control_policy_from_acl_request(
1✔
4083
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4084
        )
4085
        s3_bucket.acl = acp
1✔
4086

4087
    def get_bucket_acl(
1✔
4088
        self,
4089
        context: RequestContext,
4090
        bucket: BucketName,
4091
        expected_bucket_owner: AccountId = None,
4092
        **kwargs,
4093
    ) -> GetBucketAclOutput:
4094
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4095

4096
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4097

4098
    @handler("PutObjectAcl", expand=False)
1✔
4099
    def put_object_acl(
1✔
4100
        self,
4101
        context: RequestContext,
4102
        request: PutObjectAclRequest,
4103
    ) -> PutObjectAclOutput:
4104
        bucket = request["Bucket"]
1✔
4105
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4106

4107
        s3_object = s3_bucket.get_object(
1✔
4108
            key=request["Key"],
4109
            version_id=request.get("VersionId"),
4110
            http_method="PUT",
4111
        )
4112
        acp = get_access_control_policy_from_acl_request(
1✔
4113
            request=request, owner=s3_object.owner, request_body=context.request.data
4114
        )
4115
        previous_acl = s3_object.acl
1✔
4116
        s3_object.acl = acp
1✔
4117

4118
        if previous_acl != acp:
1✔
4119
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4120

4121
        # TODO: RequestCharged
4122
        return PutObjectAclOutput()
1✔
4123

4124
    def get_object_acl(
1✔
4125
        self,
4126
        context: RequestContext,
4127
        bucket: BucketName,
4128
        key: ObjectKey,
4129
        version_id: ObjectVersionId = None,
4130
        request_payer: RequestPayer = None,
4131
        expected_bucket_owner: AccountId = None,
4132
        **kwargs,
4133
    ) -> GetObjectAclOutput:
4134
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4135

4136
        s3_object = s3_bucket.get_object(
1✔
4137
            key=key,
4138
            version_id=version_id,
4139
        )
4140
        # TODO: RequestCharged
4141
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4142

4143
    def get_bucket_policy_status(
1✔
4144
        self,
4145
        context: RequestContext,
4146
        bucket: BucketName,
4147
        expected_bucket_owner: AccountId = None,
4148
        **kwargs,
4149
    ) -> GetBucketPolicyStatusOutput:
4150
        raise NotImplementedError
4151

4152
    def get_object_torrent(
1✔
4153
        self,
4154
        context: RequestContext,
4155
        bucket: BucketName,
4156
        key: ObjectKey,
4157
        request_payer: RequestPayer = None,
4158
        expected_bucket_owner: AccountId = None,
4159
        **kwargs,
4160
    ) -> GetObjectTorrentOutput:
4161
        raise NotImplementedError
4162

4163
    def post_object(
1✔
4164
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4165
    ) -> PostResponse:
4166
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4167
            raise PreconditionFailed(
1✔
4168
                "At least one of the pre-conditions you specified did not hold",
4169
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4170
            )
4171
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4172
        # TODO: signature validation is not implemented for pre-signed POST
4173
        # policy validation is not implemented either, except expiration and mandatory fields
4174
        # This operation is the only one using form for storing the request data. We will have to do some manual
4175
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4176
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4177

4178
        form = context.request.form
1✔
4179
        object_key = context.request.form.get("key")
1✔
4180

4181
        if "file" in form:
1✔
4182
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4183
            file_data = to_bytes(form["file"])
1✔
4184
            object_content_length = len(file_data)
1✔
4185
            stream = BytesIO(file_data)
1✔
4186
        else:
4187
            # this is the default behaviour
4188
            fileobj = context.request.files["file"]
1✔
4189
            stream = fileobj.stream
1✔
4190
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4191
            # validation
4192
            original_pos = stream.tell()
1✔
4193
            object_content_length = stream.seek(0, 2)
1✔
4194
            # reset the stream and put it back at its original position
4195
            stream.seek(original_pos, 0)
1✔
4196

4197
            if "${filename}" in object_key:
1✔
4198
                # TODO: ${filename} is actually usable in all form fields
4199
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4200
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4201
                # is recognized by all form fields.
4202
                object_key = object_key.replace("${filename}", fileobj.filename)
×
4203

4204
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4205
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4206
        additional_policy_metadata = {
1✔
4207
            "bucket": bucket,
4208
            "content_length": object_content_length,
4209
        }
4210
        validate_post_policy(form, additional_policy_metadata)
1✔
4211

4212
        if canned_acl := form.get("acl"):
1✔
4213
            validate_canned_acl(canned_acl)
×
4214
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4215
        else:
4216
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4217

4218
        post_system_settable_headers = [
1✔
4219
            "Cache-Control",
4220
            "Content-Type",
4221
            "Content-Disposition",
4222
            "Content-Encoding",
4223
        ]
4224
        system_metadata = {}
1✔
4225
        for system_metadata_field in post_system_settable_headers:
1✔
4226
            if field_value := form.get(system_metadata_field):
1✔
4227
                system_metadata[system_metadata_field.replace("-", "")] = field_value
1✔
4228

4229
        if not system_metadata.get("ContentType"):
1✔
4230
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4231

4232
        user_metadata = {
1✔
4233
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4234
            for field in form
4235
            if field.startswith("x-amz-meta-")
4236
        }
4237

4238
        if tagging := form.get("tagging"):
1✔
4239
            # this is weird, as it's direct XML in the form, we need to parse it directly
4240
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4241

4242
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4243
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4244
        ):
4245
            raise InvalidStorageClass(
1✔
4246
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4247
            )
4248

4249
        encryption_request = {
1✔
4250
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4251
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4252
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4253
        }
4254

4255
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4256
            encryption_request,
4257
            s3_bucket,
4258
            store,
4259
        )
4260

4261
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4262
        checksum_value = (
1✔
4263
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4264
        )
4265
        expires = (
1✔
4266
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4267
        )
4268

4269
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4270

4271
        s3_object = S3Object(
1✔
4272
            key=object_key,
4273
            version_id=version_id,
4274
            storage_class=storage_class,
4275
            expires=expires,
4276
            user_metadata=user_metadata,
4277
            system_metadata=system_metadata,
4278
            checksum_algorithm=checksum_algorithm,
4279
            checksum_value=checksum_value,
4280
            encryption=encryption_parameters.encryption,
4281
            kms_key_id=encryption_parameters.kms_key_id,
4282
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4283
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4284
            acl=acp,
4285
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4286
        )
4287

4288
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4289
            s3_stored_object.write(stream)
1✔
4290

4291
            if checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
1✔
4292
                self._storage_backend.remove(bucket, s3_object)
×
4293
                raise InvalidRequest(
×
4294
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4295
                )
4296

4297
            s3_bucket.objects.set(object_key, s3_object)
1✔
4298

4299
        # in case we are overriding an object, delete the tags entry
4300
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4301
        store.TAGS.tags.pop(key_id, None)
1✔
4302
        if tagging:
1✔
4303
            store.TAGS.tags[key_id] = tagging
1✔
4304

4305
        response = PostResponse()
1✔
4306
        # hacky way to set the etag in the headers as well: two locations for one value
4307
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4308

4309
        if redirect := form.get("success_action_redirect"):
1✔
4310
            # we need to create the redirect, as the parser could not return the moto-calculated one
4311
            try:
1✔
4312
                redirect = create_redirect_for_post_request(
1✔
4313
                    base_redirect=redirect,
4314
                    bucket=bucket,
4315
                    object_key=object_key,
4316
                    etag=s3_object.quoted_etag,
4317
                )
4318
                response["LocationHeader"] = redirect
1✔
4319
                response["StatusCode"] = 303
1✔
4320
            except ValueError:
1✔
4321
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4322
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4323

4324
        elif status_code := form.get("success_action_status"):
1✔
4325
            response["StatusCode"] = status_code
1✔
4326
        else:
4327
            response["StatusCode"] = 204
1✔
4328

4329
        response["LocationHeader"] = response.get(
1✔
4330
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
4331
        )
4332

4333
        if s3_bucket.versioning_status == "Enabled":
1✔
4334
            response["VersionId"] = s3_object.version_id
×
4335

4336
        if s3_object.checksum_algorithm:
1✔
4337
            response[f"Checksum{checksum_algorithm.upper()}"] = s3_object.checksum_value
×
4338

4339
        if s3_bucket.lifecycle_rules:
1✔
4340
            if expiration_header := self._get_expiration_header(
×
4341
                s3_bucket.lifecycle_rules,
4342
                bucket,
4343
                s3_object,
4344
                store.TAGS.tags.get(key_id, {}),
4345
            ):
4346
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4347
                #  apply them everytime we get/head an object
4348
                response["Expiration"] = expiration_header
×
4349

4350
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4351

4352
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4353

4354
        if response["StatusCode"] == "201":
1✔
4355
            # if the StatusCode is 201, S3 returns an XML body with additional information
4356
            response["ETag"] = s3_object.quoted_etag
1✔
4357
            response["Bucket"] = bucket
1✔
4358
            response["Key"] = object_key
1✔
4359
            response["Location"] = response["LocationHeader"]
1✔
4360

4361
        return response
1✔
4362

4363

4364
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4365
    if not bucket_versioning_status:
1✔
4366
        return None
1✔
4367
    elif bucket_versioning_status.lower() == "enabled":
1✔
4368
        return generate_safe_version_id()
1✔
4369
    else:
4370
        return "null"
1✔
4371

4372

4373
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4374
    if encryption := s3_object.encryption:
1✔
4375
        response["ServerSideEncryption"] = encryption
1✔
4376
        if encryption == ServerSideEncryption.aws_kms:
1✔
4377
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4378
            if s3_object.bucket_key_enabled:
1✔
4379
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4380

4381

4382
def get_encryption_parameters_from_request_and_bucket(
1✔
4383
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4384
    s3_bucket: S3Bucket,
4385
    store: S3Store,
4386
) -> EncryptionParameters:
4387
    if request.get("SSECustomerKey"):
1✔
4388
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4389
        return EncryptionParameters(None, None, False)
1✔
4390

4391
    encryption = request.get("ServerSideEncryption")
1✔
4392
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4393
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4394
    if s3_bucket.encryption_rule:
1✔
4395
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4396
        encryption = (
1✔
4397
            encryption
4398
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4399
        )
4400
        if encryption == ServerSideEncryption.aws_kms:
1✔
4401
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4402
                "ApplyServerSideEncryptionByDefault"
4403
            ].get("KMSMasterKeyID")
4404
            kms_key_id = get_kms_key_arn(
1✔
4405
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4406
            )
4407
            if not kms_key_id:
1✔
4408
                # if not key is provided, AWS will use an AWS managed KMS key
4409
                # create it if it doesn't already exist, and save it in the store per region
4410
                if not store.aws_managed_kms_key_id:
1✔
4411
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4412
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4413
                    )
4414
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4415

4416
                kms_key_id = store.aws_managed_kms_key_id
1✔
4417

4418
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4419

4420

4421
def get_object_lock_parameters_from_bucket_and_request(
1✔
4422
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4423
    s3_bucket: S3Bucket,
4424
):
4425
    # TODO: also validate here?
4426
    lock_mode = request.get("ObjectLockMode")
1✔
4427
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4428
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4429

4430
    if default_retention := s3_bucket.object_lock_default_retention:
1✔
4431
        lock_mode = lock_mode or default_retention.get("Mode")
1✔
4432
        if lock_mode and not lock_until:
1✔
4433
            lock_until = get_retention_from_now(
1✔
4434
                days=default_retention.get("Days"),
4435
                years=default_retention.get("Years"),
4436
            )
4437

4438
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4439

4440

4441
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4442
    """
4443
    Calculate the range value from a part Number for an S3 Object
4444
    :param s3_object: S3Object
4445
    :param part_number: the wanted part from the S3Object
4446
    :return: an ObjectRange used to return only a slice of an Object
4447
    """
4448
    if not s3_object.parts:
1✔
4449
        if part_number > 1:
1✔
4450
            raise InvalidPartNumber(
1✔
4451
                "The requested partnumber is not satisfiable",
4452
                PartNumberRequested=part_number,
4453
                ActualPartCount=1,
4454
            )
4455
        return ObjectRange(
1✔
4456
            begin=0,
4457
            end=s3_object.size - 1,
4458
            content_length=s3_object.size,
4459
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4460
        )
4461
    elif not (part_data := s3_object.parts.get(part_number)):
1✔
4462
        raise InvalidPartNumber(
1✔
4463
            "The requested partnumber is not satisfiable",
4464
            PartNumberRequested=part_number,
4465
            ActualPartCount=len(s3_object.parts),
4466
        )
4467

4468
    begin, part_length = part_data
1✔
4469
    end = begin + part_length - 1
1✔
4470
    return ObjectRange(
1✔
4471
        begin=begin,
4472
        end=end,
4473
        content_length=part_length,
4474
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4475
    )
4476

4477

4478
def get_acl_headers_from_request(
1✔
4479
    request: Union[
4480
        PutObjectRequest,
4481
        CreateMultipartUploadRequest,
4482
        CopyObjectRequest,
4483
        CreateBucketRequest,
4484
        PutBucketAclRequest,
4485
        PutObjectAclRequest,
4486
    ],
4487
) -> list[tuple[str, str]]:
4488
    permission_keys = [
1✔
4489
        "GrantFullControl",
4490
        "GrantRead",
4491
        "GrantReadACP",
4492
        "GrantWrite",
4493
        "GrantWriteACP",
4494
    ]
4495
    acl_headers = [
1✔
4496
        (permission, grant_header)
4497
        for permission in permission_keys
4498
        if (grant_header := request.get(permission))
4499
    ]
4500
    return acl_headers
1✔
4501

4502

4503
def get_access_control_policy_from_acl_request(
1✔
4504
    request: Union[PutBucketAclRequest, PutObjectAclRequest],
4505
    owner: Owner,
4506
    request_body: bytes,
4507
) -> AccessControlPolicy:
4508
    canned_acl = request.get("ACL")
1✔
4509
    acl_headers = get_acl_headers_from_request(request)
1✔
4510

4511
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4512
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4513
    # use case seems dangerous
4514
    is_acp_in_body = request_body
1✔
4515

4516
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4517
        raise MissingSecurityHeader(
1✔
4518
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4519
        )
4520

4521
    elif canned_acl and acl_headers:
1✔
4522
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4523

4524
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4525
        raise UnexpectedContent("This request does not support content")
1✔
4526

4527
    if canned_acl:
1✔
4528
        validate_canned_acl(canned_acl)
1✔
4529
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4530

4531
    elif acl_headers:
1✔
4532
        grants = []
1✔
4533
        for permission, grantees_values in acl_headers:
1✔
4534
            permission = get_permission_from_header(permission)
1✔
4535
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4536
            grants.extend(partial_grants)
1✔
4537

4538
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4539
    else:
4540
        acp = request.get("AccessControlPolicy")
1✔
4541
        validate_acl_acp(acp)
1✔
4542
        if (
1✔
4543
            owner.get("DisplayName")
4544
            and acp["Grants"]
4545
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4546
        ):
4547
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4548

4549
    return acp
1✔
4550

4551

4552
def get_access_control_policy_for_new_resource_request(
1✔
4553
    request: Union[
4554
        PutObjectRequest, CreateMultipartUploadRequest, CopyObjectRequest, CreateBucketRequest
4555
    ],
4556
    owner: Owner,
4557
) -> AccessControlPolicy:
4558
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4559
    canned_acl = request.get("ACL")
1✔
4560
    acl_headers = get_acl_headers_from_request(request)
1✔
4561

4562
    if not (canned_acl or acl_headers):
1✔
4563
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4564

4565
    elif canned_acl and acl_headers:
1✔
4566
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4567

4568
    if canned_acl:
1✔
4569
        validate_canned_acl(canned_acl)
1✔
4570
        return get_canned_acl(canned_acl, owner=owner)
1✔
4571

4572
    grants = []
×
4573
    for permission, grantees_values in acl_headers:
×
4574
        permission = get_permission_from_header(permission)
×
4575
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
4576
        grants.extend(partial_grants)
×
4577

4578
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
4579

4580

4581
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
4582
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
4583

4584

4585
def verify_object_equality_precondition_write(
1✔
4586
    s3_bucket: S3Bucket,
4587
    key: ObjectKey,
4588
    etag: str,
4589
    initiated: datetime.datetime | None = None,
4590
) -> None:
4591
    existing = s3_bucket.objects.get(key)
1✔
4592
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
4593
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
4594

4595
    if not existing.etag == etag.strip('"'):
1✔
4596
        raise PreconditionFailed(
1✔
4597
            "At least one of the pre-conditions you specified did not hold",
4598
            Condition="If-Match",
4599
        )
4600

4601
    if initiated and initiated < existing.last_modified:
1✔
4602
        raise ConditionalRequestConflict(
1✔
4603
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
4604
            Condition="If-Match",
4605
            Key=key,
4606
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc