• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19558051963

20 Nov 2025 05:48PM UTC coverage: 86.859% (-0.05%) from 86.907%
19558051963

push

github

web-flow
Sns:v2 publish (#13399)

199 of 279 new or added lines in 5 files covered. (71.33%)

168 existing lines in 9 files now uncovered.

68851 of 79268 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.75
/localstack-core/localstack/services/s3/provider.py
1
import base64
1✔
2
import contextlib
1✔
3
import copy
1✔
4
import datetime
1✔
5
import json
1✔
6
import logging
1✔
7
import re
1✔
8
from collections import defaultdict
1✔
9
from inspect import signature
1✔
10
from io import BytesIO
1✔
11
from operator import itemgetter
1✔
12
from threading import RLock
1✔
13
from typing import IO
1✔
14
from urllib import parse as urlparse
1✔
15
from zoneinfo import ZoneInfo
1✔
16

17
from localstack import config
1✔
18
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
19
from localstack.aws.api.s3 import (
1✔
20
    MFA,
21
    AbortMultipartUploadOutput,
22
    AccelerateConfiguration,
23
    AccessControlPolicy,
24
    AccessDenied,
25
    AccountId,
26
    AnalyticsConfiguration,
27
    AnalyticsId,
28
    AuthorizationHeaderMalformed,
29
    BadDigest,
30
    Body,
31
    Bucket,
32
    BucketAlreadyExists,
33
    BucketAlreadyOwnedByYou,
34
    BucketCannedACL,
35
    BucketLifecycleConfiguration,
36
    BucketLoggingStatus,
37
    BucketName,
38
    BucketNotEmpty,
39
    BucketRegion,
40
    BucketVersioningStatus,
41
    BypassGovernanceRetention,
42
    ChecksumAlgorithm,
43
    ChecksumCRC32,
44
    ChecksumCRC32C,
45
    ChecksumCRC64NVME,
46
    ChecksumSHA1,
47
    ChecksumSHA256,
48
    ChecksumType,
49
    CommonPrefix,
50
    CompletedMultipartUpload,
51
    CompleteMultipartUploadOutput,
52
    ConditionalRequestConflict,
53
    ConfirmRemoveSelfBucketAccess,
54
    ContentMD5,
55
    CopyObjectOutput,
56
    CopyObjectRequest,
57
    CopyObjectResult,
58
    CopyPartResult,
59
    CORSConfiguration,
60
    CreateBucketOutput,
61
    CreateBucketRequest,
62
    CreateMultipartUploadOutput,
63
    CreateMultipartUploadRequest,
64
    CrossLocationLoggingProhibitted,
65
    Delete,
66
    DeletedObject,
67
    DeleteMarkerEntry,
68
    DeleteObjectOutput,
69
    DeleteObjectsOutput,
70
    DeleteObjectTaggingOutput,
71
    Delimiter,
72
    EncodingType,
73
    Error,
74
    Expiration,
75
    FetchOwner,
76
    GetBucketAccelerateConfigurationOutput,
77
    GetBucketAclOutput,
78
    GetBucketAnalyticsConfigurationOutput,
79
    GetBucketCorsOutput,
80
    GetBucketEncryptionOutput,
81
    GetBucketIntelligentTieringConfigurationOutput,
82
    GetBucketInventoryConfigurationOutput,
83
    GetBucketLifecycleConfigurationOutput,
84
    GetBucketLocationOutput,
85
    GetBucketLoggingOutput,
86
    GetBucketMetricsConfigurationOutput,
87
    GetBucketOwnershipControlsOutput,
88
    GetBucketPolicyOutput,
89
    GetBucketPolicyStatusOutput,
90
    GetBucketReplicationOutput,
91
    GetBucketRequestPaymentOutput,
92
    GetBucketTaggingOutput,
93
    GetBucketVersioningOutput,
94
    GetBucketWebsiteOutput,
95
    GetObjectAclOutput,
96
    GetObjectAttributesOutput,
97
    GetObjectAttributesParts,
98
    GetObjectAttributesRequest,
99
    GetObjectLegalHoldOutput,
100
    GetObjectLockConfigurationOutput,
101
    GetObjectOutput,
102
    GetObjectRequest,
103
    GetObjectRetentionOutput,
104
    GetObjectTaggingOutput,
105
    GetObjectTorrentOutput,
106
    GetPublicAccessBlockOutput,
107
    HeadBucketOutput,
108
    HeadObjectOutput,
109
    HeadObjectRequest,
110
    IfMatch,
111
    IfMatchInitiatedTime,
112
    IfMatchLastModifiedTime,
113
    IfMatchSize,
114
    IfNoneMatch,
115
    IntelligentTieringConfiguration,
116
    IntelligentTieringId,
117
    InvalidArgument,
118
    InvalidBucketName,
119
    InvalidDigest,
120
    InvalidLocationConstraint,
121
    InvalidObjectState,
122
    InvalidPartNumber,
123
    InvalidPartOrder,
124
    InvalidStorageClass,
125
    InvalidTargetBucketForLogging,
126
    InventoryConfiguration,
127
    InventoryId,
128
    KeyMarker,
129
    LifecycleRules,
130
    ListBucketAnalyticsConfigurationsOutput,
131
    ListBucketIntelligentTieringConfigurationsOutput,
132
    ListBucketInventoryConfigurationsOutput,
133
    ListBucketMetricsConfigurationsOutput,
134
    ListBucketsOutput,
135
    ListMultipartUploadsOutput,
136
    ListObjectsOutput,
137
    ListObjectsV2Output,
138
    ListObjectVersionsOutput,
139
    ListPartsOutput,
140
    Marker,
141
    MaxBuckets,
142
    MaxKeys,
143
    MaxParts,
144
    MaxUploads,
145
    MethodNotAllowed,
146
    MetricsConfiguration,
147
    MetricsId,
148
    MissingSecurityHeader,
149
    MpuObjectSize,
150
    MultipartUpload,
151
    MultipartUploadId,
152
    NoSuchBucket,
153
    NoSuchBucketPolicy,
154
    NoSuchCORSConfiguration,
155
    NoSuchKey,
156
    NoSuchLifecycleConfiguration,
157
    NoSuchPublicAccessBlockConfiguration,
158
    NoSuchTagSet,
159
    NoSuchUpload,
160
    NoSuchWebsiteConfiguration,
161
    NotificationConfiguration,
162
    Object,
163
    ObjectIdentifier,
164
    ObjectKey,
165
    ObjectLockConfiguration,
166
    ObjectLockConfigurationNotFoundError,
167
    ObjectLockEnabled,
168
    ObjectLockLegalHold,
169
    ObjectLockMode,
170
    ObjectLockRetention,
171
    ObjectLockToken,
172
    ObjectOwnership,
173
    ObjectPart,
174
    ObjectVersion,
175
    ObjectVersionId,
176
    ObjectVersionStorageClass,
177
    OptionalObjectAttributesList,
178
    Owner,
179
    OwnershipControls,
180
    OwnershipControlsNotFoundError,
181
    Part,
182
    PartNumber,
183
    PartNumberMarker,
184
    Policy,
185
    PostResponse,
186
    PreconditionFailed,
187
    Prefix,
188
    PublicAccessBlockConfiguration,
189
    PutBucketAclRequest,
190
    PutBucketLifecycleConfigurationOutput,
191
    PutObjectAclOutput,
192
    PutObjectAclRequest,
193
    PutObjectLegalHoldOutput,
194
    PutObjectLockConfigurationOutput,
195
    PutObjectOutput,
196
    PutObjectRequest,
197
    PutObjectRetentionOutput,
198
    PutObjectTaggingOutput,
199
    ReplicationConfiguration,
200
    ReplicationConfigurationNotFoundError,
201
    RequestPayer,
202
    RequestPaymentConfiguration,
203
    RestoreObjectOutput,
204
    RestoreRequest,
205
    S3Api,
206
    ServerSideEncryption,
207
    ServerSideEncryptionConfiguration,
208
    SkipValidation,
209
    SSECustomerAlgorithm,
210
    SSECustomerKey,
211
    SSECustomerKeyMD5,
212
    StartAfter,
213
    StorageClass,
214
    Tagging,
215
    Token,
216
    TransitionDefaultMinimumObjectSize,
217
    UploadIdMarker,
218
    UploadPartCopyOutput,
219
    UploadPartCopyRequest,
220
    UploadPartOutput,
221
    UploadPartRequest,
222
    VersionIdMarker,
223
    VersioningConfiguration,
224
    WebsiteConfiguration,
225
)
226
from localstack.aws.api.s3 import NotImplemented as NotImplementedException
1✔
227
from localstack.aws.handlers import (
1✔
228
    modify_service_response,
229
    preprocess_request,
230
    serve_custom_service_request_handlers,
231
)
232
from localstack.constants import AWS_REGION_US_EAST_1
1✔
233
from localstack.services.edge import ROUTER
1✔
234
from localstack.services.plugins import ServiceLifecycleHook
1✔
235
from localstack.services.s3.codec import AwsChunkedDecoder
1✔
236
from localstack.services.s3.constants import (
1✔
237
    ALLOWED_HEADER_OVERRIDES,
238
    ARCHIVES_STORAGE_CLASSES,
239
    CHECKSUM_ALGORITHMS,
240
    DEFAULT_BUCKET_ENCRYPTION,
241
    S3_HOST_ID,
242
)
243
from localstack.services.s3.cors import S3CorsHandler, s3_cors_request_handler
1✔
244
from localstack.services.s3.exceptions import (
1✔
245
    InvalidBucketOwnerAWSAccountID,
246
    InvalidBucketState,
247
    InvalidRequest,
248
    MalformedPolicy,
249
    MalformedXML,
250
    NoSuchConfiguration,
251
    NoSuchObjectLockConfiguration,
252
    TooManyConfigurations,
253
    UnexpectedContent,
254
)
255
from localstack.services.s3.models import (
1✔
256
    BucketCorsIndex,
257
    EncryptionParameters,
258
    ObjectLockParameters,
259
    S3Bucket,
260
    S3DeleteMarker,
261
    S3Multipart,
262
    S3Object,
263
    S3Part,
264
    S3Store,
265
    VersionedKeyStore,
266
    s3_stores,
267
)
268
from localstack.services.s3.notifications import NotificationDispatcher, S3EventNotificationContext
1✔
269
from localstack.services.s3.presigned_url import validate_post_policy
1✔
270
from localstack.services.s3.storage.core import LimitedIterableStream, S3ObjectStore
1✔
271
from localstack.services.s3.storage.ephemeral import EphemeralS3ObjectStore
1✔
272
from localstack.services.s3.utils import (
1✔
273
    ObjectRange,
274
    add_expiration_days_to_datetime,
275
    base_64_content_md5_to_etag,
276
    create_redirect_for_post_request,
277
    create_s3_kms_managed_key_for_region,
278
    etag_to_base_64_content_md5,
279
    extract_bucket_key_version_id_from_copy_source,
280
    generate_safe_version_id,
281
    get_canned_acl,
282
    get_class_attrs_from_spec_class,
283
    get_failed_precondition_copy_source,
284
    get_failed_upload_part_copy_source_preconditions,
285
    get_full_default_bucket_location,
286
    get_kms_key_arn,
287
    get_lifecycle_rule_from_object,
288
    get_owner_for_account_id,
289
    get_permission_from_header,
290
    get_retention_from_now,
291
    get_s3_checksum_algorithm_from_request,
292
    get_s3_checksum_algorithm_from_trailing_headers,
293
    get_system_metadata_from_request,
294
    get_unique_key_id,
295
    is_bucket_name_valid,
296
    is_version_older_than_other,
297
    parse_copy_source_range_header,
298
    parse_post_object_tagging_xml,
299
    parse_range_header,
300
    parse_tagging_header,
301
    s3_response_handler,
302
    serialize_expiration_header,
303
    str_to_rfc_1123_datetime,
304
    validate_dict_fields,
305
    validate_failed_precondition,
306
    validate_kms_key_id,
307
    validate_tag_set,
308
)
309
from localstack.services.s3.validation import (
1✔
310
    parse_grants_in_headers,
311
    validate_acl_acp,
312
    validate_bucket_analytics_configuration,
313
    validate_bucket_intelligent_tiering_configuration,
314
    validate_canned_acl,
315
    validate_checksum_value,
316
    validate_cors_configuration,
317
    validate_inventory_configuration,
318
    validate_lifecycle_configuration,
319
    validate_object_key,
320
    validate_sse_c,
321
    validate_website_configuration,
322
)
323
from localstack.services.s3.website_hosting import register_website_hosting_routes
1✔
324
from localstack.state import AssetDirectory, StateVisitor
1✔
325
from localstack.utils.aws.arns import s3_bucket_name
1✔
326
from localstack.utils.aws.aws_stack import get_valid_regions_for_service
1✔
327
from localstack.utils.collections import select_from_typed_dict
1✔
328
from localstack.utils.strings import short_uid, to_bytes, to_str
1✔
329

330
LOG = logging.getLogger(__name__)
1✔
331

332
STORAGE_CLASSES = get_class_attrs_from_spec_class(StorageClass)
1✔
333
SSE_ALGORITHMS = get_class_attrs_from_spec_class(ServerSideEncryption)
1✔
334
OBJECT_OWNERSHIPS = get_class_attrs_from_spec_class(ObjectOwnership)
1✔
335
OBJECT_LOCK_MODES = get_class_attrs_from_spec_class(ObjectLockMode)
1✔
336

337
DEFAULT_S3_TMP_DIR = "/tmp/localstack-s3-storage"
1✔
338

339

340
class S3Provider(S3Api, ServiceLifecycleHook):
1✔
341
    def __init__(self, storage_backend: S3ObjectStore = None) -> None:
1✔
342
        super().__init__()
1✔
343
        self._storage_backend = storage_backend or EphemeralS3ObjectStore(DEFAULT_S3_TMP_DIR)
1✔
344
        self._notification_dispatcher = NotificationDispatcher()
1✔
345
        self._cors_handler = S3CorsHandler(BucketCorsIndex())
1✔
346
        # TODO: add lock for keys for PutObject, only way to support precondition writes for versioned buckets
347
        self._preconditions_locks = defaultdict(lambda: defaultdict(RLock))
1✔
348

349
        # runtime cache of Lifecycle Expiration headers, as they need to be calculated everytime we fetch an object
350
        # in case the rules have changed
351
        self._expiration_cache: dict[BucketName, dict[ObjectKey, Expiration]] = defaultdict(dict)
1✔
352

353
    def on_after_init(self):
1✔
354
        preprocess_request.append(self._cors_handler)
1✔
355
        serve_custom_service_request_handlers.append(s3_cors_request_handler)
1✔
356
        modify_service_response.append(self.service, s3_response_handler)
1✔
357
        register_website_hosting_routes(router=ROUTER)
1✔
358

359
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
360
        visitor.visit(s3_stores)
×
UNCOV
361
        visitor.visit(AssetDirectory(self.service, self._storage_backend.root_directory))
×
362

363
    def on_before_state_save(self):
1✔
UNCOV
364
        self._storage_backend.flush()
×
365

366
    def on_after_state_reset(self):
1✔
UNCOV
367
        self._cors_handler.invalidate_cache()
×
368

369
    def on_after_state_load(self):
1✔
UNCOV
370
        self._cors_handler.invalidate_cache()
×
371

372
    def on_before_stop(self):
1✔
373
        self._notification_dispatcher.shutdown()
1✔
374
        self._storage_backend.close()
1✔
375

376
    def _notify(
1✔
377
        self,
378
        context: RequestContext,
379
        s3_bucket: S3Bucket,
380
        s3_object: S3Object | S3DeleteMarker = None,
381
        s3_notif_ctx: S3EventNotificationContext = None,
382
    ):
383
        """
384
        :param context: the RequestContext, to retrieve more information about the incoming notification
385
        :param s3_bucket: the S3Bucket object
386
        :param s3_object: the S3Object object if S3EventNotificationContext is not given
387
        :param s3_notif_ctx: S3EventNotificationContext, in case we need specific data only available in the API call
388
        :return:
389
        """
390
        if s3_bucket.notification_configuration:
1✔
391
            if not s3_notif_ctx:
1✔
392
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
393
                    context,
394
                    s3_bucket=s3_bucket,
395
                    s3_object=s3_object,
396
                )
397

398
            self._notification_dispatcher.send_notifications(
1✔
399
                s3_notif_ctx, s3_bucket.notification_configuration
400
            )
401

402
    def _verify_notification_configuration(
1✔
403
        self,
404
        notification_configuration: NotificationConfiguration,
405
        skip_destination_validation: SkipValidation,
406
        context: RequestContext,
407
        bucket_name: str,
408
    ):
409
        self._notification_dispatcher.verify_configuration(
1✔
410
            notification_configuration, skip_destination_validation, context, bucket_name
411
        )
412

413
    def _get_expiration_header(
1✔
414
        self,
415
        lifecycle_rules: LifecycleRules,
416
        bucket: BucketName,
417
        s3_object: S3Object,
418
        object_tags: dict[str, str],
419
    ) -> Expiration:
420
        """
421
        This method will check if the key matches a Lifecycle filter, and return the serializer header if that's
422
        the case. We're caching it because it can change depending on the set rules on the bucket.
423
        We can't use `lru_cache` as the parameters needs to be hashable
424
        :param lifecycle_rules: the bucket LifecycleRules
425
        :param s3_object: S3Object
426
        :param object_tags: the object tags
427
        :return: the Expiration header if there's a rule matching
428
        """
429
        if cached_exp := self._expiration_cache.get(bucket, {}).get(s3_object.key):
1✔
430
            return cached_exp
1✔
431

432
        if lifecycle_rule := get_lifecycle_rule_from_object(
1✔
433
            lifecycle_rules, s3_object.key, s3_object.size, object_tags
434
        ):
435
            expiration_header = serialize_expiration_header(
1✔
436
                lifecycle_rule["ID"],
437
                lifecycle_rule["Expiration"],
438
                s3_object.last_modified,
439
            )
440
            self._expiration_cache[bucket][s3_object.key] = expiration_header
1✔
441
            return expiration_header
1✔
442

443
    def _get_cross_account_bucket(
1✔
444
        self,
445
        context: RequestContext,
446
        bucket_name: BucketName,
447
        *,
448
        expected_bucket_owner: AccountId = None,
449
    ) -> tuple[S3Store, S3Bucket]:
450
        if expected_bucket_owner and not re.fullmatch(r"\w{12}", expected_bucket_owner):
1✔
451
            raise InvalidBucketOwnerAWSAccountID(
1✔
452
                f"The value of the expected bucket owner parameter must be an AWS Account ID... [{expected_bucket_owner}]",
453
            )
454

455
        store = self.get_store(context.account_id, context.region)
1✔
456
        if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
457
            if not (account_id := store.global_bucket_map.get(bucket_name)):
1✔
458
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
1✔
459

460
            store = self.get_store(account_id, context.region)
1✔
461
            if not (s3_bucket := store.buckets.get(bucket_name)):
1✔
UNCOV
462
                raise NoSuchBucket("The specified bucket does not exist", BucketName=bucket_name)
×
463

464
        if expected_bucket_owner and s3_bucket.bucket_account_id != expected_bucket_owner:
1✔
465
            raise AccessDenied("Access Denied")
1✔
466

467
        return store, s3_bucket
1✔
468

469
    @staticmethod
1✔
470
    def get_store(account_id: str, region_name: str) -> S3Store:
1✔
471
        # Use default account id for external access? would need an anonymous one
472
        return s3_stores[account_id][region_name]
1✔
473

474
    @handler("CreateBucket", expand=False)
1✔
475
    def create_bucket(
1✔
476
        self,
477
        context: RequestContext,
478
        request: CreateBucketRequest,
479
    ) -> CreateBucketOutput:
480
        if context.region == "aws-global":
1✔
481
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
482
            #  right now so this will help users to properly set a region in their config
483
            # See the `TestS3.test_create_bucket_aws_global` test
484
            raise AuthorizationHeaderMalformed(
1✔
485
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
486
                HostId=S3_HOST_ID,
487
                Region=AWS_REGION_US_EAST_1,
488
            )
489

490
        bucket_name = request["Bucket"]
1✔
491

492
        if not is_bucket_name_valid(bucket_name):
1✔
493
            raise InvalidBucketName("The specified bucket is not valid.", BucketName=bucket_name)
1✔
494

495
        # the XML parser returns an empty dict if the body contains the following:
496
        # <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/" />
497
        # but it also returns an empty dict if the body is fully empty. We need to differentiate the 2 cases by checking
498
        # if the body is empty or not
499
        if context.request.data and (
1✔
500
            (create_bucket_configuration := request.get("CreateBucketConfiguration")) is not None
501
        ):
502
            if not (bucket_region := create_bucket_configuration.get("LocationConstraint")):
1✔
503
                raise MalformedXML()
1✔
504

505
            if context.region == AWS_REGION_US_EAST_1:
1✔
506
                if bucket_region in ("us-east-1", "aws-global"):
1✔
507
                    raise InvalidLocationConstraint(
1✔
508
                        "The specified location-constraint is not valid",
509
                        LocationConstraint=bucket_region,
510
                    )
511
            elif context.region != bucket_region:
1✔
512
                raise CommonServiceException(
1✔
513
                    code="IllegalLocationConstraintException",
514
                    message=f"The {bucket_region} location constraint is incompatible for the region specific endpoint this request was sent to.",
515
                )
516
        else:
517
            bucket_region = AWS_REGION_US_EAST_1
1✔
518
            if context.region != bucket_region:
1✔
519
                raise CommonServiceException(
1✔
520
                    code="IllegalLocationConstraintException",
521
                    message="The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.",
522
                )
523

524
        store = self.get_store(context.account_id, bucket_region)
1✔
525

526
        if bucket_name in store.global_bucket_map:
1✔
527
            existing_bucket_owner = store.global_bucket_map[bucket_name]
1✔
528
            if existing_bucket_owner != context.account_id:
1✔
529
                raise BucketAlreadyExists()
1✔
530

531
            # if the existing bucket has the same owner, the behaviour will depend on the region
532
            if bucket_region != "us-east-1":
1✔
533
                raise BucketAlreadyOwnedByYou(
1✔
534
                    "Your previous request to create the named bucket succeeded and you already own it.",
535
                    BucketName=bucket_name,
536
                )
537
            else:
538
                # CreateBucket is idempotent in us-east-1
539
                return CreateBucketOutput(Location=f"/{bucket_name}")
1✔
540

541
        if (
1✔
542
            object_ownership := request.get("ObjectOwnership")
543
        ) is not None and object_ownership not in OBJECT_OWNERSHIPS:
544
            raise InvalidArgument(
1✔
545
                f"Invalid x-amz-object-ownership header: {object_ownership}",
546
                ArgumentName="x-amz-object-ownership",
547
            )
548
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_Owner.html
549
        owner = get_owner_for_account_id(context.account_id)
1✔
550
        acl = get_access_control_policy_for_new_resource_request(request, owner=owner)
1✔
551
        s3_bucket = S3Bucket(
1✔
552
            name=bucket_name,
553
            account_id=context.account_id,
554
            bucket_region=bucket_region,
555
            owner=owner,
556
            acl=acl,
557
            object_ownership=request.get("ObjectOwnership"),
558
            object_lock_enabled_for_bucket=request.get("ObjectLockEnabledForBucket"),
559
        )
560

561
        store.buckets[bucket_name] = s3_bucket
1✔
562
        store.global_bucket_map[bucket_name] = s3_bucket.bucket_account_id
1✔
563
        self._cors_handler.invalidate_cache()
1✔
564
        self._storage_backend.create_bucket(bucket_name)
1✔
565

566
        # Location is always contained in response -> full url for LocationConstraint outside us-east-1
567
        location = (
1✔
568
            f"/{bucket_name}"
569
            if bucket_region == "us-east-1"
570
            else get_full_default_bucket_location(bucket_name)
571
        )
572
        response = CreateBucketOutput(Location=location)
1✔
573
        return response
1✔
574

575
    def delete_bucket(
1✔
576
        self,
577
        context: RequestContext,
578
        bucket: BucketName,
579
        expected_bucket_owner: AccountId = None,
580
        **kwargs,
581
    ) -> None:
582
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
583

584
        # the bucket still contains objects
585
        if not s3_bucket.objects.is_empty():
1✔
586
            message = "The bucket you tried to delete is not empty"
1✔
587
            if s3_bucket.versioning_status:
1✔
588
                message += ". You must delete all versions in the bucket."
1✔
589
            raise BucketNotEmpty(
1✔
590
                message,
591
                BucketName=bucket,
592
            )
593

594
        store.buckets.pop(bucket)
1✔
595
        store.global_bucket_map.pop(bucket)
1✔
596
        self._cors_handler.invalidate_cache()
1✔
597
        self._expiration_cache.pop(bucket, None)
1✔
598
        self._preconditions_locks.pop(bucket, None)
1✔
599
        # clean up the storage backend
600
        self._storage_backend.delete_bucket(bucket)
1✔
601

602
    def list_buckets(
1✔
603
        self,
604
        context: RequestContext,
605
        max_buckets: MaxBuckets = None,
606
        continuation_token: Token = None,
607
        prefix: Prefix = None,
608
        bucket_region: BucketRegion = None,
609
        **kwargs,
610
    ) -> ListBucketsOutput:
611
        if bucket_region and not config.ALLOW_NONSTANDARD_REGIONS:
1✔
612
            if bucket_region not in get_valid_regions_for_service(self.service):
1✔
613
                raise InvalidArgument(
1✔
614
                    f"Argument value {bucket_region} is not a valid AWS Region",
615
                    ArgumentName="bucket-region",
616
                )
617

618
        owner = get_owner_for_account_id(context.account_id)
1✔
619
        store = self.get_store(context.account_id, context.region)
1✔
620

621
        decoded_continuation_token = (
1✔
622
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
623
            if continuation_token
624
            else None
625
        )
626

627
        count = 0
1✔
628
        buckets: list[Bucket] = []
1✔
629
        next_continuation_token = None
1✔
630

631
        # Comparing strings with case sensitivity since AWS is case-sensitive
632
        for bucket in sorted(store.buckets.values(), key=lambda r: r.name):
1✔
633
            if continuation_token and bucket.name < decoded_continuation_token:
1✔
634
                continue
1✔
635

636
            if prefix and not bucket.name.startswith(prefix):
1✔
637
                continue
1✔
638

639
            if bucket_region and not bucket.bucket_region == bucket_region:
1✔
640
                continue
1✔
641

642
            if max_buckets and count >= max_buckets:
1✔
643
                next_continuation_token = to_str(base64.urlsafe_b64encode(bucket.name.encode()))
1✔
644
                break
1✔
645

646
            output_bucket = Bucket(
1✔
647
                Name=bucket.name,
648
                CreationDate=bucket.creation_date,
649
                BucketRegion=bucket.bucket_region,
650
            )
651
            buckets.append(output_bucket)
1✔
652
            count += 1
1✔
653

654
        return ListBucketsOutput(
1✔
655
            Owner=owner, Buckets=buckets, Prefix=prefix, ContinuationToken=next_continuation_token
656
        )
657

658
    def head_bucket(
1✔
659
        self,
660
        context: RequestContext,
661
        bucket: BucketName,
662
        expected_bucket_owner: AccountId = None,
663
        **kwargs,
664
    ) -> HeadBucketOutput:
665
        if context.region == "aws-global":
1✔
666
            # TODO: extend this logic to probably all the provider, and maybe all services. S3 is the most impacted
667
            #  right now so this will help users to properly set a region in their config
668
            # See the `TestS3.test_create_bucket_aws_global` test
669
            raise AuthorizationHeaderMalformed(
1✔
670
                f"The authorization header is malformed; the region 'aws-global' is wrong; expecting '{AWS_REGION_US_EAST_1}'",
671
                HostId=S3_HOST_ID,
672
                Region=AWS_REGION_US_EAST_1,
673
            )
674

675
        store = self.get_store(context.account_id, context.region)
1✔
676
        if not (s3_bucket := store.buckets.get(bucket)):
1✔
677
            if not (account_id := store.global_bucket_map.get(bucket)):
1✔
678
                # just to return the 404 error message
679
                raise NoSuchBucket()
1✔
680

UNCOV
681
            store = self.get_store(account_id, context.region)
×
UNCOV
682
            if not (s3_bucket := store.buckets.get(bucket)):
×
683
                # just to return the 404 error message
UNCOV
684
                raise NoSuchBucket()
×
685

686
        # TODO: this call is also used to check if the user has access/authorization for the bucket
687
        #  it can return 403
688
        return HeadBucketOutput(BucketRegion=s3_bucket.bucket_region)
1✔
689

690
    def get_bucket_location(
1✔
691
        self,
692
        context: RequestContext,
693
        bucket: BucketName,
694
        expected_bucket_owner: AccountId = None,
695
        **kwargs,
696
    ) -> GetBucketLocationOutput:
697
        """
698
        When implementing the ASF provider, this operation is implemented because:
699
        - The spec defines a root element GetBucketLocationOutput containing a LocationConstraint member, where
700
          S3 actually just returns the LocationConstraint on the root level (only operation so far that we know of).
701
        - We circumvent the root level element here by patching the spec such that this operation returns a
702
          single "payload" (the XML body response), which causes the serializer to directly take the payload element.
703
        - The above "hack" causes the fix in the serializer to not be picked up here as we're passing the XML body as
704
          the payload, which is why we need to manually do this here by manipulating the string.
705
        Botocore implements this hack for parsing the response in `botocore.handlers.py#parse_get_bucket_location`
706
        """
707
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
708

709
        location_constraint = (
1✔
710
            '<?xml version="1.0" encoding="UTF-8"?>\n'
711
            '<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">{{location}}</LocationConstraint>'
712
        )
713

714
        location = s3_bucket.bucket_region if s3_bucket.bucket_region != "us-east-1" else ""
1✔
715
        location_constraint = location_constraint.replace("{{location}}", location)
1✔
716

717
        response = GetBucketLocationOutput(LocationConstraint=location_constraint)
1✔
718
        return response
1✔
719

720
    @handler("PutObject", expand=False)
1✔
721
    def put_object(
1✔
722
        self,
723
        context: RequestContext,
724
        request: PutObjectRequest,
725
    ) -> PutObjectOutput:
726
        # TODO: validate order of validation
727
        # TODO: still need to handle following parameters
728
        #  request_payer: RequestPayer = None,
729
        bucket_name = request["Bucket"]
1✔
730
        key = request["Key"]
1✔
731
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
732

733
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
734
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
735
        ):
736
            raise InvalidStorageClass(
1✔
737
                "The storage class you specified is not valid", StorageClassRequested=storage_class
738
            )
739

740
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
741
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
742

743
        validate_object_key(key)
1✔
744

745
        if_match = request.get("IfMatch")
1✔
746
        if (if_none_match := request.get("IfNoneMatch")) and if_match:
1✔
747
            raise NotImplementedException(
748
                "A header you provided implies functionality that is not implemented",
749
                Header="If-Match,If-None-Match",
750
                additionalMessage="Multiple conditional request headers present in the request",
751
            )
752

753
        elif (if_none_match and if_none_match != "*") or (if_match and if_match == "*"):
1✔
754
            header_name = "If-None-Match" if if_none_match else "If-Match"
1✔
755
            raise NotImplementedException(
756
                "A header you provided implies functionality that is not implemented",
757
                Header=header_name,
758
                additionalMessage=f"We don't accept the provided value of {header_name} header for this API",
759
            )
760

761
        system_metadata = get_system_metadata_from_request(request)
1✔
762
        if not system_metadata.get("ContentType"):
1✔
763
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
764

765
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
766
        if version_id != "null":
1✔
767
            # if we are in a versioned bucket, we need to lock around the full key (all the versions)
768
            # because object versions have locks per version
769
            precondition_lock = self._preconditions_locks[bucket_name][key]
1✔
770
        else:
771
            precondition_lock = contextlib.nullcontext()
1✔
772

773
        etag_content_md5 = ""
1✔
774
        if content_md5 := request.get("ContentMD5"):
1✔
775
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
776
            etag_content_md5 = base_64_content_md5_to_etag(content_md5)
1✔
777
            if not etag_content_md5:
1✔
778
                raise InvalidDigest(
1✔
779
                    "The Content-MD5 you specified was invalid.",
780
                    Content_MD5=content_md5,
781
                )
782

783
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
784
        checksum_value = (
1✔
785
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
786
        )
787

788
        # TODO: we're not encrypting the object with the provided key for now
789
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
790
        validate_sse_c(
1✔
791
            algorithm=request.get("SSECustomerAlgorithm"),
792
            encryption_key=request.get("SSECustomerKey"),
793
            encryption_key_md5=sse_c_key_md5,
794
            server_side_encryption=request.get("ServerSideEncryption"),
795
        )
796

797
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
798
            request,
799
            s3_bucket,
800
            store,
801
        )
802

803
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
804

805
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
806

807
        if tagging := request.get("Tagging"):
1✔
808
            tagging = parse_tagging_header(tagging)
1✔
809

810
        s3_object = S3Object(
1✔
811
            key=key,
812
            version_id=version_id,
813
            storage_class=storage_class,
814
            expires=request.get("Expires"),
815
            user_metadata=request.get("Metadata"),
816
            system_metadata=system_metadata,
817
            checksum_algorithm=checksum_algorithm,
818
            checksum_value=checksum_value,
819
            encryption=encryption_parameters.encryption,
820
            kms_key_id=encryption_parameters.kms_key_id,
821
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
822
            sse_key_hash=sse_c_key_md5,
823
            lock_mode=lock_parameters.lock_mode,
824
            lock_legal_status=lock_parameters.lock_legal_status,
825
            lock_until=lock_parameters.lock_until,
826
            website_redirect_location=request.get("WebsiteRedirectLocation"),
827
            acl=acl,
828
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
829
        )
830

831
        body = request.get("Body")
1✔
832
        # check if chunked request
833
        headers = context.request.headers
1✔
834
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
835
            "STREAMING-"
836
        ) or "aws-chunked" in headers.get("content-encoding", "")
837
        if is_aws_chunked:
1✔
838
            checksum_algorithm = (
1✔
839
                checksum_algorithm
840
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
841
            )
842
            if checksum_algorithm:
1✔
843
                s3_object.checksum_algorithm = checksum_algorithm
1✔
844

845
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
846
            body = AwsChunkedDecoder(body, decoded_content_length, s3_object=s3_object)
1✔
847

848
            # S3 removes the `aws-chunked` value from ContentEncoding
849
            if content_encoding := s3_object.system_metadata.pop("ContentEncoding", None):
1✔
850
                encodings = [enc for enc in content_encoding.split(",") if enc != "aws-chunked"]
1✔
851
                if encodings:
1✔
852
                    s3_object.system_metadata["ContentEncoding"] = ",".join(encodings)
1✔
853

854
        with (
1✔
855
            precondition_lock,
856
            self._storage_backend.open(bucket_name, s3_object, mode="w") as s3_stored_object,
857
        ):
858
            # as we are inside the lock here, if multiple concurrent requests happen for the same object, it's the first
859
            # one to finish to succeed, and subsequent will raise exceptions. Once the first write finishes, we're
860
            # opening the lock and other requests can check this condition
861
            if if_none_match and object_exists_for_precondition_write(s3_bucket, key):
1✔
862
                raise PreconditionFailed(
1✔
863
                    "At least one of the pre-conditions you specified did not hold",
864
                    Condition="If-None-Match",
865
                )
866

867
            elif if_match:
1✔
868
                verify_object_equality_precondition_write(s3_bucket, key, if_match)
1✔
869

870
            s3_stored_object.write(body)
1✔
871

872
            if s3_object.checksum_algorithm:
1✔
873
                if not s3_object.checksum_value:
1✔
874
                    s3_object.checksum_value = s3_stored_object.checksum
1✔
875
                elif not validate_checksum_value(s3_object.checksum_value, checksum_algorithm):
1✔
876
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
877
                    raise InvalidRequest(
1✔
878
                        f"Value for x-amz-checksum-{s3_object.checksum_algorithm.lower()} header is invalid."
879
                    )
880
                elif s3_object.checksum_value != s3_stored_object.checksum:
1✔
881
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
882
                    raise BadDigest(
1✔
883
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
884
                    )
885

886
            # TODO: handle ContentMD5 and ChecksumAlgorithm in a handler for all requests except requests with a
887
            #  streaming body. We can use the specs to verify which operations needs to have the checksum validated
888
            if content_md5:
1✔
889
                calculated_md5 = etag_to_base_64_content_md5(s3_stored_object.etag)
1✔
890
                if calculated_md5 != content_md5:
1✔
891
                    self._storage_backend.remove(bucket_name, s3_object)
1✔
892
                    raise BadDigest(
1✔
893
                        "The Content-MD5 you specified did not match what we received.",
894
                        ExpectedDigest=etag_content_md5,
895
                        CalculatedDigest=calculated_md5,
896
                    )
897

898
            s3_bucket.objects.set(key, s3_object)
1✔
899

900
        # in case we are overriding an object, delete the tags entry
901
        key_id = get_unique_key_id(bucket_name, key, version_id)
1✔
902
        store.TAGS.tags.pop(key_id, None)
1✔
903
        if tagging:
1✔
904
            store.TAGS.tags[key_id] = tagging
1✔
905

906
        # RequestCharged: Optional[RequestCharged]  # TODO
907
        response = PutObjectOutput(
1✔
908
            ETag=s3_object.quoted_etag,
909
        )
910
        if s3_bucket.versioning_status == "Enabled":
1✔
911
            response["VersionId"] = s3_object.version_id
1✔
912

913
        if s3_object.checksum_algorithm:
1✔
914
            response[f"Checksum{s3_object.checksum_algorithm}"] = s3_object.checksum_value
1✔
915
            response["ChecksumType"] = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
916

917
        if s3_bucket.lifecycle_rules:
1✔
918
            if expiration_header := self._get_expiration_header(
1✔
919
                s3_bucket.lifecycle_rules,
920
                bucket_name,
921
                s3_object,
922
                store.TAGS.tags.get(key_id, {}),
923
            ):
924
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
925
                #  apply them everytime we get/head an object
926
                response["Expiration"] = expiration_header
1✔
927

928
        add_encryption_to_response(response, s3_object=s3_object)
1✔
929
        if sse_c_key_md5:
1✔
930
            response["SSECustomerAlgorithm"] = "AES256"
1✔
931
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
932

933
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
934

935
        return response
1✔
936

937
    @handler("GetObject", expand=False)
1✔
938
    def get_object(
1✔
939
        self,
940
        context: RequestContext,
941
        request: GetObjectRequest,
942
    ) -> GetObjectOutput:
943
        # TODO: missing handling parameters:
944
        #  request_payer: RequestPayer = None,
945
        #  expected_bucket_owner: AccountId = None,
946

947
        bucket_name = request["Bucket"]
1✔
948
        object_key = request["Key"]
1✔
949
        version_id = request.get("VersionId")
1✔
950
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
951

952
        s3_object = s3_bucket.get_object(
1✔
953
            key=object_key,
954
            version_id=version_id,
955
            http_method="GET",
956
        )
957

958
        if s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not s3_object.restore:
1✔
959
            raise InvalidObjectState(
1✔
960
                "The operation is not valid for the object's storage class",
961
                StorageClass=s3_object.storage_class,
962
            )
963

964
        if not config.S3_SKIP_KMS_KEY_VALIDATION and s3_object.kms_key_id:
1✔
965
            validate_kms_key_id(kms_key=s3_object.kms_key_id, bucket=s3_bucket)
1✔
966

967
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
968
        # we're using getattr access because when restoring, the field might not exist
969
        # TODO: cleanup at next major release
970
        if sse_key_hash := getattr(s3_object, "sse_key_hash", None):
1✔
971
            if sse_key_hash and not sse_c_key_md5:
1✔
972
                raise InvalidRequest(
1✔
973
                    "The object was stored using a form of Server Side Encryption. "
974
                    "The correct parameters must be provided to retrieve the object."
975
                )
976
            elif sse_key_hash != sse_c_key_md5:
1✔
977
                raise AccessDenied(
1✔
978
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
979
                )
980

981
        validate_sse_c(
1✔
982
            algorithm=request.get("SSECustomerAlgorithm"),
983
            encryption_key=request.get("SSECustomerKey"),
984
            encryption_key_md5=sse_c_key_md5,
985
        )
986

987
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
988

989
        range_header = request.get("Range")
1✔
990
        part_number = request.get("PartNumber")
1✔
991
        if range_header and part_number:
1✔
992
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
1✔
993
        range_data = None
1✔
994
        if range_header:
1✔
995
            range_data = parse_range_header(range_header, s3_object.size)
1✔
996
        elif part_number:
1✔
997
            range_data = get_part_range(s3_object, part_number)
1✔
998

999
        # we deliberately do not call `.close()` on the s3_stored_object to keep the read lock acquired. When passing
1000
        # the object to Werkzeug, the handler will call `.close()` after finishing iterating over `__iter__`.
1001
        # this can however lead to deadlocks if an exception happens between the call and returning the object.
1002
        # Be careful into adding validation between this call and `return` of `S3Provider.get_object`
1003
        s3_stored_object = self._storage_backend.open(bucket_name, s3_object, mode="r")
1✔
1004

1005
        # this is a hacky way to verify the object hasn't been modified between `s3_object = s3_bucket.get_object`
1006
        # and the storage backend call. If it has been modified, now that we're in the read lock, we can safely fetch
1007
        # the object again
1008
        if s3_stored_object.last_modified != s3_object.internal_last_modified:
1✔
1009
            s3_object = s3_bucket.get_object(
1✔
1010
                key=object_key,
1011
                version_id=version_id,
1012
                http_method="GET",
1013
            )
1014

1015
        response = GetObjectOutput(
1✔
1016
            AcceptRanges="bytes",
1017
            **s3_object.get_system_metadata_fields(),
1018
        )
1019
        if s3_object.user_metadata:
1✔
1020
            response["Metadata"] = s3_object.user_metadata
1✔
1021

1022
        if s3_object.parts and request.get("PartNumber"):
1✔
1023
            response["PartsCount"] = len(s3_object.parts)
1✔
1024

1025
        if s3_object.version_id:
1✔
1026
            response["VersionId"] = s3_object.version_id
1✔
1027

1028
        if s3_object.website_redirect_location:
1✔
1029
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1030

1031
        if s3_object.restore:
1✔
UNCOV
1032
            response["Restore"] = s3_object.restore
×
1033

1034
        checksum_value = None
1✔
1035
        checksum_type = None
1✔
1036
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1037
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1038
                checksum_value = s3_object.checksum_value
1✔
1039
                checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
1040

1041
        if range_data:
1✔
1042
            s3_stored_object.seek(range_data.begin)
1✔
1043
            response["Body"] = LimitedIterableStream(
1✔
1044
                s3_stored_object, max_length=range_data.content_length
1045
            )
1046
            response["ContentRange"] = range_data.content_range
1✔
1047
            response["ContentLength"] = range_data.content_length
1✔
1048
            response["StatusCode"] = 206
1✔
1049
            if checksum_value:
1✔
1050
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1051
                    part_data = s3_object.parts[part_number]
1✔
1052
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1053
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1054
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1055

1056
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1057
                # only had one part
1058
                elif range_data.content_length == s3_object.size:
1✔
1059
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1060
                    response["ChecksumType"] = checksum_type
1✔
1061
        else:
1062
            response["Body"] = s3_stored_object
1✔
1063
            if checksum_value:
1✔
1064
                response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1065
                response["ChecksumType"] = checksum_type
1✔
1066

1067
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1068

1069
        if object_tags := store.TAGS.tags.get(
1✔
1070
            get_unique_key_id(bucket_name, object_key, version_id)
1071
        ):
1072
            response["TagCount"] = len(object_tags)
1✔
1073

1074
        if s3_object.is_current and s3_bucket.lifecycle_rules:
1✔
1075
            if expiration_header := self._get_expiration_header(
1✔
1076
                s3_bucket.lifecycle_rules,
1077
                bucket_name,
1078
                s3_object,
1079
                object_tags,
1080
            ):
1081
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1082
                #  apply them everytime we get/head an object
1083
                response["Expiration"] = expiration_header
1✔
1084

1085
        # TODO: missing returned fields
1086
        #     RequestCharged: Optional[RequestCharged]
1087
        #     ReplicationStatus: Optional[ReplicationStatus]
1088

1089
        if s3_object.lock_mode:
1✔
UNCOV
1090
            response["ObjectLockMode"] = s3_object.lock_mode
×
UNCOV
1091
            if s3_object.lock_until:
×
UNCOV
1092
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
×
1093
        if s3_object.lock_legal_status:
1✔
UNCOV
1094
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
×
1095

1096
        if sse_c_key_md5:
1✔
1097
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1098
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1099

1100
        for request_param, response_param in ALLOWED_HEADER_OVERRIDES.items():
1✔
1101
            if request_param_value := request.get(request_param):
1✔
1102
                response[response_param] = request_param_value
1✔
1103

1104
        return response
1✔
1105

1106
    @handler("HeadObject", expand=False)
1✔
1107
    def head_object(
1✔
1108
        self,
1109
        context: RequestContext,
1110
        request: HeadObjectRequest,
1111
    ) -> HeadObjectOutput:
1112
        bucket_name = request["Bucket"]
1✔
1113
        object_key = request["Key"]
1✔
1114
        version_id = request.get("VersionId")
1✔
1115
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
1116

1117
        s3_object = s3_bucket.get_object(
1✔
1118
            key=object_key,
1119
            version_id=version_id,
1120
            http_method="HEAD",
1121
        )
1122

1123
        validate_failed_precondition(request, s3_object.last_modified, s3_object.etag)
1✔
1124

1125
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1126
        if s3_object.sse_key_hash:
1✔
1127
            if not sse_c_key_md5:
1✔
UNCOV
1128
                raise InvalidRequest(
×
1129
                    "The object was stored using a form of Server Side Encryption. "
1130
                    "The correct parameters must be provided to retrieve the object."
1131
                )
1132
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
1133
                raise AccessDenied(
1✔
1134
                    "Requests specifying Server Side Encryption with Customer provided keys must provide the correct secret key."
1135
                )
1136

1137
        validate_sse_c(
1✔
1138
            algorithm=request.get("SSECustomerAlgorithm"),
1139
            encryption_key=request.get("SSECustomerKey"),
1140
            encryption_key_md5=sse_c_key_md5,
1141
        )
1142

1143
        response = HeadObjectOutput(
1✔
1144
            AcceptRanges="bytes",
1145
            **s3_object.get_system_metadata_fields(),
1146
        )
1147
        if s3_object.user_metadata:
1✔
1148
            response["Metadata"] = s3_object.user_metadata
1✔
1149

1150
        checksum_value = None
1✔
1151
        checksum_type = None
1✔
1152
        if checksum_algorithm := s3_object.checksum_algorithm:
1✔
1153
            if (request.get("ChecksumMode") or "").upper() == "ENABLED":
1✔
1154
                checksum_value = s3_object.checksum_value
1✔
1155
                checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
1156

1157
        if s3_object.parts and request.get("PartNumber"):
1✔
1158
            response["PartsCount"] = len(s3_object.parts)
1✔
1159

1160
        if s3_object.version_id:
1✔
1161
            response["VersionId"] = s3_object.version_id
1✔
1162

1163
        if s3_object.website_redirect_location:
1✔
1164
            response["WebsiteRedirectLocation"] = s3_object.website_redirect_location
1✔
1165

1166
        if s3_object.restore:
1✔
1167
            response["Restore"] = s3_object.restore
1✔
1168

1169
        range_header = request.get("Range")
1✔
1170
        part_number = request.get("PartNumber")
1✔
1171
        if range_header and part_number:
1✔
UNCOV
1172
            raise InvalidRequest("Cannot specify both Range header and partNumber query parameter")
×
1173
        range_data = None
1✔
1174
        if range_header:
1✔
UNCOV
1175
            range_data = parse_range_header(range_header, s3_object.size)
×
1176
        elif part_number:
1✔
1177
            range_data = get_part_range(s3_object, part_number)
1✔
1178

1179
        if range_data:
1✔
1180
            response["ContentLength"] = range_data.content_length
1✔
1181
            response["ContentRange"] = range_data.content_range
1✔
1182
            response["StatusCode"] = 206
1✔
1183
            if checksum_value:
1✔
1184
                if s3_object.parts and part_number and checksum_type == ChecksumType.COMPOSITE:
1✔
1185
                    part_data = s3_object.parts[part_number]
1✔
1186
                    checksum_key = f"Checksum{checksum_algorithm.upper()}"
1✔
1187
                    response[checksum_key] = part_data.get(checksum_key)
1✔
1188
                    response["ChecksumType"] = ChecksumType.COMPOSITE
1✔
1189

1190
                # it means either the range header means the whole object, or that a multipart upload with `FULL_OBJECT`
1191
                # only had one part
1192
                elif range_data.content_length == s3_object.size:
1✔
1193
                    response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1194
                    response["ChecksumType"] = checksum_type
1✔
1195
        elif checksum_value:
1✔
1196
            response[f"Checksum{checksum_algorithm.upper()}"] = checksum_value
1✔
1197
            response["ChecksumType"] = checksum_type
1✔
1198

1199
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1200

1201
        # if you specify the VersionId, AWS won't return the Expiration header, even if that's the current version
1202
        if not version_id and s3_bucket.lifecycle_rules:
1✔
1203
            object_tags = store.TAGS.tags.get(
1✔
1204
                get_unique_key_id(bucket_name, object_key, s3_object.version_id)
1205
            )
1206
            if expiration_header := self._get_expiration_header(
1✔
1207
                s3_bucket.lifecycle_rules,
1208
                bucket_name,
1209
                s3_object,
1210
                object_tags,
1211
            ):
1212
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
1213
                #  apply them everytime we get/head an object
1214
                response["Expiration"] = expiration_header
1✔
1215

1216
        if s3_object.lock_mode:
1✔
1217
            response["ObjectLockMode"] = s3_object.lock_mode
1✔
1218
            if s3_object.lock_until:
1✔
1219
                response["ObjectLockRetainUntilDate"] = s3_object.lock_until
1✔
1220
        if s3_object.lock_legal_status:
1✔
1221
            response["ObjectLockLegalHoldStatus"] = s3_object.lock_legal_status
1✔
1222

1223
        if sse_c_key_md5:
1✔
1224
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1225
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
1226

1227
        # TODO: missing return fields:
1228
        #  ArchiveStatus: Optional[ArchiveStatus]
1229
        #  RequestCharged: Optional[RequestCharged]
1230
        #  ReplicationStatus: Optional[ReplicationStatus]
1231

1232
        return response
1✔
1233

1234
    def delete_object(
1✔
1235
        self,
1236
        context: RequestContext,
1237
        bucket: BucketName,
1238
        key: ObjectKey,
1239
        mfa: MFA = None,
1240
        version_id: ObjectVersionId = None,
1241
        request_payer: RequestPayer = None,
1242
        bypass_governance_retention: BypassGovernanceRetention = None,
1243
        expected_bucket_owner: AccountId = None,
1244
        if_match: IfMatch = None,
1245
        if_match_last_modified_time: IfMatchLastModifiedTime = None,
1246
        if_match_size: IfMatchSize = None,
1247
        **kwargs,
1248
    ) -> DeleteObjectOutput:
1249
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1250

1251
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1252
            raise InvalidArgument(
1✔
1253
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1254
                ArgumentName="x-amz-bypass-governance-retention",
1255
            )
1256

1257
        # TODO: this is only supported for Directory Buckets
1258
        non_supported_precondition = None
1✔
1259
        if if_match:
1✔
1260
            non_supported_precondition = "If-Match"
1✔
1261
        if if_match_size:
1✔
1262
            non_supported_precondition = "x-amz-if-match-size"
1✔
1263
        if if_match_last_modified_time:
1✔
1264
            non_supported_precondition = "x-amz-if-match-last-modified-time"
1✔
1265
        if non_supported_precondition:
1✔
1266
            LOG.warning(
1✔
1267
                "DeleteObject Preconditions is only supported for Directory Buckets. "
1268
                "LocalStack does not support Directory Buckets yet."
1269
            )
1270
            raise NotImplementedException(
1271
                "A header you provided implies functionality that is not implemented",
1272
                Header=non_supported_precondition,
1273
            )
1274

1275
        if s3_bucket.versioning_status is None:
1✔
1276
            if version_id and version_id != "null":
1✔
1277
                raise InvalidArgument(
1✔
1278
                    "Invalid version id specified",
1279
                    ArgumentName="versionId",
1280
                    ArgumentValue=version_id,
1281
                )
1282

1283
            found_object = s3_bucket.objects.pop(key, None)
1✔
1284
            # TODO: RequestCharged
1285
            if found_object:
1✔
1286
                self._storage_backend.remove(bucket, found_object)
1✔
1287
                self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1288
                store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1289

1290
            return DeleteObjectOutput()
1✔
1291

1292
        if not version_id:
1✔
1293
            delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1294
            delete_marker = S3DeleteMarker(key=key, version_id=delete_marker_id)
1✔
1295
            s3_bucket.objects.set(key, delete_marker)
1✔
1296
            s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1297
                context,
1298
                s3_bucket=s3_bucket,
1299
                s3_object=delete_marker,
1300
            )
1301
            s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1302
            self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1303

1304
            return DeleteObjectOutput(VersionId=delete_marker.version_id, DeleteMarker=True)
1✔
1305

1306
        if key not in s3_bucket.objects:
1✔
UNCOV
1307
            return DeleteObjectOutput()
×
1308

1309
        if not (s3_object := s3_bucket.objects.get(key, version_id)):
1✔
1310
            raise InvalidArgument(
1✔
1311
                "Invalid version id specified",
1312
                ArgumentName="versionId",
1313
                ArgumentValue=version_id,
1314
            )
1315

1316
        if s3_object.is_locked(bypass_governance_retention):
1✔
1317
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
1318

1319
        s3_bucket.objects.pop(object_key=key, version_id=version_id)
1✔
1320
        response = DeleteObjectOutput(VersionId=s3_object.version_id)
1✔
1321

1322
        if isinstance(s3_object, S3DeleteMarker):
1✔
1323
            response["DeleteMarker"] = True
1✔
1324
        else:
1325
            self._storage_backend.remove(bucket, s3_object)
1✔
1326
            store.TAGS.tags.pop(get_unique_key_id(bucket, key, version_id), None)
1✔
1327
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
1328

1329
        if key not in s3_bucket.objects:
1✔
1330
            # we clean up keys that do not have any object versions in them anymore
1331
            self._preconditions_locks[bucket].pop(key, None)
1✔
1332

1333
        return response
1✔
1334

1335
    def delete_objects(
1✔
1336
        self,
1337
        context: RequestContext,
1338
        bucket: BucketName,
1339
        delete: Delete,
1340
        mfa: MFA = None,
1341
        request_payer: RequestPayer = None,
1342
        bypass_governance_retention: BypassGovernanceRetention = None,
1343
        expected_bucket_owner: AccountId = None,
1344
        checksum_algorithm: ChecksumAlgorithm = None,
1345
        **kwargs,
1346
    ) -> DeleteObjectsOutput:
1347
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1348

1349
        if bypass_governance_retention is not None and not s3_bucket.object_lock_enabled:
1✔
1350
            raise InvalidArgument(
1✔
1351
                "x-amz-bypass-governance-retention is only applicable to Object Lock enabled buckets.",
1352
                ArgumentName="x-amz-bypass-governance-retention",
1353
            )
1354

1355
        objects: list[ObjectIdentifier] = delete.get("Objects")
1✔
1356
        if not objects:
1✔
UNCOV
1357
            raise MalformedXML()
×
1358

1359
        # TODO: max 1000 delete at once? test against AWS?
1360

1361
        quiet = delete.get("Quiet", False)
1✔
1362
        deleted = []
1✔
1363
        errors = []
1✔
1364

1365
        to_remove = []
1✔
1366
        versioned_keys = set()
1✔
1367
        for to_delete_object in objects:
1✔
1368
            object_key = to_delete_object.get("Key")
1✔
1369
            version_id = to_delete_object.get("VersionId")
1✔
1370
            if s3_bucket.versioning_status is None:
1✔
1371
                if version_id and version_id != "null":
1✔
1372
                    errors.append(
1✔
1373
                        Error(
1374
                            Code="NoSuchVersion",
1375
                            Key=object_key,
1376
                            Message="The specified version does not exist.",
1377
                            VersionId=version_id,
1378
                        )
1379
                    )
1380
                    continue
1✔
1381

1382
                found_object = s3_bucket.objects.pop(object_key, None)
1✔
1383
                if found_object:
1✔
1384
                    to_remove.append(found_object)
1✔
1385
                    self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1386
                    store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1387
                # small hack to not create a fake object for nothing
1388
                elif s3_bucket.notification_configuration:
1✔
1389
                    # DeleteObjects is a bit weird, even if the object didn't exist, S3 will trigger a notification
1390
                    # for a non-existing object being deleted
1391
                    self._notify(
1✔
1392
                        context, s3_bucket=s3_bucket, s3_object=S3Object(key=object_key, etag="")
1393
                    )
1394

1395
                if not quiet:
1✔
1396
                    deleted.append(DeletedObject(Key=object_key))
1✔
1397

1398
                continue
1✔
1399

1400
            if not version_id:
1✔
1401
                delete_marker_id = generate_version_id(s3_bucket.versioning_status)
1✔
1402
                delete_marker = S3DeleteMarker(key=object_key, version_id=delete_marker_id)
1✔
1403
                s3_bucket.objects.set(object_key, delete_marker)
1✔
1404
                s3_notif_ctx = S3EventNotificationContext.from_request_context(
1✔
1405
                    context,
1406
                    s3_bucket=s3_bucket,
1407
                    s3_object=delete_marker,
1408
                )
1409
                s3_notif_ctx.event_type = f"{s3_notif_ctx.event_type}MarkerCreated"
1✔
1410
                self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx)
1✔
1411

1412
                if not quiet:
1✔
1413
                    deleted.append(
1✔
1414
                        DeletedObject(
1415
                            DeleteMarker=True,
1416
                            DeleteMarkerVersionId=delete_marker_id,
1417
                            Key=object_key,
1418
                        )
1419
                    )
1420
                continue
1✔
1421

1422
            if not (
1✔
1423
                found_object := s3_bucket.objects.get(object_key=object_key, version_id=version_id)
1424
            ):
1425
                errors.append(
1✔
1426
                    Error(
1427
                        Code="NoSuchVersion",
1428
                        Key=object_key,
1429
                        Message="The specified version does not exist.",
1430
                        VersionId=version_id,
1431
                    )
1432
                )
1433
                continue
1✔
1434

1435
            if found_object.is_locked(bypass_governance_retention):
1✔
1436
                errors.append(
1✔
1437
                    Error(
1438
                        Code="AccessDenied",
1439
                        Key=object_key,
1440
                        Message="Access Denied because object protected by object lock.",
1441
                        VersionId=version_id,
1442
                    )
1443
                )
1444
                continue
1✔
1445

1446
            s3_bucket.objects.pop(object_key=object_key, version_id=version_id)
1✔
1447
            versioned_keys.add(object_key)
1✔
1448

1449
            if not quiet:
1✔
1450
                deleted_object = DeletedObject(
1✔
1451
                    Key=object_key,
1452
                    VersionId=version_id,
1453
                )
1454
                if isinstance(found_object, S3DeleteMarker):
1✔
1455
                    deleted_object["DeleteMarker"] = True
1✔
1456
                    deleted_object["DeleteMarkerVersionId"] = found_object.version_id
1✔
1457

1458
                deleted.append(deleted_object)
1✔
1459

1460
            if isinstance(found_object, S3Object):
1✔
1461
                to_remove.append(found_object)
1✔
1462

1463
            self._notify(context, s3_bucket=s3_bucket, s3_object=found_object)
1✔
1464
            store.TAGS.tags.pop(get_unique_key_id(bucket, object_key, version_id), None)
1✔
1465

1466
        for versioned_key in versioned_keys:
1✔
1467
            # we clean up keys that do not have any object versions in them anymore
1468
            if versioned_key not in s3_bucket.objects:
1✔
1469
                self._preconditions_locks[bucket].pop(versioned_key, None)
1✔
1470

1471
        # TODO: request charged
1472
        self._storage_backend.remove(bucket, to_remove)
1✔
1473
        response: DeleteObjectsOutput = {}
1✔
1474
        # AWS validated: the list of Deleted objects is unordered, multiple identical calls can return different results
1475
        if errors:
1✔
1476
            response["Errors"] = errors
1✔
1477
        if not quiet:
1✔
1478
            response["Deleted"] = deleted
1✔
1479

1480
        return response
1✔
1481

1482
    @handler("CopyObject", expand=False)
1✔
1483
    def copy_object(
1✔
1484
        self,
1485
        context: RequestContext,
1486
        request: CopyObjectRequest,
1487
    ) -> CopyObjectOutput:
1488
        # request_payer: RequestPayer = None,  # TODO:
1489
        dest_bucket = request["Bucket"]
1✔
1490
        dest_key = request["Key"]
1✔
1491
        validate_object_key(dest_key)
1✔
1492
        store, dest_s3_bucket = self._get_cross_account_bucket(context, dest_bucket)
1✔
1493

1494
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
1495
            request.get("CopySource")
1496
        )
1497
        _, src_s3_bucket = self._get_cross_account_bucket(context, src_bucket)
1✔
1498

1499
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
1500
            validate_kms_key_id(sse_kms_key_id, dest_s3_bucket)
1✔
1501

1502
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
1503
        try:
1✔
1504
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
UNCOV
1505
        except MethodNotAllowed:
×
UNCOV
1506
            raise InvalidRequest(
×
1507
                "The source of a copy request may not specifically refer to a delete marker by version id."
1508
            )
1509

1510
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
UNCOV
1511
            raise InvalidObjectState(
×
1512
                "Operation is not valid for the source object's storage class",
1513
                StorageClass=src_s3_object.storage_class,
1514
            )
1515

1516
        if failed_condition := get_failed_precondition_copy_source(
1✔
1517
            request, src_s3_object.last_modified, src_s3_object.etag
1518
        ):
1519
            raise PreconditionFailed(
1✔
1520
                "At least one of the pre-conditions you specified did not hold",
1521
                Condition=failed_condition,
1522
            )
1523

1524
        source_sse_c_key_md5 = request.get("CopySourceSSECustomerKeyMD5")
1✔
1525
        if src_s3_object.sse_key_hash:
1✔
1526
            if not source_sse_c_key_md5:
1✔
1527
                raise InvalidRequest(
1✔
1528
                    "The object was stored using a form of Server Side Encryption. "
1529
                    "The correct parameters must be provided to retrieve the object."
1530
                )
1531
            elif src_s3_object.sse_key_hash != source_sse_c_key_md5:
1✔
UNCOV
1532
                raise AccessDenied("Access Denied")
×
1533

1534
        validate_sse_c(
1✔
1535
            algorithm=request.get("CopySourceSSECustomerAlgorithm"),
1536
            encryption_key=request.get("CopySourceSSECustomerKey"),
1537
            encryption_key_md5=source_sse_c_key_md5,
1538
        )
1539

1540
        target_sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
1541
        server_side_encryption = request.get("ServerSideEncryption")
1✔
1542
        # validate target SSE-C parameters
1543
        validate_sse_c(
1✔
1544
            algorithm=request.get("SSECustomerAlgorithm"),
1545
            encryption_key=request.get("SSECustomerKey"),
1546
            encryption_key_md5=target_sse_c_key_md5,
1547
            server_side_encryption=server_side_encryption,
1548
        )
1549

1550
        # TODO validate order of validation
1551
        storage_class = request.get("StorageClass")
1✔
1552
        metadata_directive = request.get("MetadataDirective")
1✔
1553
        website_redirect_location = request.get("WebsiteRedirectLocation")
1✔
1554
        # we need to check for identity of the object, to see if the default one has been changed
1555
        is_default_encryption = (
1✔
1556
            dest_s3_bucket.encryption_rule is DEFAULT_BUCKET_ENCRYPTION
1557
            and src_s3_object.encryption == "AES256"
1558
        )
1559
        if (
1✔
1560
            src_bucket == dest_bucket
1561
            and src_key == dest_key
1562
            and not any(
1563
                (
1564
                    storage_class,
1565
                    server_side_encryption,
1566
                    target_sse_c_key_md5,
1567
                    metadata_directive == "REPLACE",
1568
                    website_redirect_location,
1569
                    dest_s3_bucket.encryption_rule
1570
                    and not is_default_encryption,  # S3 will allow copy in place if the bucket has encryption configured
1571
                    src_s3_object.restore,
1572
                )
1573
            )
1574
        ):
1575
            raise InvalidRequest(
1✔
1576
                "This copy request is illegal because it is trying to copy an object to itself without changing the "
1577
                "object's metadata, storage class, website redirect location or encryption attributes."
1578
            )
1579

1580
        if tagging := request.get("Tagging"):
1✔
1581
            tagging = parse_tagging_header(tagging)
1✔
1582

1583
        if metadata_directive == "REPLACE":
1✔
1584
            user_metadata = request.get("Metadata")
1✔
1585
            system_metadata = get_system_metadata_from_request(request)
1✔
1586
            if not system_metadata.get("ContentType"):
1✔
1587
                system_metadata["ContentType"] = "binary/octet-stream"
1✔
1588
        else:
1589
            user_metadata = src_s3_object.user_metadata
1✔
1590
            system_metadata = src_s3_object.system_metadata
1✔
1591

1592
        dest_version_id = generate_version_id(dest_s3_bucket.versioning_status)
1✔
1593

1594
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
1595
            request,
1596
            dest_s3_bucket,
1597
            store,
1598
        )
1599
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(
1✔
1600
            request, dest_s3_bucket
1601
        )
1602

1603
        acl = get_access_control_policy_for_new_resource_request(
1✔
1604
            request, owner=dest_s3_bucket.owner
1605
        )
1606
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
1607

1608
        s3_object = S3Object(
1✔
1609
            key=dest_key,
1610
            size=src_s3_object.size,
1611
            version_id=dest_version_id,
1612
            storage_class=storage_class,
1613
            expires=request.get("Expires"),
1614
            user_metadata=user_metadata,
1615
            system_metadata=system_metadata,
1616
            checksum_algorithm=checksum_algorithm or src_s3_object.checksum_algorithm,
1617
            encryption=encryption_parameters.encryption,
1618
            kms_key_id=encryption_parameters.kms_key_id,
1619
            bucket_key_enabled=request.get(
1620
                "BucketKeyEnabled"
1621
            ),  # CopyObject does not inherit from the bucket here
1622
            sse_key_hash=target_sse_c_key_md5,
1623
            lock_mode=lock_parameters.lock_mode,
1624
            lock_legal_status=lock_parameters.lock_legal_status,
1625
            lock_until=lock_parameters.lock_until,
1626
            website_redirect_location=website_redirect_location,
1627
            expiration=None,  # TODO, from lifecycle
1628
            acl=acl,
1629
            owner=dest_s3_bucket.owner,
1630
        )
1631

1632
        with self._storage_backend.copy(
1✔
1633
            src_bucket=src_bucket,
1634
            src_object=src_s3_object,
1635
            dest_bucket=dest_bucket,
1636
            dest_object=s3_object,
1637
        ) as s3_stored_object:
1638
            s3_object.checksum_value = s3_stored_object.checksum or src_s3_object.checksum_value
1✔
1639
            s3_object.etag = s3_stored_object.etag or src_s3_object.etag
1✔
1640

1641
            dest_s3_bucket.objects.set(dest_key, s3_object)
1✔
1642

1643
        dest_key_id = get_unique_key_id(dest_bucket, dest_key, dest_version_id)
1✔
1644

1645
        if (request.get("TaggingDirective")) == "REPLACE":
1✔
1646
            store.TAGS.tags[dest_key_id] = tagging or {}
1✔
1647
        else:
1648
            src_key_id = get_unique_key_id(src_bucket, src_key, src_s3_object.version_id)
1✔
1649
            src_tags = store.TAGS.tags.get(src_key_id, {})
1✔
1650
            store.TAGS.tags[dest_key_id] = copy.copy(src_tags)
1✔
1651

1652
        copy_object_result = CopyObjectResult(
1✔
1653
            ETag=s3_object.quoted_etag,
1654
            LastModified=s3_object.last_modified,
1655
        )
1656
        if s3_object.checksum_algorithm:
1✔
1657
            copy_object_result[f"Checksum{s3_object.checksum_algorithm.upper()}"] = (
1✔
1658
                s3_object.checksum_value
1659
            )
1660

1661
        response = CopyObjectOutput(
1✔
1662
            CopyObjectResult=copy_object_result,
1663
        )
1664

1665
        if s3_object.version_id:
1✔
1666
            response["VersionId"] = s3_object.version_id
1✔
1667

1668
        if s3_object.expiration:
1✔
UNCOV
1669
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
1670

1671
        add_encryption_to_response(response, s3_object=s3_object)
1✔
1672
        if target_sse_c_key_md5:
1✔
1673
            response["SSECustomerAlgorithm"] = "AES256"
1✔
1674
            response["SSECustomerKeyMD5"] = target_sse_c_key_md5
1✔
1675

1676
        if (
1✔
1677
            src_s3_bucket.versioning_status
1678
            and src_s3_object.version_id
1679
            and src_s3_object.version_id != "null"
1680
        ):
1681
            response["CopySourceVersionId"] = src_s3_object.version_id
1✔
1682

1683
        # RequestCharged: Optional[RequestCharged] # TODO
1684
        self._notify(context, s3_bucket=dest_s3_bucket, s3_object=s3_object)
1✔
1685

1686
        return response
1✔
1687

1688
    def list_objects(
1✔
1689
        self,
1690
        context: RequestContext,
1691
        bucket: BucketName,
1692
        delimiter: Delimiter = None,
1693
        encoding_type: EncodingType = None,
1694
        marker: Marker = None,
1695
        max_keys: MaxKeys = None,
1696
        prefix: Prefix = None,
1697
        request_payer: RequestPayer = None,
1698
        expected_bucket_owner: AccountId = None,
1699
        optional_object_attributes: OptionalObjectAttributesList = None,
1700
        **kwargs,
1701
    ) -> ListObjectsOutput:
1702
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1703

1704
        common_prefixes = set()
1✔
1705
        count = 0
1✔
1706
        is_truncated = False
1✔
1707
        next_key_marker = None
1✔
1708
        max_keys = max_keys or 1000
1✔
1709
        prefix = prefix or ""
1✔
1710
        delimiter = delimiter or ""
1✔
1711
        if encoding_type:
1✔
1712
            prefix = urlparse.quote(prefix)
1✔
1713
            delimiter = urlparse.quote(delimiter)
1✔
1714

1715
        s3_objects: list[Object] = []
1✔
1716

1717
        all_keys = sorted(s3_bucket.objects.values(), key=lambda r: r.key)
1✔
1718
        last_key = all_keys[-1] if all_keys else None
1✔
1719

1720
        # sort by key
1721
        for s3_object in all_keys:
1✔
1722
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1723
            # skip all keys that alphabetically come before key_marker
1724
            if marker:
1✔
1725
                if key <= marker:
1✔
1726
                    continue
1✔
1727

1728
            # Filter for keys that start with prefix
1729
            if prefix and not key.startswith(prefix):
1✔
UNCOV
1730
                continue
×
1731

1732
            # see ListObjectsV2 for the logic comments (shared logic here)
1733
            prefix_including_delimiter = None
1✔
1734
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1735
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1736
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1737

1738
                if prefix_including_delimiter in common_prefixes or (
1✔
1739
                    marker and marker.startswith(prefix_including_delimiter)
1740
                ):
1741
                    continue
1✔
1742

1743
            if prefix_including_delimiter:
1✔
1744
                common_prefixes.add(prefix_including_delimiter)
1✔
1745
            else:
1746
                # TODO: add RestoreStatus if present
1747
                object_data = Object(
1✔
1748
                    Key=key,
1749
                    ETag=s3_object.quoted_etag,
1750
                    Owner=s3_bucket.owner,  # TODO: verify reality
1751
                    Size=s3_object.size,
1752
                    LastModified=s3_object.last_modified,
1753
                    StorageClass=s3_object.storage_class,
1754
                )
1755

1756
                if s3_object.checksum_algorithm:
1✔
1757
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1758
                    object_data["ChecksumType"] = getattr(
1✔
1759
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1760
                    )
1761

1762
                s3_objects.append(object_data)
1✔
1763

1764
            # we just added a CommonPrefix or an Object, increase the counter
1765
            count += 1
1✔
1766
            if count >= max_keys and last_key.key != s3_object.key:
1✔
1767
                is_truncated = True
1✔
1768
                if prefix_including_delimiter:
1✔
1769
                    next_key_marker = prefix_including_delimiter
1✔
1770
                elif s3_objects:
1✔
1771
                    next_key_marker = s3_objects[-1]["Key"]
1✔
1772
                break
1✔
1773

1774
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1775

1776
        response = ListObjectsOutput(
1✔
1777
            IsTruncated=is_truncated,
1778
            Name=bucket,
1779
            MaxKeys=max_keys,
1780
            Prefix=prefix or "",
1781
            Marker=marker or "",
1782
        )
1783
        if s3_objects:
1✔
1784
            response["Contents"] = s3_objects
1✔
1785
        if encoding_type:
1✔
1786
            response["EncodingType"] = EncodingType.url
1✔
1787
        if delimiter:
1✔
1788
            response["Delimiter"] = delimiter
1✔
1789
        if common_prefixes:
1✔
1790
            response["CommonPrefixes"] = common_prefixes
1✔
1791
        if delimiter and next_key_marker:
1✔
1792
            response["NextMarker"] = next_key_marker
1✔
1793
        if s3_bucket.bucket_region != "us-east-1":
1✔
UNCOV
1794
            response["BucketRegion"] = s3_bucket.bucket_region
×
1795

1796
        # RequestCharged: Optional[RequestCharged]  # TODO
1797
        return response
1✔
1798

1799
    def list_objects_v2(
1✔
1800
        self,
1801
        context: RequestContext,
1802
        bucket: BucketName,
1803
        delimiter: Delimiter = None,
1804
        encoding_type: EncodingType = None,
1805
        max_keys: MaxKeys = None,
1806
        prefix: Prefix = None,
1807
        continuation_token: Token = None,
1808
        fetch_owner: FetchOwner = None,
1809
        start_after: StartAfter = None,
1810
        request_payer: RequestPayer = None,
1811
        expected_bucket_owner: AccountId = None,
1812
        optional_object_attributes: OptionalObjectAttributesList = None,
1813
        **kwargs,
1814
    ) -> ListObjectsV2Output:
1815
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1816

1817
        if continuation_token == "":
1✔
1818
            raise InvalidArgument(
1✔
1819
                "The continuation token provided is incorrect",
1820
                ArgumentName="continuation-token",
1821
            )
1822

1823
        common_prefixes = set()
1✔
1824
        count = 0
1✔
1825
        is_truncated = False
1✔
1826
        next_continuation_token = None
1✔
1827
        max_keys = max_keys or 1000
1✔
1828
        prefix = prefix or ""
1✔
1829
        delimiter = delimiter or ""
1✔
1830
        if encoding_type:
1✔
1831
            prefix = urlparse.quote(prefix)
1✔
1832
            delimiter = urlparse.quote(delimiter)
1✔
1833
        decoded_continuation_token = (
1✔
1834
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
1835
            if continuation_token
1836
            else None
1837
        )
1838

1839
        s3_objects: list[Object] = []
1✔
1840

1841
        # sort by key
1842
        for s3_object in sorted(s3_bucket.objects.values(), key=lambda r: r.key):
1✔
1843
            key = urlparse.quote(s3_object.key) if encoding_type else s3_object.key
1✔
1844

1845
            # skip all keys that alphabetically come before continuation_token
1846
            if continuation_token:
1✔
1847
                if key < decoded_continuation_token:
1✔
1848
                    continue
1✔
1849

1850
            elif start_after:
1✔
1851
                if key <= start_after:
1✔
1852
                    continue
1✔
1853

1854
            # Filter for keys that start with prefix
1855
            if prefix and not key.startswith(prefix):
1✔
1856
                continue
1✔
1857

1858
            # separate keys that contain the same string between the prefix and the first occurrence of the delimiter
1859
            prefix_including_delimiter = None
1✔
1860
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
1861
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
1862
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
1863

1864
                # if the CommonPrefix is already in the CommonPrefixes, it doesn't count towards MaxKey, we can skip
1865
                # the entry without increasing the counter. We need to iterate over all of these entries before
1866
                # returning the next continuation marker, to properly start at the next key after this CommonPrefix
1867
                if prefix_including_delimiter in common_prefixes:
1✔
1868
                    continue
1✔
1869

1870
            # After skipping all entries, verify we're not over the MaxKeys before adding a new entry
1871
            if count >= max_keys:
1✔
1872
                is_truncated = True
1✔
1873
                next_continuation_token = to_str(base64.urlsafe_b64encode(s3_object.key.encode()))
1✔
1874
                break
1✔
1875

1876
            # if we found a new CommonPrefix, add it to the CommonPrefixes
1877
            # else, it means it's a new Object, add it to the Contents
1878
            if prefix_including_delimiter:
1✔
1879
                common_prefixes.add(prefix_including_delimiter)
1✔
1880
            else:
1881
                # TODO: add RestoreStatus if present
1882
                object_data = Object(
1✔
1883
                    Key=key,
1884
                    ETag=s3_object.quoted_etag,
1885
                    Size=s3_object.size,
1886
                    LastModified=s3_object.last_modified,
1887
                    StorageClass=s3_object.storage_class,
1888
                )
1889

1890
                if fetch_owner:
1✔
UNCOV
1891
                    object_data["Owner"] = s3_bucket.owner
×
1892

1893
                if s3_object.checksum_algorithm:
1✔
1894
                    object_data["ChecksumAlgorithm"] = [s3_object.checksum_algorithm]
1✔
1895
                    object_data["ChecksumType"] = getattr(
1✔
1896
                        s3_object, "checksum_type", ChecksumType.FULL_OBJECT
1897
                    )
1898

1899
                s3_objects.append(object_data)
1✔
1900

1901
            # we just added either a CommonPrefix or an Object to the List, increase the counter by one
1902
            count += 1
1✔
1903

1904
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
1905

1906
        response = ListObjectsV2Output(
1✔
1907
            IsTruncated=is_truncated,
1908
            Name=bucket,
1909
            MaxKeys=max_keys,
1910
            Prefix=prefix or "",
1911
            KeyCount=count,
1912
        )
1913
        if s3_objects:
1✔
1914
            response["Contents"] = s3_objects
1✔
1915
        if encoding_type:
1✔
1916
            response["EncodingType"] = EncodingType.url
1✔
1917
        if delimiter:
1✔
1918
            response["Delimiter"] = delimiter
1✔
1919
        if common_prefixes:
1✔
1920
            response["CommonPrefixes"] = common_prefixes
1✔
1921
        if next_continuation_token:
1✔
1922
            response["NextContinuationToken"] = next_continuation_token
1✔
1923

1924
        if continuation_token:
1✔
1925
            response["ContinuationToken"] = continuation_token
1✔
1926
        elif start_after:
1✔
1927
            response["StartAfter"] = start_after
1✔
1928

1929
        if s3_bucket.bucket_region != "us-east-1":
1✔
1930
            response["BucketRegion"] = s3_bucket.bucket_region
1✔
1931

1932
        # RequestCharged: Optional[RequestCharged]  # TODO
1933
        return response
1✔
1934

1935
    def list_object_versions(
1✔
1936
        self,
1937
        context: RequestContext,
1938
        bucket: BucketName,
1939
        delimiter: Delimiter = None,
1940
        encoding_type: EncodingType = None,
1941
        key_marker: KeyMarker = None,
1942
        max_keys: MaxKeys = None,
1943
        prefix: Prefix = None,
1944
        version_id_marker: VersionIdMarker = None,
1945
        expected_bucket_owner: AccountId = None,
1946
        request_payer: RequestPayer = None,
1947
        optional_object_attributes: OptionalObjectAttributesList = None,
1948
        **kwargs,
1949
    ) -> ListObjectVersionsOutput:
1950
        if version_id_marker and not key_marker:
1✔
1951
            raise InvalidArgument(
1✔
1952
                "A version-id marker cannot be specified without a key marker.",
1953
                ArgumentName="version-id-marker",
1954
                ArgumentValue=version_id_marker,
1955
            )
1956

1957
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
1958
        common_prefixes = set()
1✔
1959
        count = 0
1✔
1960
        is_truncated = False
1✔
1961
        next_key_marker = None
1✔
1962
        next_version_id_marker = None
1✔
1963
        max_keys = max_keys or 1000
1✔
1964
        prefix = prefix or ""
1✔
1965
        delimiter = delimiter or ""
1✔
1966
        if encoding_type:
1✔
1967
            prefix = urlparse.quote(prefix)
1✔
1968
            delimiter = urlparse.quote(delimiter)
1✔
1969
        version_key_marker_found = False
1✔
1970

1971
        object_versions: list[ObjectVersion] = []
1✔
1972
        delete_markers: list[DeleteMarkerEntry] = []
1✔
1973

1974
        all_versions = s3_bucket.objects.values(with_versions=True)
1✔
1975
        # sort by key, and last-modified-date, to get the last version first
1976
        all_versions.sort(key=lambda r: (r.key, -r.last_modified.timestamp()))
1✔
1977
        last_version = all_versions[-1] if all_versions else None
1✔
1978

1979
        for version in all_versions:
1✔
1980
            key = urlparse.quote(version.key) if encoding_type else version.key
1✔
1981
            # skip all keys that alphabetically come before key_marker
1982
            if key_marker:
1✔
1983
                if key < key_marker:
1✔
1984
                    continue
1✔
1985
                elif key == key_marker:
1✔
1986
                    if not version_id_marker:
1✔
1987
                        continue
1✔
1988
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
1989
                    if version.version_id == version_id_marker:
1✔
1990
                        version_key_marker_found = True
1✔
1991
                        continue
1✔
1992

1993
                    # it is possible that the version_id_marker related object has been deleted, in that case, start
1994
                    # as soon as the next version id is older than the version id marker (meaning this version was
1995
                    # next after the now-deleted version)
1996
                    elif is_version_older_than_other(version.version_id, version_id_marker):
1✔
1997
                        version_key_marker_found = True
1✔
1998

1999
                    elif not version_key_marker_found:
1✔
2000
                        # as long as we have not passed the version_key_marker, skip the versions
2001
                        continue
1✔
2002

2003
            # Filter for keys that start with prefix
2004
            if prefix and not key.startswith(prefix):
1✔
2005
                continue
1✔
2006

2007
            # see ListObjectsV2 for the logic comments (shared logic here)
2008
            prefix_including_delimiter = None
1✔
2009
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
2010
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
2011
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
2012

2013
                if prefix_including_delimiter in common_prefixes or (
1✔
2014
                    key_marker and key_marker.startswith(prefix_including_delimiter)
2015
                ):
2016
                    continue
1✔
2017

2018
            if prefix_including_delimiter:
1✔
2019
                common_prefixes.add(prefix_including_delimiter)
1✔
2020

2021
            elif isinstance(version, S3DeleteMarker):
1✔
2022
                delete_marker = DeleteMarkerEntry(
1✔
2023
                    Key=key,
2024
                    Owner=s3_bucket.owner,
2025
                    VersionId=version.version_id,
2026
                    IsLatest=version.is_current,
2027
                    LastModified=version.last_modified,
2028
                )
2029
                delete_markers.append(delete_marker)
1✔
2030
            else:
2031
                # TODO: add RestoreStatus if present
2032
                object_version = ObjectVersion(
1✔
2033
                    Key=key,
2034
                    ETag=version.quoted_etag,
2035
                    Owner=s3_bucket.owner,  # TODO: verify reality
2036
                    Size=version.size,
2037
                    VersionId=version.version_id or "null",
2038
                    LastModified=version.last_modified,
2039
                    IsLatest=version.is_current,
2040
                    # TODO: verify this, are other class possible?
2041
                    # StorageClass=version.storage_class,
2042
                    StorageClass=ObjectVersionStorageClass.STANDARD,
2043
                )
2044

2045
                if version.checksum_algorithm:
1✔
2046
                    object_version["ChecksumAlgorithm"] = [version.checksum_algorithm]
1✔
2047
                    object_version["ChecksumType"] = getattr(
1✔
2048
                        version, "checksum_type", ChecksumType.FULL_OBJECT
2049
                    )
2050

2051
                object_versions.append(object_version)
1✔
2052

2053
            # we just added a CommonPrefix, an Object or a DeleteMarker, increase the counter
2054
            count += 1
1✔
2055
            if count >= max_keys and last_version.version_id != version.version_id:
1✔
2056
                is_truncated = True
1✔
2057
                if prefix_including_delimiter:
1✔
2058
                    next_key_marker = prefix_including_delimiter
1✔
2059
                else:
2060
                    next_key_marker = version.key
1✔
2061
                    next_version_id_marker = version.version_id
1✔
2062
                break
1✔
2063

2064
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
2065

2066
        response = ListObjectVersionsOutput(
1✔
2067
            IsTruncated=is_truncated,
2068
            Name=bucket,
2069
            MaxKeys=max_keys,
2070
            Prefix=prefix,
2071
            KeyMarker=key_marker or "",
2072
            VersionIdMarker=version_id_marker or "",
2073
        )
2074
        if object_versions:
1✔
2075
            response["Versions"] = object_versions
1✔
2076
        if encoding_type:
1✔
2077
            response["EncodingType"] = EncodingType.url
1✔
2078
        if delete_markers:
1✔
2079
            response["DeleteMarkers"] = delete_markers
1✔
2080
        if delimiter:
1✔
2081
            response["Delimiter"] = delimiter
1✔
2082
        if common_prefixes:
1✔
2083
            response["CommonPrefixes"] = common_prefixes
1✔
2084
        if next_key_marker:
1✔
2085
            response["NextKeyMarker"] = next_key_marker
1✔
2086
        if next_version_id_marker:
1✔
2087
            response["NextVersionIdMarker"] = next_version_id_marker
1✔
2088

2089
        # RequestCharged: Optional[RequestCharged]  # TODO
2090
        return response
1✔
2091

2092
    @handler("GetObjectAttributes", expand=False)
1✔
2093
    def get_object_attributes(
1✔
2094
        self,
2095
        context: RequestContext,
2096
        request: GetObjectAttributesRequest,
2097
    ) -> GetObjectAttributesOutput:
2098
        bucket_name = request["Bucket"]
1✔
2099
        object_key = request["Key"]
1✔
2100
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2101

2102
        s3_object = s3_bucket.get_object(
1✔
2103
            key=object_key,
2104
            version_id=request.get("VersionId"),
2105
            http_method="GET",
2106
        )
2107

2108
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2109
        if s3_object.sse_key_hash:
1✔
2110
            if not sse_c_key_md5:
1✔
UNCOV
2111
                raise InvalidRequest(
×
2112
                    "The object was stored using a form of Server Side Encryption. "
2113
                    "The correct parameters must be provided to retrieve the object."
2114
                )
2115
            elif s3_object.sse_key_hash != sse_c_key_md5:
1✔
UNCOV
2116
                raise AccessDenied("Access Denied")
×
2117

2118
        validate_sse_c(
1✔
2119
            algorithm=request.get("SSECustomerAlgorithm"),
2120
            encryption_key=request.get("SSECustomerKey"),
2121
            encryption_key_md5=sse_c_key_md5,
2122
        )
2123

2124
        object_attrs = request.get("ObjectAttributes", [])
1✔
2125
        response = GetObjectAttributesOutput()
1✔
2126
        object_checksum_type = getattr(s3_object, "checksum_type", ChecksumType.FULL_OBJECT)
1✔
2127
        if "ETag" in object_attrs:
1✔
2128
            response["ETag"] = s3_object.etag
1✔
2129
        if "StorageClass" in object_attrs:
1✔
2130
            response["StorageClass"] = s3_object.storage_class
1✔
2131
        if "ObjectSize" in object_attrs:
1✔
2132
            response["ObjectSize"] = s3_object.size
1✔
2133
        if "Checksum" in object_attrs and (checksum_algorithm := s3_object.checksum_algorithm):
1✔
2134
            if s3_object.parts:
1✔
2135
                checksum_value = s3_object.checksum_value.split("-")[0]
1✔
2136
            else:
2137
                checksum_value = s3_object.checksum_value
1✔
2138
            response["Checksum"] = {
1✔
2139
                f"Checksum{checksum_algorithm.upper()}": checksum_value,
2140
                "ChecksumType": object_checksum_type,
2141
            }
2142

2143
        response["LastModified"] = s3_object.last_modified
1✔
2144

2145
        if s3_bucket.versioning_status:
1✔
2146
            response["VersionId"] = s3_object.version_id
1✔
2147

2148
        if "ObjectParts" in object_attrs and s3_object.parts:
1✔
2149
            if object_checksum_type == ChecksumType.FULL_OBJECT:
1✔
2150
                response["ObjectParts"] = GetObjectAttributesParts(
1✔
2151
                    TotalPartsCount=len(s3_object.parts)
2152
                )
2153
            else:
2154
                # this is basically a simplified `ListParts` call on the object, only returned when the checksum type is
2155
                # COMPOSITE
2156
                count = 0
1✔
2157
                is_truncated = False
1✔
2158
                part_number_marker = request.get("PartNumberMarker") or 0
1✔
2159
                max_parts = request.get("MaxParts") or 1000
1✔
2160

2161
                parts = []
1✔
2162
                all_parts = sorted(s3_object.parts.items())
1✔
2163
                last_part_number, last_part = all_parts[-1]
1✔
2164

2165
                # TODO: remove this backward compatibility hack needed for state created with <= 4.5
2166
                #  the parts would only be a tuple and would not store the proper state for 4.5 and earlier, so we need
2167
                #  to return early
2168
                if isinstance(last_part, tuple):
1✔
UNCOV
2169
                    response["ObjectParts"] = GetObjectAttributesParts(
×
2170
                        TotalPartsCount=len(s3_object.parts)
2171
                    )
UNCOV
2172
                    return response
×
2173

2174
                for part_number, part in all_parts:
1✔
2175
                    if part_number <= part_number_marker:
1✔
2176
                        continue
1✔
2177
                    part_item = select_from_typed_dict(ObjectPart, part)
1✔
2178

2179
                    parts.append(part_item)
1✔
2180
                    count += 1
1✔
2181

2182
                    if count >= max_parts and part["PartNumber"] != last_part_number:
1✔
2183
                        is_truncated = True
1✔
2184
                        break
1✔
2185

2186
                object_parts = GetObjectAttributesParts(
1✔
2187
                    TotalPartsCount=len(s3_object.parts),
2188
                    IsTruncated=is_truncated,
2189
                    MaxParts=max_parts,
2190
                    PartNumberMarker=part_number_marker,
2191
                    NextPartNumberMarker=0,
2192
                )
2193
                if parts:
1✔
2194
                    object_parts["Parts"] = parts
1✔
2195
                    object_parts["NextPartNumberMarker"] = parts[-1]["PartNumber"]
1✔
2196

2197
                response["ObjectParts"] = object_parts
1✔
2198

2199
        return response
1✔
2200

2201
    def restore_object(
1✔
2202
        self,
2203
        context: RequestContext,
2204
        bucket: BucketName,
2205
        key: ObjectKey,
2206
        version_id: ObjectVersionId = None,
2207
        restore_request: RestoreRequest = None,
2208
        request_payer: RequestPayer = None,
2209
        checksum_algorithm: ChecksumAlgorithm = None,
2210
        expected_bucket_owner: AccountId = None,
2211
        **kwargs,
2212
    ) -> RestoreObjectOutput:
2213
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2214

2215
        s3_object = s3_bucket.get_object(
1✔
2216
            key=key,
2217
            version_id=version_id,
2218
            http_method="GET",  # TODO: verify http method
2219
        )
2220
        if s3_object.storage_class not in ARCHIVES_STORAGE_CLASSES:
1✔
UNCOV
2221
            raise InvalidObjectState(StorageClass=s3_object.storage_class)
×
2222

2223
        # TODO: moto was only supported "Days" parameters from RestoreRequest, and was ignoring the others
2224
        # will only implement only the same functionality for now
2225

2226
        # if a request was already done and the object was available, and we're updating it, set the status code to 200
2227
        status_code = 200 if s3_object.restore else 202
1✔
2228
        restore_days = restore_request.get("Days")
1✔
2229
        if not restore_days:
1✔
UNCOV
2230
            LOG.debug("LocalStack does not support restore SELECT requests yet.")
×
UNCOV
2231
            return RestoreObjectOutput()
×
2232

2233
        restore_expiration_date = add_expiration_days_to_datetime(
1✔
2234
            datetime.datetime.now(datetime.UTC), restore_days
2235
        )
2236
        # TODO: add a way to transition from ongoing-request=true to false? for now it is instant
2237
        s3_object.restore = f'ongoing-request="false", expiry-date="{restore_expiration_date}"'
1✔
2238

2239
        s3_notif_ctx_initiated = S3EventNotificationContext.from_request_context(
1✔
2240
            context,
2241
            s3_bucket=s3_bucket,
2242
            s3_object=s3_object,
2243
        )
2244
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_initiated)
1✔
2245
        # But because it's instant in LocalStack, we can directly send the Completed notification as well
2246
        # We just need to copy the context so that we don't mutate the first context while it could be sent
2247
        # And modify its event type from `ObjectRestore:Post` to `ObjectRestore:Completed`
2248
        s3_notif_ctx_completed = copy.copy(s3_notif_ctx_initiated)
1✔
2249
        s3_notif_ctx_completed.event_type = s3_notif_ctx_completed.event_type.replace(
1✔
2250
            "Post", "Completed"
2251
        )
2252
        self._notify(context, s3_bucket=s3_bucket, s3_notif_ctx=s3_notif_ctx_completed)
1✔
2253

2254
        # TODO: request charged
2255
        return RestoreObjectOutput(StatusCode=status_code)
1✔
2256

2257
    @handler("CreateMultipartUpload", expand=False)
1✔
2258
    def create_multipart_upload(
1✔
2259
        self,
2260
        context: RequestContext,
2261
        request: CreateMultipartUploadRequest,
2262
    ) -> CreateMultipartUploadOutput:
2263
        # TODO: handle missing parameters:
2264
        #  request_payer: RequestPayer = None,
2265
        bucket_name = request["Bucket"]
1✔
2266
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2267

2268
        if (storage_class := request.get("StorageClass")) is not None and (
1✔
2269
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
2270
        ):
2271
            raise InvalidStorageClass(
1✔
2272
                "The storage class you specified is not valid", StorageClassRequested=storage_class
2273
            )
2274

2275
        if not config.S3_SKIP_KMS_KEY_VALIDATION and (sse_kms_key_id := request.get("SSEKMSKeyId")):
1✔
2276
            validate_kms_key_id(sse_kms_key_id, s3_bucket)
1✔
2277

2278
        if tagging := request.get("Tagging"):
1✔
UNCOV
2279
            tagging = parse_tagging_header(tagging_header=tagging)
×
2280

2281
        key = request["Key"]
1✔
2282

2283
        system_metadata = get_system_metadata_from_request(request)
1✔
2284
        if not system_metadata.get("ContentType"):
1✔
2285
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
2286

2287
        checksum_algorithm = request.get("ChecksumAlgorithm")
1✔
2288
        if checksum_algorithm and checksum_algorithm not in CHECKSUM_ALGORITHMS:
1✔
2289
            raise InvalidRequest(
1✔
2290
                "Checksum algorithm provided is unsupported. Please try again with any of the valid types: [CRC32, CRC32C, SHA1, SHA256]"
2291
            )
2292

2293
        if not (checksum_type := request.get("ChecksumType")) and checksum_algorithm:
1✔
2294
            if checksum_algorithm == ChecksumAlgorithm.CRC64NVME:
1✔
2295
                checksum_type = ChecksumType.FULL_OBJECT
1✔
2296
            else:
2297
                checksum_type = ChecksumType.COMPOSITE
1✔
2298
        elif checksum_type and not checksum_algorithm:
1✔
2299
            raise InvalidRequest(
1✔
2300
                "The x-amz-checksum-type header can only be used with the x-amz-checksum-algorithm header."
2301
            )
2302

2303
        if (
1✔
2304
            checksum_type == ChecksumType.COMPOSITE
2305
            and checksum_algorithm == ChecksumAlgorithm.CRC64NVME
2306
        ):
2307
            raise InvalidRequest(
1✔
2308
                "The COMPOSITE checksum type cannot be used with the crc64nvme checksum algorithm."
2309
            )
2310
        elif checksum_type == ChecksumType.FULL_OBJECT and checksum_algorithm.upper().startswith(
1✔
2311
            "SHA"
2312
        ):
2313
            raise InvalidRequest(
1✔
2314
                f"The FULL_OBJECT checksum type cannot be used with the {checksum_algorithm.lower()} checksum algorithm."
2315
            )
2316

2317
        # TODO: we're not encrypting the object with the provided key for now
2318
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2319
        validate_sse_c(
1✔
2320
            algorithm=request.get("SSECustomerAlgorithm"),
2321
            encryption_key=request.get("SSECustomerKey"),
2322
            encryption_key_md5=sse_c_key_md5,
2323
            server_side_encryption=request.get("ServerSideEncryption"),
2324
        )
2325

2326
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
2327
            request,
2328
            s3_bucket,
2329
            store,
2330
        )
2331
        lock_parameters = get_object_lock_parameters_from_bucket_and_request(request, s3_bucket)
1✔
2332

2333
        acl = get_access_control_policy_for_new_resource_request(request, owner=s3_bucket.owner)
1✔
2334

2335
        # validate encryption values
2336
        s3_multipart = S3Multipart(
1✔
2337
            key=key,
2338
            storage_class=storage_class,
2339
            expires=request.get("Expires"),
2340
            user_metadata=request.get("Metadata"),
2341
            system_metadata=system_metadata,
2342
            checksum_algorithm=checksum_algorithm,
2343
            checksum_type=checksum_type,
2344
            encryption=encryption_parameters.encryption,
2345
            kms_key_id=encryption_parameters.kms_key_id,
2346
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
2347
            sse_key_hash=sse_c_key_md5,
2348
            lock_mode=lock_parameters.lock_mode,
2349
            lock_legal_status=lock_parameters.lock_legal_status,
2350
            lock_until=lock_parameters.lock_until,
2351
            website_redirect_location=request.get("WebsiteRedirectLocation"),
2352
            expiration=None,  # TODO, from lifecycle, or should it be updated with config?
2353
            acl=acl,
2354
            initiator=get_owner_for_account_id(context.account_id),
2355
            tagging=tagging,
2356
            owner=s3_bucket.owner,
2357
            precondition=object_exists_for_precondition_write(s3_bucket, key),
2358
        )
2359
        # it seems if there is SSE-C on the multipart, AWS S3 will override the default Checksum behavior (but not on
2360
        # PutObject)
2361
        if sse_c_key_md5:
1✔
2362
            s3_multipart.object.checksum_algorithm = None
1✔
2363

2364
        s3_bucket.multiparts[s3_multipart.id] = s3_multipart
1✔
2365

2366
        response = CreateMultipartUploadOutput(
1✔
2367
            Bucket=bucket_name, Key=key, UploadId=s3_multipart.id
2368
        )
2369

2370
        if checksum_algorithm:
1✔
2371
            response["ChecksumAlgorithm"] = checksum_algorithm
1✔
2372
            response["ChecksumType"] = checksum_type
1✔
2373

2374
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2375
        if sse_c_key_md5:
1✔
2376
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2377
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2378

2379
        # TODO: missing response fields we're not currently supporting
2380
        # - AbortDate: lifecycle related,not currently supported, todo
2381
        # - AbortRuleId: lifecycle related, not currently supported, todo
2382
        # - RequestCharged: todo
2383

2384
        return response
1✔
2385

2386
    @handler("UploadPart", expand=False)
1✔
2387
    def upload_part(
1✔
2388
        self,
2389
        context: RequestContext,
2390
        request: UploadPartRequest,
2391
    ) -> UploadPartOutput:
2392
        # TODO: missing following parameters:
2393
        #  content_length: ContentLength = None, ->validate?
2394
        #  content_md5: ContentMD5 = None, -> validate?
2395
        #  request_payer: RequestPayer = None,
2396
        bucket_name = request["Bucket"]
1✔
2397
        store, s3_bucket = self._get_cross_account_bucket(context, bucket_name)
1✔
2398

2399
        upload_id = request.get("UploadId")
1✔
2400
        if not (
1✔
2401
            s3_multipart := s3_bucket.multiparts.get(upload_id)
2402
        ) or s3_multipart.object.key != request.get("Key"):
2403
            raise NoSuchUpload(
1✔
2404
                "The specified upload does not exist. "
2405
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2406
                UploadId=upload_id,
2407
            )
2408
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
2409
            raise InvalidArgument(
1✔
2410
                "Part number must be an integer between 1 and 10000, inclusive",
2411
                ArgumentName="partNumber",
2412
                ArgumentValue=part_number,
2413
            )
2414

2415
        if content_md5 := request.get("ContentMD5"):
1✔
2416
            # assert that the received ContentMD5 is a properly b64 encoded value that fits a MD5 hash length
2417
            if not base_64_content_md5_to_etag(content_md5):
1✔
2418
                raise InvalidDigest(
1✔
2419
                    "The Content-MD5 you specified was invalid.",
2420
                    Content_MD5=content_md5,
2421
                )
2422

2423
        checksum_algorithm = get_s3_checksum_algorithm_from_request(request)
1✔
2424
        checksum_value = (
1✔
2425
            request.get(f"Checksum{checksum_algorithm.upper()}") if checksum_algorithm else None
2426
        )
2427

2428
        # TODO: we're not encrypting the object with the provided key for now
2429
        sse_c_key_md5 = request.get("SSECustomerKeyMD5")
1✔
2430
        validate_sse_c(
1✔
2431
            algorithm=request.get("SSECustomerAlgorithm"),
2432
            encryption_key=request.get("SSECustomerKey"),
2433
            encryption_key_md5=sse_c_key_md5,
2434
        )
2435

2436
        if (s3_multipart.object.sse_key_hash and not sse_c_key_md5) or (
1✔
2437
            sse_c_key_md5 and not s3_multipart.object.sse_key_hash
2438
        ):
2439
            raise InvalidRequest(
1✔
2440
                "The multipart upload initiate requested encryption. "
2441
                "Subsequent part requests must include the appropriate encryption parameters."
2442
            )
2443
        elif (
1✔
2444
            s3_multipart.object.sse_key_hash
2445
            and sse_c_key_md5
2446
            and s3_multipart.object.sse_key_hash != sse_c_key_md5
2447
        ):
2448
            raise InvalidRequest(
1✔
2449
                "The provided encryption parameters did not match the ones used originally."
2450
            )
2451

2452
        s3_part = S3Part(
1✔
2453
            part_number=part_number,
2454
            checksum_algorithm=checksum_algorithm,
2455
            checksum_value=checksum_value,
2456
        )
2457
        body = request.get("Body")
1✔
2458
        headers = context.request.headers
1✔
2459
        is_aws_chunked = headers.get("x-amz-content-sha256", "").startswith(
1✔
2460
            "STREAMING-"
2461
        ) or "aws-chunked" in headers.get("content-encoding", "")
2462
        # check if chunked request
2463
        if is_aws_chunked:
1✔
2464
            checksum_algorithm = (
1✔
2465
                checksum_algorithm
2466
                or get_s3_checksum_algorithm_from_trailing_headers(headers.get("x-amz-trailer", ""))
2467
            )
2468
            if checksum_algorithm:
1✔
UNCOV
2469
                s3_part.checksum_algorithm = checksum_algorithm
×
2470

2471
            decoded_content_length = int(headers.get("x-amz-decoded-content-length", 0))
1✔
2472
            body = AwsChunkedDecoder(body, decoded_content_length, s3_part)
1✔
2473

2474
        if (
1✔
2475
            s3_multipart.checksum_algorithm
2476
            and s3_part.checksum_algorithm != s3_multipart.checksum_algorithm
2477
        ):
2478
            error_req_checksum = checksum_algorithm.lower() if checksum_algorithm else "null"
1✔
2479
            error_mp_checksum = (
1✔
2480
                s3_multipart.object.checksum_algorithm.lower()
2481
                if s3_multipart.object.checksum_algorithm
2482
                else "null"
2483
            )
2484
            if not error_mp_checksum == "null":
1✔
2485
                raise InvalidRequest(
1✔
2486
                    f"Checksum Type mismatch occurred, expected checksum Type: {error_mp_checksum}, actual checksum Type: {error_req_checksum}"
2487
                )
2488

2489
        stored_multipart = self._storage_backend.get_multipart(bucket_name, s3_multipart)
1✔
2490
        with stored_multipart.open(s3_part, mode="w") as stored_s3_part:
1✔
2491
            try:
1✔
2492
                stored_s3_part.write(body)
1✔
2493
            except Exception:
1✔
2494
                stored_multipart.remove_part(s3_part)
1✔
2495
                raise
1✔
2496

2497
            if checksum_algorithm:
1✔
2498
                if not validate_checksum_value(s3_part.checksum_value, checksum_algorithm):
1✔
2499
                    stored_multipart.remove_part(s3_part)
1✔
2500
                    raise InvalidRequest(
1✔
2501
                        f"Value for x-amz-checksum-{s3_part.checksum_algorithm.lower()} header is invalid."
2502
                    )
2503
                elif s3_part.checksum_value != stored_s3_part.checksum:
1✔
2504
                    stored_multipart.remove_part(s3_part)
1✔
2505
                    raise BadDigest(
1✔
2506
                        f"The {checksum_algorithm.upper()} you specified did not match the calculated checksum."
2507
                    )
2508

2509
            if content_md5:
1✔
2510
                calculated_md5 = etag_to_base_64_content_md5(s3_part.etag)
1✔
2511
                if calculated_md5 != content_md5:
1✔
2512
                    stored_multipart.remove_part(s3_part)
1✔
2513
                    raise BadDigest(
1✔
2514
                        "The Content-MD5 you specified did not match what we received.",
2515
                        ExpectedDigest=content_md5,
2516
                        CalculatedDigest=calculated_md5,
2517
                    )
2518

2519
            s3_multipart.parts[part_number] = s3_part
1✔
2520

2521
        response = UploadPartOutput(
1✔
2522
            ETag=s3_part.quoted_etag,
2523
        )
2524

2525
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2526
        if sse_c_key_md5:
1✔
2527
            response["SSECustomerAlgorithm"] = "AES256"
1✔
2528
            response["SSECustomerKeyMD5"] = sse_c_key_md5
1✔
2529

2530
        if s3_part.checksum_algorithm:
1✔
2531
            response[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2532

2533
        # TODO: RequestCharged: Optional[RequestCharged]
2534
        return response
1✔
2535

2536
    @handler("UploadPartCopy", expand=False)
1✔
2537
    def upload_part_copy(
1✔
2538
        self,
2539
        context: RequestContext,
2540
        request: UploadPartCopyRequest,
2541
    ) -> UploadPartCopyOutput:
2542
        # TODO: handle following parameters:
2543
        #  SSECustomerAlgorithm: Optional[SSECustomerAlgorithm]
2544
        #  SSECustomerKey: Optional[SSECustomerKey]
2545
        #  SSECustomerKeyMD5: Optional[SSECustomerKeyMD5]
2546
        #  CopySourceSSECustomerAlgorithm: Optional[CopySourceSSECustomerAlgorithm]
2547
        #  CopySourceSSECustomerKey: Optional[CopySourceSSECustomerKey]
2548
        #  CopySourceSSECustomerKeyMD5: Optional[CopySourceSSECustomerKeyMD5]
2549
        #  RequestPayer: Optional[RequestPayer]
2550
        #  ExpectedBucketOwner: Optional[AccountId]
2551
        #  ExpectedSourceBucketOwner: Optional[AccountId]
2552
        dest_bucket = request["Bucket"]
1✔
2553
        dest_key = request["Key"]
1✔
2554
        store = self.get_store(context.account_id, context.region)
1✔
2555
        # TODO: validate cross-account UploadPartCopy
2556
        if not (dest_s3_bucket := store.buckets.get(dest_bucket)):
1✔
UNCOV
2557
            raise NoSuchBucket("The specified bucket does not exist", BucketName=dest_bucket)
×
2558

2559
        src_bucket, src_key, src_version_id = extract_bucket_key_version_id_from_copy_source(
1✔
2560
            request.get("CopySource")
2561
        )
2562

2563
        if not (src_s3_bucket := store.buckets.get(src_bucket)):
1✔
UNCOV
2564
            raise NoSuchBucket("The specified bucket does not exist", BucketName=src_bucket)
×
2565

2566
        # if the object is a delete marker, get_object will raise NotFound if no versionId, like AWS
2567
        try:
1✔
2568
            src_s3_object = src_s3_bucket.get_object(key=src_key, version_id=src_version_id)
1✔
UNCOV
2569
        except MethodNotAllowed:
×
UNCOV
2570
            raise InvalidRequest(
×
2571
                "The source of a copy request may not specifically refer to a delete marker by version id."
2572
            )
2573

2574
        if src_s3_object.storage_class in ARCHIVES_STORAGE_CLASSES and not src_s3_object.restore:
1✔
UNCOV
2575
            raise InvalidObjectState(
×
2576
                "Operation is not valid for the source object's storage class",
2577
                StorageClass=src_s3_object.storage_class,
2578
            )
2579

2580
        upload_id = request.get("UploadId")
1✔
2581
        if (
1✔
2582
            not (s3_multipart := dest_s3_bucket.multiparts.get(upload_id))
2583
            or s3_multipart.object.key != dest_key
2584
        ):
UNCOV
2585
            raise NoSuchUpload(
×
2586
                "The specified upload does not exist. "
2587
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2588
                UploadId=upload_id,
2589
            )
2590

2591
        elif (part_number := request.get("PartNumber", 0)) < 1 or part_number > 10000:
1✔
UNCOV
2592
            raise InvalidArgument(
×
2593
                "Part number must be an integer between 1 and 10000, inclusive",
2594
                ArgumentName="partNumber",
2595
                ArgumentValue=part_number,
2596
            )
2597

2598
        source_range = request.get("CopySourceRange")
1✔
2599
        # TODO implement copy source IF
2600

2601
        range_data: ObjectRange | None = None
1✔
2602
        if source_range:
1✔
2603
            range_data = parse_copy_source_range_header(source_range, src_s3_object.size)
1✔
2604

2605
        if precondition := get_failed_upload_part_copy_source_preconditions(
1✔
2606
            request, src_s3_object.last_modified, src_s3_object.etag
2607
        ):
2608
            raise PreconditionFailed(
1✔
2609
                "At least one of the pre-conditions you specified did not hold",
2610
                Condition=precondition,
2611
            )
2612

2613
        s3_part = S3Part(part_number=part_number)
1✔
2614
        if s3_multipart.checksum_algorithm:
1✔
2615
            s3_part.checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2616

2617
        stored_multipart = self._storage_backend.get_multipart(dest_bucket, s3_multipart)
1✔
2618
        stored_multipart.copy_from_object(s3_part, src_bucket, src_s3_object, range_data)
1✔
2619

2620
        s3_multipart.parts[part_number] = s3_part
1✔
2621

2622
        # TODO: return those fields
2623
        #     RequestCharged: Optional[RequestCharged]
2624

2625
        result = CopyPartResult(
1✔
2626
            ETag=s3_part.quoted_etag,
2627
            LastModified=s3_part.last_modified,
2628
        )
2629

2630
        response = UploadPartCopyOutput(
1✔
2631
            CopyPartResult=result,
2632
        )
2633

2634
        if src_s3_bucket.versioning_status and src_s3_object.version_id:
1✔
UNCOV
2635
            response["CopySourceVersionId"] = src_s3_object.version_id
×
2636

2637
        if s3_part.checksum_algorithm:
1✔
2638
            result[f"Checksum{s3_part.checksum_algorithm.upper()}"] = s3_part.checksum_value
1✔
2639

2640
        add_encryption_to_response(response, s3_object=s3_multipart.object)
1✔
2641

2642
        return response
1✔
2643

2644
    def complete_multipart_upload(
1✔
2645
        self,
2646
        context: RequestContext,
2647
        bucket: BucketName,
2648
        key: ObjectKey,
2649
        upload_id: MultipartUploadId,
2650
        multipart_upload: CompletedMultipartUpload = None,
2651
        checksum_crc32: ChecksumCRC32 = None,
2652
        checksum_crc32_c: ChecksumCRC32C = None,
2653
        checksum_crc64_nvme: ChecksumCRC64NVME = None,
2654
        checksum_sha1: ChecksumSHA1 = None,
2655
        checksum_sha256: ChecksumSHA256 = None,
2656
        checksum_type: ChecksumType = None,
2657
        mpu_object_size: MpuObjectSize = None,
2658
        request_payer: RequestPayer = None,
2659
        expected_bucket_owner: AccountId = None,
2660
        if_match: IfMatch = None,
2661
        if_none_match: IfNoneMatch = None,
2662
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2663
        sse_customer_key: SSECustomerKey = None,
2664
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2665
        **kwargs,
2666
    ) -> CompleteMultipartUploadOutput:
2667
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2668

2669
        if (
1✔
2670
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2671
            or s3_multipart.object.key != key
2672
        ):
2673
            raise NoSuchUpload(
1✔
2674
                "The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
2675
                UploadId=upload_id,
2676
            )
2677

2678
        if if_none_match and if_match:
1✔
2679
            raise NotImplementedException(
2680
                "A header you provided implies functionality that is not implemented",
2681
                Header="If-Match,If-None-Match",
2682
                additionalMessage="Multiple conditional request headers present in the request",
2683
            )
2684

2685
        elif if_none_match:
1✔
2686
            # TODO: improve concurrency mechanism for `if_none_match` and `if_match`
2687
            if if_none_match != "*":
1✔
2688
                raise NotImplementedException(
2689
                    "A header you provided implies functionality that is not implemented",
2690
                    Header="If-None-Match",
2691
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2692
                )
2693
            if object_exists_for_precondition_write(s3_bucket, key):
1✔
2694
                raise PreconditionFailed(
1✔
2695
                    "At least one of the pre-conditions you specified did not hold",
2696
                    Condition="If-None-Match",
2697
                )
2698
            elif s3_multipart.precondition:
1✔
2699
                raise ConditionalRequestConflict(
1✔
2700
                    "The conditional request cannot succeed due to a conflicting operation against this resource.",
2701
                    Condition="If-None-Match",
2702
                    Key=key,
2703
                )
2704

2705
        elif if_match:
1✔
2706
            if if_match == "*":
1✔
2707
                raise NotImplementedException(
2708
                    "A header you provided implies functionality that is not implemented",
2709
                    Header="If-None-Match",
2710
                    additionalMessage="We don't accept the provided value of If-None-Match header for this API",
2711
                )
2712
            verify_object_equality_precondition_write(
1✔
2713
                s3_bucket, key, if_match, initiated=s3_multipart.initiated
2714
            )
2715

2716
        parts = multipart_upload.get("Parts", [])
1✔
2717
        if not parts:
1✔
2718
            raise InvalidRequest("You must specify at least one part")
1✔
2719

2720
        parts_numbers = [part.get("PartNumber") for part in parts]
1✔
2721
        # TODO: it seems that with new S3 data integrity, sorting might not be mandatory depending on checksum type
2722
        # see https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
2723
        # sorted is very fast (fastest) if the list is already sorted, which should be the case
2724
        if sorted(parts_numbers) != parts_numbers:
1✔
2725
            raise InvalidPartOrder(
1✔
2726
                "The list of parts was not in ascending order. Parts must be ordered by part number.",
2727
                UploadId=upload_id,
2728
            )
2729

2730
        mpu_checksum_algorithm = s3_multipart.checksum_algorithm
1✔
2731
        mpu_checksum_type = getattr(s3_multipart, "checksum_type", None)
1✔
2732

2733
        if checksum_type and checksum_type != mpu_checksum_type:
1✔
2734
            raise InvalidRequest(
1✔
2735
                f"The upload was created using the {mpu_checksum_type or 'null'} checksum mode. "
2736
                f"The complete request must use the same checksum mode."
2737
            )
2738

2739
        # generate the versionId before completing, in case the bucket versioning status has changed between
2740
        # creation and completion? AWS validate this
2741
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
2742
        s3_multipart.object.version_id = version_id
1✔
2743

2744
        # we're inspecting the signature of `complete_multipart`, in case the multipart has been restored from
2745
        # persistence. if we do not have a new version, do not validate those parameters
2746
        # TODO: remove for next major version (minor?)
2747
        if signature(s3_multipart.complete_multipart).parameters.get("mpu_size"):
1✔
2748
            checksum_algorithm = mpu_checksum_algorithm.lower() if mpu_checksum_algorithm else None
1✔
2749
            checksum_map = {
1✔
2750
                "crc32": checksum_crc32,
2751
                "crc32c": checksum_crc32_c,
2752
                "crc64nvme": checksum_crc64_nvme,
2753
                "sha1": checksum_sha1,
2754
                "sha256": checksum_sha256,
2755
            }
2756
            checksum_value = checksum_map.get(checksum_algorithm)
1✔
2757
            s3_multipart.complete_multipart(
1✔
2758
                parts, mpu_size=mpu_object_size, validation_checksum=checksum_value
2759
            )
2760
            if mpu_checksum_algorithm and (
1✔
2761
                (
2762
                    checksum_value
2763
                    and mpu_checksum_type == ChecksumType.FULL_OBJECT
2764
                    and not checksum_type
2765
                )
2766
                or any(
2767
                    checksum_value
2768
                    for checksum_type, checksum_value in checksum_map.items()
2769
                    if checksum_type != checksum_algorithm
2770
                )
2771
            ):
2772
                # this is not ideal, but this validation comes last... after the validation of individual parts
2773
                s3_multipart.object.parts.clear()
1✔
2774
                raise BadDigest(
1✔
2775
                    f"The {mpu_checksum_algorithm.lower()} you specified did not match the calculated checksum."
2776
                )
2777
        else:
UNCOV
2778
            s3_multipart.complete_multipart(parts)
×
2779

2780
        stored_multipart = self._storage_backend.get_multipart(bucket, s3_multipart)
1✔
2781
        stored_multipart.complete_multipart(
1✔
2782
            [s3_multipart.parts.get(part_number) for part_number in parts_numbers]
2783
        )
2784
        if not s3_multipart.checksum_algorithm and s3_multipart.object.checksum_algorithm:
1✔
2785
            with self._storage_backend.open(
1✔
2786
                bucket, s3_multipart.object, mode="r"
2787
            ) as s3_stored_object:
2788
                s3_multipart.object.checksum_value = s3_stored_object.checksum
1✔
2789
                s3_multipart.object.checksum_type = ChecksumType.FULL_OBJECT
1✔
2790

2791
        s3_object = s3_multipart.object
1✔
2792

2793
        s3_bucket.objects.set(key, s3_object)
1✔
2794

2795
        # remove the multipart now that it's complete
2796
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2797
        s3_bucket.multiparts.pop(s3_multipart.id, None)
1✔
2798

2799
        key_id = get_unique_key_id(bucket, key, version_id)
1✔
2800
        store.TAGS.tags.pop(key_id, None)
1✔
2801
        if s3_multipart.tagging:
1✔
UNCOV
2802
            store.TAGS.tags[key_id] = s3_multipart.tagging
×
2803

2804
        # RequestCharged: Optional[RequestCharged] TODO
2805

2806
        response = CompleteMultipartUploadOutput(
1✔
2807
            Bucket=bucket,
2808
            Key=key,
2809
            ETag=s3_object.quoted_etag,
2810
            Location=f"{get_full_default_bucket_location(bucket)}{key}",
2811
        )
2812

2813
        if s3_object.version_id:
1✔
UNCOV
2814
            response["VersionId"] = s3_object.version_id
×
2815

2816
        # it seems AWS is not returning checksum related fields if the object has KMS encryption ¯\_(ツ)_/¯
2817
        # but it still generates them, and they can be retrieved with regular GetObject and such operations
2818
        if s3_object.checksum_algorithm and not s3_object.kms_key_id:
1✔
2819
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
2820
            response["ChecksumType"] = s3_object.checksum_type
1✔
2821

2822
        if s3_object.expiration:
1✔
UNCOV
2823
            response["Expiration"] = s3_object.expiration  # TODO: properly parse the datetime
×
2824

2825
        add_encryption_to_response(response, s3_object=s3_object)
1✔
2826

2827
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
2828

2829
        return response
1✔
2830

2831
    def abort_multipart_upload(
1✔
2832
        self,
2833
        context: RequestContext,
2834
        bucket: BucketName,
2835
        key: ObjectKey,
2836
        upload_id: MultipartUploadId,
2837
        request_payer: RequestPayer = None,
2838
        expected_bucket_owner: AccountId = None,
2839
        if_match_initiated_time: IfMatchInitiatedTime = None,
2840
        **kwargs,
2841
    ) -> AbortMultipartUploadOutput:
2842
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2843

2844
        if (
1✔
2845
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2846
            or s3_multipart.object.key != key
2847
        ):
2848
            raise NoSuchUpload(
1✔
2849
                "The specified upload does not exist. "
2850
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2851
                UploadId=upload_id,
2852
            )
2853
        s3_bucket.multiparts.pop(upload_id, None)
1✔
2854

2855
        self._storage_backend.remove_multipart(bucket, s3_multipart)
1✔
2856
        response = AbortMultipartUploadOutput()
1✔
2857
        # TODO: requestCharged
2858
        return response
1✔
2859

2860
    def list_parts(
1✔
2861
        self,
2862
        context: RequestContext,
2863
        bucket: BucketName,
2864
        key: ObjectKey,
2865
        upload_id: MultipartUploadId,
2866
        max_parts: MaxParts = None,
2867
        part_number_marker: PartNumberMarker = None,
2868
        request_payer: RequestPayer = None,
2869
        expected_bucket_owner: AccountId = None,
2870
        sse_customer_algorithm: SSECustomerAlgorithm = None,
2871
        sse_customer_key: SSECustomerKey = None,
2872
        sse_customer_key_md5: SSECustomerKeyMD5 = None,
2873
        **kwargs,
2874
    ) -> ListPartsOutput:
2875
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2876

2877
        if (
1✔
2878
            not (s3_multipart := s3_bucket.multiparts.get(upload_id))
2879
            or s3_multipart.object.key != key
2880
        ):
2881
            raise NoSuchUpload(
1✔
2882
                "The specified upload does not exist. "
2883
                "The upload ID may be invalid, or the upload may have been aborted or completed.",
2884
                UploadId=upload_id,
2885
            )
2886

2887
        count = 0
1✔
2888
        is_truncated = False
1✔
2889
        part_number_marker = part_number_marker or 0
1✔
2890
        max_parts = max_parts or 1000
1✔
2891

2892
        parts = []
1✔
2893
        all_parts = sorted(s3_multipart.parts.items())
1✔
2894
        last_part_number = all_parts[-1][0] if all_parts else None
1✔
2895
        for part_number, part in all_parts:
1✔
2896
            if part_number <= part_number_marker:
1✔
2897
                continue
1✔
2898
            part_item = Part(
1✔
2899
                ETag=part.quoted_etag,
2900
                LastModified=part.last_modified,
2901
                PartNumber=part_number,
2902
                Size=part.size,
2903
            )
2904
            if s3_multipart.checksum_algorithm and part.checksum_algorithm:
1✔
2905
                part_item[f"Checksum{part.checksum_algorithm.upper()}"] = part.checksum_value
1✔
2906

2907
            parts.append(part_item)
1✔
2908
            count += 1
1✔
2909

2910
            if count >= max_parts and part.part_number != last_part_number:
1✔
2911
                is_truncated = True
1✔
2912
                break
1✔
2913

2914
        response = ListPartsOutput(
1✔
2915
            Bucket=bucket,
2916
            Key=key,
2917
            UploadId=upload_id,
2918
            Initiator=s3_multipart.initiator,
2919
            Owner=s3_multipart.initiator,
2920
            StorageClass=s3_multipart.object.storage_class,
2921
            IsTruncated=is_truncated,
2922
            MaxParts=max_parts,
2923
            PartNumberMarker=0,
2924
            NextPartNumberMarker=0,
2925
        )
2926
        if parts:
1✔
2927
            response["Parts"] = parts
1✔
2928
            last_part = parts[-1]["PartNumber"]
1✔
2929
            response["NextPartNumberMarker"] = last_part
1✔
2930

2931
        if part_number_marker:
1✔
2932
            response["PartNumberMarker"] = part_number_marker
1✔
2933
        if s3_multipart.checksum_algorithm:
1✔
2934
            response["ChecksumAlgorithm"] = s3_multipart.object.checksum_algorithm
1✔
2935
            response["ChecksumType"] = getattr(s3_multipart, "checksum_type", None)
1✔
2936

2937
        #     AbortDate: Optional[AbortDate] TODO: lifecycle
2938
        #     AbortRuleId: Optional[AbortRuleId] TODO: lifecycle
2939
        #     RequestCharged: Optional[RequestCharged]
2940

2941
        return response
1✔
2942

2943
    def list_multipart_uploads(
1✔
2944
        self,
2945
        context: RequestContext,
2946
        bucket: BucketName,
2947
        delimiter: Delimiter = None,
2948
        encoding_type: EncodingType = None,
2949
        key_marker: KeyMarker = None,
2950
        max_uploads: MaxUploads = None,
2951
        prefix: Prefix = None,
2952
        upload_id_marker: UploadIdMarker = None,
2953
        expected_bucket_owner: AccountId = None,
2954
        request_payer: RequestPayer = None,
2955
        **kwargs,
2956
    ) -> ListMultipartUploadsOutput:
2957
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
2958

2959
        common_prefixes = set()
1✔
2960
        count = 0
1✔
2961
        is_truncated = False
1✔
2962
        max_uploads = max_uploads or 1000
1✔
2963
        prefix = prefix or ""
1✔
2964
        delimiter = delimiter or ""
1✔
2965
        if encoding_type:
1✔
2966
            prefix = urlparse.quote(prefix)
1✔
2967
            delimiter = urlparse.quote(delimiter)
1✔
2968
        upload_id_marker_found = False
1✔
2969

2970
        if key_marker and upload_id_marker:
1✔
2971
            multipart = s3_bucket.multiparts.get(upload_id_marker)
1✔
2972
            if multipart:
1✔
2973
                key = (
1✔
2974
                    urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
2975
                )
2976
            else:
2977
                # set key to None so it fails if the multipart is not Found
UNCOV
2978
                key = None
×
2979

2980
            if key_marker != key:
1✔
2981
                raise InvalidArgument(
1✔
2982
                    "Invalid uploadId marker",
2983
                    ArgumentName="upload-id-marker",
2984
                    ArgumentValue=upload_id_marker,
2985
                )
2986

2987
        uploads = []
1✔
2988
        # sort by key and initiated
2989
        all_multiparts = sorted(
1✔
2990
            s3_bucket.multiparts.values(), key=lambda r: (r.object.key, r.initiated.timestamp())
2991
        )
2992
        last_multipart = all_multiparts[-1] if all_multiparts else None
1✔
2993

2994
        for multipart in all_multiparts:
1✔
2995
            key = urlparse.quote(multipart.object.key) if encoding_type else multipart.object.key
1✔
2996
            # skip all keys that are different than key_marker
2997
            if key_marker:
1✔
2998
                if key < key_marker:
1✔
2999
                    continue
1✔
3000
                elif key == key_marker:
1✔
3001
                    if not upload_id_marker:
1✔
3002
                        continue
1✔
3003
                    # as the keys are ordered by time, once we found the key marker, we can return the next one
3004
                    if multipart.id == upload_id_marker:
1✔
3005
                        upload_id_marker_found = True
1✔
3006
                        continue
1✔
3007
                    elif not upload_id_marker_found:
1✔
3008
                        # as long as we have not passed the version_key_marker, skip the versions
3009
                        continue
1✔
3010

3011
            # Filter for keys that start with prefix
3012
            if prefix and not key.startswith(prefix):
1✔
3013
                continue
1✔
3014

3015
            # see ListObjectsV2 for the logic comments (shared logic here)
3016
            prefix_including_delimiter = None
1✔
3017
            if delimiter and delimiter in (key_no_prefix := key.removeprefix(prefix)):
1✔
3018
                pre_delimiter, _, _ = key_no_prefix.partition(delimiter)
1✔
3019
                prefix_including_delimiter = f"{prefix}{pre_delimiter}{delimiter}"
1✔
3020

3021
                if prefix_including_delimiter in common_prefixes or (
1✔
3022
                    key_marker and key_marker.startswith(prefix_including_delimiter)
3023
                ):
3024
                    continue
1✔
3025

3026
            if prefix_including_delimiter:
1✔
3027
                common_prefixes.add(prefix_including_delimiter)
1✔
3028
            else:
3029
                multipart_upload = MultipartUpload(
1✔
3030
                    UploadId=multipart.id,
3031
                    Key=multipart.object.key,
3032
                    Initiated=multipart.initiated,
3033
                    StorageClass=multipart.object.storage_class,
3034
                    Owner=multipart.initiator,  # TODO: check the difference
3035
                    Initiator=multipart.initiator,
3036
                )
3037
                if multipart.checksum_algorithm:
1✔
3038
                    multipart_upload["ChecksumAlgorithm"] = multipart.checksum_algorithm
1✔
3039
                    multipart_upload["ChecksumType"] = getattr(multipart, "checksum_type", None)
1✔
3040

3041
                uploads.append(multipart_upload)
1✔
3042

3043
            count += 1
1✔
3044
            if count >= max_uploads and last_multipart.id != multipart.id:
1✔
3045
                is_truncated = True
1✔
3046
                break
1✔
3047

3048
        common_prefixes = [CommonPrefix(Prefix=prefix) for prefix in sorted(common_prefixes)]
1✔
3049

3050
        response = ListMultipartUploadsOutput(
1✔
3051
            Bucket=bucket,
3052
            IsTruncated=is_truncated,
3053
            MaxUploads=max_uploads or 1000,
3054
            KeyMarker=key_marker or "",
3055
            UploadIdMarker=upload_id_marker or "" if key_marker else "",
3056
            NextKeyMarker="",
3057
            NextUploadIdMarker="",
3058
        )
3059
        if uploads:
1✔
3060
            response["Uploads"] = uploads
1✔
3061
            last_upload = uploads[-1]
1✔
3062
            response["NextKeyMarker"] = last_upload["Key"]
1✔
3063
            response["NextUploadIdMarker"] = last_upload["UploadId"]
1✔
3064
        if delimiter:
1✔
3065
            response["Delimiter"] = delimiter
1✔
3066
        if prefix:
1✔
3067
            response["Prefix"] = prefix
1✔
3068
        if encoding_type:
1✔
3069
            response["EncodingType"] = EncodingType.url
1✔
3070
        if common_prefixes:
1✔
3071
            response["CommonPrefixes"] = common_prefixes
1✔
3072

3073
        return response
1✔
3074

3075
    def put_bucket_versioning(
1✔
3076
        self,
3077
        context: RequestContext,
3078
        bucket: BucketName,
3079
        versioning_configuration: VersioningConfiguration,
3080
        content_md5: ContentMD5 = None,
3081
        checksum_algorithm: ChecksumAlgorithm = None,
3082
        mfa: MFA = None,
3083
        expected_bucket_owner: AccountId = None,
3084
        **kwargs,
3085
    ) -> None:
3086
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3087
        if not (versioning_status := versioning_configuration.get("Status")):
1✔
3088
            raise CommonServiceException(
1✔
3089
                code="IllegalVersioningConfigurationException",
3090
                message="The Versioning element must be specified",
3091
            )
3092

3093
        if versioning_status not in ("Enabled", "Suspended"):
1✔
3094
            raise MalformedXML()
1✔
3095

3096
        if s3_bucket.object_lock_enabled and versioning_status == "Suspended":
1✔
3097
            raise InvalidBucketState(
1✔
3098
                "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed."
3099
            )
3100

3101
        if not s3_bucket.versioning_status:
1✔
3102
            s3_bucket.objects = VersionedKeyStore.from_key_store(s3_bucket.objects)
1✔
3103

3104
        s3_bucket.versioning_status = versioning_status
1✔
3105

3106
    def get_bucket_versioning(
1✔
3107
        self,
3108
        context: RequestContext,
3109
        bucket: BucketName,
3110
        expected_bucket_owner: AccountId = None,
3111
        **kwargs,
3112
    ) -> GetBucketVersioningOutput:
3113
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3114

3115
        if not s3_bucket.versioning_status:
1✔
3116
            return GetBucketVersioningOutput()
1✔
3117

3118
        return GetBucketVersioningOutput(Status=s3_bucket.versioning_status)
1✔
3119

3120
    def get_bucket_encryption(
1✔
3121
        self,
3122
        context: RequestContext,
3123
        bucket: BucketName,
3124
        expected_bucket_owner: AccountId = None,
3125
        **kwargs,
3126
    ) -> GetBucketEncryptionOutput:
3127
        # AWS now encrypts bucket by default with AES256, see:
3128
        # https://docs.aws.amazon.com/AmazonS3/latest/userguide/default-bucket-encryption.html
3129
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3130

3131
        if not s3_bucket.encryption_rule:
1✔
UNCOV
3132
            return GetBucketEncryptionOutput()
×
3133

3134
        return GetBucketEncryptionOutput(
1✔
3135
            ServerSideEncryptionConfiguration={"Rules": [s3_bucket.encryption_rule]}
3136
        )
3137

3138
    def put_bucket_encryption(
1✔
3139
        self,
3140
        context: RequestContext,
3141
        bucket: BucketName,
3142
        server_side_encryption_configuration: ServerSideEncryptionConfiguration,
3143
        content_md5: ContentMD5 = None,
3144
        checksum_algorithm: ChecksumAlgorithm = None,
3145
        expected_bucket_owner: AccountId = None,
3146
        **kwargs,
3147
    ) -> None:
3148
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3149

3150
        if not (rules := server_side_encryption_configuration.get("Rules")):
1✔
3151
            raise MalformedXML()
1✔
3152

3153
        if len(rules) != 1 or not (
1✔
3154
            encryption := rules[0].get("ApplyServerSideEncryptionByDefault")
3155
        ):
3156
            raise MalformedXML()
1✔
3157

3158
        if not (sse_algorithm := encryption.get("SSEAlgorithm")):
1✔
UNCOV
3159
            raise MalformedXML()
×
3160

3161
        if sse_algorithm not in SSE_ALGORITHMS:
1✔
UNCOV
3162
            raise MalformedXML()
×
3163

3164
        if sse_algorithm != ServerSideEncryption.aws_kms and "KMSMasterKeyID" in encryption:
1✔
3165
            raise InvalidArgument(
1✔
3166
                "a KMSMasterKeyID is not applicable if the default sse algorithm is not aws:kms or aws:kms:dsse",
3167
                ArgumentName="ApplyServerSideEncryptionByDefault",
3168
            )
3169
        # elif master_kms_key := encryption.get("KMSMasterKeyID"):
3170
        # TODO: validate KMS key? not currently done in moto
3171
        # You can pass either the KeyId or the KeyArn. If cross-account, it has to be the ARN.
3172
        # It's always saved as the ARN in the bucket configuration.
3173
        # kms_key_arn = get_kms_key_arn(master_kms_key, s3_bucket.bucket_account_id)
3174
        # encryption["KMSMasterKeyID"] = master_kms_key
3175

3176
        s3_bucket.encryption_rule = rules[0]
1✔
3177

3178
    def delete_bucket_encryption(
1✔
3179
        self,
3180
        context: RequestContext,
3181
        bucket: BucketName,
3182
        expected_bucket_owner: AccountId = None,
3183
        **kwargs,
3184
    ) -> None:
3185
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3186

3187
        s3_bucket.encryption_rule = None
1✔
3188

3189
    def put_bucket_notification_configuration(
1✔
3190
        self,
3191
        context: RequestContext,
3192
        bucket: BucketName,
3193
        notification_configuration: NotificationConfiguration,
3194
        expected_bucket_owner: AccountId = None,
3195
        skip_destination_validation: SkipValidation = None,
3196
        **kwargs,
3197
    ) -> None:
3198
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3199

3200
        self._verify_notification_configuration(
1✔
3201
            notification_configuration, skip_destination_validation, context, bucket
3202
        )
3203
        s3_bucket.notification_configuration = notification_configuration
1✔
3204

3205
    def get_bucket_notification_configuration(
1✔
3206
        self,
3207
        context: RequestContext,
3208
        bucket: BucketName,
3209
        expected_bucket_owner: AccountId = None,
3210
        **kwargs,
3211
    ) -> NotificationConfiguration:
3212
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3213

3214
        return s3_bucket.notification_configuration or NotificationConfiguration()
1✔
3215

3216
    def put_bucket_tagging(
1✔
3217
        self,
3218
        context: RequestContext,
3219
        bucket: BucketName,
3220
        tagging: Tagging,
3221
        content_md5: ContentMD5 = None,
3222
        checksum_algorithm: ChecksumAlgorithm = None,
3223
        expected_bucket_owner: AccountId = None,
3224
        **kwargs,
3225
    ) -> None:
3226
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3227

3228
        if "TagSet" not in tagging:
1✔
UNCOV
3229
            raise MalformedXML()
×
3230

3231
        tag_set = tagging["TagSet"] or []
1✔
3232
        validate_tag_set(tag_set, type_set="bucket")
1✔
3233

3234
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3235
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3236
        store.TAGS.tag_resource(s3_bucket.bucket_arn, tags=tag_set)
1✔
3237

3238
    def get_bucket_tagging(
1✔
3239
        self,
3240
        context: RequestContext,
3241
        bucket: BucketName,
3242
        expected_bucket_owner: AccountId = None,
3243
        **kwargs,
3244
    ) -> GetBucketTaggingOutput:
3245
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3246
        tag_set = store.TAGS.list_tags_for_resource(s3_bucket.bucket_arn, root_name="Tags")["Tags"]
1✔
3247
        if not tag_set:
1✔
3248
            raise NoSuchTagSet(
1✔
3249
                "The TagSet does not exist",
3250
                BucketName=bucket,
3251
            )
3252

3253
        return GetBucketTaggingOutput(TagSet=tag_set)
1✔
3254

3255
    def delete_bucket_tagging(
1✔
3256
        self,
3257
        context: RequestContext,
3258
        bucket: BucketName,
3259
        expected_bucket_owner: AccountId = None,
3260
        **kwargs,
3261
    ) -> None:
3262
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3263

3264
        store.TAGS.tags.pop(s3_bucket.bucket_arn, None)
1✔
3265

3266
    def put_object_tagging(
1✔
3267
        self,
3268
        context: RequestContext,
3269
        bucket: BucketName,
3270
        key: ObjectKey,
3271
        tagging: Tagging,
3272
        version_id: ObjectVersionId = None,
3273
        content_md5: ContentMD5 = None,
3274
        checksum_algorithm: ChecksumAlgorithm = None,
3275
        expected_bucket_owner: AccountId = None,
3276
        request_payer: RequestPayer = None,
3277
        **kwargs,
3278
    ) -> PutObjectTaggingOutput:
3279
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3280

3281
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="PUT")
1✔
3282

3283
        if "TagSet" not in tagging:
1✔
UNCOV
3284
            raise MalformedXML()
×
3285

3286
        tag_set = tagging["TagSet"] or []
1✔
3287
        validate_tag_set(tag_set, type_set="object")
1✔
3288

3289
        key_id = get_unique_key_id(bucket, key, s3_object.version_id)
1✔
3290
        # remove the previous tags before setting the new ones, it overwrites the whole TagSet
3291
        store.TAGS.tags.pop(key_id, None)
1✔
3292
        store.TAGS.tag_resource(key_id, tags=tag_set)
1✔
3293
        response = PutObjectTaggingOutput()
1✔
3294
        if s3_object.version_id:
1✔
3295
            response["VersionId"] = s3_object.version_id
1✔
3296

3297
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3298

3299
        return response
1✔
3300

3301
    def get_object_tagging(
1✔
3302
        self,
3303
        context: RequestContext,
3304
        bucket: BucketName,
3305
        key: ObjectKey,
3306
        version_id: ObjectVersionId = None,
3307
        expected_bucket_owner: AccountId = None,
3308
        request_payer: RequestPayer = None,
3309
        **kwargs,
3310
    ) -> GetObjectTaggingOutput:
3311
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3312

3313
        try:
1✔
3314
            s3_object = s3_bucket.get_object(key=key, version_id=version_id)
1✔
3315
        except NoSuchKey as e:
1✔
3316
            # it seems GetObjectTagging does not work like all other operations, so we need to raise a different
3317
            # exception. As we already need to catch it because of the format of the Key, it is not worth to modify the
3318
            # `S3Bucket.get_object` signature for one operation.
3319
            if s3_bucket.versioning_status and (
1✔
3320
                s3_object_version := s3_bucket.objects.get(key, version_id)
3321
            ):
3322
                raise MethodNotAllowed(
1✔
3323
                    "The specified method is not allowed against this resource.",
3324
                    Method="GET",
3325
                    ResourceType="DeleteMarker",
3326
                    DeleteMarker=True,
3327
                    Allow="DELETE",
3328
                    VersionId=s3_object_version.version_id,
3329
                )
3330

3331
            # There a weird AWS validated bug in S3: the returned key contains the bucket name as well
3332
            # follow AWS on this one
3333
            e.Key = f"{bucket}/{key}"
1✔
3334
            raise e
1✔
3335

3336
        tag_set = store.TAGS.list_tags_for_resource(
1✔
3337
            get_unique_key_id(bucket, key, s3_object.version_id)
3338
        )["Tags"]
3339
        response = GetObjectTaggingOutput(TagSet=tag_set)
1✔
3340
        if s3_object.version_id:
1✔
3341
            response["VersionId"] = s3_object.version_id
1✔
3342

3343
        return response
1✔
3344

3345
    def delete_object_tagging(
1✔
3346
        self,
3347
        context: RequestContext,
3348
        bucket: BucketName,
3349
        key: ObjectKey,
3350
        version_id: ObjectVersionId = None,
3351
        expected_bucket_owner: AccountId = None,
3352
        **kwargs,
3353
    ) -> DeleteObjectTaggingOutput:
3354
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3355

3356
        s3_object = s3_bucket.get_object(key=key, version_id=version_id, http_method="DELETE")
1✔
3357

3358
        store.TAGS.tags.pop(get_unique_key_id(bucket, key, s3_object.version_id), None)
1✔
3359
        response = DeleteObjectTaggingOutput()
1✔
3360
        if s3_object.version_id:
1✔
3361
            response["VersionId"] = s3_object.version_id
1✔
3362

3363
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
3364

3365
        return response
1✔
3366

3367
    def put_bucket_cors(
1✔
3368
        self,
3369
        context: RequestContext,
3370
        bucket: BucketName,
3371
        cors_configuration: CORSConfiguration,
3372
        content_md5: ContentMD5 = None,
3373
        checksum_algorithm: ChecksumAlgorithm = None,
3374
        expected_bucket_owner: AccountId = None,
3375
        **kwargs,
3376
    ) -> None:
3377
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3378
        validate_cors_configuration(cors_configuration)
1✔
3379
        s3_bucket.cors_rules = cors_configuration
1✔
3380
        self._cors_handler.invalidate_cache()
1✔
3381

3382
    def get_bucket_cors(
1✔
3383
        self,
3384
        context: RequestContext,
3385
        bucket: BucketName,
3386
        expected_bucket_owner: AccountId = None,
3387
        **kwargs,
3388
    ) -> GetBucketCorsOutput:
3389
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3390

3391
        if not s3_bucket.cors_rules:
1✔
3392
            raise NoSuchCORSConfiguration(
1✔
3393
                "The CORS configuration does not exist",
3394
                BucketName=bucket,
3395
            )
3396
        return GetBucketCorsOutput(CORSRules=s3_bucket.cors_rules["CORSRules"])
1✔
3397

3398
    def delete_bucket_cors(
1✔
3399
        self,
3400
        context: RequestContext,
3401
        bucket: BucketName,
3402
        expected_bucket_owner: AccountId = None,
3403
        **kwargs,
3404
    ) -> None:
3405
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3406

3407
        if s3_bucket.cors_rules:
1✔
3408
            self._cors_handler.invalidate_cache()
1✔
3409
            s3_bucket.cors_rules = None
1✔
3410

3411
    def get_bucket_lifecycle_configuration(
1✔
3412
        self,
3413
        context: RequestContext,
3414
        bucket: BucketName,
3415
        expected_bucket_owner: AccountId = None,
3416
        **kwargs,
3417
    ) -> GetBucketLifecycleConfigurationOutput:
3418
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3419

3420
        if not s3_bucket.lifecycle_rules:
1✔
3421
            raise NoSuchLifecycleConfiguration(
1✔
3422
                "The lifecycle configuration does not exist",
3423
                BucketName=bucket,
3424
            )
3425

3426
        return GetBucketLifecycleConfigurationOutput(
1✔
3427
            Rules=s3_bucket.lifecycle_rules,
3428
            # TODO: remove for next major version, safe access to new attribute
3429
            TransitionDefaultMinimumObjectSize=getattr(
3430
                s3_bucket,
3431
                "transition_default_minimum_object_size",
3432
                TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3433
            ),
3434
        )
3435

3436
    def put_bucket_lifecycle_configuration(
1✔
3437
        self,
3438
        context: RequestContext,
3439
        bucket: BucketName,
3440
        checksum_algorithm: ChecksumAlgorithm = None,
3441
        lifecycle_configuration: BucketLifecycleConfiguration = None,
3442
        expected_bucket_owner: AccountId = None,
3443
        transition_default_minimum_object_size: TransitionDefaultMinimumObjectSize = None,
3444
        **kwargs,
3445
    ) -> PutBucketLifecycleConfigurationOutput:
3446
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3447

3448
        transition_min_obj_size = (
1✔
3449
            transition_default_minimum_object_size
3450
            or TransitionDefaultMinimumObjectSize.all_storage_classes_128K
3451
        )
3452

3453
        if transition_min_obj_size not in (
1✔
3454
            TransitionDefaultMinimumObjectSize.all_storage_classes_128K,
3455
            TransitionDefaultMinimumObjectSize.varies_by_storage_class,
3456
        ):
3457
            raise InvalidRequest(
1✔
3458
                f"Invalid TransitionDefaultMinimumObjectSize found: {transition_min_obj_size}"
3459
            )
3460

3461
        validate_lifecycle_configuration(lifecycle_configuration)
1✔
3462
        # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to apply them
3463
        #  everytime we get/head an object
3464
        # for now, we keep a cache and get it everytime we fetch an object
3465
        s3_bucket.lifecycle_rules = lifecycle_configuration["Rules"]
1✔
3466
        s3_bucket.transition_default_minimum_object_size = transition_min_obj_size
1✔
3467
        self._expiration_cache[bucket].clear()
1✔
3468
        return PutBucketLifecycleConfigurationOutput(
1✔
3469
            TransitionDefaultMinimumObjectSize=transition_min_obj_size
3470
        )
3471

3472
    def delete_bucket_lifecycle(
1✔
3473
        self,
3474
        context: RequestContext,
3475
        bucket: BucketName,
3476
        expected_bucket_owner: AccountId = None,
3477
        **kwargs,
3478
    ) -> None:
3479
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3480

3481
        s3_bucket.lifecycle_rules = None
1✔
3482
        self._expiration_cache[bucket].clear()
1✔
3483

3484
    def put_bucket_analytics_configuration(
1✔
3485
        self,
3486
        context: RequestContext,
3487
        bucket: BucketName,
3488
        id: AnalyticsId,
3489
        analytics_configuration: AnalyticsConfiguration,
3490
        expected_bucket_owner: AccountId = None,
3491
        **kwargs,
3492
    ) -> None:
3493
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3494

3495
        validate_bucket_analytics_configuration(
1✔
3496
            id=id, analytics_configuration=analytics_configuration
3497
        )
3498

3499
        s3_bucket.analytics_configurations[id] = analytics_configuration
1✔
3500

3501
    def get_bucket_analytics_configuration(
1✔
3502
        self,
3503
        context: RequestContext,
3504
        bucket: BucketName,
3505
        id: AnalyticsId,
3506
        expected_bucket_owner: AccountId = None,
3507
        **kwargs,
3508
    ) -> GetBucketAnalyticsConfigurationOutput:
3509
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3510

3511
        if not (analytic_config := s3_bucket.analytics_configurations.get(id)):
1✔
3512
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3513

3514
        return GetBucketAnalyticsConfigurationOutput(AnalyticsConfiguration=analytic_config)
1✔
3515

3516
    def list_bucket_analytics_configurations(
1✔
3517
        self,
3518
        context: RequestContext,
3519
        bucket: BucketName,
3520
        continuation_token: Token = None,
3521
        expected_bucket_owner: AccountId = None,
3522
        **kwargs,
3523
    ) -> ListBucketAnalyticsConfigurationsOutput:
3524
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3525

3526
        return ListBucketAnalyticsConfigurationsOutput(
1✔
3527
            IsTruncated=False,
3528
            AnalyticsConfigurationList=sorted(
3529
                s3_bucket.analytics_configurations.values(),
3530
                key=itemgetter("Id"),
3531
            ),
3532
        )
3533

3534
    def delete_bucket_analytics_configuration(
1✔
3535
        self,
3536
        context: RequestContext,
3537
        bucket: BucketName,
3538
        id: AnalyticsId,
3539
        expected_bucket_owner: AccountId = None,
3540
        **kwargs,
3541
    ) -> None:
3542
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3543

3544
        if not s3_bucket.analytics_configurations.pop(id, None):
1✔
3545
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3546

3547
    def put_bucket_intelligent_tiering_configuration(
1✔
3548
        self,
3549
        context: RequestContext,
3550
        bucket: BucketName,
3551
        id: IntelligentTieringId,
3552
        intelligent_tiering_configuration: IntelligentTieringConfiguration,
3553
        expected_bucket_owner: AccountId | None = None,
3554
        **kwargs,
3555
    ) -> None:
3556
        # TODO add support for expected_bucket_owner
3557
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3558

3559
        validate_bucket_intelligent_tiering_configuration(id, intelligent_tiering_configuration)
1✔
3560

3561
        s3_bucket.intelligent_tiering_configurations[id] = intelligent_tiering_configuration
1✔
3562

3563
    def get_bucket_intelligent_tiering_configuration(
1✔
3564
        self,
3565
        context: RequestContext,
3566
        bucket: BucketName,
3567
        id: IntelligentTieringId,
3568
        expected_bucket_owner: AccountId | None = None,
3569
        **kwargs,
3570
    ) -> GetBucketIntelligentTieringConfigurationOutput:
3571
        # TODO add support for expected_bucket_owner
3572
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3573

3574
        if not (itier_config := s3_bucket.intelligent_tiering_configurations.get(id)):
1✔
UNCOV
3575
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3576

3577
        return GetBucketIntelligentTieringConfigurationOutput(
1✔
3578
            IntelligentTieringConfiguration=itier_config
3579
        )
3580

3581
    def delete_bucket_intelligent_tiering_configuration(
1✔
3582
        self,
3583
        context: RequestContext,
3584
        bucket: BucketName,
3585
        id: IntelligentTieringId,
3586
        expected_bucket_owner: AccountId | None = None,
3587
        **kwargs,
3588
    ) -> None:
3589
        # TODO add support for expected_bucket_owner
3590
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3591

3592
        if not s3_bucket.intelligent_tiering_configurations.pop(id, None):
1✔
3593
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3594

3595
    def list_bucket_intelligent_tiering_configurations(
1✔
3596
        self,
3597
        context: RequestContext,
3598
        bucket: BucketName,
3599
        continuation_token: Token | None = None,
3600
        expected_bucket_owner: AccountId | None = None,
3601
        **kwargs,
3602
    ) -> ListBucketIntelligentTieringConfigurationsOutput:
3603
        # TODO add support for expected_bucket_owner
3604
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3605

3606
        return ListBucketIntelligentTieringConfigurationsOutput(
1✔
3607
            IsTruncated=False,
3608
            IntelligentTieringConfigurationList=sorted(
3609
                s3_bucket.intelligent_tiering_configurations.values(),
3610
                key=itemgetter("Id"),
3611
            ),
3612
        )
3613

3614
    def put_bucket_inventory_configuration(
1✔
3615
        self,
3616
        context: RequestContext,
3617
        bucket: BucketName,
3618
        id: InventoryId,
3619
        inventory_configuration: InventoryConfiguration,
3620
        expected_bucket_owner: AccountId = None,
3621
        **kwargs,
3622
    ) -> None:
3623
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3624

3625
        validate_inventory_configuration(
1✔
3626
            config_id=id, inventory_configuration=inventory_configuration
3627
        )
3628
        s3_bucket.inventory_configurations[id] = inventory_configuration
1✔
3629

3630
    def get_bucket_inventory_configuration(
1✔
3631
        self,
3632
        context: RequestContext,
3633
        bucket: BucketName,
3634
        id: InventoryId,
3635
        expected_bucket_owner: AccountId = None,
3636
        **kwargs,
3637
    ) -> GetBucketInventoryConfigurationOutput:
3638
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3639

3640
        if not (inv_config := s3_bucket.inventory_configurations.get(id)):
1✔
3641
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
3642
        return GetBucketInventoryConfigurationOutput(InventoryConfiguration=inv_config)
1✔
3643

3644
    def list_bucket_inventory_configurations(
1✔
3645
        self,
3646
        context: RequestContext,
3647
        bucket: BucketName,
3648
        continuation_token: Token = None,
3649
        expected_bucket_owner: AccountId = None,
3650
        **kwargs,
3651
    ) -> ListBucketInventoryConfigurationsOutput:
3652
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3653

3654
        return ListBucketInventoryConfigurationsOutput(
1✔
3655
            IsTruncated=False,
3656
            InventoryConfigurationList=sorted(
3657
                s3_bucket.inventory_configurations.values(), key=itemgetter("Id")
3658
            ),
3659
        )
3660

3661
    def delete_bucket_inventory_configuration(
1✔
3662
        self,
3663
        context: RequestContext,
3664
        bucket: BucketName,
3665
        id: InventoryId,
3666
        expected_bucket_owner: AccountId = None,
3667
        **kwargs,
3668
    ) -> None:
3669
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3670

3671
        if not s3_bucket.inventory_configurations.pop(id, None):
1✔
UNCOV
3672
            raise NoSuchConfiguration("The specified configuration does not exist.")
×
3673

3674
    def get_bucket_website(
1✔
3675
        self,
3676
        context: RequestContext,
3677
        bucket: BucketName,
3678
        expected_bucket_owner: AccountId = None,
3679
        **kwargs,
3680
    ) -> GetBucketWebsiteOutput:
3681
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3682

3683
        if not s3_bucket.website_configuration:
1✔
3684
            raise NoSuchWebsiteConfiguration(
1✔
3685
                "The specified bucket does not have a website configuration",
3686
                BucketName=bucket,
3687
            )
3688
        return s3_bucket.website_configuration
1✔
3689

3690
    def put_bucket_website(
1✔
3691
        self,
3692
        context: RequestContext,
3693
        bucket: BucketName,
3694
        website_configuration: WebsiteConfiguration,
3695
        content_md5: ContentMD5 = None,
3696
        checksum_algorithm: ChecksumAlgorithm = None,
3697
        expected_bucket_owner: AccountId = None,
3698
        **kwargs,
3699
    ) -> None:
3700
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3701

3702
        validate_website_configuration(website_configuration)
1✔
3703
        s3_bucket.website_configuration = website_configuration
1✔
3704

3705
    def delete_bucket_website(
1✔
3706
        self,
3707
        context: RequestContext,
3708
        bucket: BucketName,
3709
        expected_bucket_owner: AccountId = None,
3710
        **kwargs,
3711
    ) -> None:
3712
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3713
        # does not raise error if the bucket did not have a config, will simply return
3714
        s3_bucket.website_configuration = None
1✔
3715

3716
    def get_object_lock_configuration(
1✔
3717
        self,
3718
        context: RequestContext,
3719
        bucket: BucketName,
3720
        expected_bucket_owner: AccountId = None,
3721
        **kwargs,
3722
    ) -> GetObjectLockConfigurationOutput:
3723
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3724
        if not s3_bucket.object_lock_enabled:
1✔
3725
            raise ObjectLockConfigurationNotFoundError(
1✔
3726
                "Object Lock configuration does not exist for this bucket",
3727
                BucketName=bucket,
3728
            )
3729

3730
        response = GetObjectLockConfigurationOutput(
1✔
3731
            ObjectLockConfiguration=ObjectLockConfiguration(
3732
                ObjectLockEnabled=ObjectLockEnabled.Enabled
3733
            )
3734
        )
3735
        if s3_bucket.object_lock_default_retention:
1✔
3736
            response["ObjectLockConfiguration"]["Rule"] = {
1✔
3737
                "DefaultRetention": s3_bucket.object_lock_default_retention
3738
            }
3739

3740
        return response
1✔
3741

3742
    def put_object_lock_configuration(
1✔
3743
        self,
3744
        context: RequestContext,
3745
        bucket: BucketName,
3746
        object_lock_configuration: ObjectLockConfiguration = None,
3747
        request_payer: RequestPayer = None,
3748
        token: ObjectLockToken = None,
3749
        content_md5: ContentMD5 = None,
3750
        checksum_algorithm: ChecksumAlgorithm = None,
3751
        expected_bucket_owner: AccountId = None,
3752
        **kwargs,
3753
    ) -> PutObjectLockConfigurationOutput:
3754
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3755
        if s3_bucket.versioning_status != "Enabled":
1✔
3756
            raise InvalidBucketState(
1✔
3757
                "Versioning must be 'Enabled' on the bucket to apply a Object Lock configuration"
3758
            )
3759

3760
        if (
1✔
3761
            not object_lock_configuration
3762
            or object_lock_configuration.get("ObjectLockEnabled") != "Enabled"
3763
        ):
3764
            raise MalformedXML()
1✔
3765

3766
        if "Rule" not in object_lock_configuration:
1✔
3767
            s3_bucket.object_lock_default_retention = None
1✔
3768
            if not s3_bucket.object_lock_enabled:
1✔
3769
                s3_bucket.object_lock_enabled = True
1✔
3770

3771
            return PutObjectLockConfigurationOutput()
1✔
3772
        elif not (rule := object_lock_configuration["Rule"]) or not (
1✔
3773
            default_retention := rule.get("DefaultRetention")
3774
        ):
3775
            raise MalformedXML()
1✔
3776

3777
        if "Mode" not in default_retention or (
1✔
3778
            ("Days" in default_retention and "Years" in default_retention)
3779
            or ("Days" not in default_retention and "Years" not in default_retention)
3780
        ):
3781
            raise MalformedXML()
1✔
3782

3783
        if default_retention["Mode"] not in OBJECT_LOCK_MODES:
1✔
3784
            raise MalformedXML()
1✔
3785

3786
        s3_bucket.object_lock_default_retention = default_retention
1✔
3787
        if not s3_bucket.object_lock_enabled:
1✔
UNCOV
3788
            s3_bucket.object_lock_enabled = True
×
3789

3790
        return PutObjectLockConfigurationOutput()
1✔
3791

3792
    def get_object_legal_hold(
1✔
3793
        self,
3794
        context: RequestContext,
3795
        bucket: BucketName,
3796
        key: ObjectKey,
3797
        version_id: ObjectVersionId = None,
3798
        request_payer: RequestPayer = None,
3799
        expected_bucket_owner: AccountId = None,
3800
        **kwargs,
3801
    ) -> GetObjectLegalHoldOutput:
3802
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3803
        if not s3_bucket.object_lock_enabled:
1✔
3804
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3805

3806
        s3_object = s3_bucket.get_object(
1✔
3807
            key=key,
3808
            version_id=version_id,
3809
            http_method="GET",
3810
        )
3811
        if not s3_object.lock_legal_status:
1✔
3812
            raise NoSuchObjectLockConfiguration(
1✔
3813
                "The specified object does not have a ObjectLock configuration"
3814
            )
3815

3816
        return GetObjectLegalHoldOutput(
1✔
3817
            LegalHold=ObjectLockLegalHold(Status=s3_object.lock_legal_status)
3818
        )
3819

3820
    def put_object_legal_hold(
1✔
3821
        self,
3822
        context: RequestContext,
3823
        bucket: BucketName,
3824
        key: ObjectKey,
3825
        legal_hold: ObjectLockLegalHold = None,
3826
        request_payer: RequestPayer = None,
3827
        version_id: ObjectVersionId = None,
3828
        content_md5: ContentMD5 = None,
3829
        checksum_algorithm: ChecksumAlgorithm = None,
3830
        expected_bucket_owner: AccountId = None,
3831
        **kwargs,
3832
    ) -> PutObjectLegalHoldOutput:
3833
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3834

3835
        if not legal_hold:
1✔
3836
            raise MalformedXML()
1✔
3837

3838
        if not s3_bucket.object_lock_enabled:
1✔
3839
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3840

3841
        s3_object = s3_bucket.get_object(
1✔
3842
            key=key,
3843
            version_id=version_id,
3844
            http_method="PUT",
3845
        )
3846
        # TODO: check casing
3847
        if not (status := legal_hold.get("Status")) or status not in ("ON", "OFF"):
1✔
UNCOV
3848
            raise MalformedXML()
×
3849

3850
        s3_object.lock_legal_status = status
1✔
3851

3852
        # TODO: return RequestCharged
3853
        return PutObjectRetentionOutput()
1✔
3854

3855
    def get_object_retention(
1✔
3856
        self,
3857
        context: RequestContext,
3858
        bucket: BucketName,
3859
        key: ObjectKey,
3860
        version_id: ObjectVersionId = None,
3861
        request_payer: RequestPayer = None,
3862
        expected_bucket_owner: AccountId = None,
3863
        **kwargs,
3864
    ) -> GetObjectRetentionOutput:
3865
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3866
        if not s3_bucket.object_lock_enabled:
1✔
3867
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3868

3869
        s3_object = s3_bucket.get_object(
1✔
3870
            key=key,
3871
            version_id=version_id,
3872
            http_method="GET",
3873
        )
3874
        if not s3_object.lock_mode:
1✔
3875
            raise NoSuchObjectLockConfiguration(
1✔
3876
                "The specified object does not have a ObjectLock configuration"
3877
            )
3878

3879
        return GetObjectRetentionOutput(
1✔
3880
            Retention=ObjectLockRetention(
3881
                Mode=s3_object.lock_mode,
3882
                RetainUntilDate=s3_object.lock_until,
3883
            )
3884
        )
3885

3886
    def put_object_retention(
1✔
3887
        self,
3888
        context: RequestContext,
3889
        bucket: BucketName,
3890
        key: ObjectKey,
3891
        retention: ObjectLockRetention = None,
3892
        request_payer: RequestPayer = None,
3893
        version_id: ObjectVersionId = None,
3894
        bypass_governance_retention: BypassGovernanceRetention = None,
3895
        content_md5: ContentMD5 = None,
3896
        checksum_algorithm: ChecksumAlgorithm = None,
3897
        expected_bucket_owner: AccountId = None,
3898
        **kwargs,
3899
    ) -> PutObjectRetentionOutput:
3900
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3901
        if not s3_bucket.object_lock_enabled:
1✔
3902
            raise InvalidRequest("Bucket is missing Object Lock Configuration")
1✔
3903

3904
        s3_object = s3_bucket.get_object(
1✔
3905
            key=key,
3906
            version_id=version_id,
3907
            http_method="PUT",
3908
        )
3909

3910
        if retention and (
1✔
3911
            not validate_dict_fields(retention, required_fields={"Mode", "RetainUntilDate"})
3912
            or retention["Mode"] not in OBJECT_LOCK_MODES
3913
        ):
3914
            raise MalformedXML()
1✔
3915

3916
        if retention and retention["RetainUntilDate"] < datetime.datetime.now(datetime.UTC):
1✔
3917
            # weirdly, this date is format as following: Tue Dec 31 16:00:00 PST 2019
3918
            # it contains the timezone as PST, even if you target a bucket in Europe or Asia
3919
            pst_datetime = retention["RetainUntilDate"].astimezone(
1✔
3920
                tz=ZoneInfo("America/Los_Angeles")
3921
            )
3922
            raise InvalidArgument(
1✔
3923
                "The retain until date must be in the future!",
3924
                ArgumentName="RetainUntilDate",
3925
                ArgumentValue=pst_datetime.strftime("%a %b %d %H:%M:%S %Z %Y"),
3926
            )
3927

3928
        is_request_reducing_locking = (
1✔
3929
            not retention
3930
            or (s3_object.lock_until and s3_object.lock_until > retention["RetainUntilDate"])
3931
            or (
3932
                retention["Mode"] == ObjectLockMode.GOVERNANCE
3933
                and s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3934
            )
3935
        )
3936
        if is_request_reducing_locking and (
1✔
3937
            s3_object.lock_mode == ObjectLockMode.COMPLIANCE
3938
            or (
3939
                s3_object.lock_mode == ObjectLockMode.GOVERNANCE and not bypass_governance_retention
3940
            )
3941
        ):
3942
            raise AccessDenied("Access Denied because object protected by object lock.")
1✔
3943

3944
        s3_object.lock_mode = retention["Mode"] if retention else None
1✔
3945
        s3_object.lock_until = retention["RetainUntilDate"] if retention else None
1✔
3946

3947
        # TODO: return RequestCharged
3948
        return PutObjectRetentionOutput()
1✔
3949

3950
    def put_bucket_request_payment(
1✔
3951
        self,
3952
        context: RequestContext,
3953
        bucket: BucketName,
3954
        request_payment_configuration: RequestPaymentConfiguration,
3955
        content_md5: ContentMD5 = None,
3956
        checksum_algorithm: ChecksumAlgorithm = None,
3957
        expected_bucket_owner: AccountId = None,
3958
        **kwargs,
3959
    ) -> None:
3960
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3961
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3962

3963
        payer = request_payment_configuration.get("Payer")
1✔
3964
        if payer not in ["Requester", "BucketOwner"]:
1✔
3965
            raise MalformedXML()
1✔
3966

3967
        s3_bucket.payer = payer
1✔
3968

3969
    def get_bucket_request_payment(
1✔
3970
        self,
3971
        context: RequestContext,
3972
        bucket: BucketName,
3973
        expected_bucket_owner: AccountId = None,
3974
        **kwargs,
3975
    ) -> GetBucketRequestPaymentOutput:
3976
        # TODO: this currently only mock the operation, but its actual effect is not emulated
3977
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3978

3979
        return GetBucketRequestPaymentOutput(Payer=s3_bucket.payer)
1✔
3980

3981
    def get_bucket_ownership_controls(
1✔
3982
        self,
3983
        context: RequestContext,
3984
        bucket: BucketName,
3985
        expected_bucket_owner: AccountId = None,
3986
        **kwargs,
3987
    ) -> GetBucketOwnershipControlsOutput:
3988
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
3989

3990
        if not s3_bucket.object_ownership:
1✔
3991
            raise OwnershipControlsNotFoundError(
1✔
3992
                "The bucket ownership controls were not found",
3993
                BucketName=bucket,
3994
            )
3995

3996
        return GetBucketOwnershipControlsOutput(
1✔
3997
            OwnershipControls={"Rules": [{"ObjectOwnership": s3_bucket.object_ownership}]}
3998
        )
3999

4000
    def put_bucket_ownership_controls(
1✔
4001
        self,
4002
        context: RequestContext,
4003
        bucket: BucketName,
4004
        ownership_controls: OwnershipControls,
4005
        content_md5: ContentMD5 | None = None,
4006
        expected_bucket_owner: AccountId | None = None,
4007
        checksum_algorithm: ChecksumAlgorithm | None = None,
4008
        **kwargs,
4009
    ) -> None:
4010
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4011
        #  it for example almost forbid ACL usage when set to BucketOwnerEnforced
4012
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4013

4014
        if not (rules := ownership_controls.get("Rules")) or len(rules) > 1:
1✔
4015
            raise MalformedXML()
1✔
4016

4017
        rule = rules[0]
1✔
4018
        if (object_ownership := rule.get("ObjectOwnership")) not in OBJECT_OWNERSHIPS:
1✔
4019
            raise MalformedXML()
1✔
4020

4021
        s3_bucket.object_ownership = object_ownership
1✔
4022

4023
    def delete_bucket_ownership_controls(
1✔
4024
        self,
4025
        context: RequestContext,
4026
        bucket: BucketName,
4027
        expected_bucket_owner: AccountId = None,
4028
        **kwargs,
4029
    ) -> None:
4030
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4031

4032
        s3_bucket.object_ownership = None
1✔
4033

4034
    def get_public_access_block(
1✔
4035
        self,
4036
        context: RequestContext,
4037
        bucket: BucketName,
4038
        expected_bucket_owner: AccountId = None,
4039
        **kwargs,
4040
    ) -> GetPublicAccessBlockOutput:
4041
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4042

4043
        if not s3_bucket.public_access_block:
1✔
4044
            raise NoSuchPublicAccessBlockConfiguration(
1✔
4045
                "The public access block configuration was not found", BucketName=bucket
4046
            )
4047

4048
        return GetPublicAccessBlockOutput(
1✔
4049
            PublicAccessBlockConfiguration=s3_bucket.public_access_block
4050
        )
4051

4052
    def put_public_access_block(
1✔
4053
        self,
4054
        context: RequestContext,
4055
        bucket: BucketName,
4056
        public_access_block_configuration: PublicAccessBlockConfiguration,
4057
        content_md5: ContentMD5 = None,
4058
        checksum_algorithm: ChecksumAlgorithm = None,
4059
        expected_bucket_owner: AccountId = None,
4060
        **kwargs,
4061
    ) -> None:
4062
        # TODO: this currently only mock the operation, but its actual effect is not emulated
4063
        #  as we do not enforce ACL directly. Also, this should take the most restrictive between S3Control and the
4064
        #  bucket configuration. See s3control
4065
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4066

4067
        public_access_block_fields = {
1✔
4068
            "BlockPublicAcls",
4069
            "BlockPublicPolicy",
4070
            "IgnorePublicAcls",
4071
            "RestrictPublicBuckets",
4072
        }
4073
        if not validate_dict_fields(
1✔
4074
            public_access_block_configuration,
4075
            required_fields=set(),
4076
            optional_fields=public_access_block_fields,
4077
        ):
UNCOV
4078
            raise MalformedXML()
×
4079

4080
        for field in public_access_block_fields:
1✔
4081
            if public_access_block_configuration.get(field) is None:
1✔
4082
                public_access_block_configuration[field] = False
1✔
4083

4084
        s3_bucket.public_access_block = public_access_block_configuration
1✔
4085

4086
    def delete_public_access_block(
1✔
4087
        self,
4088
        context: RequestContext,
4089
        bucket: BucketName,
4090
        expected_bucket_owner: AccountId = None,
4091
        **kwargs,
4092
    ) -> None:
4093
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4094

4095
        s3_bucket.public_access_block = None
1✔
4096

4097
    def get_bucket_policy(
1✔
4098
        self,
4099
        context: RequestContext,
4100
        bucket: BucketName,
4101
        expected_bucket_owner: AccountId = None,
4102
        **kwargs,
4103
    ) -> GetBucketPolicyOutput:
4104
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4105
            context, bucket, expected_bucket_owner=expected_bucket_owner
4106
        )
4107
        if not s3_bucket.policy:
1✔
4108
            raise NoSuchBucketPolicy(
1✔
4109
                "The bucket policy does not exist",
4110
                BucketName=bucket,
4111
            )
4112
        return GetBucketPolicyOutput(Policy=s3_bucket.policy)
1✔
4113

4114
    def put_bucket_policy(
1✔
4115
        self,
4116
        context: RequestContext,
4117
        bucket: BucketName,
4118
        policy: Policy,
4119
        content_md5: ContentMD5 = None,
4120
        checksum_algorithm: ChecksumAlgorithm = None,
4121
        confirm_remove_self_bucket_access: ConfirmRemoveSelfBucketAccess = None,
4122
        expected_bucket_owner: AccountId = None,
4123
        **kwargs,
4124
    ) -> None:
4125
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4126
            context, bucket, expected_bucket_owner=expected_bucket_owner
4127
        )
4128

4129
        if not policy or policy[0] != "{":
1✔
4130
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
1✔
4131
        try:
1✔
4132
            json_policy = json.loads(policy)
1✔
4133
            if not json_policy:
1✔
4134
                # TODO: add more validation around the policy?
4135
                raise MalformedPolicy("Missing required field Statement")
1✔
4136
        except ValueError:
1✔
UNCOV
4137
            raise MalformedPolicy("Policies must be valid JSON and the first byte must be '{'")
×
4138

4139
        s3_bucket.policy = policy
1✔
4140

4141
    def delete_bucket_policy(
1✔
4142
        self,
4143
        context: RequestContext,
4144
        bucket: BucketName,
4145
        expected_bucket_owner: AccountId = None,
4146
        **kwargs,
4147
    ) -> None:
4148
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4149
            context, bucket, expected_bucket_owner=expected_bucket_owner
4150
        )
4151

4152
        s3_bucket.policy = None
1✔
4153

4154
    def get_bucket_accelerate_configuration(
1✔
4155
        self,
4156
        context: RequestContext,
4157
        bucket: BucketName,
4158
        expected_bucket_owner: AccountId = None,
4159
        request_payer: RequestPayer = None,
4160
        **kwargs,
4161
    ) -> GetBucketAccelerateConfigurationOutput:
4162
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4163

4164
        response = GetBucketAccelerateConfigurationOutput()
1✔
4165
        if s3_bucket.accelerate_status:
1✔
4166
            response["Status"] = s3_bucket.accelerate_status
1✔
4167

4168
        return response
1✔
4169

4170
    def put_bucket_accelerate_configuration(
1✔
4171
        self,
4172
        context: RequestContext,
4173
        bucket: BucketName,
4174
        accelerate_configuration: AccelerateConfiguration,
4175
        expected_bucket_owner: AccountId = None,
4176
        checksum_algorithm: ChecksumAlgorithm = None,
4177
        **kwargs,
4178
    ) -> None:
4179
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4180

4181
        if "." in bucket:
1✔
4182
            raise InvalidRequest(
1✔
4183
                "S3 Transfer Acceleration is not supported for buckets with periods (.) in their names"
4184
            )
4185

4186
        if not (status := accelerate_configuration.get("Status")) or status not in (
1✔
4187
            "Enabled",
4188
            "Suspended",
4189
        ):
4190
            raise MalformedXML()
1✔
4191

4192
        s3_bucket.accelerate_status = status
1✔
4193

4194
    def put_bucket_logging(
1✔
4195
        self,
4196
        context: RequestContext,
4197
        bucket: BucketName,
4198
        bucket_logging_status: BucketLoggingStatus,
4199
        content_md5: ContentMD5 = None,
4200
        checksum_algorithm: ChecksumAlgorithm = None,
4201
        expected_bucket_owner: AccountId = None,
4202
        **kwargs,
4203
    ) -> None:
4204
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4205

4206
        if not (logging_config := bucket_logging_status.get("LoggingEnabled")):
1✔
4207
            s3_bucket.logging = {}
1✔
4208
            return
1✔
4209

4210
        # the target bucket must be in the same account
4211
        if not (target_bucket_name := logging_config.get("TargetBucket")):
1✔
UNCOV
4212
            raise MalformedXML()
×
4213

4214
        if not logging_config.get("TargetPrefix"):
1✔
UNCOV
4215
            logging_config["TargetPrefix"] = ""
×
4216

4217
        # TODO: validate Grants
4218

4219
        if not (target_s3_bucket := store.buckets.get(target_bucket_name)):
1✔
4220
            raise InvalidTargetBucketForLogging(
1✔
4221
                "The target bucket for logging does not exist",
4222
                TargetBucket=target_bucket_name,
4223
            )
4224

4225
        source_bucket_region = s3_bucket.bucket_region
1✔
4226
        if target_s3_bucket.bucket_region != source_bucket_region:
1✔
4227
            raise (
1✔
4228
                CrossLocationLoggingProhibitted(
4229
                    "Cross S3 location logging not allowed. ",
4230
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4231
                )
4232
                if source_bucket_region == AWS_REGION_US_EAST_1
4233
                else CrossLocationLoggingProhibitted(
4234
                    "Cross S3 location logging not allowed. ",
4235
                    SourceBucketLocation=source_bucket_region,
4236
                    TargetBucketLocation=target_s3_bucket.bucket_region,
4237
                )
4238
            )
4239

4240
        s3_bucket.logging = logging_config
1✔
4241

4242
    def get_bucket_logging(
1✔
4243
        self,
4244
        context: RequestContext,
4245
        bucket: BucketName,
4246
        expected_bucket_owner: AccountId = None,
4247
        **kwargs,
4248
    ) -> GetBucketLoggingOutput:
4249
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4250

4251
        if not s3_bucket.logging:
1✔
4252
            return GetBucketLoggingOutput()
1✔
4253

4254
        return GetBucketLoggingOutput(LoggingEnabled=s3_bucket.logging)
1✔
4255

4256
    def put_bucket_replication(
1✔
4257
        self,
4258
        context: RequestContext,
4259
        bucket: BucketName,
4260
        replication_configuration: ReplicationConfiguration,
4261
        content_md5: ContentMD5 = None,
4262
        checksum_algorithm: ChecksumAlgorithm = None,
4263
        token: ObjectLockToken = None,
4264
        expected_bucket_owner: AccountId = None,
4265
        **kwargs,
4266
    ) -> None:
4267
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4268
        if not s3_bucket.versioning_status == BucketVersioningStatus.Enabled:
1✔
4269
            raise InvalidRequest(
1✔
4270
                "Versioning must be 'Enabled' on the bucket to apply a replication configuration"
4271
            )
4272

4273
        if not (rules := replication_configuration.get("Rules")):
1✔
4274
            raise MalformedXML()
1✔
4275

4276
        for rule in rules:
1✔
4277
            if "ID" not in rule:
1✔
4278
                rule["ID"] = short_uid()
1✔
4279

4280
            dest_bucket_arn = rule.get("Destination", {}).get("Bucket")
1✔
4281
            dest_bucket_name = s3_bucket_name(dest_bucket_arn)
1✔
4282
            if (
1✔
4283
                not (dest_s3_bucket := store.buckets.get(dest_bucket_name))
4284
                or not dest_s3_bucket.versioning_status == BucketVersioningStatus.Enabled
4285
            ):
4286
                # according to AWS testing the same exception is raised if the bucket does not exist
4287
                # or if versioning was disabled
4288
                raise InvalidRequest("Destination bucket must have versioning enabled.")
1✔
4289

4290
        # TODO more validation on input
4291
        s3_bucket.replication = replication_configuration
1✔
4292

4293
    def get_bucket_replication(
1✔
4294
        self,
4295
        context: RequestContext,
4296
        bucket: BucketName,
4297
        expected_bucket_owner: AccountId = None,
4298
        **kwargs,
4299
    ) -> GetBucketReplicationOutput:
4300
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4301

4302
        if not s3_bucket.replication:
1✔
4303
            raise ReplicationConfigurationNotFoundError(
1✔
4304
                "The replication configuration was not found",
4305
                BucketName=bucket,
4306
            )
4307

4308
        return GetBucketReplicationOutput(ReplicationConfiguration=s3_bucket.replication)
1✔
4309

4310
    def delete_bucket_replication(
1✔
4311
        self,
4312
        context: RequestContext,
4313
        bucket: BucketName,
4314
        expected_bucket_owner: AccountId = None,
4315
        **kwargs,
4316
    ) -> None:
4317
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4318

4319
        s3_bucket.replication = None
1✔
4320

4321
    @handler("PutBucketAcl", expand=False)
1✔
4322
    def put_bucket_acl(
1✔
4323
        self,
4324
        context: RequestContext,
4325
        request: PutBucketAclRequest,
4326
    ) -> None:
4327
        bucket = request["Bucket"]
1✔
4328
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4329
        acp = get_access_control_policy_from_acl_request(
1✔
4330
            request=request, owner=s3_bucket.owner, request_body=context.request.data
4331
        )
4332
        s3_bucket.acl = acp
1✔
4333

4334
    def get_bucket_acl(
1✔
4335
        self,
4336
        context: RequestContext,
4337
        bucket: BucketName,
4338
        expected_bucket_owner: AccountId = None,
4339
        **kwargs,
4340
    ) -> GetBucketAclOutput:
4341
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4342

4343
        return GetBucketAclOutput(Owner=s3_bucket.acl["Owner"], Grants=s3_bucket.acl["Grants"])
1✔
4344

4345
    @handler("PutObjectAcl", expand=False)
1✔
4346
    def put_object_acl(
1✔
4347
        self,
4348
        context: RequestContext,
4349
        request: PutObjectAclRequest,
4350
    ) -> PutObjectAclOutput:
4351
        bucket = request["Bucket"]
1✔
4352
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4353

4354
        s3_object = s3_bucket.get_object(
1✔
4355
            key=request["Key"],
4356
            version_id=request.get("VersionId"),
4357
            http_method="PUT",
4358
        )
4359
        acp = get_access_control_policy_from_acl_request(
1✔
4360
            request=request, owner=s3_object.owner, request_body=context.request.data
4361
        )
4362
        previous_acl = s3_object.acl
1✔
4363
        s3_object.acl = acp
1✔
4364

4365
        if previous_acl != acp:
1✔
4366
            self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4367

4368
        # TODO: RequestCharged
4369
        return PutObjectAclOutput()
1✔
4370

4371
    def get_object_acl(
1✔
4372
        self,
4373
        context: RequestContext,
4374
        bucket: BucketName,
4375
        key: ObjectKey,
4376
        version_id: ObjectVersionId = None,
4377
        request_payer: RequestPayer = None,
4378
        expected_bucket_owner: AccountId = None,
4379
        **kwargs,
4380
    ) -> GetObjectAclOutput:
4381
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4382

4383
        s3_object = s3_bucket.get_object(
1✔
4384
            key=key,
4385
            version_id=version_id,
4386
        )
4387
        # TODO: RequestCharged
4388
        return GetObjectAclOutput(Owner=s3_object.acl["Owner"], Grants=s3_object.acl["Grants"])
1✔
4389

4390
    def get_bucket_policy_status(
1✔
4391
        self,
4392
        context: RequestContext,
4393
        bucket: BucketName,
4394
        expected_bucket_owner: AccountId = None,
4395
        **kwargs,
4396
    ) -> GetBucketPolicyStatusOutput:
4397
        raise NotImplementedError
4398

4399
    def get_object_torrent(
1✔
4400
        self,
4401
        context: RequestContext,
4402
        bucket: BucketName,
4403
        key: ObjectKey,
4404
        request_payer: RequestPayer = None,
4405
        expected_bucket_owner: AccountId = None,
4406
        **kwargs,
4407
    ) -> GetObjectTorrentOutput:
4408
        raise NotImplementedError
4409

4410
    def post_object(
1✔
4411
        self, context: RequestContext, bucket: BucketName, body: IO[Body] = None, **kwargs
4412
    ) -> PostResponse:
4413
        if "multipart/form-data" not in context.request.headers.get("Content-Type", ""):
1✔
4414
            raise PreconditionFailed(
1✔
4415
                "At least one of the pre-conditions you specified did not hold",
4416
                Condition="Bucket POST must be of the enclosure-type multipart/form-data",
4417
            )
4418
        # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html
4419
        # TODO: signature validation is not implemented for pre-signed POST
4420
        # policy validation is not implemented either, except expiration and mandatory fields
4421
        # This operation is the only one using form for storing the request data. We will have to do some manual
4422
        # parsing here, as no specs are present for this, as no client directly implements this operation.
4423
        store, s3_bucket = self._get_cross_account_bucket(context, bucket)
1✔
4424

4425
        form = context.request.form
1✔
4426
        object_key = context.request.form.get("key")
1✔
4427

4428
        if "file" in form:
1✔
4429
            # in AWS, you can pass the file content as a string in the form field and not as a file object
4430
            file_data = to_bytes(form["file"])
1✔
4431
            object_content_length = len(file_data)
1✔
4432
            stream = BytesIO(file_data)
1✔
4433
        else:
4434
            # this is the default behaviour
4435
            fileobj = context.request.files["file"]
1✔
4436
            stream = fileobj.stream
1✔
4437
            # stream is a SpooledTemporaryFile, so we can seek the stream to know its length, necessary for policy
4438
            # validation
4439
            original_pos = stream.tell()
1✔
4440
            object_content_length = stream.seek(0, 2)
1✔
4441
            # reset the stream and put it back at its original position
4442
            stream.seek(original_pos, 0)
1✔
4443

4444
            if "${filename}" in object_key:
1✔
4445
                # TODO: ${filename} is actually usable in all form fields
4446
                # See https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/PresignedPost.html
4447
                # > The string ${filename} is automatically replaced with the name of the file provided by the user and
4448
                # is recognized by all form fields.
4449
                object_key = object_key.replace("${filename}", fileobj.filename)
1✔
4450

4451
        # TODO: see if we need to pass additional metadata not contained in the policy from the table under
4452
        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions
4453
        additional_policy_metadata = {
1✔
4454
            "bucket": bucket,
4455
            "content_length": object_content_length,
4456
        }
4457
        validate_post_policy(form, additional_policy_metadata)
1✔
4458

4459
        if canned_acl := form.get("acl"):
1✔
UNCOV
4460
            validate_canned_acl(canned_acl)
×
UNCOV
4461
            acp = get_canned_acl(canned_acl, owner=s3_bucket.owner)
×
4462
        else:
4463
            acp = get_canned_acl(BucketCannedACL.private, owner=s3_bucket.owner)
1✔
4464

4465
        post_system_settable_headers = [
1✔
4466
            "Cache-Control",
4467
            "Content-Type",
4468
            "Content-Disposition",
4469
            "Content-Encoding",
4470
        ]
4471
        system_metadata = {}
1✔
4472
        for system_metadata_field in post_system_settable_headers:
1✔
4473
            if field_value := form.get(system_metadata_field):
1✔
4474
                system_metadata[system_metadata_field.replace("-", "")] = field_value
1✔
4475

4476
        if not system_metadata.get("ContentType"):
1✔
4477
            system_metadata["ContentType"] = "binary/octet-stream"
1✔
4478

4479
        user_metadata = {
1✔
4480
            field.removeprefix("x-amz-meta-").lower(): form.get(field)
4481
            for field in form
4482
            if field.startswith("x-amz-meta-")
4483
        }
4484

4485
        if tagging := form.get("tagging"):
1✔
4486
            # this is weird, as it's direct XML in the form, we need to parse it directly
4487
            tagging = parse_post_object_tagging_xml(tagging)
1✔
4488

4489
        if (storage_class := form.get("x-amz-storage-class")) is not None and (
1✔
4490
            storage_class not in STORAGE_CLASSES or storage_class == StorageClass.OUTPOSTS
4491
        ):
4492
            raise InvalidStorageClass(
1✔
4493
                "The storage class you specified is not valid", StorageClassRequested=storage_class
4494
            )
4495

4496
        encryption_request = {
1✔
4497
            "ServerSideEncryption": form.get("x-amz-server-side-encryption"),
4498
            "SSEKMSKeyId": form.get("x-amz-server-side-encryption-aws-kms-key-id"),
4499
            "BucketKeyEnabled": form.get("x-amz-server-side-encryption-bucket-key-enabled"),
4500
        }
4501

4502
        encryption_parameters = get_encryption_parameters_from_request_and_bucket(
1✔
4503
            encryption_request,
4504
            s3_bucket,
4505
            store,
4506
        )
4507

4508
        checksum_algorithm = form.get("x-amz-checksum-algorithm")
1✔
4509
        checksum_value = (
1✔
4510
            form.get(f"x-amz-checksum-{checksum_algorithm.lower()}") if checksum_algorithm else None
4511
        )
4512
        expires = (
1✔
4513
            str_to_rfc_1123_datetime(expires_str) if (expires_str := form.get("Expires")) else None
4514
        )
4515

4516
        version_id = generate_version_id(s3_bucket.versioning_status)
1✔
4517

4518
        s3_object = S3Object(
1✔
4519
            key=object_key,
4520
            version_id=version_id,
4521
            storage_class=storage_class,
4522
            expires=expires,
4523
            user_metadata=user_metadata,
4524
            system_metadata=system_metadata,
4525
            checksum_algorithm=checksum_algorithm,
4526
            checksum_value=checksum_value,
4527
            encryption=encryption_parameters.encryption,
4528
            kms_key_id=encryption_parameters.kms_key_id,
4529
            bucket_key_enabled=encryption_parameters.bucket_key_enabled,
4530
            website_redirect_location=form.get("x-amz-website-redirect-location"),
4531
            acl=acp,
4532
            owner=s3_bucket.owner,  # TODO: for now we only have one owner, but it can depends on Bucket settings
4533
        )
4534

4535
        with self._storage_backend.open(bucket, s3_object, mode="w") as s3_stored_object:
1✔
4536
            s3_stored_object.write(stream)
1✔
4537

4538
            if not s3_object.checksum_value:
1✔
4539
                s3_object.checksum_value = s3_stored_object.checksum
1✔
4540

UNCOV
4541
            elif checksum_algorithm and s3_object.checksum_value != s3_stored_object.checksum:
×
UNCOV
4542
                self._storage_backend.remove(bucket, s3_object)
×
UNCOV
4543
                raise InvalidRequest(
×
4544
                    f"Value for x-amz-checksum-{checksum_algorithm.lower()} header is invalid."
4545
                )
4546

4547
            s3_bucket.objects.set(object_key, s3_object)
1✔
4548

4549
        # in case we are overriding an object, delete the tags entry
4550
        key_id = get_unique_key_id(bucket, object_key, version_id)
1✔
4551
        store.TAGS.tags.pop(key_id, None)
1✔
4552
        if tagging:
1✔
4553
            store.TAGS.tags[key_id] = tagging
1✔
4554

4555
        response = PostResponse()
1✔
4556
        # hacky way to set the etag in the headers as well: two locations for one value
4557
        response["ETagHeader"] = s3_object.quoted_etag
1✔
4558

4559
        if redirect := form.get("success_action_redirect"):
1✔
4560
            # we need to create the redirect, as the parser could not return the moto-calculated one
4561
            try:
1✔
4562
                redirect = create_redirect_for_post_request(
1✔
4563
                    base_redirect=redirect,
4564
                    bucket=bucket,
4565
                    object_key=object_key,
4566
                    etag=s3_object.quoted_etag,
4567
                )
4568
                response["LocationHeader"] = redirect
1✔
4569
                response["StatusCode"] = 303
1✔
4570
            except ValueError:
1✔
4571
                # If S3 cannot interpret the URL, it acts as if the field is not present.
4572
                response["StatusCode"] = form.get("success_action_status", 204)
1✔
4573

4574
        elif status_code := form.get("success_action_status"):
1✔
4575
            response["StatusCode"] = status_code
1✔
4576
        else:
4577
            response["StatusCode"] = 204
1✔
4578

4579
        response["LocationHeader"] = response.get(
1✔
4580
            "LocationHeader", f"{get_full_default_bucket_location(bucket)}{object_key}"
4581
        )
4582

4583
        if s3_bucket.versioning_status == "Enabled":
1✔
UNCOV
4584
            response["VersionId"] = s3_object.version_id
×
4585

4586
        if s3_object.checksum_algorithm:
1✔
4587
            response[f"Checksum{s3_object.checksum_algorithm.upper()}"] = s3_object.checksum_value
1✔
4588
            response["ChecksumType"] = ChecksumType.FULL_OBJECT
1✔
4589

4590
        if s3_bucket.lifecycle_rules:
1✔
4591
            if expiration_header := self._get_expiration_header(
×
4592
                s3_bucket.lifecycle_rules,
4593
                bucket,
4594
                s3_object,
4595
                store.TAGS.tags.get(key_id, {}),
4596
            ):
4597
                # TODO: we either apply the lifecycle to existing objects when we set the new rules, or we need to
4598
                #  apply them everytime we get/head an object
UNCOV
4599
                response["Expiration"] = expiration_header
×
4600

4601
        add_encryption_to_response(response, s3_object=s3_object)
1✔
4602

4603
        self._notify(context, s3_bucket=s3_bucket, s3_object=s3_object)
1✔
4604

4605
        if response["StatusCode"] == "201":
1✔
4606
            # if the StatusCode is 201, S3 returns an XML body with additional information
4607
            response["ETag"] = s3_object.quoted_etag
1✔
4608
            response["Bucket"] = bucket
1✔
4609
            response["Key"] = object_key
1✔
4610
            response["Location"] = response["LocationHeader"]
1✔
4611

4612
        return response
1✔
4613

4614
    def put_bucket_metrics_configuration(
1✔
4615
        self,
4616
        context: RequestContext,
4617
        bucket: BucketName,
4618
        id: MetricsId,
4619
        metrics_configuration: MetricsConfiguration,
4620
        expected_bucket_owner: AccountId = None,
4621
        **kwargs,
4622
    ) -> None:
4623
        """
4624
        Update or add a new metrics configuration. If the provided `id` already exists, its associated configuration
4625
        will be overwritten. The total number of metric configurations is limited to 1000. If this limit is exceeded,
4626
        an error is raised unless the `is` already exists.
4627

4628
        :param context: The request context.
4629
        :param bucket: The name of the bucket associated with the metrics configuration.
4630
        :param id: Identifies the metrics configuration being added or updated.
4631
        :param metrics_configuration: A new or updated configuration associated with the given metrics identifier.
4632
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4633
        :return: None
4634
        :raises TooManyConfigurations: If the total number of metrics configurations exceeds 1000 AND the provided
4635
            `metrics_id` does not already exist.
4636
        """
4637
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4638
            context, bucket, expected_bucket_owner=expected_bucket_owner
4639
        )
4640

4641
        if (
1✔
4642
            len(s3_bucket.metric_configurations) >= 1000
4643
            and id not in s3_bucket.metric_configurations
4644
        ):
UNCOV
4645
            raise TooManyConfigurations("Too many metrics configurations")
×
4646
        s3_bucket.metric_configurations[id] = metrics_configuration
1✔
4647

4648
    def get_bucket_metrics_configuration(
1✔
4649
        self,
4650
        context: RequestContext,
4651
        bucket: BucketName,
4652
        id: MetricsId,
4653
        expected_bucket_owner: AccountId = None,
4654
        **kwargs,
4655
    ) -> GetBucketMetricsConfigurationOutput:
4656
        """
4657
        Retrieve the metrics configuration associated with a given metrics identifier.
4658

4659
        :param context: The request context.
4660
        :param bucket: The name of the bucket associated with the metrics configuration.
4661
        :param id: The unique identifier of the metrics configuration to retrieve.
4662
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4663
        :return: The metrics configuration associated with the given metrics identifier.
4664
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4665
        """
4666
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4667
            context, bucket, expected_bucket_owner=expected_bucket_owner
4668
        )
4669

4670
        metric_config = s3_bucket.metric_configurations.get(id)
1✔
4671
        if not metric_config:
1✔
4672
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4673
        return GetBucketMetricsConfigurationOutput(MetricsConfiguration=metric_config)
1✔
4674

4675
    def list_bucket_metrics_configurations(
1✔
4676
        self,
4677
        context: RequestContext,
4678
        bucket: BucketName,
4679
        continuation_token: Token = None,
4680
        expected_bucket_owner: AccountId = None,
4681
        **kwargs,
4682
    ) -> ListBucketMetricsConfigurationsOutput:
4683
        """
4684
        Lists the metric configurations available, allowing for pagination using a continuation token to retrieve more
4685
        results.
4686

4687
        :param context: The request context.
4688
        :param bucket: The name of the bucket associated with the metrics configuration.
4689
        :param continuation_token: An optional continuation token to retrieve the next set of results in case there are
4690
            more results than the default limit. Provided as a base64-encoded string value.
4691
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4692
        :return: A list of metric configurations and an optional continuation token for fetching subsequent data, if
4693
            applicable.
4694
        """
4695
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4696
            context, bucket, expected_bucket_owner=expected_bucket_owner
4697
        )
4698

4699
        metrics_configurations: list[MetricsConfiguration] = []
1✔
4700
        next_continuation_token = None
1✔
4701

4702
        decoded_continuation_token = (
1✔
4703
            to_str(base64.urlsafe_b64decode(continuation_token.encode()))
4704
            if continuation_token
4705
            else None
4706
        )
4707

4708
        for metric in sorted(s3_bucket.metric_configurations.values(), key=lambda r: r["Id"]):
1✔
4709
            if continuation_token and metric["Id"] < decoded_continuation_token:
1✔
4710
                continue
1✔
4711

4712
            if len(metrics_configurations) >= 100:
1✔
4713
                next_continuation_token = to_str(base64.urlsafe_b64encode(metric["Id"].encode()))
1✔
4714
                break
1✔
4715

4716
            metrics_configurations.append(metric)
1✔
4717

4718
        return ListBucketMetricsConfigurationsOutput(
1✔
4719
            IsTruncated=next_continuation_token is not None,
4720
            ContinuationToken=continuation_token,
4721
            NextContinuationToken=next_continuation_token,
4722
            MetricsConfigurationList=metrics_configurations,
4723
        )
4724

4725
    def delete_bucket_metrics_configuration(
1✔
4726
        self,
4727
        context: RequestContext,
4728
        bucket: BucketName,
4729
        id: MetricsId,
4730
        expected_bucket_owner: AccountId = None,
4731
        **kwargs,
4732
    ) -> None:
4733
        """
4734
        Removes a specific metrics configuration identified by its metrics ID.
4735

4736
        :param context: The request context.
4737
        :param bucket: The name of the bucket associated with the metrics configuration.
4738
        :param id: The unique identifier of the metrics configuration to delete.
4739
        :param expected_bucket_owner: The expected account ID of the bucket owner.
4740
        :return: None
4741
        :raises NoSuchConfiguration: If the provided metrics configuration does not exist.
4742
        """
4743
        store, s3_bucket = self._get_cross_account_bucket(
1✔
4744
            context, bucket, expected_bucket_owner=expected_bucket_owner
4745
        )
4746

4747
        deleted_config = s3_bucket.metric_configurations.pop(id, None)
1✔
4748
        if not deleted_config:
1✔
4749
            raise NoSuchConfiguration("The specified configuration does not exist.")
1✔
4750

4751

4752
def generate_version_id(bucket_versioning_status: str) -> str | None:
1✔
4753
    if not bucket_versioning_status:
1✔
4754
        return None
1✔
4755
    elif bucket_versioning_status.lower() == "enabled":
1✔
4756
        return generate_safe_version_id()
1✔
4757
    else:
4758
        return "null"
1✔
4759

4760

4761
def add_encryption_to_response(response: dict, s3_object: S3Object):
1✔
4762
    if encryption := s3_object.encryption:
1✔
4763
        response["ServerSideEncryption"] = encryption
1✔
4764
        if encryption == ServerSideEncryption.aws_kms:
1✔
4765
            response["SSEKMSKeyId"] = s3_object.kms_key_id
1✔
4766
            if s3_object.bucket_key_enabled:
1✔
4767
                response["BucketKeyEnabled"] = s3_object.bucket_key_enabled
1✔
4768

4769

4770
def get_encryption_parameters_from_request_and_bucket(
1✔
4771
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4772
    s3_bucket: S3Bucket,
4773
    store: S3Store,
4774
) -> EncryptionParameters:
4775
    if request.get("SSECustomerKey"):
1✔
4776
        # we return early, because ServerSideEncryption does not apply if the request has SSE-C
4777
        return EncryptionParameters(None, None, False)
1✔
4778

4779
    encryption = request.get("ServerSideEncryption")
1✔
4780
    kms_key_id = request.get("SSEKMSKeyId")
1✔
4781
    bucket_key_enabled = request.get("BucketKeyEnabled")
1✔
4782
    if s3_bucket.encryption_rule:
1✔
4783
        bucket_key_enabled = bucket_key_enabled or s3_bucket.encryption_rule.get("BucketKeyEnabled")
1✔
4784
        encryption = (
1✔
4785
            encryption
4786
            or s3_bucket.encryption_rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
4787
        )
4788
        if encryption == ServerSideEncryption.aws_kms:
1✔
4789
            key_id = kms_key_id or s3_bucket.encryption_rule[
1✔
4790
                "ApplyServerSideEncryptionByDefault"
4791
            ].get("KMSMasterKeyID")
4792
            kms_key_id = get_kms_key_arn(
1✔
4793
                key_id, s3_bucket.bucket_account_id, s3_bucket.bucket_region
4794
            )
4795
            if not kms_key_id:
1✔
4796
                # if not key is provided, AWS will use an AWS managed KMS key
4797
                # create it if it doesn't already exist, and save it in the store per region
4798
                if not store.aws_managed_kms_key_id:
1✔
4799
                    managed_kms_key_id = create_s3_kms_managed_key_for_region(
1✔
4800
                        s3_bucket.bucket_account_id, s3_bucket.bucket_region
4801
                    )
4802
                    store.aws_managed_kms_key_id = managed_kms_key_id
1✔
4803

4804
                kms_key_id = store.aws_managed_kms_key_id
1✔
4805

4806
    return EncryptionParameters(encryption, kms_key_id, bucket_key_enabled)
1✔
4807

4808

4809
def get_object_lock_parameters_from_bucket_and_request(
1✔
4810
    request: PutObjectRequest | CopyObjectRequest | CreateMultipartUploadRequest,
4811
    s3_bucket: S3Bucket,
4812
):
4813
    lock_mode = request.get("ObjectLockMode")
1✔
4814
    lock_legal_status = request.get("ObjectLockLegalHoldStatus")
1✔
4815
    lock_until = request.get("ObjectLockRetainUntilDate")
1✔
4816

4817
    if lock_mode and not lock_until:
1✔
4818
        raise InvalidArgument(
1✔
4819
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4820
            ArgumentName="x-amz-object-lock-retain-until-date",
4821
        )
4822
    elif not lock_mode and lock_until:
1✔
4823
        raise InvalidArgument(
1✔
4824
            "x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied",
4825
            ArgumentName="x-amz-object-lock-mode",
4826
        )
4827

4828
    if lock_mode and lock_mode not in OBJECT_LOCK_MODES:
1✔
4829
        raise InvalidArgument(
1✔
4830
            "Unknown wormMode directive.",
4831
            ArgumentName="x-amz-object-lock-mode",
4832
            ArgumentValue=lock_mode,
4833
        )
4834

4835
    if (default_retention := s3_bucket.object_lock_default_retention) and not lock_mode:
1✔
4836
        lock_mode = default_retention["Mode"]
1✔
4837
        lock_until = get_retention_from_now(
1✔
4838
            days=default_retention.get("Days"),
4839
            years=default_retention.get("Years"),
4840
        )
4841

4842
    return ObjectLockParameters(lock_until, lock_legal_status, lock_mode)
1✔
4843

4844

4845
def get_part_range(s3_object: S3Object, part_number: PartNumber) -> ObjectRange:
1✔
4846
    """
4847
    Calculate the range value from a part Number for an S3 Object
4848
    :param s3_object: S3Object
4849
    :param part_number: the wanted part from the S3Object
4850
    :return: an ObjectRange used to return only a slice of an Object
4851
    """
4852
    if not s3_object.parts:
1✔
4853
        if part_number > 1:
1✔
4854
            raise InvalidPartNumber(
1✔
4855
                "The requested partnumber is not satisfiable",
4856
                PartNumberRequested=part_number,
4857
                ActualPartCount=1,
4858
            )
4859
        return ObjectRange(
1✔
4860
            begin=0,
4861
            end=s3_object.size - 1,
4862
            content_length=s3_object.size,
4863
            content_range=f"bytes 0-{s3_object.size - 1}/{s3_object.size}",
4864
        )
4865
    elif not (part_data := s3_object.parts.get(part_number)):
1✔
4866
        raise InvalidPartNumber(
1✔
4867
            "The requested partnumber is not satisfiable",
4868
            PartNumberRequested=part_number,
4869
            ActualPartCount=len(s3_object.parts),
4870
        )
4871

4872
    # TODO: remove for next major version 5.0, compatibility for <= 4.5
4873
    if isinstance(part_data, tuple):
1✔
UNCOV
4874
        begin, part_length = part_data
×
4875
    else:
4876
        begin = part_data["_position"]
1✔
4877
        part_length = part_data["Size"]
1✔
4878

4879
    end = begin + part_length - 1
1✔
4880
    return ObjectRange(
1✔
4881
        begin=begin,
4882
        end=end,
4883
        content_length=part_length,
4884
        content_range=f"bytes {begin}-{end}/{s3_object.size}",
4885
    )
4886

4887

4888
def get_acl_headers_from_request(
1✔
4889
    request: PutObjectRequest
4890
    | CreateMultipartUploadRequest
4891
    | CopyObjectRequest
4892
    | CreateBucketRequest
4893
    | PutBucketAclRequest
4894
    | PutObjectAclRequest,
4895
) -> list[tuple[str, str]]:
4896
    permission_keys = [
1✔
4897
        "GrantFullControl",
4898
        "GrantRead",
4899
        "GrantReadACP",
4900
        "GrantWrite",
4901
        "GrantWriteACP",
4902
    ]
4903
    acl_headers = [
1✔
4904
        (permission, grant_header)
4905
        for permission in permission_keys
4906
        if (grant_header := request.get(permission))
4907
    ]
4908
    return acl_headers
1✔
4909

4910

4911
def get_access_control_policy_from_acl_request(
1✔
4912
    request: PutBucketAclRequest | PutObjectAclRequest,
4913
    owner: Owner,
4914
    request_body: bytes,
4915
) -> AccessControlPolicy:
4916
    canned_acl = request.get("ACL")
1✔
4917
    acl_headers = get_acl_headers_from_request(request)
1✔
4918

4919
    # FIXME: this is very dirty, but the parser does not differentiate between an empty body and an empty XML node
4920
    # errors are different depending on that data, so we need to access the context. Modifying the parser for this
4921
    # use case seems dangerous
4922
    is_acp_in_body = request_body
1✔
4923

4924
    if not (canned_acl or acl_headers or is_acp_in_body):
1✔
4925
        raise MissingSecurityHeader(
1✔
4926
            "Your request was missing a required header", MissingHeaderName="x-amz-acl"
4927
        )
4928

4929
    elif canned_acl and acl_headers:
1✔
4930
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
1✔
4931

4932
    elif (canned_acl or acl_headers) and is_acp_in_body:
1✔
4933
        raise UnexpectedContent("This request does not support content")
1✔
4934

4935
    if canned_acl:
1✔
4936
        validate_canned_acl(canned_acl)
1✔
4937
        acp = get_canned_acl(canned_acl, owner=owner)
1✔
4938

4939
    elif acl_headers:
1✔
4940
        grants = []
1✔
4941
        for permission, grantees_values in acl_headers:
1✔
4942
            permission = get_permission_from_header(permission)
1✔
4943
            partial_grants = parse_grants_in_headers(permission, grantees_values)
1✔
4944
            grants.extend(partial_grants)
1✔
4945

4946
        acp = AccessControlPolicy(Owner=owner, Grants=grants)
1✔
4947
    else:
4948
        acp = request.get("AccessControlPolicy")
1✔
4949
        validate_acl_acp(acp)
1✔
4950
        if (
1✔
4951
            owner.get("DisplayName")
4952
            and acp["Grants"]
4953
            and "DisplayName" not in acp["Grants"][0]["Grantee"]
4954
        ):
4955
            acp["Grants"][0]["Grantee"]["DisplayName"] = owner["DisplayName"]
1✔
4956

4957
    return acp
1✔
4958

4959

4960
def get_access_control_policy_for_new_resource_request(
1✔
4961
    request: PutObjectRequest
4962
    | CreateMultipartUploadRequest
4963
    | CopyObjectRequest
4964
    | CreateBucketRequest,
4965
    owner: Owner,
4966
) -> AccessControlPolicy:
4967
    # TODO: this is basic ACL, not taking into account Bucket settings. Revisit once we really implement ACLs.
4968
    canned_acl = request.get("ACL")
1✔
4969
    acl_headers = get_acl_headers_from_request(request)
1✔
4970

4971
    if not (canned_acl or acl_headers):
1✔
4972
        return get_canned_acl(BucketCannedACL.private, owner=owner)
1✔
4973

4974
    elif canned_acl and acl_headers:
1✔
4975
        raise InvalidRequest("Specifying both Canned ACLs and Header Grants is not allowed")
×
4976

4977
    if canned_acl:
1✔
4978
        validate_canned_acl(canned_acl)
1✔
4979
        return get_canned_acl(canned_acl, owner=owner)
1✔
4980

UNCOV
4981
    grants = []
×
UNCOV
4982
    for permission, grantees_values in acl_headers:
×
UNCOV
4983
        permission = get_permission_from_header(permission)
×
UNCOV
4984
        partial_grants = parse_grants_in_headers(permission, grantees_values)
×
UNCOV
4985
        grants.extend(partial_grants)
×
4986

UNCOV
4987
    return AccessControlPolicy(Owner=owner, Grants=grants)
×
4988

4989

4990
def object_exists_for_precondition_write(s3_bucket: S3Bucket, key: ObjectKey) -> bool:
1✔
4991
    return (existing := s3_bucket.objects.get(key)) and not isinstance(existing, S3DeleteMarker)
1✔
4992

4993

4994
def verify_object_equality_precondition_write(
1✔
4995
    s3_bucket: S3Bucket,
4996
    key: ObjectKey,
4997
    etag: str,
4998
    initiated: datetime.datetime | None = None,
4999
) -> None:
5000
    existing = s3_bucket.objects.get(key)
1✔
5001
    if not existing or isinstance(existing, S3DeleteMarker):
1✔
5002
        raise NoSuchKey("The specified key does not exist.", Key=key)
1✔
5003

5004
    if not existing.etag == etag.strip('"'):
1✔
5005
        raise PreconditionFailed(
1✔
5006
            "At least one of the pre-conditions you specified did not hold",
5007
            Condition="If-Match",
5008
        )
5009

5010
    if initiated and initiated < existing.last_modified:
1✔
5011
        raise ConditionalRequestConflict(
1✔
5012
            "The conditional request cannot succeed due to a conflicting operation against this resource.",
5013
            Condition="If-Match",
5014
            Key=key,
5015
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc